Module tests.common

Functions

def activate_standby_volume(client, volume_name, frontend='blockdev')
def apps_api(request)

Create a new AppsV1API instance.

Returns

A new AppsV1API Instance.

def assert_backup_state(b_actual, b_expected)
def assert_from_assert_error_check_threads(thrd_list)

Check all threads in thrd_list are done and their status

Parameters

thrd_list: thread list created by create_assert_error_check_thread.

def backing_image_feature_supported(client)
def batch_v1_api(request)

Create a new BatchV1Api instance.

Returns

A new BatchV1Api Instance.

def check_all_support_bundle_managers_deleted()
def check_backing_image_disk_map_status(client, bi_name, expect_cnt, expect_disk_state)
def check_backing_image_eviction_failed(name)
def check_backing_image_single_copy_disk_eviction(client, bi_name, old_disk_id)
def check_backing_image_single_copy_node_eviction(client, bi_name, old_node)
def check_block_device_size(volume, size)
def check_csi(core_api)
def check_csi_expansion(core_api)
def check_device_data(dev, data, check_checksum=True)
def check_longhorn(core_api)
def check_pod_existence(api, pod_name, namespace='default')
def check_pv_existence(api, pv_name)
def check_pvc_existence(api, pvc_name, namespace='default')
def check_pvc_in_specific_status(api, pvc_name, status)
def check_recurring_jobs(client, recurring_jobs)
def check_statefulset_existence(api, ss_name, namespace='default')
def check_volume_data(volume, data, check_checksum=True)
def check_volume_endpoint(v)
def check_volume_existence(client, volume_name)
def check_volume_last_backup(client, volume_name, last_backup)
def check_volume_replicas(volume, spec, tag_mapping)

Check the replicas on the volume to ensure that they were scheduled properly. :param volume: The Volume to check. :param spec: The spec to validate the Tag against. :param tag_mapping: The mapping of Nodes to the Tags they have. :raise AssertionError: If the Volume doesn't match all the conditions.

def cleanup_all_backing_images(client)
def cleanup_all_recurring_jobs(client)
def cleanup_all_support_bundles(client)

Clean up all support bundles :param client: The Longhorn client to use in the request.

def cleanup_all_volumes(client)

Clean up all volumes :param client: The Longhorn client to use in the request.

def cleanup_client()
def cleanup_crypto_secret()
def cleanup_disks_on_node(client, node_id, *disks)
def cleanup_host_disk(vol_name)
def cleanup_host_disks(client, *args)
def cleanup_node_disks(client, node_name)
def cleanup_storage_class()
def cleanup_test_disks(client)
def cleanup_volume(client, volume)

Clean up the volume after the test. :param client: The Longhorn client to use in the request. :param volume: The volume to clean up.

def cleanup_volume_by_name(client, vol_name)
def client(request)

Return an individual Longhorn API client for testing.

def clients(request)
def copy_file_to_volume_dev_mb_data(src_path, dest_path, src_offset, dest_offset, size_in_mb, timeout_cnt=5)
def copy_pod_volume_data(api, pod_name, src_path, dest_path)
def core_api(request)

Create a new CoreV1API instance.

Returns

A new CoreV1API Instance.

def crash_engine_process_with_sigkill(client, core_api, volume_name)
def crash_replica_processes(client, api, volname, replicas=None, wait_to_fail=True)
def create_and_check_volume(client, volume_name, num_of_replicas=3, size='16777216', backing_image='', frontend='blockdev', snapshot_data_integrity='ignored', access_mode='rwo')

Create a new volume with the specified parameters. Assert that the new volume is detached and that all of the requested parameters match.

:param client: The Longhorn client to use in the request. :param volume_name: The name of the volume. :param num_of_replicas: The number of replicas the volume should have. :param size: The size of the volume, as a string representing the number of bytes. :param backing_image: The backing image to use for the volume. :param frontend: The frontend to use for the volume. :return: The volume instance created.

def create_and_wait_deployment(apps_api, deployment_manifest)
def create_and_wait_pod(api, pod_manifest)

Creates a new Pod attached to a PersistentVolumeClaim for testing.

The function will block until the Pod is online or until it times out, whichever occurs first. The volume created by the manifest passed in will be mounted to '/data'.

Args

api
An instance of CoreV1API.
pod_name
The name of the Pod.
volume
The volume manifest.
def create_and_wait_statefulset(statefulset_manifest)

Create a new StatefulSet for testing.

