Module tests.common
Functions
def activate_standby_volume(client, volume_name, frontend='blockdev')
def apps_api(request)
-
Create a new AppsV1API instance.
Returns
A new AppsV1API Instance.
def assert_backup_state(b_actual, b_expected)
def assert_from_assert_error_check_threads(thrd_list)
-
Check all threads in thrd_list are done and their status
Parameters
thrd_list: thread list created by create_assert_error_check_thread.
def backing_image_feature_supported(client)
def batch_v1_api(request)
-
Create a new BatchV1Api instance.
Returns
A new BatchV1Api Instance.
def check_all_support_bundle_managers_deleted()
def check_backing_image_disk_map_status(client, bi_name, expect_cnt, expect_disk_state)
def check_backing_image_eviction_failed(name)
def check_backing_image_single_copy_disk_eviction(client, bi_name, old_disk_id)
def check_backing_image_single_copy_node_eviction(client, bi_name, old_node)
def check_block_device_size(volume, size)
def check_csi(core_api)
def check_csi_expansion(core_api)
def check_device_data(dev, data, check_checksum=True)
def check_longhorn(core_api)
def check_pod_existence(api, pod_name, namespace='default')
def check_pv_existence(api, pv_name)
def check_pvc_existence(api, pvc_name, namespace='default')
def check_pvc_in_specific_status(api, pvc_name, status)
def check_recurring_jobs(client, recurring_jobs)
def check_statefulset_existence(api, ss_name, namespace='default')
def check_volume_data(volume, data, check_checksum=True)
def check_volume_endpoint(v)
def check_volume_existence(client, volume_name)
def check_volume_last_backup(client, volume_name, last_backup)
def check_volume_replicas(volume, spec, tag_mapping)
-
Check the replicas on the volume to ensure that they were scheduled properly. :param volume: The Volume to check. :param spec: The spec to validate the Tag against. :param tag_mapping: The mapping of Nodes to the Tags they have. :raise AssertionError: If the Volume doesn't match all the conditions.
def cleanup_all_backing_images(client)
def cleanup_all_recurring_jobs(client)
def cleanup_all_support_bundles(client)
-
Clean up all support bundles :param client: The Longhorn client to use in the request.
def cleanup_all_volumes(client)
-
Clean up all volumes :param client: The Longhorn client to use in the request.
def cleanup_client()
def cleanup_crypto_secret()
def cleanup_disks_on_node(client, node_id, *disks)
def cleanup_host_disk(vol_name)
def cleanup_host_disks(client, *args)
def cleanup_node_disks(client, node_name)
def cleanup_storage_class()
def cleanup_test_disks(client)
def cleanup_volume(client, volume)
-
Clean up the volume after the test. :param client: The Longhorn client to use in the request. :param volume: The volume to clean up.
def cleanup_volume_by_name(client, vol_name)
def client(request)
-
Return an individual Longhorn API client for testing.
def clients(request)
def copy_file_to_volume_dev_mb_data(src_path, dest_path, src_offset, dest_offset, size_in_mb, timeout_cnt=5)
def copy_pod_volume_data(api, pod_name, src_path, dest_path)
def core_api(request)
-
Create a new CoreV1API instance.
Returns
A new CoreV1API Instance.
def crash_engine_process_with_sigkill(client, core_api, volume_name)
def crash_replica_processes(client, api, volname, replicas=None, wait_to_fail=True)
def create_and_check_volume(client, volume_name, num_of_replicas=3, size='16777216', backing_image='', frontend='blockdev', snapshot_data_integrity='ignored', access_mode='rwo')
-
Create a new volume with the specified parameters. Assert that the new volume is detached and that all of the requested parameters match.
:param client: The Longhorn client to use in the request. :param volume_name: The name of the volume. :param num_of_replicas: The number of replicas the volume should have. :param size: The size of the volume, as a string representing the number of bytes. :param backing_image: The backing image to use for the volume. :param frontend: The frontend to use for the volume. :return: The volume instance created.
def create_and_wait_deployment(apps_api, deployment_manifest)
def create_and_wait_pod(api, pod_manifest)
-
Creates a new Pod attached to a PersistentVolumeClaim for testing.
The function will block until the Pod is online or until it times out, whichever occurs first. The volume created by the manifest passed in will be mounted to '/data'.
Args
api
- An instance of CoreV1API.
pod_name
- The name of the Pod.
volume
- The volume manifest.
def create_and_wait_statefulset(statefulset_manifest)
-
Create a new StatefulSet for testing.
This function will block until all replicas in the StatefulSet are online or it times out, whichever occurs first.
def create_assert_error_check_thread(func, *args)
-
Do func by threading with arguments
Parameters
func: function that want to do things in parallel. args: arguments for function.
def create_backing_image_with_matching_url(client, name, url, minNumberOfCopies=1, nodeSelector=[], diskSelector=[])
def create_backup(client, volname, data={}, labels={})
def create_backup_from_volume_attached_to_pod(client, core_api, volume_name, pod_name, data_path='/data/test', data_size=100)
-
Write data in the pod and take a backup.
Args
client
- The Longhorn client to use in the request.
core_api
- An instance of CoreV1API.
pod_name
- The name of the Pod.
volume_name
- The volume name which is attached to the pod.
data_path
- File name suffixed to the mount point. e.g /data/file
data_size
- Size of the data to be written in the pod.
Returns
The backup volume name, backup, checksum of data written in the backup
def create_crypto_secret(secret_manifest, namespace='longhorn-system')
def create_deployment_and_write_data(client, core_api, make_deployment_with_pvc, volume_name, size, replica_count, data_size, attach_node_id=None)
def create_host_disk(client, vol_name, size, node_id)
def create_pv_for_volume(client, core_api, volume, pv_name, fs_type='ext4')
def create_pvc(pvc_manifest)
def create_pvc_for_volume(client, core_api, volume, pvc_name, pvc_namespace='default')
def create_pvc_spec(name)
-
Generate a volume manifest using the given name for the PVC.
This spec is used to test dynamically provisioned PersistentVolumes (those created using a storage class).
def create_recurring_jobs(client, recurring_jobs)
def create_rwx_volume_with_storageclass(client, core_api, storage_class)
def create_snapshot(longhorn_api_client, volume_name)
def create_statefulset(statefulset_manifest)
-
Create a new StatefulSet for testing.
def create_storage_class(sc_manifest)
def create_support_bundle(client)
def create_volume(client, vol_name, size, node_id, r_num)
def create_volume_and_backup(client, vol_name, vol_size, backup_data_size)
def create_volume_and_write_data(client, volume_name, volume_size='16777216')
-
- Create and attach a volume
- Write the data to volume
def crypto_secret(request)
def csi_pv(request)
def csi_pv_backingimage(request)
def csi_pvc_name(request)
def delete_and_wait_deployment(apps_api, deployment_name, namespace='default')
def delete_and_wait_longhorn(client, name)
-
Delete a volume from Longhorn.
def delete_and_wait_pod(api, pod_name, namespace='default', wait=True)
-
Delete a specified Pod.
This function does not check if the Pod does exist and will throw an error if a nonexistent Pod is specified.
Args
api
- An instance of CoreV1API.
pod_name
- The name of the Pod.
def delete_and_wait_pv(api, pv_name)
def delete_and_wait_pvc(api, pvc_name, retry_counts=150)
def delete_and_wait_statefulset(api, client, statefulset)
def delete_and_wait_volume_attachment(storage_api, volume_attachment_name)
def delete_backup(client, volume_name, backup_name)
def delete_backup_backing_image(client, backing_image_name)
def delete_backup_volume(client, volume_name)
def delete_crypto_secret(namespace, name)
def delete_replica_on_test_node(client, volume_name)
def delete_replica_processes(client, api, volname)
def delete_statefulset(apps_api, statefulset)
def delete_storage_class(sc_name)
def delete_support_bundle(node_id, name, client)
def dev_read(dev, start, count)
def dev_write(dev, start, data)
def disable_auto_salvage(client)
def download_support_bundle(node_id, name, client, target_path='')
def enable_default_disk(client)
def exec_command_in_pod(api, command, pod_name, namespace, container=None)
-
Execute command in the pod.
Args
api
- An instance of CoreV1API.
pod_name
- The name of the Pod.
command
- The command to execute in the pod.
namespace
- The namespace where the pod exists.
Returns
The output of the command.
def exec_instance_manager(api, im_name, cmd)
def exec_local(cmd)
def exec_nsenter(cmd, process_name=None)
def expand_and_wait_for_pvc(api, pvc, size)
def fail_replica_expansion(client, api, volname, size, replicas=None)
def find_ancestor_process_by_name(ancestor_name)
def find_backup(client, vol_name, snap_name)
-
find_backup will look for a backup on the backupstore it's important to note, that this can only be used for completed backups since the backup.cfg will only be written once a backup operation has been completed successfully
def find_backup_volume(client, volume_name)
def find_dockerd_pid()
def find_process_pid(process_name)
def find_replica_for_backup(client, volume_name, backup_id)
def find_self()
def fix_replica_expansion_failure(client, api, volname, size, replicas=None)
def generate_attachment_ticket_id()
def generate_pod_with_pvc_manifest(pod_name, pvc_name)
def generate_random_data(count)
def generate_random_pos(size, used={})
def generate_random_suffix()
def generate_sts_name()
def generate_support_bundle(case_name)
-
Generate support bundle into folder ./support_bundle/case_name.zip
Won't generate support bundle if current support bundle count greater than MAX_SUPPORT_BINDLE_NUMBER.
Args
case_name
- support bundle will named case_name.zip
def generate_volume_name()
def get_all_support_bundle_manager_deployments(apps_api)
def get_apps_api_client()
def get_backupstore_poll_interval()
def get_backupstore_url()
def get_backupstores()
def get_client(address)
def get_clients(hosts)
def get_clone_volume_name(client, source_volume_name)
def get_compatibility_test_image(cli_v, cli_minv, ctl_v, ctl_minv, data_v, data_minv)
def get_core_api_client()
def get_custom_object_api_client()
def get_default_engine_image(client)
def get_deployment_pod_names(core_api, deployment)
def get_device_checksum(dev)
def get_disk_uuid()
def get_engine_host_id(client, vol_name)
def get_engine_image_status_value(client, ei_name)
def get_host_disk_size(disk)
def get_host_replica_count(client, volume_name, host_id, chk_running=False)
def get_iscsi_ip(iscsi)
def get_iscsi_lun(iscsi)
def get_iscsi_port(iscsi)
def get_iscsi_target(iscsi)
def get_k8s_zone_label()
def get_liveness_probe_spec(initial_delay=5, period=5)
def get_longhorn_api_client()
def get_mgr_ips()
def get_node_by_disk_id(client, disk_id)
def get_pod_data_md5sum(api, pod_name, path)
def get_process_info(p_path)
def get_pv_manifest(request)
def get_pvc_manifest(request)
def get_random_client(clients)
def get_scheduling_api_client()
def get_self_host_id()
def get_statefulset_pod_info(api, s_set)
def get_storage_api_client()
def get_support_bundle(node_id, name, client)
def get_support_bundle_url(client)
def get_update_disks(disks)
def get_upgrade_test_image(cli_v, cli_minv, ctl_v, ctl_minv, data_v, data_minv)
def get_version_api_client()
def get_volume_dev_mb_data_md5sum(path, offset_in_mb, length_in_mb)
def get_volume_endpoint(v)
def get_volume_engine(v)
def get_volume_name(api, pvc_name)
-
Given a PersistentVolumeClaim, return the name of the associated PV.
def get_volume_recurring_jobs_and_groups(volume)
def get_volume_running_replica_cnt(client, volume_name)
def is_backupTarget_nfs(s)
def is_backupTarget_s3(s)
def is_k8s_node_gke_cos(core_api)
def is_k8s_node_label(core_api, label_key, label_value, node_name)
def is_replica_available(r)
def iscsi_login(iscsi_ep)
def iscsi_logout(iscsi_ep)
def json_string_go_to_python(str)
def lazy_umount_disk(mount_path)
def load_k8s_config()
def make_deployment_cpu_request(request)
def make_deployment_with_pvc(request)
def monitor_restore_progress(client, volume_name)
def mount_disk(dev, mount_path)
-
Assign the Tags under DEFAULT_TAGS to the Longhorn client's Nodes to provide a base set of Tags to work with in the tests. :return: A dictionary mapping a Node's ID to the Tags it has.
def offline_expand_attached_volume(client, volume_name, size='33554432')
def parse_iscsi_endpoint(iscsi)
def pod(request)
def pod_make(request)
def prepare_host_disk(dev, vol_name)
def prepare_pod_with_data_in_mb(client, core_api, csi_pv, pvc, pod_make, volume_name, volume_size='1073741824', num_of_replicas=3, data_path='/data/test', data_size_in_mb=100, add_liveness_probe=True, access_mode='rwo')
def prepare_statefulset_with_data_in_mb(client, core_api, statefulset, sts_name, storage_class, data_path='/data/test', data_size_in_mb=100)
def priority_class(request)
def pvc(request)
def pvc_backingimage(request)
def pvc_name(request)
def random_labels()
def read_pod_block_volume_data(api, pod_name, data_size, offset, device_path)
def read_volume_data(api, pod_name, filename='test')
-
Retrieve data from a Pod's volume.
Args
api
- An instance of CoreV1API.
pod_name
- The name of the Pod.
Returns
The data contained within the volume.
def recurring_job_feature_supported(client)
def reset_disks_for_all_nodes(client)
def reset_engine_image(client)
def reset_longhorn_node_zone(client)
def reset_node(client, core_api)
def reset_nodes_taint(client)
def reset_settings(client)
def restart_and_wait_ready_engine_count(client, ready_engine_count)
-
Delete/restart engine daemonset and wait ready engine image count after daemonset restart
def restore_backup_and_get_data_checksum(client, core_api, backup, pod, file_name='', command='')
-
Restore the backup in a pod and get the checksum of all the files or checksum of a particular file.
Args
client
- The Longhorn client to use in the request.
core_api
- An instance of CoreV1API.
backup
- The backup to be restored.
pod
- Pod fixture.
file_name
- Optional - File whose checksum to be computed.
command
- Optional - command to be executed in the pod.
Returns
The checksum as a dictionary as in file_name=checksum and the output of the command executed in the pod.
def rwx_statefulset(request)
def scale_up_engine_image_daemonset(client)
def scheduling_api(request)
-
Create a new SchedulingV1API instance.
Returns
A new CoreV1API Instance.
def set_and_wait_k8s_nodes_zone_label(core_api, node_zone_map)
def set_k8s_node_label(core_api, node_name, key, value)
def set_k8s_node_zone_label(core_api, node_name, zone_name)
def set_node_cordon(api, node_name, to_cordon)
-
Set a kubernetes node schedulable status
def set_node_scheduling(client, node, allowScheduling, retry=False)
def set_node_scheduling_eviction(client, node, allowScheduling, evictionRequested, retry=False)
-
Set the tags on a node without modifying its scheduling status. Retry if "too many retries error" happened. :param client: The Longhorn client to use in the request. :param node: The Node to update. :param tags: The tags to set on the node. :return: The updated Node.
def settings_reset()
def size_to_string(volume_size)
-
Convert a volume size to string format to pass into Kubernetes.
Args
volume_size
- The size of the volume in bytes.
Returns
The size of the volume in gigabytes as a passable string to Kubernetes.
def statefulset(request)
def storage_class(request)
def sts_name(request)
def system_backup_feature_supported(client)
def system_backup_random_name()
def system_backup_wait_for_state(state, name, client)
def system_backups_cleanup(client)
-
Clean up all system backups :param client: The Longhorn client to use in the request.
def system_restore_random_name()
def system_restore_wait_for_state(state, name, client)
def system_restores_cleanup(client)
-
Clean up all system restores :param client: The Longhorn client to use in the request.
def umount_disk(mount_path)
def update_node_disks(client, node_name, disks, retry=False)
def update_persistent_volume_claim(core_api, name, namespace, claim)
def update_recurring_job(client, name, groups, labels, cron='', retain=0, concurrency=0)
def update_setting(client, name, value)
def update_statefulset_manifests(ss_manifest, sc_manifest, name)
-
Write in a new StatefulSet name and the proper StorageClass name for tests.
def volume_name(request)
def volume_read(v, start, count)
def volume_valid(dev)
def volume_write(v, start, data)
def wait_and_get_any_deployment_pod(core_api, deployment_name, is_phase='Running')
-
Add mechanism to wait for a stable running pod when deployment restarts its workload, since Longhorn manager could create/delete the new workload pod multiple times, it's possible that we get an unstable pod which will be deleted immediately, so add a wait mechanism to get a stable running pod. ref: https://github.com/longhorn/longhorn/issues/4814
def wait_and_get_pv_for_pvc(api, pvc_name)
def wait_delete_deployment(apps_api, deployment_name, namespace='default')
def wait_delete_dm_device(api, name)
def wait_delete_pod(api, pod_uid, namespace='default')
def wait_delete_pv(api, pv_name)
def wait_delete_pvc(api, pvc_name, retry_counts=150)
def wait_delete_volume_attachment(storage_api, volume_attachment_name)
def wait_deployment_replica_ready(apps_api, deployment_name, desired_replica_count, namespace='default')
def wait_for_all_instance_manager_running(client)
def wait_for_backing_image_delete(client, name)
def wait_for_backing_image_disk_cleanup(client, bi_name, disk_id)
def wait_for_backing_image_in_disk_fail(client, backing_img_name, disk_uuid)
def wait_for_backing_image_status(client, backing_img_name, image_status)
def wait_for_backup_backing_image_delete(client, name)
def wait_for_backup_completion(client, volume_name, snapshot_name=None, retry_count=300)
def wait_for_backup_count(backup_volume, number, retry_counts=120)
def wait_for_backup_delete(client, volume_name, backup_name)
def wait_for_backup_failed(client, volume_name, snapshot_name=None, retry_count=300)
def wait_for_backup_restore_completed(client, name, backup_name)
def wait_for_backup_state(client, volume_name, predicate, retry_count=300)
def wait_for_backup_target_available(client, available)
def wait_for_backup_to_start(client, volume_name, snapshot_name=None, retry_count=300, chk_progress=0)
def wait_for_backup_volume(client, vol_name, backing_image='')
def wait_for_backup_volume_backing_image_synced(client, volume_name, backing_image, retry_count=300)
def wait_for_backup_volume_delete(client, name)
def wait_for_cron_job_count(batch_v1_api, number, label='', retry_counts=150)
def wait_for_cron_job_create(batch_v1_api, label='', retry_counts=150)
def wait_for_cron_job_delete(batch_v1_api, label='', retry_counts=150)
def wait_for_deployed_engine_image_count(client, image_name, expected_cnt, exclude_nodes=[])
def wait_for_device_login(dest_path, name)
def wait_for_disk_conditions(client, node_name, disk_name, key, value)
def wait_for_disk_status(client, node_name, disk_name, key, value)
def wait_for_disk_storage_available(client, node_name, disk_name, disk_path)
def wait_for_disk_update(client, name, disk_num)
def wait_for_disk_uuid(client, node_name, uuid)
def wait_for_dr_volume_expansion(longhorn_api_client, volume_name, size_str)
def wait_for_engine_image_condition(client, image_name, state)
-
state: "True", "False"
def wait_for_engine_image_creation(client, image_name)
def wait_for_engine_image_deletion(client, core_api, engine_image_name)
def wait_for_engine_image_incompatible(client, image_name)
def wait_for_engine_image_ref_count(client, image_name, count)
def wait_for_engine_image_state(client, image_name, state)
def wait_for_expansion_error_clear(longhorn_api_client, volume_name)
def wait_for_expansion_failure(client, volume_name, last_failed_at='')
def wait_for_instance_manager_count(client, number, retry_counts=120)
def wait_for_instance_manager_desire_state(client, core_api, im_name, state, desire=True)
def wait_for_node_mountpropagation_condition(client, name)
def wait_for_node_schedulable_condition(client, name)
def wait_for_node_tag_update(client, name, tags)
def wait_for_node_update(client, name, key, value)
def wait_for_pod_annotation(core_api, label_selector, anno_key, anno_val)
def wait_for_pod_phase(core_api, pod_name, pod_phase, namespace='default')
def wait_for_pod_remount(core_api, pod_name, chk_path='/data/lost+found')
def wait_for_pod_restart(core_api, pod_name, namespace='default')
def wait_for_pods_volume_delete(client, pod_list, retry_counts=300)
def wait_for_pods_volume_state(client, pod_list, field, value, retry_counts=150)
def wait_for_pvc_phase(api, pvc_name, phase)
def wait_for_rebuild_complete(client, volume_name, retry_count=150)
def wait_for_rebuild_start(client, volume_name, retry_count=150, retry_interval=1)
def wait_for_recurring_jobs_cleanup(client)
def wait_for_replica_count(client, volume_name, replica_count)
def wait_for_replica_directory()
def wait_for_replica_failed(client, volname, replica_name, retry_cnts=150, retry_ivl=1)
def wait_for_replica_running(client, volname, replica_name)
def wait_for_replica_scheduled(client, volume_name, to_nodes, expect_success=2, expect_fail=0, anti_affinity=False, chk_vol_healthy=True, chk_replica_running=True)
def wait_for_restoration_start(client, name)
def wait_for_running_engine_image_count(image_name, engine_cnt)
def wait_for_snapshot_count(volume, number, retry_counts=120, count_removed=False)
def wait_for_snapshot_purge(client, volume_name, *snaps)
def wait_for_support_bundle_cleanup(client)
def wait_for_support_bundle_state(state, node_id, name, client)
def wait_for_tainted_node_engine_image_undeployed(client, img_name, tainted_node)
def wait_for_volume_attached(client, name)
def wait_for_volume_clone_status(client, name, key, value)
def wait_for_volume_condition_restore(client, name, key, value)
def wait_for_volume_condition_scheduled(client, name, key, value)
def wait_for_volume_condition_toomanysnapshots(client, name, key, value, expected_message=None)
def wait_for_volume_creation(client, name)
def wait_for_volume_current_image(client, name, image)
def wait_for_volume_degraded(client, name)
def wait_for_volume_delete(client, name)
def wait_for_volume_detached(client, name)
def wait_for_volume_detached_unknown(client, name)
def wait_for_volume_endpoint(client, name)
def wait_for_volume_expansion(longhorn_api_client, volume_name, expected_size='')
def wait_for_volume_faulted(client, name)
def wait_for_volume_frontend_disabled(client, volume_name, state=True)
def wait_for_volume_healthy(client, name, retry_count=150)
def wait_for_volume_healthy_no_frontend(client, name)
def wait_for_volume_migration_node(client, volume_name, node_id, expected_replica_count=-1)
def wait_for_volume_migration_ready(client, volume_name)
def wait_for_volume_option_trim_auto_removing_snapshots(client, volume_name, enabled)
def wait_for_volume_recurring_job_update(volume, jobs=[], groups=[])
def wait_for_volume_replica_auto_balance_update(client, volume_name, value)
def wait_for_volume_replica_count(client, name, count)
def wait_for_volume_replica_rebuilt_on_same_node_different_disk(client, node_name, volume_name, old_disk_name)
def wait_for_volume_replicas_mode(client, volname, mode, replica_names=None, replica_count=None)
def wait_for_volume_replicas_running_on_hosts(client, volume_name, host_ids, replica_balanced)
def wait_for_volume_restoration_completed(client, name)
def wait_for_volume_restoration_start(client, volume_name, backup_name, progress=0)
def wait_for_volume_status(client, name, key, value, retry_count=360)
def wait_longhorn_node_zone_reset(client)
def wait_pod(pod_name)
def wait_pod_attach_after_first_backup_completion(client, core_api, volume_name, label_name)
def wait_scheduling_failure(client, volume_name)
-
Wait and make sure no new replicas are running on the specified volume. Trigger a failed assertion of one is detected. :param client: The Longhorn client to use in the request. :param volume_name: The name of the volume.
def wait_statefulset(statefulset_manifest)
def wait_volume_kubernetes_status(client, volume_name, expect_ks)
def write_device_random_data(dev, position={})
def write_pod_block_volume_data(api, pod_name, test_data, offset, device_path)
def write_pod_volume_data(api, pod_name, test_data, filename='test')
-
Write data into a Pod's volume.
Args
api
- An instance of CoreV1API.
pod_name
- The name of the Pod.
test_data
- The data to be written.
def write_pod_volume_random_data(api, pod_name, path, size_in_mb)
def write_volume_data(volume, data)
def write_volume_dev_random_mb_data(path, offset_in_mb, length_in_mb, timeout_cnt=3)
def write_volume_random_data(volume, position={})
Classes
class AssertErrorCheckThread (target, args)
-
This class is used for catching exception caused in threads, especially for AssertionError now.
Parameters
target : The threading function. args : Arguments of the target function.
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.
Expand source code
class AssertErrorCheckThread(threading.Thread): """ This class is used for catching exception caused in threads, especially for AssertionError now. Parameters: target : The threading function. args : Arguments of the target function. """ def __init__(self, target, args): threading.Thread.__init__(self) self.target = target self.args = args self.asserted = None def run(self): try: self.target(*self.args) except AssertionError as e: self.asserted = e def join(self): threading.Thread.join(self) if self.asserted: raise self.asserted
Ancestors
- threading.Thread
Methods
def join(self)
-
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
def run(self)
-
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
class timeout (seconds=1, error_message='Timeout')
-
Expand source code
class timeout: def __init__(self, seconds=1, error_message='Timeout'): self.seconds = seconds self.error_message = error_message def handle_timeout(self, signum, frame): raise Exception(self.error_message) def __enter__(self): signal.signal(signal.SIGALRM, self.handle_timeout) signal.alarm(self.seconds) def __exit__(self, type, value, traceback): signal.alarm(0)
Methods
def handle_timeout(self, signum, frame)