This function will block until all replicas in the StatefulSet are online or it times out, whichever occurs first.

def create_assert_error_check_thread(func, *args)

Do func by threading with arguments

Parameters

func: function that want to do things in parallel. args: arguments for function.

def create_backing_image_with_matching_url(client, name, url, minNumberOfCopies=1, nodeSelector=[], diskSelector=[])
def create_backup(client, volname, data={}, labels={})
def create_backup_from_volume_attached_to_pod(client, core_api, volume_name, pod_name, data_path='/data/test', data_size=100)

Write data in the pod and take a backup.

Args

client
The Longhorn client to use in the request.
core_api
An instance of CoreV1API.
pod_name
The name of the Pod.
volume_name
The volume name which is attached to the pod.
data_path
File name suffixed to the mount point. e.g /data/file
data_size
Size of the data to be written in the pod.

Returns

The backup volume name, backup, checksum of data written in the backup

def create_crypto_secret(secret_manifest, namespace='longhorn-system')
def create_deployment_and_write_data(client, core_api, make_deployment_with_pvc, volume_name, size, replica_count, data_size, attach_node_id=None)
def create_host_disk(client, vol_name, size, node_id)
def create_pv_for_volume(client, core_api, volume, pv_name, fs_type='ext4')
def create_pvc(pvc_manifest)
def create_pvc_for_volume(client, core_api, volume, pvc_name, pvc_namespace='default')
def create_pvc_spec(name)

Generate a volume manifest using the given name for the PVC.

This spec is used to test dynamically provisioned PersistentVolumes (those created using a storage class).

def create_recurring_jobs(client, recurring_jobs)
def create_rwx_volume_with_storageclass(client, core_api, storage_class)
def create_snapshot(longhorn_api_client, volume_name)
def create_statefulset(statefulset_manifest)

Create a new StatefulSet for testing.

def create_storage_class(sc_manifest)
def create_support_bundle(client)
def create_volume(client, vol_name, size, node_id, r_num)
def create_volume_and_backup(client, vol_name, vol_size, backup_data_size)
def create_volume_and_write_data(client, volume_name, volume_size='16777216')
  1. Create and attach a volume
  2. Write the data to volume
def crypto_secret(request)
def csi_pv(request)
def csi_pv_backingimage(request)
def csi_pvc_name(request)
def delete_and_wait_deployment(apps_api, deployment_name, namespace='default')
def delete_and_wait_longhorn(client, name)

Delete a volume from Longhorn.

def delete_and_wait_pod(api, pod_name, namespace='default', wait=True)

Delete a specified Pod.

This function does not check if the Pod does exist and will throw an error if a nonexistent Pod is specified.

Args

api
An instance of CoreV1API.
pod_name
The name of the Pod.
def delete_and_wait_pv(api, pv_name)
def delete_and_wait_pvc(api, pvc_name, retry_counts=150)
def delete_and_wait_statefulset(api, client, statefulset)
def delete_and_wait_volume_attachment(storage_api, volume_attachment_name)
def delete_backup(client, volume_name, backup_name)
def delete_backup_backing_image(client, backing_image_name)
def delete_backup_volume(client, volume_name)
def delete_crypto_secret(namespace, name)
def delete_replica_on_test_node(client, volume_name)
def delete_replica_processes(client, api, volname)
def delete_statefulset(apps_api, statefulset)
def delete_storage_class(sc_name)
def delete_support_bundle(node_id, name, client)
def dev_read(dev, start, count)
def dev_write(dev, start, data)
def disable_auto_salvage(client)
def download_support_bundle(node_id, name, client, target_path='')
def enable_default_disk(client)
def exec_command_in_pod(api, command, pod_name, namespace, container=None)

Execute command in the pod.

Args

api
An instance of CoreV1API.
pod_name
The name of the Pod.
command
The command to execute in the pod.
namespace
The namespace where the pod exists.

Returns

The output of the command.

def exec_instance_manager(api, im_name, cmd)
def exec_local(cmd)
def exec_nsenter(cmd, process_name=None)
def expand_and_wait_for_pvc(api, pvc, size)
def fail_replica_expansion(client, api, volname, size, replicas=None)
def find_ancestor_process_by_name(ancestor_name)
def find_backup(client, vol_name, snap_name)

find_backup will look for a backup on the backupstore it's important to note, that this can only be used for completed backups since the backup.cfg will only be written once a backup operation has been completed successfully

def find_backup_volume(client, volume_name)
def find_dockerd_pid()
def find_process_pid(process_name)
def find_replica_for_backup(client, volume_name, backup_id)
def find_self()
def fix_replica_expansion_failure(client, api, volname, size, replicas=None)
def generate_attachment_ticket_id()
def generate_pod_with_pvc_manifest(pod_name, pvc_name)
def generate_random_data(count)
def generate_random_pos(size, used={})
def generate_random_suffix()
def generate_sts_name()
def generate_support_bundle(case_name)

Generate support bundle into folder ./support_bundle/case_name.zip

Won't generate support bundle if current support bundle count greater than MAX_SUPPORT_BINDLE_NUMBER.

Args

case_name
support bundle will named case_name.zip
def generate_volume_name()
def get_all_support_bundle_manager_deployments(apps_api)
def get_apps_api_client()
def get_backupstore_poll_interval()
def get_backupstore_url()
def get_backupstores()
def get_client(address)
def get_clients(hosts)
def get_clone_volume_name(client, source_volume_name)
def get_compatibility_test_image(cli_v, cli_minv, ctl_v, ctl_minv, data_v, data_minv)
def get_core_api_client()
def get_custom_object_api_client()
def get_default_engine_image(client)
def get_deployment_pod_names(core_api, deployment)
def get_device_checksum(dev)
def get_disk_uuid()
def get_engine_host_id(client, vol_name)
def get_engine_image_status_value(client, ei_name)
def get_host_disk_size(disk)
def get_host_replica_count(client, volume_name, host_id, chk_running=False)
def get_iscsi_ip(iscsi)
def get_iscsi_lun(iscsi)
def get_iscsi_port(iscsi)
def get_iscsi_target(iscsi)
def get_k8s_zone_label()
def get_liveness_probe_spec(initial_delay=5, period=5)
def get_longhorn_api_client()
def get_mgr_ips()
def get_node_by_disk_id(client, disk_id)
def get_pod_data_md5sum(api, pod_name, path)
def get_process_info(p_path)
def get_pv_manifest(request)
def get_pvc_manifest(request)
def get_random_client(clients)
def get_scheduling_api_client()
def get_self_host_id()
def get_statefulset_pod_info(api, s_set)
def get_storage_api_client()
def get_support_bundle(node_id, name, client)
def get_support_bundle_url(client)
def get_update_disks(disks)
def get_upgrade_test_image(cli_v, cli_minv, ctl_v, ctl_minv, data_v, data_minv)
def get_version_api_client()
def get_volume_dev_mb_data_md5sum(path, offset_in_mb, length_in_mb)
def get_volume_endpoint(v)
def get_volume_engine(v)
def get_volume_name(api, pvc_name)

Given a PersistentVolumeClaim, return the name of the associated PV.

def get_volume_recurring_jobs_and_groups(volume)
def get_volume_running_replica_cnt(client, volume_name)
def is_backupTarget_nfs(s)
def is_backupTarget_s3(s)
def is_k8s_node_gke_cos(core_api)
def is_k8s_node_label(core_api, label_key, label_value, node_name)
def is_replica_available(r)
def iscsi_login(iscsi_ep)
def iscsi_logout(iscsi_ep)
def json_string_go_to_python(str)
def lazy_umount_disk(mount_path)
def load_k8s_config()
def make_deployment_cpu_request(request)
def make_deployment_with_pvc(request)
def monitor_restore_progress(client, volume_name)
def mount_disk(dev, mount_path)
def node_default_tags()

Assign the Tags under DEFAULT_TAGS to the Longhorn client's Nodes to provide a base set of Tags to work with in the tests. :return: A dictionary mapping a Node's ID to the Tags it has.

def offline_expand_attached_volume(client, volume_name, size='33554432')
def parse_iscsi_endpoint(iscsi)
def pod(request)
def pod_make(request)
def prepare_host_disk(dev, vol_name)
def prepare_pod_with_data_in_mb(client, core_api, csi_pv, pvc, pod_make, volume_name, volume_size='1073741824', num_of_replicas=3, data_path='/data/test', data_size_in_mb=100, add_liveness_probe=True, access_mode='rwo')
def prepare_statefulset_with_data_in_mb(client, core_api, statefulset, sts_name, storage_class, data_path='/data/test', data_size_in_mb=100)
def priority_class(request)
def pvc(request)
def pvc_backingimage(request)
def pvc_name(request)
def random_labels()
def read_pod_block_volume_data(api, pod_name, data_size, offset, device_path)
def read_volume_data(api, pod_name, filename='test')

Retrieve data from a Pod's volume.

Args

api
An instance of CoreV1API.
pod_name
The name of the Pod.

Returns

The data contained within the volume.

def recurring_job_feature_supported(client)
def reset_disks_for_all_nodes(client)
def reset_engine_image(client)
def reset_longhorn_node_zone(client)
def reset_node(client, core_api)
def reset_nodes_taint(client)
def reset_settings(client)
def restart_and_wait_ready_engine_count(client, ready_engine_count)

Delete/restart engine daemonset and wait ready engine image count after daemonset restart

def restore_backup_and_get_data_checksum(client, core_api, backup, pod, file_name='', command='')

Restore the backup in a pod and get the checksum of all the files or checksum of a particular file.

Args

client
The Longhorn client to use in the request.
core_api
An instance of CoreV1API.
backup
The backup to be restored.
pod
Pod fixture.
file_name
Optional - File whose checksum to be computed.
command
Optional - command to be executed in the pod.

Returns

The checksum as a dictionary as in file_name=checksum and the output of the command executed in the pod.

def rwx_statefulset(request)
def scale_up_engine_image_daemonset(client)
def scheduling_api(request)

Create a new SchedulingV1API instance.

Returns

A new CoreV1API Instance.

def set_and_wait_k8s_nodes_zone_label(core_api, node_zone_map)
def set_k8s_node_label(core_api, node_name, key, value)
def set_k8s_node_zone_label(core_api, node_name, zone_name)
def set_node_cordon(api, node_name, to_cordon)

Set a kubernetes node schedulable status

def set_node_scheduling(client, node, allowScheduling, retry=False)
def set_node_scheduling_eviction(client, node, allowScheduling, evictionRequested, retry=False)
def set_node_tags(client, node, tags=[], retry=False)

Set the tags on a node without modifying its scheduling status. Retry if "too many retries error" happened. :param client: The Longhorn client to use in the request. :param node: The Node to update. :param tags: The tags to set on the node. :return: The updated Node.

def set_tags_for_node_and_its_disks(client, node, tags)
def settings_reset()
def size_to_string(volume_size)

Convert a volume size to string format to pass into Kubernetes.

Args

volume_size
The size of the volume in bytes.

Returns

The size of the volume in gigabytes as a passable string to Kubernetes.

def statefulset(request)
def storage_class(request)
def sts_name(request)
def system_backup_feature_supported(client)
def system_backup_random_name()
def system_backup_wait_for_state(state, name, client)
def system_backups_cleanup(client)

Clean up all system backups :param client: The Longhorn client to use in the request.

def system_restore_random_name()
def system_restore_wait_for_state(state, name, client)
def system_restores_cleanup(client)

Clean up all system restores :param client: The Longhorn client to use in the request.

def umount_disk(mount_path)
def update_node_disks(client, node_name, disks, retry=False)
def update_persistent_volume_claim(core_api, name, namespace, claim)
def update_recurring_job(client, name, groups, labels, cron='', retain=0, concurrency=0)
def update_setting(client, name, value)
def update_statefulset_manifests(ss_manifest, sc_manifest, name)

Write in a new StatefulSet name and the proper StorageClass name for tests.

def volume_name(request)
def volume_read(v, start, count)
def volume_valid(dev)
def volume_write(v, start, data)
def wait_and_get_any_deployment_pod(core_api, deployment_name, is_phase='Running')

Add mechanism to wait for a stable running pod when deployment restarts its workload, since Longhorn manager could create/delete the new workload pod multiple times, it's possible that we get an unstable pod which will be deleted immediately, so add a wait mechanism to get a stable running pod. ref: https://github.com/longhorn/longhorn/issues/4814

def wait_and_get_pv_for_pvc(api, pvc_name)
def wait_delete_deployment(apps_api, deployment_name, namespace='default')
def wait_delete_dm_device(api, name)
def wait_delete_pod(api, pod_uid, namespace='default')
def wait_delete_pv(api, pv_name)
def wait_delete_pvc(api, pvc_name, retry_counts=150)
def wait_delete_volume_attachment(storage_api, volume_attachment_name)
def wait_deployment_replica_ready(apps_api, deployment_name, desired_replica_count, namespace='default')
def wait_for_all_instance_manager_running(client)
def wait_for_backing_image_delete(client, name)
def wait_for_backing_image_disk_cleanup(client, bi_name, disk_id)
def wait_for_backing_image_in_disk_fail(client, backing_img_name, disk_uuid)
def wait_for_backing_image_status(client, backing_img_name, image_status)
def wait_for_backup_backing_image_delete(client, name)
def wait_for_backup_completion(client, volume_name, snapshot_name=None, retry_count=300)
def wait_for_backup_count(backup_volume, number, retry_counts=120)
def wait_for_backup_delete(client, volume_name, backup_name)
def wait_for_backup_failed(client, volume_name, snapshot_name=None, retry_count=300)
def wait_for_backup_restore_completed(client, name, backup_name)
def wait_for_backup_state(client, volume_name, predicate, retry_count=300)
def wait_for_backup_target_available(client, available)
def wait_for_backup_to_start(client, volume_name, snapshot_name=None, retry_count=300, chk_progress=0)
def wait_for_backup_volume(client, vol_name, backing_image='')
def wait_for_backup_volume_backing_image_synced(client, volume_name, backing_image, retry_count=300)
def wait_for_backup_volume_delete(client, name)
def wait_for_cron_job_count(batch_v1_api, number, label='', retry_counts=150)
def wait_for_cron_job_create(batch_v1_api, label='', retry_counts=150)
def wait_for_cron_job_delete(batch_v1_api, label='', retry_counts=150)
def wait_for_deployed_engine_image_count(client, image_name, expected_cnt, exclude_nodes=[])
def wait_for_device_login(dest_path, name)
def wait_for_disk_conditions(client, node_name, disk_name, key, value)
def wait_for_disk_status(client, node_name, disk_name, key, value)
def wait_for_disk_storage_available(client, node_name, disk_name, disk_path)
def wait_for_disk_update(client, name, disk_num)
def wait_for_disk_uuid(client, node_name, uuid)
def wait_for_dr_volume_expansion(longhorn_api_client, volume_name, size_str)
def wait_for_engine_image_condition(client, image_name, state)

state: "True", "False"

def wait_for_engine_image_creation(client, image_name)
def wait_for_engine_image_deletion(client, core_api, engine_image_name)
def wait_for_engine_image_incompatible(client, image_name)
def wait_for_engine_image_ref_count(client, image_name, count)
def wait_for_engine_image_state(client, image_name, state)
def wait_for_expansion_error_clear(longhorn_api_client, volume_name)
def wait_for_expansion_failure(client, volume_name, last_failed_at='')
def wait_for_instance_manager_count(client, number, retry_counts=120)
def wait_for_instance_manager_desire_state(client, core_api, im_name, state, desire=True)
def wait_for_node_mountpropagation_condition(client, name)
def wait_for_node_schedulable_condition(client, name)
def wait_for_node_tag_update(client, name, tags)
def wait_for_node_update(client, name, key, value)
def wait_for_pod_annotation(core_api, label_selector, anno_key, anno_val)
def wait_for_pod_phase(core_api, pod_name, pod_phase, namespace='default')
def wait_for_pod_remount(core_api, pod_name, chk_path='/data/lost+found')
def wait_for_pod_restart(core_api, pod_name, namespace='default')
def wait_for_pods_volume_delete(client, pod_list, retry_counts=300)
def wait_for_pods_volume_state(client, pod_list, field, value, retry_counts=150)
def wait_for_pvc_phase(api, pvc_name, phase)
def wait_for_rebuild_complete(client, volume_name, retry_count=150)
def wait_for_rebuild_start(client, volume_name, retry_count=150, retry_interval=1)
def wait_for_recurring_jobs_cleanup(client)
def wait_for_replica_count(client, volume_name, replica_count)
def wait_for_replica_directory()
def wait_for_replica_failed(client, volname, replica_name, retry_cnts=150, retry_ivl=1)
def wait_for_replica_running(client, volname, replica_name)
def wait_for_replica_scheduled(client, volume_name, to_nodes, expect_success=2, expect_fail=0, anti_affinity=False, chk_vol_healthy=True, chk_replica_running=True)
def wait_for_restoration_start(client, name)
def wait_for_running_engine_image_count(image_name, engine_cnt)
def wait_for_snapshot_count(volume, number, retry_counts=120, count_removed=False)
def wait_for_snapshot_purge(client, volume_name, *snaps)
def wait_for_support_bundle_cleanup(client)
def wait_for_support_bundle_state(state, node_id, name, client)
def wait_for_tainted_node_engine_image_undeployed(client, img_name, tainted_node)
def wait_for_volume_attached(client, name)
def wait_for_volume_clone_status(client, name, key, value)
def wait_for_volume_condition_restore(client, name, key, value)
def wait_for_volume_condition_scheduled(client, name, key, value)
def wait_for_volume_condition_toomanysnapshots(client, name, key, value, expected_message=None)
def wait_for_volume_creation(client, name)
def wait_for_volume_current_image(client, name, image)
def wait_for_volume_degraded(client, name)
def wait_for_volume_delete(client, name)
def wait_for_volume_detached(client, name)
def wait_for_volume_detached_unknown(client, name)
def wait_for_volume_endpoint(client, name)
def wait_for_volume_expansion(longhorn_api_client, volume_name, expected_size='')
def wait_for_volume_faulted(client, name)
def wait_for_volume_frontend_disabled(client, volume_name, state=True)
def wait_for_volume_healthy(client, name, retry_count=150)
def wait_for_volume_healthy_no_frontend(client, name)
def wait_for_volume_migration_node(client, volume_name, node_id, expected_replica_count=-1)
def wait_for_volume_migration_ready(client, volume_name)
def wait_for_volume_option_trim_auto_removing_snapshots(client, volume_name, enabled)
def wait_for_volume_recurring_job_update(volume, jobs=[], groups=[])
def wait_for_volume_replica_auto_balance_update(client, volume_name, value)
def wait_for_volume_replica_count(client, name, count)
def wait_for_volume_replica_rebuilt_on_same_node_different_disk(client, node_name, volume_name, old_disk_name)
def wait_for_volume_replicas_mode(client, volname, mode, replica_names=None, replica_count=None)
def wait_for_volume_replicas_running_on_hosts(client, volume_name, host_ids, replica_balanced)
def wait_for_volume_restoration_completed(client, name)
def wait_for_volume_restoration_start(client, volume_name, backup_name, progress=0)
def wait_for_volume_status(client, name, key, value, retry_count=360)
def wait_longhorn_node_zone_reset(client)
def wait_pod(pod_name)
def wait_pod_attach_after_first_backup_completion(client, core_api, volume_name, label_name)
def wait_scheduling_failure(client, volume_name)

Wait and make sure no new replicas are running on the specified volume. Trigger a failed assertion of one is detected. :param client: The Longhorn client to use in the request. :param volume_name: The name of the volume.

def wait_statefulset(statefulset_manifest)
def wait_volume_kubernetes_status(client, volume_name, expect_ks)
def write_device_random_data(dev, position={})
def write_pod_block_volume_data(api, pod_name, test_data, offset, device_path)
def write_pod_volume_data(api, pod_name, test_data, filename='test')

Write data into a Pod's volume.

Args

api
An instance of CoreV1API.
pod_name
The name of the Pod.
test_data
The data to be written.
def write_pod_volume_random_data(api, pod_name, path, size_in_mb)
def write_volume_data(volume, data)
def write_volume_dev_random_mb_data(path, offset_in_mb, length_in_mb, timeout_cnt=3)
def write_volume_random_data(volume, position={})

Classes

class AssertErrorCheckThread (target, args)

This class is used for catching exception caused in threads, especially for AssertionError now.

Parameters

target : The threading function. args : Arguments of the target function.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.

Expand source code
class AssertErrorCheckThread(threading.Thread):
    """
        This class is used for catching exception caused in threads,
        especially for AssertionError now.

        Parameters:
            target  :       The threading function.
            args    :       Arguments of the target function.
    """
    def __init__(self, target, args):
        threading.Thread.__init__(self)
        self.target = target
        self.args = args
        self.asserted = None

    def run(self):
        try:
            self.target(*self.args)
        except AssertionError as e:
            self.asserted = e

    def join(self):
        threading.Thread.join(self)
        if self.asserted:
            raise self.asserted

Ancestors

  • threading.Thread

Methods

def join(self)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

def run(self)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class timeout (seconds=1, error_message='Timeout')
Expand source code
class timeout:

    def __init__(self, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message

    def handle_timeout(self, signum, frame):
        raise Exception(self.error_message)

    def __enter__(self):
        signal.signal(signal.SIGALRM, self.handle_timeout)
        signal.alarm(self.seconds)

    def __exit__(self, type, value, traceback):
        signal.alarm(0)

Methods

def handle_timeout(self, signum, frame)