Module tests.common
Functions
def activate_standby_volume(client, volume_name, frontend='blockdev')
-
Expand source code
def activate_standby_volume(client, volume_name, frontend=VOLUME_FRONTEND_BLOCKDEV): volume = client.by_id_volume(volume_name) assert volume.standby is True for i in range(RETRY_COUNTS): volume = client.by_id_volume(volume_name) engines = volume.controllers if len(engines) != 1 or \ (volume.lastBackup != "" and engines[0].lastRestoredBackup != volume.lastBackup): time.sleep(RETRY_INTERVAL) continue activated = False try: volume.activate(frontend=frontend) activated = True break except Exception as e: assert "hasn't finished incremental restored" \ in str(e.error.message) time.sleep(RETRY_INTERVAL) if activated: break volume = client.by_id_volume(volume_name) assert volume.standby is False assert volume.frontend == VOLUME_FRONTEND_BLOCKDEV wait_for_volume_detached(client, volume_name) volume = client.by_id_volume(volume_name) engine = get_volume_engine(volume) assert engine.lastRestoredBackup == "" assert engine.requestedBackupRestore == ""
def apps_api(request)
-
Expand source code
@pytest.fixture def apps_api(request): """ Create a new AppsV1API instance. Returns: A new AppsV1API Instance. """ c = Configuration() c.assert_hostname = False Configuration.set_default(c) k8sconfig.load_incluster_config() apps_api = k8sclient.AppsV1Api() return apps_api
Create a new AppsV1API instance.
Returns
A new AppsV1API Instance.
def assert_backup_state(b_actual, b_expected)
-
Expand source code
def assert_backup_state(b_actual, b_expected): assert b_expected.name == b_actual.name assert b_expected.url == b_actual.url assert b_expected.snapshotName == b_actual.snapshotName assert b_expected.snapshotCreated == b_actual.snapshotCreated assert b_expected.created == b_actual.created assert b_expected.volumeName == b_actual.volumeName assert b_expected.volumeSize == b_actual.volumeSize assert b_expected.volumeCreated == b_actual.volumeCreated assert b_expected.messages == b_actual.messages is None
def assert_from_assert_error_check_threads(thrd_list)
-
Expand source code
def assert_from_assert_error_check_threads(thrd_list): """ Check all threads in thrd_list are done and their status Parameters: thrd_list: thread list created by create_assert_error_check_thread. """ assert isinstance(thrd_list, list), "thrd_list is not a list" err_list = [] for t in thrd_list: try: t.join() except AssertionError as e: print(e) err_list.append(e) if err_list: assert False, err_list
Check all threads in thrd_list are done and their status
Parameters
thrd_list: thread list created by create_assert_error_check_thread.
def backing_image_feature_supported(client)
-
Expand source code
def backing_image_feature_supported(client): if hasattr(client.by_id_schema("backingImage"), "id"): return True else: return False
def batch_v1_api(request)
-
Expand source code
@pytest.fixture def batch_v1_api(request): """ Create a new BatchV1Api instance. Returns: A new BatchV1Api Instance. """ c = Configuration() c.assert_hostname = False Configuration.set_default(c) k8sconfig.load_incluster_config() api = k8sclient.BatchV1Api() return api
Create a new BatchV1Api instance.
Returns
A new BatchV1Api Instance.
def check_all_support_bundle_managers_deleted()
-
Expand source code
def check_all_support_bundle_managers_deleted(): apps_api = get_apps_api_client() deployments = get_all_support_bundle_manager_deployments(apps_api) for support_bundle_manager in deployments: wait_delete_deployment(apps_api, support_bundle_manager.metadata.name, namespace=LONGHORN_NAMESPACE) assert len(get_all_support_bundle_manager_deployments(apps_api)) == 0
def check_backing_image_disk_map_status(client, bi_name, expect_cnt, expect_disk_state)
-
Expand source code
def check_backing_image_disk_map_status(client, bi_name, expect_cnt, expect_disk_state): # NOQA # Number of expect_cnt should equal to number of disk map # that have expect_disk_state for i in range(RETRY_COUNTS): backing_image = client.by_id_backing_image(bi_name) count = 0 for disk_id, status in iter(backing_image.diskFileStatusMap.items()): if status.state == expect_disk_state: count = count + 1 if expect_cnt == count: break time.sleep(RETRY_INTERVAL) assert expect_cnt == count
def check_backing_image_eviction_failed(name)
-
Expand source code
def check_backing_image_eviction_failed(name): # NOQA core_client = get_core_api_client() selector = "involvedObject.kind=BackingImage,involvedObject.name=" + name check = False for i in range(RETRY_COUNTS_LONG): events = core_client.list_namespaced_event( namespace=LONGHORN_NAMESPACE, field_selector=selector, ).items if len(events) == 0: continue for j in range(len(events)): if (events[j].reason == FAILED_DELETING_REASONE and BACKINGIMAGE_FAILED_EVICT_MSG in events[j].message): check = True break if check: break time.sleep(RETRY_INTERVAL) assert check
def check_backing_image_single_copy_disk_eviction(client, bi_name, old_disk_id)
-
Expand source code
def check_backing_image_single_copy_disk_eviction(client, bi_name, old_disk_id): # NOQA for i in range(RETRY_COUNTS): backing_image = client.by_id_backing_image(bi_name) current_disk_id = next(iter(backing_image.diskFileStatusMap)) if current_disk_id != old_disk_id: break time.sleep(RETRY_INTERVAL) assert current_disk_id != old_disk_id
def check_backing_image_single_copy_node_eviction(client, bi_name, old_node)
-
Expand source code
def check_backing_image_single_copy_node_eviction(client, bi_name, old_node): # NOQA for i in range(RETRY_COUNTS): backing_image = client.by_id_backing_image(bi_name) current_disk_id = next(iter(backing_image.diskFileStatusMap)) current_node = get_node_by_disk_id(client, current_disk_id) if current_node.name != old_node.name: break time.sleep(RETRY_INTERVAL) assert current_node.name != old_node.name
def check_block_device_size(volume, size)
-
Expand source code
def check_block_device_size(volume, size): dev = get_volume_endpoint(volume) # BLKGETSIZE64, result is bytes as unsigned 64-bit integer (uint64) req = 0x80081272 buf = ' ' * 8 with open(dev) as dev: buf = fcntl.ioctl(dev.fileno(), req, buf) device_size = struct.unpack('L', buf)[0] assert device_size == size
def check_csi(core_api)
-
Expand source code
def check_csi(core_api): using_csi = CSI_UNKNOWN has_attacher = False has_provisioner = False has_csi_plugin = False pod_running = True try: longhorn_pod_list = core_api.list_namespaced_pod('longhorn-system') for item in longhorn_pod_list.items: if item.status.phase != "Running": pod_running = False labels = item.metadata.labels if not labels: pass elif labels.get('app', '') == 'csi-attacher': has_attacher = True elif labels.get('app', '') == 'csi-provisioner': has_provisioner = True elif labels.get('app', '') == 'longhorn-csi-plugin': has_csi_plugin = True if has_attacher and has_provisioner and has_csi_plugin and pod_running: using_csi = CSI_TRUE elif not has_attacher and not has_provisioner \ and not has_csi_plugin and not pod_running: using_csi = CSI_FALSE except ApiException as e: if (e.status == 404): using_csi = CSI_FALSE assert using_csi != CSI_UNKNOWN return True if using_csi == CSI_TRUE else False
def check_csi_expansion(core_api)
-
Expand source code
def check_csi_expansion(core_api): csi_expansion_enabled = False has_csi_resizer = False pod_running = True try: longhorn_pod_list = core_api.list_namespaced_pod('longhorn-system') for item in longhorn_pod_list.items: if item.status.phase != "Running": pod_running = False labels = item.metadata.labels if not labels: pass elif labels.get('app', '') == 'csi-resizer': has_csi_resizer = True if has_csi_resizer and pod_running: csi_expansion_enabled = True except ApiException: pass return csi_expansion_enabled
def check_device_data(dev, data, check_checksum=True)
-
Expand source code
def check_device_data(dev, data, check_checksum=True): r_data = dev_read(dev, data['pos'], data['len']) assert r_data == bytes(data['content'], encoding='utf8') if check_checksum: r_checksum = get_device_checksum(dev) assert r_checksum == data['checksum']
def check_longhorn(core_api)
-
Expand source code
def check_longhorn(core_api): ready = False has_engine_image = False has_driver_deployer = False has_manager = False has_ui = False has_instance_manager = False pod_running = True for i in range(RETRY_COUNTS): print(f"wait for Longhorn components ready ... ({i})") try: longhorn_pod_list = core_api.list_namespaced_pod('longhorn-system') for item in longhorn_pod_list.items: labels = item.metadata.labels if not labels: pass elif labels.get('longhorn.io/component', '') == 'engine-image'\ and item.status.phase == "Running": has_engine_image = True elif labels.get('app', '') == 'longhorn-driver-deployer' \ and item.status.phase == "Running": has_driver_deployer = True elif labels.get('app', '') == 'longhorn-manager' \ and item.status.phase == "Running": has_manager = True elif labels.get('app', '') == 'longhorn-ui' \ and item.status.phase == "Running": has_ui = True elif labels.get('longhorn.io/component', '') == \ 'instance-manager' \ and item.status.phase == "Running": has_instance_manager = True if has_engine_image and has_driver_deployer and has_manager and \ has_ui and has_instance_manager and pod_running: ready = True break else: for item in longhorn_pod_list.items: print(f"{item.metadata.name} {item.status.phase}") except ApiException as e: if (e.status == 404): ready = False time.sleep(RETRY_INTERVAL) assert ready, "Failed to wait for Longhorn components ready"
def check_pod_existence(api, pod_name, namespace='default')
-
Expand source code
def check_pod_existence(api, pod_name, namespace="default"): pods = api.list_namespaced_pod(namespace) for pod in pods.items: if pod.metadata.name == pod_name and \ not pod.metadata.deletion_timestamp: return True return False
def check_pv_existence(api, pv_name)
-
Expand source code
def check_pv_existence(api, pv_name): pvs = api.list_persistent_volume() for pv in pvs.items: if pv.metadata.name == pv_name and not pv.metadata.deletion_timestamp: return True return False
def check_pvc_existence(api, pvc_name, namespace='default')
-
Expand source code
def check_pvc_existence(api, pvc_name, namespace="default"): pvcs = api.list_namespaced_persistent_volume_claim(namespace) for pvc in pvcs.items: if pvc.metadata.name == pvc_name and not \ pvc.metadata.deletion_timestamp: return True return False
def check_pvc_in_specific_status(api, pvc_name, status)
-
Expand source code
def check_pvc_in_specific_status(api, pvc_name, status): for i in range(RETRY_EXEC_COUNTS): claim = \ api.read_namespaced_persistent_volume_claim(name=pvc_name, namespace='default') if claim.status.phase == status: break time.sleep(RETRY_INTERVAL) assert claim.status.phase == status
def check_recurring_jobs(client, recurring_jobs)
-
Expand source code
def check_recurring_jobs(client, recurring_jobs): for name, spec in recurring_jobs.items(): recurring_job = client.by_id_recurring_job(name) assert recurring_job.name == name assert recurring_job.task == spec["task"] if len(spec["groups"]) > 0: assert recurring_job.groups == spec["groups"] assert recurring_job.cron == spec["cron"] expect_retain = spec["retain"] if recurring_job.task == "snapshot-cleanup" or \ recurring_job.task == "filesystem-trim": expect_retain = 0 assert recurring_job.retain == expect_retain assert recurring_job.concurrency == spec["concurrency"]
def check_statefulset_existence(api, ss_name, namespace='default')
-
Expand source code
def check_statefulset_existence(api, ss_name, namespace="default"): ss_list = api.list_namespaced_stateful_set(namespace) for ss in ss_list.items: if ss.metadata.name == ss_name and not ss.metadata.deletion_timestamp: return True return False
def check_volume_data(volume, data, check_checksum=True)
-
Expand source code
def check_volume_data(volume, data, check_checksum=True): dev = get_volume_endpoint(volume) check_device_data(dev, data, check_checksum)
def check_volume_endpoint(v)
-
Expand source code
def check_volume_endpoint(v): engine = get_volume_engine(v) endpoint = engine.endpoint if v.disableFrontend: assert endpoint == "" else: if v.frontend == VOLUME_FRONTEND_BLOCKDEV or \ v.frontend == VOLUME_FRONTEND_UBLK: assert endpoint == os.path.join(DEV_PATH, v.name) elif v.frontend == VOLUME_FRONTEND_ISCSI: assert endpoint.startswith("iscsi://") elif v.frontend == VOLUME_FRONTEND_NVMF: assert endpoint.startswith("nvmf://") else: raise Exception("Unexpected volume frontend:", v.frontend) return endpoint
def check_volume_existence(client, volume_name)
-
Expand source code
def check_volume_existence(client, volume_name): volumes = client.list_volume() for volume in volumes: if volume.name == volume_name: return True return False
def check_volume_last_backup(client, volume_name, last_backup)
-
Expand source code
def check_volume_last_backup(client, volume_name, last_backup): for i in range(RETRY_COUNTS): volume = client.by_id_volume(volume_name) if volume.lastBackup == last_backup: break time.sleep(RETRY_INTERVAL) volume = client.by_id_volume(volume_name) assert volume.lastBackup == last_backup
def check_volume_replicas(volume, spec, tag_mapping)
-
Expand source code
def check_volume_replicas(volume, spec, tag_mapping): """ Check the replicas on the volume to ensure that they were scheduled properly. :param volume: The Volume to check. :param spec: The spec to validate the Tag against. :param tag_mapping: The mapping of Nodes to the Tags they have. :raise AssertionError: If the Volume doesn't match all the conditions. """ found_hosts = {} # Make sure that all the Tags the Volume requested were fulfilled. for replica in volume.replicas: found_hosts[replica.hostId] = {} assert not len(set(spec["disk"]) - set(tag_mapping[replica.hostId]["disk"])) assert not len(set(spec["node"]) - set(tag_mapping[replica.hostId]["node"])) # The Volume should have replicas on as many nodes as matched # the requirements (specified by "expected" in the spec variable). assert len(found_hosts) == spec["expected"]
Check the replicas on the volume to ensure that they were scheduled properly. :param volume: The Volume to check. :param spec: The spec to validate the Tag against. :param tag_mapping: The mapping of Nodes to the Tags they have. :raise AssertionError: If the Volume doesn't match all the conditions.
def cleanup_all_backing_images(client)
-
Expand source code
def cleanup_all_backing_images(client): backing_images = client.list_backing_image() for bi in backing_images: try: client.delete(bi) except Exception as e: print("\nException when cleanup backing image ", bi) print(e) for i in range(RETRY_COUNTS): backing_images = client.list_backing_image() if len(backing_images) == 0: break time.sleep(RETRY_INTERVAL) assert len(client.list_backing_image()) == 0
def cleanup_all_recurring_jobs(client)
-
Expand source code
def cleanup_all_recurring_jobs(client): recurring_jobs = client.list_recurring_job() for recurring_job in recurring_jobs: try: client.delete(recurring_job) except Exception as e: print("\nException when cleanup recurring job ", recurring_job) print(e) wait_for_recurring_jobs_cleanup(client)
def cleanup_all_support_bundles(client)
-
Expand source code
def cleanup_all_support_bundles(client): """ Clean up all support bundles :param client: The Longhorn client to use in the request. """ support_bundles = client.list_support_bundle() for support_bundle in support_bundles: id = support_bundle['id'] name = support_bundle['name'] # ignore the error when clean up try: delete_support_bundle(id, name, client) except Exception as e: print("\nException when cleanup support_bundle ", support_bundle) print(e) ok = False for _ in range(RETRY_COUNTS): support_bundles = client.list_support_bundle() if len(support_bundles) == 0: ok = True break time.sleep(RETRY_INTERVAL) assert ok
Clean up all support bundles :param client: The Longhorn client to use in the request.
def cleanup_all_volumes(client)
-
Expand source code
def cleanup_all_volumes(client): """ Clean up all volumes :param client: The Longhorn client to use in the request. """ volumes = client.list_volume() for v in volumes: # ignore the error when clean up try: client.delete(v) wait_for_volume_delete(client, v.name) except Exception as e: print("\nException when cleanup volume ", v) print(e) for i in range(RETRY_COUNTS): volumes = client.list_volume() if len(volumes) == 0: break time.sleep(RETRY_INTERVAL) volumes = client.list_volume() assert len(volumes) == 0
Clean up all volumes :param client: The Longhorn client to use in the request.
def cleanup_client()
-
Expand source code
def cleanup_client(): core_api = k8sclient.CoreV1Api() client = get_longhorn_api_client() enable_default_disk(client) cleanup_all_volumes(client) # cleanup test disks cleanup_test_disks(client) if recurring_job_feature_supported(client): cleanup_all_recurring_jobs(client) if backing_image_feature_supported(client): cleanup_all_backing_images(client) cleanup_crypto_secret() cleanup_storage_class() if system_backup_feature_supported(client): system_restores_cleanup(client) cleanup_all_support_bundles(client) # enable nodes scheduling reset_node(client, core_api) reset_settings(client) reset_disks_for_all_nodes(client) scale_up_engine_image_daemonset(client) reset_engine_image(client) wait_for_all_instance_manager_running(client) enable_v2 = os.environ.get('RUN_V2_TEST') if enable_v2 == "true": return # check replica subdirectory of default disk path if not os.path.exists(DEFAULT_REPLICA_DIRECTORY): subprocess.check_call( ["mkdir", "-p", DEFAULT_REPLICA_DIRECTORY])
def cleanup_crypto_secret()
-
Expand source code
def cleanup_crypto_secret(): secret_deletes = ["longhorn-crypto"] api = get_core_api_client() ret = api.list_namespaced_secret(namespace=LONGHORN_NAMESPACE) for sc in ret.items: if sc.metadata.name in secret_deletes: delete_crypto_secret(name=sc.metadata.name, namespace=LONGHORN_NAMESPACE) ok = False for _ in range(RETRY_COUNTS): ok = True ret = api.list_namespaced_secret(namespace=LONGHORN_NAMESPACE) for s in ret.items: if s.metadata.name in secret_deletes: ok = False break if ok: break time.sleep(RETRY_INTERVAL) assert ok
def cleanup_disks_on_node(client, node_id, *disks)
-
Expand source code
def cleanup_disks_on_node(client, node_id, *disks): # NOQA # Disable scheduling for the new disks on self node node = client.by_id_node(node_id) for name, disk in node.disks.items(): if disk.path != DEFAULT_DISK_PATH: disk.allowScheduling = False # Update disks of self node update_disks = get_update_disks(node.disks) update_node_disks(client, node.name, disks=update_disks, retry=True) node = wait_for_disk_update(client, node_id, len(update_disks)) # Remove new disks on self node and enable scheduling for the default disk default_disks = {} for name, disk in iter(node.disks.items()): if disk.path == DEFAULT_DISK_PATH: disk.allowScheduling = True default_disks[name] = disk # Update disks of self node update_disks = get_update_disks(node.disks) update_node_disks(client, node.name, disks=default_disks, retry=True) wait_for_disk_update(client, node_id, len(default_disks)) # Cleanup host disks for disk in disks: cleanup_host_disks(client, disk)
def cleanup_host_disk(vol_name)
-
Expand source code
def cleanup_host_disk(vol_name): mount_path = os.path.join(DIRECTORY_PATH, vol_name) umount_disk(mount_path) cmd = ['rm', '-r', mount_path] subprocess.check_call(cmd)
def cleanup_host_disks(client, *args)
-
Expand source code
def cleanup_host_disks(client, *args): # clean disk for vol_name in args: # umount disk cleanup_host_disk(vol_name) # clean volume cleanup_volume_by_name(client, vol_name)
def cleanup_node_disks(client, node_name)
-
Expand source code
def cleanup_node_disks(client, node_name): node = client.by_id_node(node_name) disks = node.disks for _, disk in iter(disks.items()): disk.allowScheduling = False update_disks = get_update_disks(disks) node = client.by_id_node(node_name) # Retry if "too many retries error" happened. for _ in range(NODE_UPDATE_RETRY_COUNT): try: node.diskUpdate(disks=update_disks) except Exception as e: if disk_being_syncing in str(e.error.message): time.sleep(NODE_UPDATE_RETRY_INTERVAL) continue print(e) raise else: break for name, disk in iter(disks.items()): wait_for_disk_status(client, node_name, name, "allowScheduling", False) # Retry if "too many retries error" happened. for _ in range(NODE_UPDATE_RETRY_COUNT): try: node.diskUpdate(disks={}) except Exception as e: if disk_being_syncing in str(e.error.message): time.sleep(NODE_UPDATE_RETRY_INTERVAL) continue print(e) raise else: break return wait_for_disk_update(client, node_name, 0)
def cleanup_storage_class()
-
Expand source code
def cleanup_storage_class(): # premium-rwo, standard-rwo and standard are installed in gke by default # azurefile-csi, azurefile-csi-premium, azurefile-premium, managed, # managed-csi, managed-csi-premium, managed-premium are installed # in aks by default skip_sc_deletes = ["longhorn", "local-path", "premium-rwo", "standard-rwo", "standard", "azurefile-csi", "azurefile-csi-premium", "azurefile-premium", "managed", "managed-csi", "managed-csi-premium", "managed-premium"] api = get_storage_api_client() ret = api.list_storage_class() for sc in ret.items: if sc.metadata.name in skip_sc_deletes: continue delete_storage_class(sc.metadata.name) ok = False for _ in range(RETRY_COUNTS): ok = True ret = api.list_storage_class() for sc in ret.items: if sc.metadata.name not in skip_sc_deletes: ok = False break if ok: break time.sleep(RETRY_INTERVAL) assert ok
def cleanup_test_disks(client)
-
Expand source code
def cleanup_test_disks(client): try: del_dirs = os.listdir(DIRECTORY_PATH) except FileNotFoundError: del_dirs = [] host_id = get_self_host_id() node = client.by_id_node(host_id) disks = node.disks for name, disk in iter(disks.items()): for del_dir in del_dirs: dir_path = os.path.join(DIRECTORY_PATH, del_dir) if dir_path == disk.path: disk.allowScheduling = False update_disks = get_update_disks(disks) # Retry if "too many retries error" happened. for _ in range(NODE_UPDATE_RETRY_COUNT): try: node = node.diskUpdate(disks=update_disks) disks = node.disks for name, disk in iter(disks.items()): for del_dir in del_dirs: dir_path = os.path.join(DIRECTORY_PATH, del_dir) if dir_path == disk.path: wait_for_disk_status(client, host_id, name, "allowScheduling", False) except Exception as e: if disk_being_syncing in str(e.error.message): time.sleep(NODE_UPDATE_RETRY_INTERVAL) continue print("\nException when update node disks", node) print(e) raise else: break # delete test disks disks = node.disks update_disks = {} for name, disk in iter(disks.items()): if disk.allowScheduling: update_disks[name] = disk # Retry if "too many retries error" happened. for _ in range(NODE_UPDATE_RETRY_COUNT): try: node.diskUpdate(disks=update_disks) wait_for_disk_update(client, host_id, len(update_disks)) except Exception as e: if disk_being_syncing in str(e.error.message): time.sleep(NODE_UPDATE_RETRY_INTERVAL) continue print("\nException when delete node test disks", node) print(e) raise else: break # cleanup host disks for del_dir in del_dirs: try: cleanup_host_disk(del_dir) except Exception as e: print("\nException when cleanup host disk", del_dir) print(e) pass
def cleanup_volume(client, volume)
-
Expand source code
def cleanup_volume(client, volume): """ Clean up the volume after the test. :param client: The Longhorn client to use in the request. :param volume: The volume to clean up. """ volume.detach() volume = wait_for_volume_detached(client, volume.name) client.delete(volume) wait_for_volume_delete(client, volume.name) volumes = client.list_volume() assert len(volumes) == 0
Clean up the volume after the test. :param client: The Longhorn client to use in the request. :param volume: The volume to clean up.
def cleanup_volume_by_name(client, vol_name)
-
Expand source code
def cleanup_volume_by_name(client, vol_name): volume = client.by_id_volume(vol_name) volume.detach() client.delete(volume) wait_for_volume_delete(client, vol_name)
def client(request)
-
Expand source code
@pytest.fixture def client(request): """ Return an individual Longhorn API client for testing. """ k8sconfig.load_incluster_config() # Make sure nodes and managers are all online. ips = get_mgr_ips() api_client = None # check if longhorn manager port is open before calling get_client for ip in ips: family = socket.AF_INET6 if ':' in ip else socket.AF_INET sock = socket.socket(family, socket.SOCK_STREAM) try: if sock.connect_ex((ip, 9500)) == 0: api_client = get_client(ip + PORT) break finally: sock.close() if api_client is None: raise RuntimeError( "Failed to connect to any Longhorn manager on ports 9500") hosts = api_client.list_node() assert len(hosts) == len(ips) request.addfinalizer(lambda: cleanup_client()) if not os.path.exists(DIRECTORY_PATH): try: os.makedirs(DIRECTORY_PATH) except OSError as e: raise Exception( f"Failed to create directory {DIRECTORY_PATH}: {e}" ) cleanup_client() return api_client
Return an individual Longhorn API client for testing.
def clients(request)
-
Expand source code
@pytest.fixture def clients(request): k8sconfig.load_incluster_config() ips = get_mgr_ips() client = get_client(ips[0] + PORT) hosts = client.list_node() assert len(hosts) == len(ips) clis = get_clients(hosts) def finalizer(): cleanup_client() request.addfinalizer(finalizer) cleanup_client() return clis
def copy_file_to_volume_dev_mb_data(src_path, dest_path, src_offset, dest_offset, size_in_mb, timeout_cnt=5)
-
Expand source code
def copy_file_to_volume_dev_mb_data(src_path, dest_path, src_offset, dest_offset, size_in_mb, timeout_cnt=5): cmd = [ '/bin/sh', '-c', 'dd if=%s of=%s bs=1M count=%d skip=%d seek=%d' % (src_path, dest_path, size_in_mb, src_offset, dest_offset) ] with timeout(seconds=STREAM_EXEC_TIMEOUT * timeout_cnt, error_message='Timeout on copying file to dev'): subprocess.check_call(cmd)
def copy_pod_volume_data(api, pod_name, src_path, dest_path)
-
Expand source code
def copy_pod_volume_data(api, pod_name, src_path, dest_path): write_cmd = [ '/bin/sh', '-c', 'dd if=' + src_path + ' of=' + dest_path ] return stream( api.connect_get_namespaced_pod_exec, pod_name, 'default', command=write_cmd, stderr=True, stdin=False, stdout=True, tty=False)
def core_api(request)
-
Expand source code
@pytest.fixture def core_api(request): """ Create a new CoreV1API instance. Returns: A new CoreV1API Instance. """ c = Configuration() c.assert_hostname = False Configuration.set_default(c) k8sconfig.load_incluster_config() core_api = k8sclient.CoreV1Api() return core_api
Create a new CoreV1API instance.
Returns
A new CoreV1API Instance.
def crash_engine_process_with_sigkill(client, core_api, volume_name)
-
Expand source code
def crash_engine_process_with_sigkill(client, core_api, volume_name): volume = client.by_id_volume(volume_name) ins_mgr_name = volume.controllers[0].instanceManagerName kill_command = [ '/bin/sh', '-c', "kill `pgrep -f \"controller " + volume_name + "\"`"] with timeout(seconds=STREAM_EXEC_TIMEOUT, error_message='Timeout on executing stream read'): stream(core_api.connect_get_namespaced_pod_exec, ins_mgr_name, LONGHORN_NAMESPACE, command=kill_command, stderr=True, stdin=False, stdout=True, tty=False)
def crash_replica_processes(client, api, volname, replicas=None, wait_to_fail=True)
-
Expand source code
def crash_replica_processes(client, api, volname, replicas=None, wait_to_fail=True): threads = [] if replicas is None: volume = client.by_id_volume(volname) replicas = volume.replicas for r in replicas: assert r.instanceManagerName != "" pgrep_command = f"pgrep -f {r['dataPath']}" pid = exec_instance_manager(api, r.instanceManagerName, pgrep_command) assert pid != "" kill_command = f"kill {pid}" exec_instance_manager(api, r.instanceManagerName, kill_command) if wait_to_fail is True: thread = create_assert_error_check_thread( wait_for_replica_failed, client, volname, r['name'], RETRY_COUNTS, RETRY_INTERVAL_SHORT ) threads.append(thread) if wait_to_fail: assert_from_assert_error_check_threads(threads)
def create_and_check_volume(client,
volume_name,
num_of_replicas=3,
size='33554432',
backing_image='',
frontend='blockdev',
snapshot_data_integrity='ignored',
access_mode='rwo',
data_engine='v1')-
Expand source code
def create_and_check_volume(client, volume_name, num_of_replicas=3, size=SIZE, backing_image="", frontend=VOLUME_FRONTEND_BLOCKDEV, snapshot_data_integrity="ignored", access_mode=ACCESS_MODE_RWO, data_engine=DATA_ENGINE): # NOQA """ Create a new volume with the specified parameters. Assert that the new volume is detached and that all of the requested parameters match. :param client: The Longhorn client to use in the request. :param volume_name: The name of the volume. :param num_of_replicas: The number of replicas the volume should have. :param size: The size of the volume, as a string representing the number of bytes. :param backing_image: The backing image to use for the volume. :param frontend: The frontend to use for the volume. :return: The volume instance created. """ if not backing_image_feature_supported(client): backing_image = None client.create_volume(name=volume_name, size=size, numberOfReplicas=num_of_replicas, backingImage=backing_image, frontend=frontend, snapshotDataIntegrity=snapshot_data_integrity, accessMode=access_mode, dataEngine=data_engine) volume = wait_for_volume_detached(client, volume_name) assert volume.name == volume_name assert volume.size == size assert volume.numberOfReplicas == num_of_replicas assert volume.state == "detached" if backing_image_feature_supported(client): assert volume.backingImage == backing_image assert volume.frontend == frontend assert volume.created != "" return volume
Create a new volume with the specified parameters. Assert that the new volume is detached and that all of the requested parameters match.
:param client: The Longhorn client to use in the request. :param volume_name: The name of the volume. :param num_of_replicas: The number of replicas the volume should have. :param size: The size of the volume, as a string representing the number of bytes. :param backing_image: The backing image to use for the volume. :param frontend: The frontend to use for the volume. :return: The volume instance created.
def create_and_wait_deployment(apps_api, deployment_manifest)
-
Expand source code
def create_and_wait_deployment(apps_api, deployment_manifest): apps_api.create_namespaced_deployment( body=deployment_manifest, namespace='default') deployment_name = deployment_manifest["metadata"]["name"] desired_replica_count = deployment_manifest["spec"]["replicas"] wait_deployment_replica_ready( apps_api, deployment_name, desired_replica_count )
def create_and_wait_pod(api, pod_manifest)
-
Expand source code
def create_and_wait_pod(api, pod_manifest): """ Creates a new Pod attached to a PersistentVolumeClaim for testing. The function will block until the Pod is online or until it times out, whichever occurs first. The volume created by the manifest passed in will be mounted to '/data'. Args: api: An instance of CoreV1API. pod_name: The name of the Pod. volume: The volume manifest. """ api.create_namespaced_pod( body=pod_manifest, namespace='default') pod_name = pod_manifest['metadata']['name'] wait_pod(pod_name)
Creates a new Pod attached to a PersistentVolumeClaim for testing.
The function will block until the Pod is online or until it times out, whichever occurs first. The volume created by the manifest passed in will be mounted to '/data'.
Args
api
- An instance of CoreV1API.
pod_name
- The name of the Pod.
volume
- The volume manifest.
def create_and_wait_statefulset(statefulset_manifest)
-
Expand source code
def create_and_wait_statefulset(statefulset_manifest): """ Create a new StatefulSet for testing. This function will block until all replicas in the StatefulSet are online or it times out, whichever occurs first. """ create_statefulset(statefulset_manifest) wait_statefulset(statefulset_manifest)
Create a new StatefulSet for testing.
This function will block until all replicas in the StatefulSet are online or it times out, whichever occurs first.
def create_assert_error_check_thread(func, *args)
-
Expand source code
def create_assert_error_check_thread(func, *args): """ Do func by threading with arguments Parameters: func: function that want to do things in parallel. args: arguments for function. """ assert isinstance(func, types.FunctionType), "First arg is not a function." t = AssertErrorCheckThread(target=func, args=args) t.start() return t
Do func by threading with arguments
Parameters
func: function that want to do things in parallel. args: arguments for function.
def create_backing_image_with_matching_url(client,
name,
url,
minNumberOfCopies=1,
nodeSelector=[],
diskSelector=[],
dataEngine='v1')-
Expand source code
def create_backing_image_with_matching_url(client, name, url, minNumberOfCopies=1, nodeSelector=[], diskSelector=[], dataEngine=DATA_ENGINE): backing_images = client.list_backing_image() found = False for bi in backing_images: if bi.name == name: found = True break if found: if bi.sourceType != BACKING_IMAGE_SOURCE_TYPE_DOWNLOAD or \ bi.parameters["url"] != url: client.delete(bi) bi = client.by_id_backing_image(name=name) if bi is None or bi.deletionTimestamp != "": wait_for_backing_image_delete(client, name) found = False if not found: expected_checksum = "" # Only the following 2 URLs will be used in the integration tests # for now. if url == BACKING_IMAGE_RAW_URL: expected_checksum = BACKING_IMAGE_RAW_CHECKSUM elif url == BACKING_IMAGE_QCOW2_URL: if dataEngine == "v2": expected_checksum = BACKING_IMAGE_RAW_CHECKSUM else: expected_checksum = BACKING_IMAGE_QCOW2_CHECKSUM bi = client.create_backing_image( name=name, sourceType=BACKING_IMAGE_SOURCE_TYPE_DOWNLOAD, parameters={"url": url}, expectedChecksum=expected_checksum, minNumberOfCopies=minNumberOfCopies, nodeSelector=nodeSelector, diskSelector=diskSelector, dataEngine=dataEngine) assert bi is_ready = False for i in range(RETRY_COUNTS): bi = client.by_id_backing_image(name) if (len(bi.diskFileStatusMap) == minNumberOfCopies and bi.currentChecksum != ""): for disk, status in iter(bi.diskFileStatusMap.items()): if status.state == "ready": is_ready = True break if is_ready: break time.sleep(RETRY_INTERVAL) return bi
def create_backup(client, volname, data={}, labels={})
-
Expand source code
def create_backup(client, volname, data={}, labels={}): volume = client.by_id_volume(volname) create_snapshot(client, volname) if not data: data = write_volume_random_data(volume) else: data = write_volume_data(volume, data) snap = create_snapshot(client, volname) create_snapshot(client, volname) # after backup request we need to wait for completion of the backup # since the backup.cfg will only be available once the backup operation # has been completed volume.snapshotBackup(name=snap.name, labels=labels) wait_for_backup_completion(client, volname, snap.name) verified = False for i in range(RETRY_COMMAND_COUNT): bv, b = find_backup(client, volname, snap.name) new_b = bv.backupGet(name=b.name) if new_b.name == b.name and \ new_b.url == b.url and \ new_b.snapshotName == b.snapshotName and \ new_b.snapshotCreated == b.snapshotCreated and \ new_b.created == b.created and \ new_b.volumeName == b.volumeName and \ new_b.volumeSize == b.volumeSize and \ new_b.volumeCreated == b.volumeCreated: verified = True break time.sleep(RETRY_INTERVAL) assert verified # Don't directly compare the Label dictionaries, since the server could # have added extra Labels. for key, val in iter(labels.items()): assert new_b.labels.get(key) == val volume = wait_for_volume_status(client, volname, "lastBackup", b.name) assert volume.lastBackupAt != "" return bv, b, snap, data
def create_backup_from_volume_attached_to_pod(client, core_api, volume_name, pod_name, data_path='/data/test', data_size=100)
-
Expand source code
def create_backup_from_volume_attached_to_pod(client, core_api, volume_name, pod_name, data_path='/data/test', data_size=DATA_SIZE_IN_MB_1): """ Write data in the pod and take a backup. Args: client: The Longhorn client to use in the request. core_api: An instance of CoreV1API. pod_name: The name of the Pod. volume_name: The volume name which is attached to the pod. data_path: File name suffixed to the mount point. e.g /data/file data_size: Size of the data to be written in the pod. Returns: The backup volume name, backup, checksum of data written in the backup """ write_pod_volume_random_data(core_api, pod_name, data_path, data_size) data_checksum = get_pod_data_md5sum(core_api, pod_name, data_path) snap = create_snapshot(client, volume_name) volume = client.by_id_volume(volume_name) volume.snapshotBackup(name=snap.name) wait_for_backup_completion(client, volume_name, snap.name) backup_volume, backup = find_backup(client, volume_name, snap.name) return backup_volume, backup, data_checksum
Write data in the pod and take a backup.
Args
client
- The Longhorn client to use in the request.
core_api
- An instance of CoreV1API.
pod_name
- The name of the Pod.
volume_name
- The volume name which is attached to the pod.
data_path
- File name suffixed to the mount point. e.g /data/file
data_size
- Size of the data to be written in the pod.
Returns
The backup volume name, backup, checksum of data written in the backup
def create_crypto_secret(secret_manifest, namespace='longhorn-system')
-
Expand source code
def create_crypto_secret(secret_manifest, namespace=LONGHORN_NAMESPACE): api = get_core_api_client() api.create_namespaced_secret(namespace, body=secret_manifest)
def create_deployment_and_write_data(client,
core_api,
make_deployment_with_pvc,
volume_name,
size,
replica_count,
data_size,
attach_node_id=None)-
Expand source code
def create_deployment_and_write_data(client, # NOQA core_api, # NOQA make_deployment_with_pvc, # NOQA volume_name, # NOQA size, # NOQA replica_count, # NOQA data_size, # NOQA attach_node_id=None): # NOQA apps_api = get_apps_api_client() volume = client.create_volume(name=volume_name, size=size, numberOfReplicas=replica_count, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) pvc_name = volume_name + "-pvc" create_pv_for_volume(client, core_api, volume, volume_name) create_pvc_for_volume(client, core_api, volume, pvc_name) deployment_name = volume_name + "-dep" deployment = make_deployment_with_pvc(deployment_name, pvc_name) if attach_node_id: deployment["spec"]["template"]["spec"]["nodeSelector"] \ = {"kubernetes.io/hostname": attach_node_id} create_and_wait_deployment(apps_api, deployment) data_path = '/data/test' deployment_pod_names = get_deployment_pod_names(core_api, deployment) write_pod_volume_random_data(core_api, deployment_pod_names[0], data_path, data_size) checksum = get_pod_data_md5sum(core_api, deployment_pod_names[0], data_path) volume = client.by_id_volume(volume_name) return volume, deployment_pod_names[0], checksum, deployment
def create_host_disk(client, vol_name, size, node_id)
-
Expand source code
def create_host_disk(client, vol_name, size, node_id): # create a single replica volume and attach it to node volume = create_volume(client, vol_name, size, node_id, 1) # prepare the disk in the host filesystem disk_path = prepare_host_disk(get_volume_endpoint(volume), volume.name) return disk_path
def create_pv_for_volume(client, core_api, volume, pv_name, fs_type='ext4')
-
Expand source code
def create_pv_for_volume(client, core_api, volume, pv_name, fs_type="ext4"): volume.pvCreate(pvName=pv_name, fsType=fs_type) for i in range(RETRY_COUNTS): if check_pv_existence(core_api, pv_name): break time.sleep(RETRY_INTERVAL) assert check_pv_existence(core_api, pv_name) ks = { 'pvName': pv_name, 'pvStatus': 'Available', 'namespace': '', 'lastPVCRefAt': '', 'lastPodRefAt': '', } wait_volume_kubernetes_status(client, volume.name, ks)
def create_pvc(pvc_manifest)
-
Expand source code
def create_pvc(pvc_manifest): api = get_core_api_client() api.create_namespaced_persistent_volume_claim( 'default', pvc_manifest)
def create_pvc_for_volume(client, core_api, volume, pvc_name, pvc_namespace='default')
-
Expand source code
def create_pvc_for_volume(client, core_api, volume, pvc_name, pvc_namespace="default"): # NOQA volume.pvcCreate(namespace=pvc_namespace, pvcName=pvc_name) for i in range(RETRY_COUNTS): if check_pvc_existence(core_api, pvc_name, pvc_namespace): break time.sleep(RETRY_INTERVAL) assert check_pvc_existence(core_api, pvc_name, pvc_namespace) ks = { 'pvStatus': 'Bound', 'namespace': pvc_namespace, 'lastPVCRefAt': '', } wait_volume_kubernetes_status(client, volume.name, ks)
def create_pvc_spec(name)
-
Expand source code
def create_pvc_spec(name): # type: (str) -> dict """ Generate a volume manifest using the given name for the PVC. This spec is used to test dynamically provisioned PersistentVolumes (those created using a storage class). """ return { 'name': 'pod-data', 'persistentVolumeClaim': { 'claimName': name, 'readOnly': False } }
Generate a volume manifest using the given name for the PVC.
This spec is used to test dynamically provisioned PersistentVolumes (those created using a storage class).
def create_recurring_jobs(client, recurring_jobs)
-
Expand source code
def create_recurring_jobs(client, recurring_jobs): time.sleep(60 - datetime.utcnow().second) for name, spec in recurring_jobs.items(): client.create_recurring_job(Name=name, Task=spec["task"], Groups=spec["groups"], Cron=spec["cron"], Retain=spec["retain"], Concurrency=spec["concurrency"], Labels=spec["labels"])
def create_rwx_volume_with_storageclass(client, core_api, storage_class)
-
Expand source code
def create_rwx_volume_with_storageclass(client, core_api, storage_class): VOLUME_SIZE = str(DEFAULT_VOLUME_SIZE * Gi) pvc_name = generate_volume_name() pvc_spec = { "apiVersion": "v1", "kind": "PersistentVolumeClaim", "metadata": { "name": pvc_name, }, "spec": { "accessModes": [ "ReadWriteMany" ], "storageClassName": storage_class['metadata']['name'], "resources": { "requests": { "storage": VOLUME_SIZE } } } } core_api.create_namespaced_persistent_volume_claim( 'default', pvc_spec ) check_pvc_in_specific_status(core_api, pvc_name, 'Bound') volume_name = get_volume_name(core_api, pvc_name) wait_for_volume_creation(client, volume_name) if storage_class['parameters']['fromBackup'] != "": wait_for_volume_restoration_completed(client, volume_name) wait_for_volume_detached(client, volume_name) return volume_name
def create_snapshot(longhorn_api_client, volume_name)
-
Expand source code
def create_snapshot(longhorn_api_client, volume_name): volume = longhorn_api_client.by_id_volume(volume_name) snap = volume.snapshotCRCreate() snap_name = snap.name snapshot_created = False for i in range(RETRY_COUNTS): snapshots = volume.snapshotList(volume=volume_name) for vs in snapshots.data: if vs.name == snap_name: snapshot_created = True snap = vs break if snapshot_created is True: break time.sleep(RETRY_INTERVAL) assert snapshot_created return snap
def create_statefulset(statefulset_manifest)
-
Expand source code
def create_statefulset(statefulset_manifest): """ Create a new StatefulSet for testing. """ api = get_apps_api_client() api.create_namespaced_stateful_set( body=statefulset_manifest, namespace='default')
Create a new StatefulSet for testing.
def create_storage_class(sc_manifest, data_engine='v1')
-
Expand source code
def create_storage_class(sc_manifest, data_engine=DATA_ENGINE): api = get_storage_api_client() sc_manifest['parameters']['dataEngine'] = data_engine api.create_storage_class( body=sc_manifest) sc_name = sc_manifest['metadata']['name'] for i in range(RETRY_COUNTS): try: sc = api.read_storage_class(sc_name) return sc except ApiException as e: if e.status != 404: raise time.sleep(RETRY_INTERVAL) assert False, f"Failed to wait for sc {sc_name} to be created"
def create_support_bundle(client)
-
Expand source code
def create_support_bundle(client): # NOQA data = {'description': 'Test', 'issueURL': ""} return requests.post(get_support_bundle_url(client), json=data).json()
def create_volume(client, vol_name, size, node_id, r_num)
-
Expand source code
def create_volume(client, vol_name, size, node_id, r_num): volume = client.create_volume(name=vol_name, size=size, numberOfReplicas=r_num, dataEngine=DATA_ENGINE) assert volume.numberOfReplicas == r_num assert volume.frontend == VOLUME_FRONTEND_BLOCKDEV volume = wait_for_volume_detached(client, vol_name) assert len(volume.replicas) == r_num assert volume.state == "detached" assert volume.created != "" volumeByName = client.by_id_volume(vol_name) assert volumeByName.name == volume.name assert volumeByName.size == volume.size assert volumeByName.numberOfReplicas == volume.numberOfReplicas assert volumeByName.state == volume.state assert volumeByName.created == volume.created volume.attach(hostId=node_id) volume = wait_for_volume_healthy(client, vol_name) return volume
def create_volume_and_backup(client, vol_name, vol_size, backup_data_size)
-
Expand source code
def create_volume_and_backup(client, vol_name, vol_size, backup_data_size): client.create_volume(name=vol_name, numberOfReplicas=1, size=str(vol_size), dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, vol_name) volume.attach(hostId=get_self_host_id()) volume = wait_for_volume_healthy(client, vol_name) data = {'pos': 0, 'len': backup_data_size, 'content': generate_random_data(backup_data_size)} _, backup, _, _ = create_backup(client, vol_name, data) return volume, backup
def create_volume_and_write_data(client, volume_name, volume_size='33554432', data_engine='v1')
-
Expand source code
def create_volume_and_write_data(client, volume_name, volume_size=SIZE, data_engine=DATA_ENGINE): """ 1. Create and attach a volume 2. Write the data to volume """ # Step 1 volume = create_and_check_volume(client, volume_name, size=volume_size, data_engine=data_engine) volume = volume.attach(hostId=get_self_host_id()) volume = wait_for_volume_healthy(client, volume_name) # Step 2 volume_data = write_volume_random_data(volume) return volume, volume_data
- Create and attach a volume
- Write the data to volume
def crypto_secret(request)
-
Expand source code
@pytest.fixture def crypto_secret(request): core_api = get_core_api_client() def get_crypto_secret(namespace=LONGHORN_NAMESPACE): crypto_secret.manifest = { 'apiVersion': 'v1', 'kind': 'Secret', 'metadata': { 'name': 'longhorn-crypto', 'namespace': namespace, }, 'stringData': { 'CRYPTO_KEY_VALUE': 'simple', 'CRYPTO_KEY_PROVIDER': 'secret' } } if is_k8s_node_gke_cos(core_api): # GKE COS's cryptsetup does not natively support "argon2i" and # "argon2id". # https://github.com/longhorn/longhorn/issues/10049 crypto_secret.manifest['stringData']['CRYPTO_PBKDF'] = 'pbkdf2' return crypto_secret.manifest def finalizer(): try: core_api.delete_namespaced_secret( name=crypto_secret.manifest['metadata']['name'], namespace=crypto_secret.manifest['metadata']['namespace']) except ApiException as e: assert e.status == 404 request.addfinalizer(finalizer) return get_crypto_secret
def csi_pv(request)
-
Expand source code
@pytest.fixture def csi_pv(request): return get_pv_manifest(request)
def csi_pv_backingimage(request)
-
Expand source code
@pytest.fixture def csi_pv_backingimage(request): pv_manifest = get_pv_manifest(request) pv_manifest['spec']['capacity']['storage'] = \ size_to_string(BACKING_IMAGE_EXT4_SIZE) pv_manifest['spec']['csi']['volumeAttributes']['backingImage'] = \ BACKING_IMAGE_NAME def finalizer(): api = get_core_api_client() delete_and_wait_pv(api, pv_manifest['metadata']['name']) client = get_longhorn_api_client() delete_and_wait_longhorn(client, pv_manifest['metadata']['name']) request.addfinalizer(finalizer) return pv_manifest
def csi_pvc_name(request)
-
Expand source code
@pytest.fixture def csi_pvc_name(request): return generate_volume_name()
def delete_and_wait_deployment(apps_api, deployment_name, namespace='default')
-
Expand source code
def delete_and_wait_deployment(apps_api, deployment_name, namespace='default'): try: apps_api.delete_namespaced_deployment( name=deployment_name, namespace=namespace ) except ApiException as e: assert e.status == 404 wait_delete_deployment(apps_api, deployment_name, namespace)
def delete_and_wait_longhorn(client, name)
-
Expand source code
def delete_and_wait_longhorn(client, name): """ Delete a volume from Longhorn. """ try: v = client.by_id_volume(name) client.delete(v) except ApiException as ex: assert ex.status == 404 except longhorn.ApiError as err: # for deleting a non-existing volume, # the status_code is 404. assert err.error.code == 404 wait_for_volume_delete(client, name)
Delete a volume from Longhorn.
def delete_and_wait_pod(api, pod_name, namespace='default', wait=True)
-
Expand source code
def delete_and_wait_pod(api, pod_name, namespace='default', wait=True): """ Delete a specified Pod. This function does not check if the Pod does exist and will throw an error if a nonexistent Pod is specified. Args: api: An instance of CoreV1API. pod_name: The name of the Pod. """ target_pod = None try: target_pod = api.read_namespaced_pod(name=pod_name, namespace=namespace) except ApiException as e: assert e.status == 404 return try: api.delete_namespaced_pod( name=pod_name, namespace=namespace, body=k8sclient.V1DeleteOptions()) except ApiException as e: assert e.status == 404 return if wait: wait_delete_pod(api, target_pod.metadata.uid, namespace=namespace)
Delete a specified Pod.
This function does not check if the Pod does exist and will throw an error if a nonexistent Pod is specified.
Args
api
- An instance of CoreV1API.
pod_name
- The name of the Pod.
def delete_and_wait_pv(api, pv_name)
-
Expand source code
def delete_and_wait_pv(api, pv_name): try: api.delete_persistent_volume( name=pv_name, body=k8sclient.V1DeleteOptions()) except ApiException as e: assert e.status == 404 wait_delete_pv(api, pv_name)
def delete_and_wait_pvc(api, pvc_name, retry_counts=150)
-
Expand source code
def delete_and_wait_pvc(api, pvc_name, retry_counts=RETRY_COUNTS): try: api.delete_namespaced_persistent_volume_claim( name=pvc_name, namespace='default', body=k8sclient.V1DeleteOptions()) except ApiException as e: assert e.status == 404 wait_delete_pvc(api, pvc_name, retry_counts=retry_counts)
def delete_and_wait_statefulset(api, client, statefulset)
-
Expand source code
def delete_and_wait_statefulset(api, client, statefulset): apps_api = get_apps_api_client() if not check_statefulset_existence(apps_api, statefulset['metadata']['name']): return # We need to generate the names for the PVCs on our own so we can # delete them. pod_data = get_statefulset_pod_info(api, statefulset) try: apps_api.delete_namespaced_stateful_set( name=statefulset['metadata']['name'], namespace='default', body=k8sclient.V1DeleteOptions()) except ApiException as e: assert e.status == 404 for i in range(DEFAULT_POD_TIMEOUT): ret = apps_api.list_namespaced_stateful_set(namespace='default') found = False for item in ret.items: if item.metadata.name == statefulset['metadata']['name']: found = True break if not found: break time.sleep(DEFAULT_POD_INTERVAL) assert not found client = get_longhorn_api_client() for pod in pod_data: # Wait on Pods too, we apparently had timeout issues with them. wait_delete_pod(api, pod['pod_uid']) delete_and_wait_pvc(api, pod['pvc_name']) # The StatefulSet tests involve both StorageClass provisioned volumes # and our manually created PVs. This checks the status of our PV once # the PVC is deleted. If it is Failed, we know it is a PV and we must # delete it manually. If it is removed from the system, we can just # wait for deletion. for i in range(DEFAULT_POD_TIMEOUT): ret = api.list_persistent_volume() found = False for item in ret.items: if item.metadata.name == pod['pv_name']: if item.status.phase in ('Failed', 'Released'): delete_and_wait_pv(api, pod['pv_name']) delete_and_wait_longhorn(client, pod['pv_name']) else: found = True break if not found: break time.sleep(DEFAULT_POD_INTERVAL) assert not found wait_for_volume_delete(client, pod['pv_name'])
def delete_and_wait_volume_attachment(storage_api, volume_attachment_name)
-
Expand source code
def delete_and_wait_volume_attachment(storage_api, volume_attachment_name): try: storage_api.delete_volume_attachment( name=volume_attachment_name ) except ApiException as e: assert e.status == 404 wait_delete_volume_attachment(storage_api, volume_attachment_name)
def delete_backup(client, bv, backup_name)
-
Expand source code
def delete_backup(client, bv, backup_name): bv.backupDelete(name=backup_name) wait_for_backup_delete(client, bv.volumeName, backup_name)
def delete_backup_backing_image(client, backing_image_name)
-
Expand source code
def delete_backup_backing_image(client, backing_image_name): bbi = client.by_id_backupBackingImage(backing_image_name) client.delete(bbi) wait_for_backup_backing_image_delete(client, backing_image_name)
def delete_backup_volume(client, backup_volume_name)
-
Expand source code
def delete_backup_volume(client, backup_volume_name): bv = client.by_id_backupVolume(backup_volume_name) client.delete(bv) wait_for_backup_volume_delete(client, backup_volume_name)
def delete_crypto_secret(namespace, name)
-
Expand source code
def delete_crypto_secret(namespace, name): api = get_core_api_client() try: api.delete_namespaced_secret(namespace=namespace, name=name) except ApiException as e: assert e.status == 404
def delete_replica_on_test_node(client, volume_name)
-
Expand source code
def delete_replica_on_test_node(client, volume_name): # NOQA lht_host_id = get_self_host_id() volume = client.by_id_volume(volume_name) for replica in volume.replicas: if replica.hostId == lht_host_id: replica_name = replica.name volume.replicaRemove(name=replica_name) wait_for_volume_degraded(client, volume_name)
def delete_replica_processes(client, api, volname)
-
Expand source code
def delete_replica_processes(client, api, volname): replica_map = {} volume = client.by_id_volume(volname) for r in volume.replicas: replica_map[r.instanceManagerName] = r.name for rm_name, r_name in replica_map.items(): delete_command = 'longhorn-instance-manager process delete ' + \ '--name ' + r_name exec_instance_manager(api, rm_name, delete_command)
def delete_statefulset(apps_api, statefulset)
-
Expand source code
def delete_statefulset(apps_api, statefulset): ss_name = statefulset['metadata']['name'] ss_namespace = statefulset['metadata']['namespace'] apps_api.delete_namespaced_stateful_set( name=ss_name, namespace=ss_namespace, body=k8sclient.V1DeleteOptions() ) for _ in range(DEFAULT_POD_TIMEOUT): ret = apps_api.list_namespaced_stateful_set(namespace=ss_namespace) found = False for item in ret.items: if item.metadata.name == ss_name: found = True break if not found: break time.sleep(DEFAULT_POD_INTERVAL) assert not found
def delete_storage_class(sc_name)
-
Expand source code
def delete_storage_class(sc_name): api = get_storage_api_client() try: api.delete_storage_class(sc_name, body=k8sclient.V1DeleteOptions()) except ApiException as e: assert e.status == 404
def delete_support_bundle(node_id, name, client)
-
Expand source code
def delete_support_bundle(node_id, name, client): url = get_support_bundle_url(client) support_bundle_url = '{}/{}/{}'.format(url, node_id, name) return requests.delete(support_bundle_url)
def dev_read(dev, start, count)
-
Expand source code
def dev_read(dev, start, count): r_data = "" fdev = open(dev, 'rb') if fdev is not None: fdev.seek(start) r_data = fdev.read(count) fdev.close() return r_data
def dev_write(dev, start, data)
-
Expand source code
def dev_write(dev, start, data): data = bytes(data, encoding='utf-8') w_length = 0 fdev = open(dev, 'rb+') if fdev is not None: fdev.seek(start) fdev.write(data) fdev.close() w_length = len(data) return w_length
def disable_auto_salvage(client)
-
Expand source code
@pytest.fixture def disable_auto_salvage(client): auto_salvage_setting = client.by_id_setting(SETTING_AUTO_SALVAGE) setting = client.update(auto_salvage_setting, value="false") assert setting.name == SETTING_AUTO_SALVAGE assert setting.value == "false" yield auto_salvage_setting = client.by_id_setting(SETTING_AUTO_SALVAGE) setting = client.update(auto_salvage_setting, value="true") assert setting.name == SETTING_AUTO_SALVAGE assert setting.value == "true"
def download_support_bundle(node_id, name, client, target_path='')
-
Expand source code
def download_support_bundle(node_id, name, client, target_path=""): # NOQA url = get_support_bundle_url(client) support_bundle_url = '{}/{}/{}'.format(url, node_id, name) download_url = '{}/download'.format(support_bundle_url) r = requests.get(download_url, allow_redirects=True, timeout=300) r.raise_for_status() if target_path != "": with open(target_path, 'wb') as f: f.write(r.content)
def enable_default_disk(client)
-
Expand source code
def enable_default_disk(client): lht_hostId = get_self_host_id() node = client.by_id_node(lht_hostId) disks = get_update_disks(node.disks) for disk in disks.values(): if disk["path"] == DEFAULT_DISK_PATH: disk.allowScheduling = True disk.evictionRequested = False update_node_disks(client, node.name, disks=disks, retry=True)
def exec_command_in_pod(api, command, pod_name, namespace, container=None)
-
Expand source code
def exec_command_in_pod(api, command, pod_name, namespace, container=None): """ Execute command in the pod. Args: api: An instance of CoreV1API. pod_name: The name of the Pod. command: The command to execute in the pod. namespace: The namespace where the pod exists. Returns: The output of the command. """ exec_command = [ '/bin/sh', '-c', command ] with timeout(seconds=STREAM_EXEC_TIMEOUT, error_message='Timeout on executing stream read/write'): return stream( api.connect_get_namespaced_pod_exec, pod_name, namespace, command=exec_command, stderr=True, stdin=False, stdout=True, container=container, tty=False)
Execute command in the pod.
Args
api
- An instance of CoreV1API.
pod_name
- The name of the Pod.
command
- The command to execute in the pod.
namespace
- The namespace where the pod exists.
Returns
The output of the command.
def exec_instance_manager(api, im_name, cmd)
-
Expand source code
def exec_instance_manager(api, im_name, cmd): exec_cmd = ['/bin/sh', '-c', cmd] with timeout(seconds=STREAM_EXEC_TIMEOUT, error_message='Timeout on executing stream read'): output = stream(api.connect_get_namespaced_pod_exec, im_name, LONGHORN_NAMESPACE, command=exec_cmd, stderr=True, stdin=False, stdout=True, tty=False) return output
def exec_local(cmd)
-
Expand source code
def exec_local(cmd): exec_cmd = cmd.split() return subprocess.check_output(exec_cmd)
def exec_nsenter(cmd, process_name=None)
-
Expand source code
def exec_nsenter(cmd, process_name=None): if process_name: proc_pid = find_process_pid(process_name) cmd_parts = cmd.split() else: proc_pid = find_dockerd_pid() or "1" cmd_parts = ["bash", "-c", cmd] exec_cmd = ["nsenter", "--mount=/host/proc/{}/ns/mnt".format(proc_pid), "--net=/host/proc/{}/ns/net".format(proc_pid)] exec_cmd.extend(cmd_parts) return subprocess.check_output(exec_cmd)
def expand_and_wait_for_pvc(api, pvc, size)
-
Expand source code
def expand_and_wait_for_pvc(api, pvc, size): pvc['spec']['resources'] = { 'requests': { 'storage': size_to_string(size) } } pvc_name = pvc['metadata']['name'] api.patch_namespaced_persistent_volume_claim( pvc_name, 'default', pvc) complete = False for i in range(RETRY_COUNTS_LONG): claim = api.read_namespaced_persistent_volume_claim( name=pvc_name, namespace='default') if claim.spec.resources.requests['storage'] ==\ claim.status.capacity['storage']: complete = True break time.sleep(RETRY_INTERVAL_LONG) assert complete return claim
def fail_replica_expansion(client, api, volname, size, replicas=None)
-
Expand source code
def fail_replica_expansion(client, api, volname, size, replicas=None): if replicas is None: volume = client.by_id_volume(volname) replicas = volume.replicas for r in replicas: tmp_meta_file_name = \ EXPANSION_SNAP_TMP_META_NAME_PATTERN % size # os.path.join() cannot deal with the path containing "/" cmd = [ '/bin/sh', '-c', 'mkdir %s && sync' % (INSTANCE_MANAGER_HOST_PATH_PREFIX + r.dataPath + "/" + tmp_meta_file_name) ] if not r.instanceManagerName: raise Exception( "Should use replica objects in the running volume," "otherwise the field r.instanceManagerName is empty") stream(api.connect_get_namespaced_pod_exec, r.instanceManagerName, LONGHORN_NAMESPACE, command=cmd, stderr=True, stdin=False, stdout=True, tty=False)
def find_ancestor_process_by_name(ancestor_name)
-
Expand source code
def find_ancestor_process_by_name(ancestor_name): p = find_self() while True: if not p or p["Pid"] == "1": break if p["Name"] == ancestor_name: return p["Pid"] p = get_process_info("/host/proc/{}/status".format(p["PPid"])) return
def find_backup(client, vol_name, snap_name)
-
Expand source code
def find_backup(client, vol_name, snap_name): """ find_backup will look for a backup on the backupstore it's important to note, that this can only be used for completed backups since the backup.cfg will only be written once a backup operation has been completed successfully """ bv = None for i in range(120): if bv is None: bv = find_backup_volume(client, vol_name) if bv is not None: backups = bv.backupList().data for b in backups: if b.snapshotName == snap_name: return bv, b time.sleep(RETRY_BACKUP_INTERVAL) assert False, "failed to find backup for snapshot " + snap_name + \ " for volume " + vol_name
find_backup will look for a backup on the backupstore it's important to note, that this can only be used for completed backups since the backup.cfg will only be written once a backup operation has been completed successfully
def find_backup_volume(client, volume_name, retry=1)
-
Expand source code
def find_backup_volume(client, volume_name, retry=1): for _ in range(retry): bvs = client.list_backupVolume() for bv in bvs: volumeName = getattr(bv, 'volumeName', bv.name) if volumeName == volume_name and bv.created != "": return bv time.sleep(RETRY_BACKUP_INTERVAL) return None
def find_dockerd_pid()
-
Expand source code
def find_dockerd_pid(): return find_ancestor_process_by_name("dockerd")
def find_process_pid(process_name)
-
Expand source code
def find_process_pid(process_name): for file in os.listdir(HOST_PROC_DIR): if not os.path.isdir(os.path.join(HOST_PROC_DIR, file)): continue # Check if file name is an integer if not file.isdigit(): continue with open(os.path.join(HOST_PROC_DIR, file, 'status'), 'r') as file: status_content = file.readlines() proc_status_content = None name_pattern = re.compile(r'^Name:\s+(.+)$') for line in status_content: name_match = name_pattern.match(line) if name_match and name_match.group(1) == process_name: proc_status_content = status_content break if proc_status_content is None: continue pid_pattern = re.compile(r'^Pid:\s+(\d+)$') for line in proc_status_content: pid_match = pid_pattern.match(line) if pid_match: return int(pid_match.group(1)) raise Exception(f"Failed to find the {process_name} PID")
def find_replica_for_backup(client, volume_name, backup_id)
-
Expand source code
def find_replica_for_backup(client, volume_name, backup_id): replica_name = None for _ in range(RETRY_EXEC_COUNTS): volume = client.by_id_volume(volume_name) for status in volume.backupStatus: if status.id == backup_id: replica_name = status.replica if replica_name: return replica_name else: time.sleep(RETRY_BACKUP_INTERVAL) assert replica_name
def find_self()
-
Expand source code
def find_self(): return get_process_info("/host/proc/self/status")
def fix_replica_expansion_failure(client, api, volname, size, replicas=None)
-
Expand source code
def fix_replica_expansion_failure(client, api, volname, size, replicas=None): if replicas is None: volume = client.by_id_volume(volname) replicas = volume.replicas for r in replicas: if not r.instanceManagerName: raise Exception( "Should use replica objects in the running volume," "otherwise the field r.instanceManagerName is empty") tmp_meta_file_name = \ EXPANSION_SNAP_TMP_META_NAME_PATTERN % size tmp_meta_file_path = \ INSTANCE_MANAGER_HOST_PATH_PREFIX + \ r.dataPath + "/" + tmp_meta_file_name removed = False for i in range(RETRY_COMMAND_COUNT): # os.path.join() cannot deal with the path containing "/" cmd = [ '/bin/sh', '-c', 'rm -rf %s && sync' % tmp_meta_file_path ] stream(api.connect_get_namespaced_pod_exec, r.instanceManagerName, LONGHORN_NAMESPACE, command=cmd, stderr=True, stdin=False, stdout=True, tty=False) cmd = ['/bin/sh', '-c', 'ls %s' % tmp_meta_file_path] output = stream( api.connect_get_namespaced_pod_exec, r.instanceManagerName, LONGHORN_NAMESPACE, command=cmd, stderr=True, stdin=False, stdout=True, tty=False) if "No such file or directory" in output: removed = True break time.sleep(RETRY_INTERVAL_LONG) assert removed
def generate_attachment_ticket_id()
-
Expand source code
def generate_attachment_ticket_id(): return ATTACHMENT_TICKET_ID_PREFIX + "-" + \ ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(6))
def generate_pod_with_pvc_manifest(pod_name, pvc_name)
-
Expand source code
def generate_pod_with_pvc_manifest(pod_name, pvc_name): pod_manifest = { "apiVersion": "v1", "kind": "Pod", "metadata": { "name": pod_name, "namespace": "default" }, "spec": { "containers": [ { "name": "volume-test", "image": "nginx:stable-alpine", "imagePullPolicy": "IfNotPresent", "volumeMounts": [ { "name": "volv", "mountPath": "/data" } ], "ports": [ { "containerPort": 80 } ] } ], "volumes": [ { "name": "volv", "persistentVolumeClaim": { "claimName": pvc_name } } ] } } return pod_manifest
def generate_random_data(count)
-
Expand source code
def generate_random_data(count): return ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(count))
def generate_random_pos(size, used={})
-
Expand source code
def generate_random_pos(size, used={}): for i in range(RETRY_COUNTS): pos = 0 if int(SIZE) != size: pos = random.randrange(0, int(SIZE)-size, 1) collided = False # it's [start, end) vs [pos, pos + size) for start, end in used.items(): if pos + size <= start or pos >= end: continue collided = True break if not collided: break assert not collided used[pos] = pos + size return pos
def generate_random_suffix()
-
Expand source code
def generate_random_suffix(): return "-" + ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(6))
def generate_sts_name()
-
Expand source code
def generate_sts_name(): return STATEFULSET_NAME + "-" + \ ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(6))
def generate_support_bundle(case_name)
-
Expand source code
def generate_support_bundle(case_name): # NOQA """ Generate support bundle into folder ./support_bundle/case_name.zip Won't generate support bundle if current support bundle count greater than MAX_SUPPORT_BINDLE_NUMBER. Args: case_name: support bundle will named case_name.zip """ os.makedirs("support_bundle", exist_ok=True) file_cnt = len(os.listdir("support_bundle")) if file_cnt >= MAX_SUPPORT_BINDLE_NUMBER: warnings.warn("Ignoring the bundle download because of \ avoiding overwhelming the disk usage.") return # Use API gen support bundle client = get_longhorn_api_client() cleanup_all_support_bundles(client) url = client._url.replace('schemas', 'supportbundles') data = {'description': case_name, 'issueURL': case_name} try: res_raw = requests.post(url, json=data) res_raw.raise_for_status() res = res_raw.json() except Exception as e: warnings.warn(f"Error while generating support bundle: {e}") return id = res['id'] name = res['name'] support_bundle_url = '{}/{}/{}'.format(url, id, name) for i in range(RETRY_EXEC_COUNTS): res = requests.get(support_bundle_url).json() if res['progressPercentage'] == 100: break else: time.sleep(RETRY_INTERVAL_LONG) if res['progressPercentage'] != 100: warnings.warn("Timeout to wait support bundle ready, skip download") return # Download support bundle download_url = '{}/download'.format(support_bundle_url) try: r = requests.get(download_url, allow_redirects=True, timeout=300) r.raise_for_status() with open('./support_bundle/{0}.zip'.format(case_name), 'wb') as f: f.write(r.content) except Exception as e: warnings.warn("Error occurred when downloading support bundle {}.zip\n\ The error was {}".format(case_name, e))
Generate support bundle into folder ./support_bundle/case_name.zip
Won't generate support bundle if current support bundle count greater than MAX_SUPPORT_BINDLE_NUMBER.
Args
case_name
- support bundle will named case_name.zip
def generate_volume_name()
-
Expand source code
def generate_volume_name(): return VOLUME_NAME + "-" + \ ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(6))
def get_all_support_bundle_manager_deployments(apps_api)
-
Expand source code
def get_all_support_bundle_manager_deployments(apps_api): # NOQA name_prefix = 'longhorn-support-bundle-manager' support_bundle_managers = [] deployments = apps_api.list_namespaced_deployment(LONGHORN_NAMESPACE) for deployment in deployments.items: if deployment.metadata.name.startswith(name_prefix): support_bundle_managers.append(deployment) return support_bundle_managers
def get_apps_api_client()
-
Expand source code
def get_apps_api_client(): load_k8s_config() return k8sclient.AppsV1Api()
def get_backupstore_poll_interval()
-
Expand source code
def get_backupstore_poll_interval(): poll_interval = os.environ.get("LONGHORN_BACKUPSTORE_POLL_INTERVAL", "") assert len(poll_interval) != 0 return poll_interval
def get_backupstore_url()
-
Expand source code
def get_backupstore_url(): backupstore = os.environ.get("LONGHORN_BACKUPSTORES", "") backupstore = backupstore.replace(" ", "") backupstores = backupstore.split(",") assert len(backupstores) != 0 return backupstores
def get_backupstores()
-
Expand source code
def get_backupstores(): backupstore = os.environ.get("LONGHORN_BACKUPSTORES", "") try: backupstore = backupstore.replace(" ", "") backupstores = backupstore.split(",") for i in range(len(backupstores)): backupstores[i] = backupstores[i].split(":")[0] except ValueError: backupstores = backupstore.split(":")[0] return backupstores
def get_client(address_with_port)
-
Expand source code
def get_client(address_with_port): # Split IP and port if address_with_port.count(':') > 1: ip_part, port = address_with_port.rsplit(':', 1) ip = ipaddress.ip_address(ip_part) if ip.version == 6: formatted_address = f"[{ip_part}]:{port}" else: formatted_address = f"{ip_part}:{port}" else: formatted_address = address_with_port url = f'http://{formatted_address}/v1/schemas' c = longhorn.from_env(url=url) return c
def get_clients(hosts)
-
Expand source code
def get_clients(hosts): clients = {} for host in hosts: assert host.name is not None assert host.address is not None clients[host.name] = get_client(host.address + PORT) return clients
def get_clone_volume_name(client, source_volume_name)
-
Expand source code
def get_clone_volume_name(client, source_volume_name): for _ in range(RETRY_EXEC_COUNTS): volumes = client.list_volume() for volume in volumes: if volume['cloneStatus']['sourceVolume'] == \ source_volume_name: return volume.name time.sleep(RETRY_INTERVAL_LONG) return None
def get_compatibility_test_image(cli_v, cli_minv, ctl_v, ctl_minv, data_v, data_minv)
-
Expand source code
def get_compatibility_test_image(cli_v, cli_minv, ctl_v, ctl_minv, data_v, data_minv): return "%s.%d-%d.%d-%d.%d-%d" % (COMPATIBILTY_TEST_IMAGE_PREFIX, cli_v, cli_minv, ctl_v, ctl_minv, data_v, data_minv)
def get_core_api_client()
-
Expand source code
def get_core_api_client(): load_k8s_config() return k8sclient.CoreV1Api()
def get_custom_object_api_client()
-
Expand source code
def get_custom_object_api_client(): load_k8s_config() return k8sclient.CustomObjectsApi()
def get_default_engine_image(client)
-
Expand source code
def get_default_engine_image(client): images = client.list_engine_image() for img in images: if img.default: return img assert False
def get_deployment_pod_names(core_api, deployment)
-
Expand source code
def get_deployment_pod_names(core_api, deployment): label_selector = \ "name=" + deployment["metadata"]["labels"]["name"] deployment_pod_list = \ core_api.list_namespaced_pod(namespace="default", label_selector=label_selector) pod_names = [] for pod in deployment_pod_list.items: pod_names.append(pod.metadata.name) return pod_names
def get_device_checksum(dev)
-
Expand source code
def get_device_checksum(dev): hash = hashlib.sha512() with open(dev, 'rb') as fdev: if fdev is not None: for chunk in iter(lambda: fdev.read(4096), b""): hash.update(chunk) return hash.hexdigest()
def get_disk_uuid()
-
Expand source code
def get_disk_uuid(): f = open(os.path.join(DEFAULT_DISK_PATH, 'longhorn-disk.cfg')) data = json.load(f) return data["diskUUID"]
def get_engine_host_id(client, vol_name)
-
Expand source code
def get_engine_host_id(client, vol_name): volume = client.by_id_volume(vol_name) engines = volume.controllers if len(engines) != 1: return return engines[0].hostId
def get_engine_image_status_value(client, ei_name)
-
Expand source code
def get_engine_image_status_value(client, ei_name): if hasattr(client.by_id_engine_image(ei_name), "nodeDeploymentMap"): return "deployed" else: return "ready"
def get_host_disk_size(disk)
-
Expand source code
def get_host_disk_size(disk): cmd = ['stat', '-fc', '{"path":"%n","fsid":"%i","type":"%T","freeBlock":%f,' '"totalBlock":%b,"blockSize":%S}', disk] # As the disk available storage is rounded off to 100Mb truncate_to = 100 * 1024 * 1024 output = subprocess.check_output(cmd) disk_info = json.loads(output) block_size = disk_info["blockSize"] free_blk = disk_info["freeBlock"] total_blk = disk_info["totalBlock"] free = int((free_blk * block_size) / truncate_to) * truncate_to total = (total_blk * block_size) return free, total
def get_host_replica_count(client, volume_name, host_id, chk_running=False)
-
Expand source code
def get_host_replica_count(client, volume_name, host_id, chk_running=False): volume = client.by_id_volume(volume_name) replica_count = 0 for replica in volume.replicas: if chk_running and not replica.running: continue if replica.hostId == host_id: replica_count += 1 return replica_count
def get_instance_manager_names(client, data_engine='v1')
-
Expand source code
def get_instance_manager_names(client, data_engine=DATA_ENGINE): ims = client.list_instance_manager() result = [] for im in ims: if im.dataEngine == data_engine: result.append(im.name) return result
def get_iscsi_ip(iscsi)
-
Expand source code
def get_iscsi_ip(iscsi): parsed = urlparse(iscsi) return parsed.hostname
def get_iscsi_lun(iscsi)
-
Expand source code
def get_iscsi_lun(iscsi): iscsi_endpoint = parse_iscsi_endpoint(iscsi) return iscsi_endpoint[2]
def get_iscsi_port(iscsi)
-
Expand source code
def get_iscsi_port(iscsi): parsed = urlparse(iscsi) return parsed.port
def get_iscsi_target(iscsi)
-
Expand source code
def get_iscsi_target(iscsi): iscsi_endpoint = parse_iscsi_endpoint(iscsi) return iscsi_endpoint[1]
def get_k8s_zone_label()
-
Expand source code
def get_k8s_zone_label(): ver_api = get_version_api_client() k8s_ver_data = ver_api.get_code() k8s_ver_major = k8s_ver_data.major assert k8s_ver_major == '1' k8s_ver_minor = k8s_ver_data.minor # k8s_ver_minor no needs to be an int # it could be "24+" in eks if int(re.sub('\\D', '', k8s_ver_minor)) >= 17: k8s_zone_label = K8S_ZONE_LABEL else: k8s_zone_label = DEPRECATED_K8S_ZONE_LABEL return k8s_zone_label
def get_liveness_probe_spec(initial_delay=5, period=5)
-
Expand source code
def get_liveness_probe_spec(initial_delay=5, period=5): pod_liveness_probe_spec = { "exec": { "command": [ "ls", "/data/lost+found" ] }, "initialDelaySeconds": initial_delay, "periodSeconds": period } return pod_liveness_probe_spec
def get_longhorn_api_client()
-
Expand source code
def get_longhorn_api_client(): for _ in range(RETRY_COUNTS): try: k8sconfig.load_incluster_config() ips = get_mgr_ips() # check if longhorn manager port is open before calling get_client for ip in ips: # Determine if IP is IPv6 family = socket.AF_INET6 if ':' in ip else socket.AF_INET sock = socket.socket(family, socket.SOCK_STREAM) sock.settimeout(RETRY_COUNTS_SHORT) try: if sock.connect_ex((ip, 9500)) == 0: return get_client(ip + PORT) finally: sock.close() except Exception: time.sleep(RETRY_INTERVAL) raise Exception("Failed to get Longhorn API client after retries")
def get_mgr_ips()
-
Expand source code
def get_mgr_ips(): ret = k8sclient.CoreV1Api().list_pod_for_all_namespaces( label_selector="app=longhorn-manager", watch=False) mgr_ips = [] for i in ret.items: mgr_ips.append(i.status.pod_ip) return mgr_ips
def get_node_by_disk_id(client, disk_id)
-
Expand source code
def get_node_by_disk_id(client, disk_id): # NOQA nodes = client.list_node() for node in nodes: disks = node.disks for name, disk in iter(disks.items()): if disk.diskUUID == disk_id: return node # should handle empty result in caller return ""
def get_nvmf_ip(nvmf)
-
Expand source code
def get_nvmf_ip(nvmf): nvmf_endpoint = parse_nvmf_endpoint(nvmf) return nvmf_endpoint[0].split(':')[0]
def get_nvmf_nqn(nvmf)
-
Expand source code
def get_nvmf_nqn(nvmf): nvmf_endpoint = parse_nvmf_endpoint(nvmf) return nvmf_endpoint[1]
def get_nvmf_port(nvmf)
-
Expand source code
def get_nvmf_port(nvmf): nvmf_endpoint = parse_nvmf_endpoint(nvmf) return nvmf_endpoint[0].split(':')[1]
def get_pod_data_md5sum(api, pod_name, path)
-
Expand source code
def get_pod_data_md5sum(api, pod_name, path): md5sum_command = [ '/bin/sh', '-c', 'md5sum ' + path + " | awk '{print $1}'" ] with timeout(seconds=STREAM_EXEC_TIMEOUT * 3, error_message='Timeout on executing stream md5sum'): return stream( api.connect_get_namespaced_pod_exec, pod_name, 'default', command=md5sum_command, stderr=True, stdin=False, stdout=True, tty=False)
def get_process_info(p_path)
-
Expand source code
def get_process_info(p_path): info = {} with open(p_path) as file: for line in file.readlines(): if 'Name:\t' == line[0:len('Name:\t')]: info["Name"] = line[len("Name:"):].strip() if 'Pid:\t' == line[0:len('Pid:\t')]: info["Pid"] = line[len("Pid:"):].strip() if 'PPid:\t' == line[0:len('PPid:\t')]: info["PPid"] = line[len("PPid:"):].strip() if "Name" not in info or "Pid" not in info or "PPid" not in info: return return info
def get_pv_manifest(request)
-
Expand source code
def get_pv_manifest(request): volume_name = generate_volume_name() pv_manifest = { 'apiVersion': 'v1', 'kind': 'PersistentVolume', 'metadata': { 'name': volume_name }, 'spec': { 'capacity': { 'storage': size_to_string(DEFAULT_VOLUME_SIZE * Gi) }, 'volumeMode': 'Filesystem', 'accessModes': ['ReadWriteOnce'], 'persistentVolumeReclaimPolicy': 'Delete', 'csi': { 'driver': 'driver.longhorn.io', 'fsType': 'ext4', 'volumeAttributes': { 'numberOfReplicas': DEFAULT_LONGHORN_PARAMS['numberOfReplicas'], 'staleReplicaTimeout': DEFAULT_LONGHORN_PARAMS['staleReplicaTimeout'] }, 'volumeHandle': volume_name } } } def finalizer(): api = get_core_api_client() delete_and_wait_pv(api, pv_manifest['metadata']['name']) client = get_longhorn_api_client() delete_and_wait_longhorn(client, pv_manifest['metadata']['name']) request.addfinalizer(finalizer) return pv_manifest
def get_pvc_manifest(request)
-
Expand source code
def get_pvc_manifest(request): pvc_manifest = { 'apiVersion': 'v1', 'kind': 'PersistentVolumeClaim', 'metadata': { 'name': generate_volume_name() }, 'spec': { 'accessModes': [ 'ReadWriteOnce' ], 'resources': { 'requests': { 'storage': size_to_string(DEFAULT_VOLUME_SIZE * Gi) } } } } def finalizer(): api = k8sclient.CoreV1Api() if not check_pvc_existence(api, pvc_manifest['metadata']['name']): return claim = api.read_namespaced_persistent_volume_claim( name=pvc_manifest['metadata']['name'], namespace='default') volume_name = claim.spec.volume_name api = get_core_api_client() delete_and_wait_pvc(api, pvc_manifest['metadata']['name']) # Working around line break issue. key = 'volume.beta.kubernetes.io/storage-provisioner' # If not using StorageClass (such as in CSI test), the Longhorn volume # will not be automatically deleted, causing this to throw an error. if (key in claim.metadata.annotations): client = get_longhorn_api_client() wait_for_volume_delete(client, volume_name) request.addfinalizer(finalizer) return pvc_manifest
def get_random_client(clients)
-
Expand source code
def get_random_client(clients): for _, client in iter(clients.items()): break return client
def get_scheduling_api_client()
-
Expand source code
def get_scheduling_api_client(): load_k8s_config() return k8sclient.SchedulingV1Api()
def get_self_host_id()
-
Expand source code
def get_self_host_id(): return os.environ.get("NODE_NAME")
def get_statefulset_pod_info(api, s_set)
-
Expand source code
def get_statefulset_pod_info(api, s_set): pod_info = [] for i in range(s_set['spec']['replicas']): pod_name = s_set['metadata']['name'] + '-' + str(i) pod = api.read_namespaced_pod(name=pod_name, namespace='default') pvc_name = pod.spec.volumes[0].persistent_volume_claim.claim_name pv_name = get_volume_name(api, pvc_name) pod_info.append({ 'pod_name': pod_name, 'pod_uid': pod.metadata.uid, 'pv_name': pv_name, 'pvc_name': pvc_name, }) return pod_info
def get_storage_api_client()
-
Expand source code
def get_storage_api_client(): load_k8s_config() return k8sclient.StorageV1Api()
def get_support_bundle(node_id, name, client)
-
Expand source code
def get_support_bundle(node_id, name, client): # NOQA url = get_support_bundle_url(client) resp = requests.get('{}/{}/{}'.format(url, node_id, name)) assert resp.status_code == 200 return resp.json()
def get_support_bundle_url(client)
-
Expand source code
def get_support_bundle_url(client): # NOQA return client._url.replace('schemas', 'supportbundles')
def get_update_disks(disks)
-
Expand source code
def get_update_disks(disks): update_disk = {} for key, disk in iter(disks.items()): update_disk[key] = disk return update_disk
def get_upgrade_test_image(cli_v, cli_minv, ctl_v, ctl_minv, data_v, data_minv)
-
Expand source code
def get_upgrade_test_image(cli_v, cli_minv, ctl_v, ctl_minv, data_v, data_minv): return "%s.%d-%d.%d-%d.%d-%d" % (UPGRADE_TEST_IMAGE_PREFIX, cli_v, cli_minv, ctl_v, ctl_minv, data_v, data_minv)
def get_version_api_client()
-
Expand source code
def get_version_api_client(): load_k8s_config() return k8sclient.VersionApi()
def get_volume_dev_mb_data_md5sum(path, offset_in_mb, length_in_mb)
-
Expand source code
def get_volume_dev_mb_data_md5sum(path, offset_in_mb, length_in_mb): md5sum_command = [ '/bin/sh', '-c', 'dd if=%s bs=1M skip=%d count=%d | md5sum' % (path, offset_in_mb, length_in_mb) ] with timeout(seconds=STREAM_EXEC_TIMEOUT * 5, error_message='Timeout on computing dev md5sum'): output = subprocess.check_output( md5sum_command).strip().decode('utf-8') return output.split(" ")[1]
def get_volume_endpoint(v)
-
Expand source code
def get_volume_endpoint(v): endpoint = check_volume_endpoint(v) return endpoint
def get_volume_engine(v)
-
Expand source code
def get_volume_engine(v): engines = v.controllers assert len(engines) != 0 return engines[0]
def get_volume_name(api, pvc_name)
-
Expand source code
def get_volume_name(api, pvc_name): # type: (dict) -> str """ Given a PersistentVolumeClaim, return the name of the associated PV. """ claim = api.read_namespaced_persistent_volume_claim( name=pvc_name, namespace='default') return claim.spec.volume_name
Given a PersistentVolumeClaim, return the name of the associated PV.
def get_volume_recurring_jobs_and_groups(volume)
-
Expand source code
def get_volume_recurring_jobs_and_groups(volume): volumeJobs = volume.recurringJobList() jobs = [] groups = [] for volumeJob in volumeJobs: if volumeJob['isGroup']: groups.append(volumeJob['name']) else: jobs.append(volumeJob['name']) return jobs, groups
def get_volume_running_replica_cnt(client, volume_name)
-
Expand source code
def get_volume_running_replica_cnt(client, volume_name): # NOQA nodes = client.list_node() cnt = 0 for node in nodes: cnt = cnt + get_host_replica_count( client, volume_name, node.name, chk_running=True) return cnt
def is_backupTarget_azurite(s)
-
Expand source code
def is_backupTarget_azurite(s): return s.startswith("azblob://")
def is_backupTarget_cifs(s)
-
Expand source code
def is_backupTarget_cifs(s): return s.startswith("cifs://")
def is_backupTarget_nfs(s)
-
Expand source code
def is_backupTarget_nfs(s): return s.startswith("nfs://")
def is_backupTarget_s3(s)
-
Expand source code
def is_backupTarget_s3(s): return s.startswith("s3://")
def is_k8s_node_gke_cos(core_api)
-
Expand source code
def is_k8s_node_gke_cos(core_api): return is_k8s_node_label(core_api, K8S_GKE_OS_DISTRO_LABEL, K8S_GKE_OS_DISTRO_COS, get_self_host_id())
def is_k8s_node_label(core_api, label_key, label_value, node_name)
-
Expand source code
def is_k8s_node_label(core_api, label_key, label_value, node_name): node = core_api.read_node(node_name) if label_key in node.metadata.labels: if node.metadata.labels[label_key] == label_value: return True return False
def is_replica_available(r)
-
Expand source code
def is_replica_available(r): return r is not None and r.running and not \ r.failedAt and r.mode == 'RW'
def iscsi_login(iscsi_ep)
-
Expand source code
def iscsi_login(iscsi_ep): ip = get_iscsi_ip(iscsi_ep) port = get_iscsi_port(iscsi_ep) target = get_iscsi_target(iscsi_ep) lun = get_iscsi_lun(iscsi_ep) # discovery cmd_discovery = "iscsiadm -m discovery -t st -p " + ip exec_nsenter(cmd_discovery, ISCSI_PROCESS) # login cmd_login = "iscsiadm -m node -T " + target + " -p " + ip + " --login" exec_nsenter(cmd_login, ISCSI_PROCESS) blk_name = "ip-%s:%s-iscsi-%s-lun-%s" % (ip, port, target, lun) wait_for_device_login(ISCSI_DEV_PATH, blk_name) dev = os.path.realpath(ISCSI_DEV_PATH + "/" + blk_name) return dev
def iscsi_logout(iscsi_ep)
-
Expand source code
def iscsi_logout(iscsi_ep): ip = get_iscsi_ip(iscsi_ep) target = get_iscsi_target(iscsi_ep) cmd_logout = "iscsiadm -m node -T " + target + " -p " + ip + " --logout" exec_nsenter(cmd_logout, ISCSI_PROCESS) cmd_rm_discovery = "iscsiadm -m discovery -p " + ip + " -o delete" exec_nsenter(cmd_rm_discovery, ISCSI_PROCESS)
def json_string_go_to_python(str)
-
Expand source code
def json_string_go_to_python(str): return str.replace("u\'", "\"").replace("\'", "\""). \ replace("True", "true").replace("False", "false")
def lazy_umount_disk(mount_path)
-
Expand source code
def lazy_umount_disk(mount_path): cmd = ['umount', '-l', mount_path] subprocess.check_call(cmd)
def load_k8s_config()
-
Expand source code
def load_k8s_config(): c = Configuration() c.assert_hostname = False Configuration.set_default(c) k8sconfig.load_incluster_config()
def make_deployment_cpu_request(request)
-
Expand source code
@pytest.fixture def make_deployment_cpu_request(request): def _generate_deployment_cpu_request_manifest(deployment_name, cpu_request, replicas=1): # NOQA make_deployment_cpu_request.deployment_manifest = { "apiVersion": "apps/v1", "kind": "Deployment", "metadata": { "name": deployment_name, "labels": { "name": deployment_name } }, "spec": { "replicas": replicas, "selector": { "matchLabels": { "name": deployment_name } }, "template": { "metadata": { "labels": { "name": deployment_name } }, "spec": { "containers": [ { "name": deployment_name, "image": "nginx:stable-alpine", "resources": { "limits": { "cpu": str(cpu_request * 2)+"m", "memory": "30Mi", }, "requests": { "cpu": str(cpu_request)+"m", "memory": "15Mi", } } } ], } } } } return make_deployment_cpu_request.deployment_manifest def finalizer(): apps_api = get_apps_api_client() deployment_name = \ make_deployment_cpu_request.deployment_manifest["metadata"]["name"] delete_and_wait_deployment( apps_api, deployment_name ) request.addfinalizer(finalizer) return _generate_deployment_cpu_request_manifest
def make_deployment_with_pvc(request)
-
Expand source code
@pytest.fixture def make_deployment_with_pvc(request): def _generate_deployment_with_pvc_manifest(deployment_name, pvc_name, replicas=1): # NOQA if not hasattr(make_deployment_with_pvc, 'deployment_manifests'): make_deployment_with_pvc.deployment_manifests = [] make_deployment_with_pvc.deployment_manifests.append({ "apiVersion": "apps/v1", "kind": "Deployment", "metadata": { "name": deployment_name, "labels": { "name": deployment_name } }, "spec": { "replicas": replicas, "selector": { "matchLabels": { "name": deployment_name } }, "template": { "metadata": { "labels": { "name": deployment_name } }, "spec": { "containers": [ { "name": deployment_name, "image": "nginx:stable-alpine", "volumeMounts": [ { "name": "volv", "mountPath": "/data" } ] } ], "volumes": [ { "name": "volv", "persistentVolumeClaim": { "claimName": pvc_name } } ] } } } }) return make_deployment_with_pvc.deployment_manifests[-1] def finalizer(): apps_api = get_apps_api_client() if not hasattr(make_deployment_with_pvc, 'deployment_manifests'): return for deployment_manifest in \ make_deployment_with_pvc.deployment_manifests: deployment_name = deployment_manifest["metadata"]["name"] delete_and_wait_deployment( apps_api, deployment_name ) request.addfinalizer(finalizer) return _generate_deployment_with_pvc_manifest
def monitor_restore_progress(client, volume_name)
-
Expand source code
def monitor_restore_progress(client, volume_name): completed = 0 rs = {} for i in range(RETRY_COUNTS_LONG): completed = 0 v = client.by_id_volume(volume_name) rs = v.restoreStatus for r in rs: assert r.error == "" if r.state == "complete": assert r.progress == 100 completed += 1 if completed == len(rs): break time.sleep(RETRY_INTERVAL) assert completed == len(rs) return v
def mount_disk(dev, mount_path)
-
Expand source code
def mount_disk(dev, mount_path): # create directory before mount cmd = ['mkdir', '-p', mount_path] subprocess.check_call(cmd) cmd = ['mount', dev, mount_path] subprocess.check_call(cmd)
-
Expand source code
@pytest.yield_fixture def node_default_tags(): """ Assign the Tags under DEFAULT_TAGS to the Longhorn client's Nodes to provide a base set of Tags to work with in the tests. :return: A dictionary mapping a Node's ID to the Tags it has. """ client = get_longhorn_api_client() # NOQA nodes = client.list_node() assert len(nodes) == 3 tag_mappings = {} for tags, node in zip(DEFAULT_TAGS, nodes): if DATA_ENGINE == "v2": # if the v2 data engine is enabled, both a file system disk # and a block disk will coexist. This is because a v2 backing image # requires a file system disk to function. assert len(node.disks) == 2 else: assert len(node.disks) == 1 update_disks = get_update_disks(node.disks) update_disks[list(update_disks)[0]].tags = tags["disk"] new_node = update_node_disks(client, node.name, disks=update_disks, retry=True) disks = get_update_disks(new_node.disks) assert disks[list(new_node.disks)[0]].tags == tags["disk"] new_node = set_node_tags(client, node, tags["node"]) assert new_node.tags == tags["node"] tag_mappings[node.id] = tags yield tag_mappings client = get_longhorn_api_client() # NOQA nodes = client.list_node() for node in nodes: update_disks = get_update_disks(node.disks) update_disks[list(update_disks)[0]].tags = [] new_node = update_node_disks(client, node.name, disks=update_disks, retry=True) disks = get_update_disks(new_node.disks) assert len(disks[list(new_node.disks)[0]].tags) == 0, \ f" disk = {disks}" new_node = set_node_tags(client, node) assert len(new_node.tags) == 0, f" Node = {new_node}"
Assign the Tags under DEFAULT_TAGS to the Longhorn client's Nodes to provide a base set of Tags to work with in the tests. :return: A dictionary mapping a Node's ID to the Tags it has.
def nvmf_login(nvmf)
-
Expand source code
def nvmf_login(nvmf): # Related commands are documented at: # https://github.com/longhorn/longhorn-tests/wiki/Connect-to-the-NVMf-frontend-volume # NOQA ip = get_nvmf_ip(nvmf) port = get_nvmf_port(nvmf) # NVMe Qualified Name nqn = get_nvmf_nqn(nvmf) cmd_connect = f"nvme connect -t tcp -a {ip} -s {port} -n {nqn}" subprocess.check_output(cmd_connect.split()) return wait_for_nvme_device()
def nvmf_logout(nvmf)
-
Expand source code
def nvmf_logout(nvmf): nqn = get_nvmf_nqn(nvmf) try: subprocess.check_call(["nvme", "disconnect", "-n", nqn]) print(f"Disconnected from {nqn}") except subprocess.CalledProcessError as e: print(f"Failed to disconnect from {nqn}: {e}")
def offline_expand_attached_volume(client, volume_name, size='67108864')
-
Expand source code
def offline_expand_attached_volume(client, volume_name, size=EXPAND_SIZE): volume = wait_for_volume_healthy(client, volume_name) engine = get_volume_engine(volume) volume.detach() volume = wait_for_volume_detached(client, volume.name) volume.expand(size=size) wait_for_volume_expansion(client, volume.name) volume = wait_for_volume_detached(client, volume.name) volume.attach(hostId=engine.hostId, disableFrontend=False) wait_for_volume_healthy(client, volume_name)
def parse_iscsi_endpoint(iscsi)
-
Expand source code
def parse_iscsi_endpoint(iscsi): iscsi_endpoint = iscsi[8:] return iscsi_endpoint.split('/')
def parse_nvmf_endpoint(nvmf)
-
Expand source code
def parse_nvmf_endpoint(nvmf): return nvmf[7:].split('/')
def pod(request)
-
Expand source code
@pytest.fixture def pod(request): pod_manifest = { 'apiVersion': 'v1', 'kind': 'Pod', 'metadata': { 'name': 'test-pod' }, 'spec': { 'containers': [{ 'image': 'busybox:1.34.0', 'imagePullPolicy': 'IfNotPresent', 'name': 'sleep', "args": [ "/bin/sh", "-c", "while true;do date;sleep 5; done" ], "volumeMounts": [{ 'name': 'pod-data', 'mountPath': '/data' }], }], 'volumes': [] } } def finalizer(): api = get_core_api_client() delete_and_wait_pod(api, pod_manifest['metadata']['name']) request.addfinalizer(finalizer) return pod_manifest
def pod_make(request)
-
Expand source code
@pytest.fixture def pod_make(request): def make_pod(name='test-pod'): pod_manifest = { 'apiVersion': 'v1', 'kind': 'Pod', 'metadata': { 'name': name }, 'spec': { 'containers': [{ 'image': 'busybox:1.34.0', 'imagePullPolicy': 'IfNotPresent', 'name': 'sleep', "args": [ "/bin/sh", "-c", "while true; do date; sleep 5; done" ], "volumeMounts": [{ 'name': 'pod-data', 'mountPath': '/data' }], }], 'volumes': [] } } def finalizer(): api = get_core_api_client() try: pod_name = pod_manifest['metadata']['name'] delete_and_wait_pod(api, pod_name) except Exception as e: print("\nException when waiting for pod deletion") print(e) return try: volume_details = pod_manifest['spec']['volumes'][0] pvc_name = volume_details['persistentVolumeClaim']['claimName'] delete_and_wait_pvc(api, pvc_name) except Exception as e: print("\nException when waiting for PVC deletion") print(e) try: found = False pvs = api.list_persistent_volume() for item in pvs.items: if item.spec.claim_ref.name == pvc_name: pv = item found = True break if found: pv_name = pv.metadata.name delete_and_wait_pv(api, pv_name) except Exception as e: print("\nException when waiting for PV deletion") print(e) request.addfinalizer(finalizer) return pod_manifest return make_pod
def prepare_host_disk(dev, vol_name)
-
Expand source code
def prepare_host_disk(dev, vol_name): cmd = ['mkfs.ext4', dev] subprocess.check_call(cmd) mount_path = os.path.join(DIRECTORY_PATH, vol_name) mount_disk(dev, mount_path) return mount_path
def prepare_pod_with_data_in_mb(client,
core_api,
csi_pv,
pvc,
pod_make,
volume_name,
volume_size='1073741824',
num_of_replicas=3,
data_path='/data/test',
data_size_in_mb=100,
add_liveness_probe=True,
access_mode='rwo',
data_engine='v1')-
Expand source code
def prepare_pod_with_data_in_mb( client, core_api, csi_pv, pvc, pod_make, volume_name, volume_size=str(1*Gi), num_of_replicas=3, data_path="/data/test", data_size_in_mb=DATA_SIZE_IN_MB_1, add_liveness_probe=True, access_mode=ACCESS_MODE_RWO, data_engine=DATA_ENGINE):# NOQA: pod_name = volume_name + "-pod" pv_name = volume_name pvc_name = volume_name + "-pvc" pod = pod_make(name=pod_name) csi_pv['metadata']['name'] = pv_name csi_pv['spec']['csi']['volumeHandle'] = volume_name csi_pv['spec']['capacity']['storage'] = volume_size pvc['metadata']['name'] = pvc_name pvc['spec']['volumeName'] = pv_name pvc['spec']['resources']['requests']['storage'] = volume_size pvc['spec']['storageClassName'] = '' pod['spec']['volumes'] = [create_pvc_spec(pvc_name)] if add_liveness_probe is True: pod_liveness_probe_spec = \ get_liveness_probe_spec(initial_delay=1, period=1) pod['spec']['containers'][0]['livenessProbe'] = \ pod_liveness_probe_spec create_and_check_volume(client, volume_name, num_of_replicas=num_of_replicas, size=volume_size, access_mode=access_mode, data_engine=data_engine) core_api.create_persistent_volume(csi_pv) core_api.create_namespaced_persistent_volume_claim( body=pvc, namespace='default') create_and_wait_pod(core_api, pod) write_pod_volume_random_data(core_api, pod_name, data_path, data_size_in_mb) md5sum = get_pod_data_md5sum(core_api, pod_name, data_path) stream(core_api.connect_get_namespaced_pod_exec, pod_name, 'default', command=["sync"], stderr=True, stdin=False, stdout=True, tty=False) return pod_name, pv_name, pvc_name, md5sum
def prepare_statefulset_with_data_in_mb(client,
core_api,
statefulset,
sts_name,
storage_class,
data_path='/data/test',
data_size_in_mb=100)-
Expand source code
def prepare_statefulset_with_data_in_mb( client, core_api, statefulset, sts_name, storage_class, data_path="/data/test", data_size_in_mb=DATA_SIZE_IN_MB_1): update_statefulset_manifests(statefulset, storage_class, sts_name) statefulset['spec']['replicas'] = 1 create_storage_class(storage_class) create_and_wait_statefulset(statefulset) pod_info = get_statefulset_pod_info(core_api, statefulset) volumes = client.list_volume() assert len(volumes) == statefulset['spec']['replicas'] vol_name = None pod_name = None md5sum = None for v in volumes: info = pod_info[0] if v.name == info['pv_name']: write_pod_volume_random_data(core_api, info['pod_name'], data_path, data_size_in_mb) md5sum = get_pod_data_md5sum(core_api, info['pod_name'], data_path) stream(core_api.connect_get_namespaced_pod_exec, info['pod_name'], 'default', command=["sync"], stderr=True, stdin=False, stdout=True, tty=False) vol_name = v.name pod_name = info['pod_name'] break assert vol_name is not None assert pod_name is not None assert md5sum is not None return vol_name, pod_name, md5sum
def priority_class(request)
-
Expand source code
@pytest.fixture def priority_class(request): priority_class = { 'apiVersion': 'scheduling.k8s.io/v1', 'kind': 'PriorityClass', 'metadata': { 'name': PRIORITY_CLASS_NAME + "-" + ''.join( random.choice(string.ascii_lowercase + string.digits) for _ in range(6)) }, 'value': random.randrange(PRIORITY_CLASS_MIN, PRIORITY_CLASS_MAX) } def finalizer(): # ensure that the priority class gets unset for longhorn # before deleting the class client = get_longhorn_api_client() setting = client.by_id_setting(SETTING_PRIORITY_CLASS) setting = client.update(setting, value='') assert setting.value == '' api = get_scheduling_api_client() try: api.delete_priority_class(name=priority_class['metadata']['name'], body=k8sclient.V1DeleteOptions()) except ApiException as e: assert e.status == 404 request.addfinalizer(finalizer) return priority_class
def pvc(request)
-
Expand source code
@pytest.fixture def pvc(request): return get_pvc_manifest(request)
def pvc_backingimage(request)
-
Expand source code
@pytest.fixture def pvc_backingimage(request): pvc_manifest = get_pvc_manifest(request) pvc_manifest['spec']['resources']['requests']['storage'] = \ size_to_string(BACKING_IMAGE_EXT4_SIZE) return pvc_manifest
def pvc_name(request)
-
Expand source code
@pytest.fixture def pvc_name(request): return generate_volume_name()
def random_labels()
-
Expand source code
@pytest.fixture def random_labels(): labels = {} i = 0 while i < 3: key = "label/" + "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(6)) if not labels.get(key): labels["key"] = generate_random_data(VOLUME_RWTEST_SIZE) i += 1 return labels
def read_pod_block_volume_data(api, pod_name, data_size, offset, device_path)
-
Expand source code
def read_pod_block_volume_data(api, pod_name, data_size, offset, device_path): read_command = [ '/bin/sh', '-c', 'dd if=' + device_path + ' status=none bs=' + str(data_size) + ' count=1 skip=' + str(offset) ] with timeout(seconds=STREAM_EXEC_TIMEOUT, error_message='Timeout on executing stream read'): return stream( api.connect_get_namespaced_pod_exec, pod_name, 'default', command=read_command, stderr=True, stdin=False, stdout=True, tty=False)
def read_volume_data(api, pod_name, filename='test')
-
Expand source code
def read_volume_data(api, pod_name, filename='test'): """ Retrieve data from a Pod's volume. Args: api: An instance of CoreV1API. pod_name: The name of the Pod. Returns: The data contained within the volume. """ read_command = [ '/bin/sh', '-c', 'cat /data/' + filename ] with timeout(seconds=STREAM_EXEC_TIMEOUT, error_message='Timeout on executing stream read'): return stream( api.connect_get_namespaced_pod_exec, pod_name, 'default', command=read_command, stderr=True, stdin=False, stdout=True, tty=False)
Retrieve data from a Pod's volume.
Args
api
- An instance of CoreV1API.
pod_name
- The name of the Pod.
Returns
The data contained within the volume.
def recurring_job_feature_supported(client)
-
Expand source code
def recurring_job_feature_supported(client): if hasattr(client.by_id_schema("volumeRecurringJob"), "id"): return True else: return False
def remount_volume_read_only(client, core_api, volume_name)
-
Expand source code
def remount_volume_read_only(client, core_api, volume_name): volume_name_hash = hashlib.sha256(volume_name.encode()).hexdigest() volume = client.by_id_volume(volume_name) instance_manager_name = volume.controllers[0].instanceManagerName print(f"Remounting volume {volume_name} as read-only: {volume_name_hash}") command = [ '/bin/sh', '-c', f"mount -o remount,ro /host/var/lib/kubelet/plugins/kubernetes.io/csi/driver.longhorn.io/{volume_name_hash}/globalmount" # NOQA ] with timeout(seconds=STREAM_EXEC_TIMEOUT, error_message='Timeout on executing stream read'): stream(core_api.connect_get_namespaced_pod_exec, instance_manager_name, LONGHORN_NAMESPACE, command=command, stderr=True, stdin=False, stdout=True, tty=False)
def reset_disks_for_all_nodes(client, add_block_disks=False)
-
Expand source code
def reset_disks_for_all_nodes(client, add_block_disks=False): # NOQA default_disks = { "v1": { "path": DEFAULT_DISK_PATH, "type": "filesystem", "default": True, } } if v2_data_engine_cr_supported(client): enable_v2 = os.environ.get('RUN_V2_TEST') if enable_v2 == "true" or add_block_disks is True: default_disks["v2"] = { "path": BLOCK_DEV_PATH, "type": "block", "default": True, } default_disks["v1"]["default"] = False nodes = client.list_node() for n in nodes: node = n # Captures the correct value of n in the closure. # Reset default disk if default disks are not the only disks # on the node. cleanup_required = False if len(node.disks) != len(default_disks): cleanup_required = True for name, disk in iter(node.disks.items()): if cleanup_required: break if disk.path not in [v["path"] for v in default_disks.values()]: cleanup_required = True break if name == "default-disk": for data_engine, disk in default_disks.items(): if not disk["default"]: continue if disk["path"] != node.disks[name].path: cleanup_required = True break if cleanup_required: update_disks = get_update_disks(node.disks) for disk_name, disk in iter(update_disks.items()): disk.allowScheduling = False update_disks[disk_name] = disk node = update_node_disks(client, node.name, disks=update_disks, retry=True) update_disks = {} node = update_node_disks(client, node.name, disks=update_disks, retry=True) node = wait_for_disk_update(client, node.name, 0) if len(node.disks) != len(default_disks): update_disks = {} for data_engine, disk in default_disks.items(): disk_name = data_engine if disk["default"]: disk_name = "default-disk" update_disks[disk_name] = { "path": disk["path"], "diskType": disk["type"], "allowScheduling": True } node = update_node_disks(client, node.name, disks=update_disks, retry=True) node = wait_for_disk_update(client, node.name, len(default_disks)) assert len(node.disks) == len(default_disks) # wait for node controller to update disk status disks = node.disks update_disks = {} for name, disk in iter(disks.items()): update_disk = disk update_disk.allowScheduling = True if disk.diskType == "filesystem": reserved_storage = int(update_disk.storageMaximum * 30 / 100) else: reserved_storage = 0 update_disk.storageReserved = reserved_storage update_disk.tags = [] update_disks[name] = update_disk node = update_node_disks(client, node.name, disks=update_disks, retry=True) for name, disk in iter(node.disks.items()): # wait for node controller update disk status wait_for_disk_status(client, node.name, name, "allowScheduling", True) wait_for_disk_status(client, node.name, name, "storageScheduled", 0) expected_reserved_storage = 0 if disk.diskType == "filesystem": expected_reserved_storage = reserved_storage wait_for_disk_status(client, node.name, name, "storageReserved", expected_reserved_storage)
def reset_engine_image(client)
-
Expand source code
def reset_engine_image(client): core_api = get_core_api_client() ready = False for i in range(RETRY_COUNTS): ready = True ei_list = client.list_engine_image().data for ei in ei_list: if ei.default: if ei.state != get_engine_image_status_value(client, ei.name): ready = False else: wait_for_engine_image_ref_count(client, ei.name, 0) client.delete(ei) wait_for_engine_image_deletion(client, core_api, ei.name) if ready: break time.sleep(RETRY_INTERVAL) assert ready
def reset_longhorn_node_zone(client)
-
Expand source code
def reset_longhorn_node_zone(client): core_api = get_core_api_client() # No need to reset zone label for GKE COS node as the node zone label is # periodically updated with the actual GCP zone. # https://github.com/longhorn/longhorn-tests/pull/1819 if is_k8s_node_gke_cos(core_api): return nodes = client.list_node() for n in nodes: set_k8s_node_zone_label(core_api, n.name, None) wait_longhorn_node_zone_reset(client)
def reset_node(client, core_api)
-
Expand source code
def reset_node(client, core_api): # remove nodes taint reset_nodes_taint(client) nodes = client.list_node() for node in nodes: try: set_node_cordon(core_api, node.id, False) node = client.by_id_node(node.id) node = set_node_tags(client, node, tags=[]) node = wait_for_node_tag_update(client, node.id, []) node = set_node_scheduling(client, node, allowScheduling=True) wait_for_node_update(client, node.id, "allowScheduling", True) except Exception as e: print("\nException when reset node scheduling and tags", node) print(e) managed_k8s_cluster = os.getenv("MANAGED_K8S_CLUSTER").lower() == 'true' if not managed_k8s_cluster: reset_longhorn_node_zone(client)
def reset_nodes_taint(client)
-
Expand source code
def reset_nodes_taint(client): core_api = get_core_api_client() nodes = client.list_node() for node in nodes: core_api.patch_node(node.id, { "spec": {"taints": []} })
def reset_settings(client)
-
Expand source code
def reset_settings(client): for setting in client.list_setting(): setting_name = setting.name setting_default_value = setting.definition.default setting_readonly = setting.definition.readOnly # We don't provide the setup for the storage network, hence there is no # default value. We need to skip here to avoid test failure when # resetting this to an empty default value. if setting_name == "storage-network": continue # The test CI deploys Longhorn with the setting value longhorn-critical # for the setting priority-class. Don't reset it to empty (which is # the default value defined in longhorn-manager code) because this will # restart Longhorn managed components and fail the test cases. # https://github.com/longhorn/longhorn/issues/7413#issuecomment-1881707958 if setting.name == SETTING_PRIORITY_CLASS: continue # The version of the support bundle kit will be specified by a command # option when starting the manager. And setting requires a value. # # Longhorn has a default version for each release provided to the # manager when starting. Meaning this setting doesn't have a default # value. # # The design grants the ability to update later by cases for # troubleshooting purposes. Meaning this setting is editable. # # So we need to skip here to avoid test failure when resetting this to # an empty default value. if setting_name == "support-bundle-manager-image": continue if setting_name == "registry-secret": continue if setting_name == "v2-data-engine": if v2_data_engine_cr_supported(client): setting = client.by_id_setting(SETTING_V2_DATA_ENGINE) try: client.update(setting, value="true") except Exception as e: print(f"\nException setting {setting_name} to true") print(e) continue s = client.by_id_setting(setting_name) if s.value != setting_default_value and not setting_readonly: try: client.update(s, value=setting_default_value) except Exception as e: print("\nException when resetting ", setting_name, " to value: ", setting_default_value) print(s) print(e)
def restart_and_wait_ready_engine_count(client, ready_engine_count)
-
Expand source code
def restart_and_wait_ready_engine_count(client, ready_engine_count): # NOQA """ Delete/restart engine daemonset and wait ready engine image count after daemonset restart """ apps_api = get_apps_api_client() default_img = get_default_engine_image(client) ds_name = "engine-image-" + default_img.name apps_api.delete_namespaced_daemon_set(ds_name, LONGHORN_NAMESPACE) wait_for_engine_image_condition(client, default_img.name, "False") wait_for_engine_image_state(client, default_img.name, "deploying") wait_for_running_engine_image_count(default_img.name, ready_engine_count)
Delete/restart engine daemonset and wait ready engine image count after daemonset restart
def restore_backup_and_get_data_checksum(client, core_api, backup, pod, file_name='', command='')
-
Expand source code
def restore_backup_and_get_data_checksum(client, core_api, backup, pod, file_name='', command=''): """ Restore the backup in a pod and get the checksum of all the files or checksum of a particular file. Args: client: The Longhorn client to use in the request. core_api: An instance of CoreV1API. backup: The backup to be restored. pod: Pod fixture. file_name: Optional - File whose checksum to be computed. command: Optional - command to be executed in the pod. Returns: The checksum as a dictionary as in file_name=checksum and the output of the command executed in the pod. """ restore_volume_name = generate_volume_name() + "-restore" restore_pod_name = restore_volume_name + "-pod" restore_pv_name = restore_volume_name + "-pv" restore_pvc_name = restore_volume_name + "-pvc" data_checksum = {} client.create_volume(name=restore_volume_name, size=str(1 * Gi), fromBackup=backup.url, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, restore_volume_name) create_pv_for_volume(client, core_api, volume, restore_pv_name) create_pvc_for_volume(client, core_api, volume, restore_pvc_name) pod['metadata']['name'] = restore_pod_name pod['spec']['volumes'] = [{ 'name': pod['spec']['containers'][0]['volumeMounts'][0]['name'], 'persistentVolumeClaim': { 'claimName': restore_pvc_name, }, }] create_and_wait_pod(core_api, pod) restore_volume = client.by_id_volume(restore_volume_name) assert restore_volume[VOLUME_FIELD_ROBUSTNESS] == VOLUME_ROBUSTNESS_HEALTHY if file_name == '': file_list = exec_command_in_pod(core_api, 'ls /data', restore_pod_name, 'default') file_list = file_list.strip() file_list = file_list.split('\n') if len(file_list) > 0: for file_name in file_list: data_path = '/data/' + file_name data_checksum[file_name] = \ get_pod_data_md5sum(core_api, restore_pod_name, data_path) else: data_path = '/data/' + file_name data_checksum[file_name] = get_pod_data_md5sum(core_api, restore_pod_name, data_path) # This is optional, if you want to execute any command and get the output output = '' if command != '': output = exec_command_in_pod(core_api, command, restore_pod_name, 'default') return data_checksum, output, restore_pod_name
Restore the backup in a pod and get the checksum of all the files or checksum of a particular file.
Args
client
- The Longhorn client to use in the request.
core_api
- An instance of CoreV1API.
backup
- The backup to be restored.
pod
- Pod fixture.
file_name
- Optional - File whose checksum to be computed.
command
- Optional - command to be executed in the pod.
Returns
The checksum as a dictionary as in file_name=checksum and the output of the command executed in the pod.
def rwx_statefulset(request)
-
Expand source code
@pytest.fixture def rwx_statefulset(request): statefulset_manifest = { 'apiVersion': 'apps/v1', 'kind': 'StatefulSet', 'metadata': { 'name': 'rwx-test-statefulset', 'namespace': 'default', }, 'spec': { 'selector': { 'matchLabels': { 'app': 'rwx-test-statefulset' } }, 'serviceName': 'rwx-test-statefulset', 'replicas': 1, 'template': { 'metadata': { 'labels': { 'app': 'rwx-test-statefulset' } }, 'spec': { 'terminationGracePeriodSeconds': 10, 'containers': [{ 'image': 'busybox:1.34.0', 'imagePullPolicy': 'IfNotPresent', 'name': 'sleep', 'args': [ '/bin/sh', '-c', 'while true;do date;sleep 5; done' ], 'volumeMounts': [{ 'name': 'pod-data', 'mountPath': '/data' }] }] } }, 'volumeClaimTemplates': [{ 'metadata': { 'name': 'pod-data' }, 'spec': { 'accessModes': [ 'ReadWriteMany' ], 'storageClassName': 'longhorn', 'resources': { 'requests': { 'storage': size_to_string( DEFAULT_VOLUME_SIZE * Gi) } } } }] } } def finalizer(): api = get_core_api_client() client = get_longhorn_api_client() delete_and_wait_statefulset(api, client, statefulset_manifest) request.addfinalizer(finalizer) return statefulset_manifest
def scale_up_engine_image_daemonset(client)
-
Expand source code
def scale_up_engine_image_daemonset(client): apps_api = get_apps_api_client() default_img = get_default_engine_image(client) ds_name = "engine-image-" + default_img.name body = [{ "op": "replace", "path": "/spec/template/spec/nodeSelector", "value": None }] try: apps_api.patch_namespaced_daemon_set( name=ds_name, namespace='longhorn-system', body=body) except ApiException as e: # for scaling up a running daemond set, # the status_code is 422 server error. assert e.status == 422 # make sure default engine image deployed ready wait_for_deployed_engine_image_count(client, default_img.name, 3)
def scheduling_api(request)
-
Expand source code
@pytest.fixture def scheduling_api(request): """ Create a new SchedulingV1API instance. Returns: A new CoreV1API Instance. """ c = Configuration() c.assert_hostname = False Configuration.set_default(c) k8sconfig.load_incluster_config() scheduling_api = k8sclient.SchedulingV1Api() return scheduling_api
Create a new SchedulingV1API instance.
Returns
A new CoreV1API Instance.
def set_and_wait_k8s_nodes_zone_label(core_api, node_zone_map)
-
Expand source code
def set_and_wait_k8s_nodes_zone_label(core_api, node_zone_map): k8s_zone_label = get_k8s_zone_label() for _ in range(RETRY_COUNTS): for node_name, zone_name in node_zone_map.items(): set_k8s_node_label(core_api, node_name, k8s_zone_label, zone_name) is_updated = False for node_name, zone_name in node_zone_map.items(): is_updated = \ is_k8s_node_label(core_api, k8s_zone_label, zone_name, node_name) if not is_updated: break if is_updated: break time.sleep(RETRY_INTERVAL) assert is_updated, \ f"Timeout while waiting for nodes zone label to be updated\n" \ f"Expected: {node_zone_map}"
def set_k8s_node_label(core_api, node_name, key, value)
-
Expand source code
def set_k8s_node_label(core_api, node_name, key, value): payload = { "metadata": { "labels": { key: value} } } core_api.patch_node(node_name, body=payload)
def set_k8s_node_zone_label(core_api, node_name, zone_name)
-
Expand source code
def set_k8s_node_zone_label(core_api, node_name, zone_name): if is_k8s_node_label(core_api, K8S_ZONE_LABEL, zone_name, node_name): return k8s_zone_label = get_k8s_zone_label() set_k8s_node_label(core_api, node_name, k8s_zone_label, zone_name)
def set_node_cordon(api, node_name, to_cordon)
-
Expand source code
def set_node_cordon(api, node_name, to_cordon): """ Set a kubernetes node schedulable status """ payload = { "spec": { "unschedulable": to_cordon } } api.patch_node(node_name, payload)
Set a kubernetes node schedulable status
def set_node_scheduling(client, node, allowScheduling, retry=False)
-
Expand source code
def set_node_scheduling(client, node, allowScheduling, retry=False): if node.tags is None: node.tags = [] if not retry: return client.update(node, allowScheduling=allowScheduling, tags=node.tags) # Retry if "too many retries error" happened. for _ in range(NODE_UPDATE_RETRY_COUNT): try: node = client.update(node, allowScheduling=allowScheduling, tags=node.tags) except Exception as e: if disk_being_syncing in str(e.error.message): time.sleep(NODE_UPDATE_RETRY_INTERVAL) continue print(e) raise else: break return node
def set_node_scheduling_eviction(client, node, allowScheduling, evictionRequested, retry=False)
-
Expand source code
def set_node_scheduling_eviction(client, node, allowScheduling, evictionRequested, retry=False): # NOQA if node.tags is None: node.tags = [] if not retry: node = client.update(node, allowScheduling=allowScheduling, evictionRequested=evictionRequested) # Retry if "too many retries error" happened. for _ in range(NODE_UPDATE_RETRY_COUNT): try: node = client.update(node, allowScheduling=allowScheduling, evictionRequested=evictionRequested) except Exception as e: if disk_being_syncing in str(e.error.message): time.sleep(NODE_UPDATE_RETRY_INTERVAL) continue print(e) raise else: break return node
-
Expand source code
def set_node_tags(client, node, tags=[], retry=False): # NOQA """ Set the tags on a node without modifying its scheduling status. Retry if "too many retries error" happened. :param client: The Longhorn client to use in the request. :param node: The Node to update. :param tags: The tags to set on the node. :return: The updated Node. """ if not retry: return client.update(node, allowScheduling=node.allowScheduling, tags=tags) for _ in range(NODE_UPDATE_RETRY_COUNT): try: node = client.update(node, allowScheduling=node.allowScheduling, tags=tags) except Exception as e: if disk_being_syncing in str(e.error.message): time.sleep(NODE_UPDATE_RETRY_INTERVAL) continue print(e) raise else: break return node
Set the tags on a node without modifying its scheduling status. Retry if "too many retries error" happened. :param client: The Longhorn client to use in the request. :param node: The Node to update. :param tags: The tags to set on the node. :return: The updated Node.
-
Expand source code
def set_tags_for_node_and_its_disks(client, node, tags): # NOQA if len(tags) == 0: expected_tags = [] else: expected_tags = list(tags) for disk_name in node.disks.keys(): node.disks[disk_name].tags = tags node = update_node_disks(client, node.name, disks=node.disks) for disk_name in node.disks.keys(): assert node.disks[disk_name].tags == expected_tags node = set_node_tags(client, node, tags) assert node.tags == expected_tags return node
def settings_reset()
-
Expand source code
@pytest.fixture def settings_reset(): yield client = get_longhorn_api_client() reset_settings(client)
def size_to_string(volume_size)
-
Expand source code
def size_to_string(volume_size): # type: (int) -> str """ Convert a volume size to string format to pass into Kubernetes. Args: volume_size: The size of the volume in bytes. Returns: The size of the volume in gigabytes as a passable string to Kubernetes. """ if volume_size >= Gi: return str(volume_size >> 30) + 'Gi' elif volume_size >= Mi: return str(volume_size >> 20) + 'Mi' else: return str(volume_size >> 10) + 'Ki'
Convert a volume size to string format to pass into Kubernetes.
Args
volume_size
- The size of the volume in bytes.
Returns
The size of the volume in gigabytes as a passable string to Kubernetes.
def statefulset(request)
-
Expand source code
@pytest.fixture def statefulset(request): statefulset_manifest = { 'apiVersion': 'apps/v1', 'kind': 'StatefulSet', 'metadata': { 'name': 'test-statefulset', 'namespace': 'default', }, 'spec': { 'selector': { 'matchLabels': { 'app': 'test-statefulset' } }, 'serviceName': 'test-statefulset', 'replicas': 2, 'template': { 'metadata': { 'labels': { 'app': 'test-statefulset' } }, 'spec': { 'terminationGracePeriodSeconds': 10, 'containers': [{ 'image': 'busybox:1.34.0', 'imagePullPolicy': 'IfNotPresent', 'name': 'sleep', 'args': [ '/bin/sh', '-c', 'while true;do date;sleep 5; done' ], 'volumeMounts': [{ 'name': 'pod-data', 'mountPath': '/data' }] }] } }, 'volumeClaimTemplates': [{ 'metadata': { 'name': 'pod-data' }, 'spec': { 'accessModes': [ 'ReadWriteOnce' ], 'storageClassName': DEFAULT_STORAGECLASS_NAME, 'resources': { 'requests': { 'storage': size_to_string( DEFAULT_VOLUME_SIZE * Gi) } } } }] } } def finalizer(): api = get_core_api_client() client = get_longhorn_api_client() delete_and_wait_statefulset(api, client, statefulset_manifest) request.addfinalizer(finalizer) return statefulset_manifest
def storage_class(request)
-
Expand source code
@pytest.fixture def storage_class(request): sc_manifest = { 'apiVersion': 'storage.k8s.io/v1', 'kind': 'StorageClass', 'metadata': { 'name': DEFAULT_STORAGECLASS_NAME }, 'provisioner': 'driver.longhorn.io', 'allowVolumeExpansion': True, 'parameters': { 'numberOfReplicas': DEFAULT_LONGHORN_PARAMS['numberOfReplicas'], 'staleReplicaTimeout': DEFAULT_LONGHORN_PARAMS['staleReplicaTimeout'] }, 'reclaimPolicy': 'Delete' } def finalizer(): api = get_storage_api_client() try: api.delete_storage_class(name=sc_manifest['metadata']['name'], body=k8sclient.V1DeleteOptions()) except ApiException as e: assert e.status == 404 request.addfinalizer(finalizer) return sc_manifest
def sts_name(request)
-
Expand source code
@pytest.fixture def sts_name(request): return generate_sts_name()
def system_backup_feature_supported(client)
-
Expand source code
def system_backup_feature_supported(client): if hasattr(client.by_id_schema("systemBackup"), "id"): return True else: return False
def system_backup_random_name()
-
Expand source code
def system_backup_random_name(): return "test-system-backup-" + \ ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(6))
def system_backup_wait_for_state(state, name, client)
-
Expand source code
def system_backup_wait_for_state(state, name, client): # NOQA ok = False for _ in range(RETRY_COUNTS): try: system_backup = client.by_id_system_backup(name) assert system_backup.state == state ok = True break except Exception: time.sleep(RETRY_INTERVAL) assert ok
def system_backups_cleanup(client)
-
Expand source code
def system_backups_cleanup(client): """ Clean up all system backups :param client: The Longhorn client to use in the request. """ system_backups = client.list_system_backup() for system_backup in system_backups: # ignore the error when clean up try: client.delete(system_backup) except Exception as e: name = system_backup['name'] print("\nException when cleanup system backup ", name) print(e) ok = False for _ in range(RETRY_COUNTS): system_backups = client.list_system_backup() if len(system_backups) == 0: ok = True break time.sleep(RETRY_INTERVAL) assert ok
Clean up all system backups :param client: The Longhorn client to use in the request.
def system_restore_random_name()
-
Expand source code
def system_restore_random_name(): return "test-system-restore-" + \ ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(6))
def system_restore_wait_for_state(state, name, client)
-
Expand source code
def system_restore_wait_for_state(state, name, client): # NOQA ok = False for _ in range(RETRY_COUNTS): system_restore = client.by_id_system_restore(name) try: system_restore = client.by_id_system_restore(name) assert system_restore.state == state ok = True break except Exception: time.sleep(RETRY_INTERVAL_LONG) assert ok, \ f" Expected state {state}, " \ f" but got {system_restore.state} after {RETRY_COUNTS} attempts"
def system_restores_cleanup(client)
-
Expand source code
def system_restores_cleanup(client): """ Clean up all system restores :param client: The Longhorn client to use in the request. """ system_restores = client.list_system_restore() for system_restore in system_restores: # ignore the error when clean up try: client.delete(system_restore) except Exception as e: name = system_restore['name'] print("\nException when cleanup system restore ", name) print(e) ok = False for _ in range(RETRY_COUNTS): system_restores = client.list_system_restore() if len(system_restores) == 0: ok = True break time.sleep(RETRY_INTERVAL) assert ok
Clean up all system restores :param client: The Longhorn client to use in the request.
def umount_disk(mount_path)
-
Expand source code
def umount_disk(mount_path): cmd = ['umount', mount_path] subprocess.check_call(cmd)
def update_node_disks(client, node_name, disks, retry=False)
-
Expand source code
def update_node_disks(client, node_name, disks, retry=False): node = client.by_id_node(node_name) if not retry: return node.diskUpdate(disks=disks) # Retry if "too many retries error" happened. for _ in range(NODE_UPDATE_RETRY_COUNT): try: node = node.diskUpdate(disks=disks) except Exception as e: if disk_being_syncing in str(e.error.message): time.sleep(NODE_UPDATE_RETRY_INTERVAL) continue print(e) raise else: break return node
def update_persistent_volume_claim(core_api, name, namespace, claim)
-
Expand source code
def update_persistent_volume_claim(core_api, name, namespace, claim): for _ in range(RETRY_COUNTS): try: core_api.replace_namespaced_persistent_volume_claim( name, namespace, claim ) break except Exception as e: print(e) time.sleep(RETRY_INTERVAL)
def update_recurring_job(client, name, groups, labels, cron='', retain=0, concurrency=0)
-
Expand source code
def update_recurring_job(client, name, groups, labels, # NOQA cron="", retain=0, concurrency=0): recurringJob = client.by_id_recurring_job(name) update_groups = groups update_labels = labels update_cron = cron if len(cron) != 0 else recurringJob["cron"] update_retain = retain if retain != 0 else recurringJob["retain"] update_concurrency = \ concurrency if concurrency != 0 else recurringJob["concurrency"] client.update(recurringJob, groups=update_groups, task=recurringJob["task"], cron=update_cron, retain=update_retain, concurrency=update_concurrency, labels=update_labels)
def update_setting(client, name, value)
-
Expand source code
def update_setting(client, name, value): for _ in range(RETRY_COUNTS): try: setting = client.by_id_setting(name) setting = client.update(setting, value=value) break except Exception as e: print(e) time.sleep(RETRY_INTERVAL) value = "" if value is None else value assert setting.value == value, \ f"expect update setting {name} to be {value}, but it's {setting.value}"
def update_statefulset_manifests(ss_manifest, sc_manifest, name)
-
Expand source code
def update_statefulset_manifests(ss_manifest, sc_manifest, name): """ Write in a new StatefulSet name and the proper StorageClass name for tests. """ ss_manifest['metadata']['name'] = \ ss_manifest['spec']['selector']['matchLabels']['app'] = \ ss_manifest['spec']['serviceName'] = \ ss_manifest['spec']['template']['metadata']['labels']['app'] = \ name ss_manifest['spec']['volumeClaimTemplates'][0]['spec']['storageClassName']\ = DEFAULT_STORAGECLASS_NAME sc_manifest['metadata']['name'] = DEFAULT_STORAGECLASS_NAME
Write in a new StatefulSet name and the proper StorageClass name for tests.
def v2_data_engine_cr_supported(client)
-
Expand source code
def v2_data_engine_cr_supported(client): longhorn_version = client.by_id_setting('current-longhorn-version').value version_doesnt_support_v2_backimg_image = ['v1.5', 'v1.6', 'v1.7'] if any(_version in longhorn_version for _version in version_doesnt_support_v2_backimg_image): print(f'{longhorn_version} doesn\'t support v2 cr for test') return False else: return True
def volume_name(request)
-
Expand source code
@pytest.fixture def volume_name(request): return generate_volume_name()
def volume_read(v, start, count)
-
Expand source code
def volume_read(v, start, count): dev = get_volume_endpoint(v) return dev_read(dev, start, count)
def volume_valid(dev)
-
Expand source code
def volume_valid(dev): return stat.S_ISBLK(os.stat(dev).st_mode)
def volume_write(v, start, data)
-
Expand source code
def volume_write(v, start, data): dev = get_volume_endpoint(v) return dev_write(dev, start, data)
def wait_and_get_any_deployment_pod(core_api, deployment_name, is_phase='Running')
-
Expand source code
def wait_and_get_any_deployment_pod(core_api, deployment_name, is_phase="Running"): """ Add mechanism to wait for a stable running pod when deployment restarts its workload, since Longhorn manager could create/delete the new workload pod multiple times, it's possible that we get an unstable pod which will be deleted immediately, so add a wait mechanism to get a stable running pod. ref: https://github.com/longhorn/longhorn/issues/4814 """ stable_pod = None wait_for_stable_retry = 0 for _ in range(DEFAULT_DEPLOYMENT_TIMEOUT): label_selector = "name=" + deployment_name pods = core_api.list_namespaced_pod(namespace="default", label_selector=label_selector) for pod in pods.items: if pod.status.phase == is_phase: if stable_pod is None or \ stable_pod.status.start_time != pod.status.start_time: stable_pod = pod wait_for_stable_retry = 0 break else: wait_for_stable_retry += 1 if wait_for_stable_retry == WAIT_FOR_POD_STABLE_MAX_RETRY: return stable_pod time.sleep(DEFAULT_DEPLOYMENT_INTERVAL) assert False
Add mechanism to wait for a stable running pod when deployment restarts its workload, since Longhorn manager could create/delete the new workload pod multiple times, it's possible that we get an unstable pod which will be deleted immediately, so add a wait mechanism to get a stable running pod. ref: https://github.com/longhorn/longhorn/issues/4814
def wait_and_get_pv_for_pvc(api, pvc_name)
-
Expand source code
def wait_and_get_pv_for_pvc(api, pvc_name): found = False for i in range(RETRY_COUNTS): pvs = api.list_persistent_volume() for item in pvs.items: if item.spec.claim_ref.name == pvc_name: found = True pv = item break if found: break time.sleep(RETRY_INTERVAL) assert found return pv
def wait_delete_deployment(apps_api, deployment_name, namespace='default')
-
Expand source code
def wait_delete_deployment(apps_api, deployment_name, namespace='default'): for i in range(DEFAULT_DEPLOYMENT_TIMEOUT): ret = apps_api.list_namespaced_deployment(namespace=namespace) found = False for item in ret.items: if item.metadata.name == deployment_name: found = True break if not found: break time.sleep(DEFAULT_DEPLOYMENT_INTERVAL) assert not found
def wait_delete_dm_device(api, name)
-
Expand source code
def wait_delete_dm_device(api, name): path = os.path.join("/dev/mapper", name) for i in range(RETRY_COUNTS): found = os.path.exists(path) if not found: break time.sleep(RETRY_INTERVAL) assert not found
def wait_delete_pod(api, pod_uid, namespace='default')
-
Expand source code
def wait_delete_pod(api, pod_uid, namespace='default'): for i in range(POD_DELETION_TIMEOUT): ret = api.list_namespaced_pod(namespace=namespace) found = False for item in ret.items: if item.metadata.uid == pod_uid: found = True break if not found: break time.sleep(DEFAULT_POD_INTERVAL) assert not found
def wait_delete_pv(api, pv_name)
-
Expand source code
def wait_delete_pv(api, pv_name): for i in range(RETRY_COUNTS): found = False pvs = api.list_persistent_volume() for item in pvs.items: if item.metadata.name == pv_name: if item.status.phase == 'Failed': try: api.delete_persistent_volume( name=pv_name, body=k8sclient.V1DeleteOptions()) except ApiException as e: assert e.status == 404 else: found = True break if not found: break time.sleep(RETRY_INTERVAL) assert not found
def wait_delete_pvc(api, pvc_name, retry_counts=150)
-
Expand source code
def wait_delete_pvc(api, pvc_name, retry_counts=RETRY_COUNTS): for _ in range(retry_counts): found = False ret = api.list_namespaced_persistent_volume_claim(namespace='default') for item in ret.items: if item.metadata.name == pvc_name: found = True break if not found: break time.sleep(RETRY_INTERVAL) assert not found
def wait_delete_volume_attachment(storage_api, volume_attachment_name)
-
Expand source code
def wait_delete_volume_attachment(storage_api, volume_attachment_name): for i in range(RETRY_COUNTS): found = False ret = storage_api.list_volume_attachment() for item in ret.items: if item.metadata.name == volume_attachment_name: found = True break if not found: break time.sleep(RETRY_INTERVAL) assert not found
def wait_deployment_replica_ready(apps_api, deployment_name, desired_replica_count, namespace='default')
-
Expand source code
def wait_deployment_replica_ready(apps_api, deployment_name, desired_replica_count, namespace='default'): # NOQA ok = False for i in range(DEFAULT_DEPLOYMENT_TIMEOUT): deployment = apps_api.read_namespaced_deployment( name=deployment_name, namespace=namespace) # deployment is none if deployment is not yet created if deployment is not None and \ deployment.status.ready_replicas == desired_replica_count: ok = True break time.sleep(DEFAULT_DEPLOYMENT_INTERVAL) assert ok
def wait_for_all_instance_manager_running(client)
-
Expand source code
def wait_for_all_instance_manager_running(client): core_api = get_core_api_client() nodes = client.list_node() for _ in range(RETRY_COUNTS): instance_managers = client.list_instance_manager() node_to_instance_manager_map = {} try: for im in instance_managers: if im.managerType == "aio": node_to_instance_manager_map[im.nodeID] = im else: print("\nFound unknown instance manager:", im) if len(node_to_instance_manager_map) != len(nodes): time.sleep(RETRY_INTERVAL) continue for _, im in node_to_instance_manager_map.items(): wait_for_instance_manager_desire_state(client, core_api, im.name, "Running", True) break except Exception: continue
def wait_for_backing_image_delete(client, name)
-
Expand source code
def wait_for_backing_image_delete(client, name): found = False for i in range(RETRY_COUNTS): bi_list = client.list_backing_image() found = False for bi in bi_list: if bi.name == name: found = True break if not found: break time.sleep(RETRY_INTERVAL) assert not found
def wait_for_backing_image_disk_cleanup(client, bi_name, disk_id)
-
Expand source code
def wait_for_backing_image_disk_cleanup(client, bi_name, disk_id): found = False for i in range(RETRY_COUNTS): found = False bi = client.by_id_backing_image(bi_name) for disk, status in iter(bi.diskFileStatusMap.items()): if disk == disk_id: found = True break if not found: break time.sleep(RETRY_INTERVAL) assert not found return bi
def wait_for_backing_image_in_disk_fail(client, backing_img_name, disk_uuid)
-
Expand source code
def wait_for_backing_image_in_disk_fail(client, backing_img_name, disk_uuid): failed = False for i in range(RETRY_BACKUP_COUNTS): if failed is False: backing_image = client.by_id_backing_image(backing_img_name) for uuid, status in iter(backing_image.diskFileStatusMap.items()): if uuid == disk_uuid and status.state == "failed": failed = True if failed is True: break time.sleep(0.1) assert failed is True
def wait_for_backing_image_status(client, backing_img_name, image_status)
-
Expand source code
def wait_for_backing_image_status(client, backing_img_name, image_status): status_matched = False for _ in range(RETRY_EXEC_COUNTS): if status_matched: break backing_image = client.by_id_backing_image(backing_img_name) try: if backing_image.diskFileStatusMap.items(): for _, status in iter(backing_image.diskFileStatusMap.items()): if status.state == image_status: status_matched = True except Exception as e: print(e) time.sleep(RETRY_EXEC_INTERVAL) assert status_matched is True
def wait_for_backup_backing_image_delete(client, name)
-
Expand source code
def wait_for_backup_backing_image_delete(client, name): for _ in range(RETRY_COUNTS): bbis = client.list_backupBackingImage() found = False for bbi in bbis: if bbi.name == name: found = True break if not found: break time.sleep(RETRY_INTERVAL) assert not found
def wait_for_backup_completion(client, volume_name, snapshot_name=None, retry_count=300)
-
Expand source code
def wait_for_backup_completion(client, volume_name, snapshot_name=None, retry_count=RETRY_BACKUP_COUNTS): completed = False for _ in range(retry_count): v = client.by_id_volume(volume_name) for b in v.backupStatus: if snapshot_name is not None and b.snapshot != snapshot_name: continue if b.state == "Completed": assert b.progress == 100 assert b.error == "" completed = True break if completed: break time.sleep(RETRY_BACKUP_INTERVAL) assert completed is True, f" Backup status = {b.state}," \ f" Backup Progress = {b.progress}, Volume = {v}" return v
def wait_for_backup_count(backup_volume, number, retry_counts=120)
-
Expand source code
def wait_for_backup_count(backup_volume, number, retry_counts=120): ok = False for _ in range(retry_counts): complete_backup_cnt = 0 for single_backup in backup_volume.backupList(): if single_backup.state == "Completed" and \ int(single_backup.volumeSize) > 0: complete_backup_cnt = complete_backup_cnt + 1 if complete_backup_cnt == number: ok = True break time.sleep(RETRY_BACKUP_INTERVAL) assert ok
def wait_for_backup_delete(client, volume_name, backup_name)
-
Expand source code
def wait_for_backup_delete(client, volume_name, backup_name): def backup_exists(): bv = find_backup_volume(client, volume_name) if bv is not None: backups = bv.backupList() for b in backups: if b.name == backup_name: return True return False for i in range(RETRY_BACKUP_COUNTS): if backup_exists() is False: return time.sleep(RETRY_BACKUP_INTERVAL) assert False, "deleted backup " + backup_name + " for volume " \ + volume_name + " is still present"
def wait_for_backup_failed(client, volume_name, snapshot_name=None, retry_count=300)
-
Expand source code
def wait_for_backup_failed(client, volume_name, snapshot_name=None, retry_count=RETRY_BACKUP_COUNTS): failed = False for _ in range(retry_count): v = client.by_id_volume(volume_name) for b in v.backupStatus: if b.state == "Error": assert b.progress == 0 assert b.error != "" failed = True break if failed: break time.sleep(RETRY_BACKUP_INTERVAL) assert failed is True return v
def wait_for_backup_restore_completed(client, name, backup_name)
-
Expand source code
def wait_for_backup_restore_completed(client, name, backup_name): complete = False for i in range(RETRY_COUNTS): v = client.by_id_volume(name) if v.controllers and len(v.controllers) != 0 and \ v.controllers[0].lastRestoredBackup == backup_name: complete = True break time.sleep(RETRY_INTERVAL_LONG) assert complete
def wait_for_backup_state(client, volume_name, predicate, retry_count=300)
-
Expand source code
def wait_for_backup_state(client, volume_name, predicate, retry_count=RETRY_BACKUP_COUNTS): completed = False for i in range(retry_count): v = client.by_id_volume(volume_name) for b in v.backupStatus: if predicate(b): completed = True break if completed: break time.sleep(RETRY_BACKUP_INTERVAL) assert completed is True return v
def wait_for_backup_target_available(client, available)
-
Expand source code
def wait_for_backup_target_available(client, available): def find_backup_target_default(client): bt = client.list_backup_target() assert bt is not None return bt.data[0] for _ in range(RETRY_COUNTS): bt = find_backup_target_default(client) if bt.available == available: break time.sleep(RETRY_INTERVAL) if bt.available != available: raise Exception( 'BackupTarget status.available should be {}', available)
def wait_for_backup_to_start(client, volume_name, snapshot_name=None, retry_count=300, chk_progress=0)
-
Expand source code
def wait_for_backup_to_start(client, volume_name, snapshot_name=None, retry_count=RETRY_BACKUP_COUNTS, chk_progress=0): in_progress = False for _ in range(retry_count): v = client.by_id_volume(volume_name) for b in v.backupStatus: if snapshot_name is not None and b.snapshot != snapshot_name: continue if b.state == "InProgress" and b.progress > chk_progress: assert b.error == "" in_progress = True break if in_progress: break time.sleep(RETRY_BACKUP_INTERVAL) assert in_progress is True return v
def wait_for_backup_volume(client, bv_name, backing_image='')
-
Expand source code
def wait_for_backup_volume(client, bv_name, backing_image=""): for _ in range(RETRY_BACKUP_COUNTS): bv = client.by_id_backupVolume(bv_name) if bv is not None: if backing_image == "": break if bv.backingImageName == backing_image \ and bv.backingImageChecksum != "": break time.sleep(RETRY_BACKUP_INTERVAL) assert bv is not None, "failed to find backup volume " + bv_name
def wait_for_backup_volume_backing_image_synced(client, volume_name, backing_image, retry_count=300)
-
Expand source code
def wait_for_backup_volume_backing_image_synced( client, volume_name, backing_image, retry_count=RETRY_BACKUP_COUNTS): completed = False for _ in range(retry_count): bv = find_backup_volume(client, volume_name) if bv is not None: if bv.backingImageName == backing_image: completed = True break time.sleep(RETRY_BACKUP_INTERVAL) assert completed is True, f" Backup Volume = {bv}," \ f" Backing Image = {backing_image}," \ f" Volume = {volume_name}" return bv
def wait_for_backup_volume_delete(client, name)
-
Expand source code
def wait_for_backup_volume_delete(client, name): for _ in range(RETRY_BACKUP_COUNTS): bvs = client.list_backupVolume() found = False for bv in bvs: if bv.name == name: found = True break if not found: break time.sleep(RETRY_BACKUP_INTERVAL) assert not found
def wait_for_cron_job_count(batch_v1_api, number, label='', retry_counts=150)
-
Expand source code
def wait_for_cron_job_count(batch_v1_api, number, label="", retry_counts=RETRY_COUNTS): ok = False for _ in range(retry_counts): jobs = batch_v1_api.list_namespaced_cron_job('longhorn-system', label_selector=label) if len(jobs.items) == number: ok = True break time.sleep(RETRY_INTERVAL) assert ok
def wait_for_cron_job_create(batch_v1_api, label='', retry_counts=150)
-
Expand source code
def wait_for_cron_job_create(batch_v1_api, label="", retry_counts=RETRY_COUNTS): exist = False for _ in range(retry_counts): job = batch_v1_api.list_namespaced_cron_job('longhorn-system', label_selector=label) if len(job.items) != 0: exist = True break time.sleep(RETRY_INTERVAL) assert exist
def wait_for_cron_job_delete(batch_v1_api, label='', retry_counts=150)
-
Expand source code
def wait_for_cron_job_delete(batch_v1_api, label="", retry_counts=RETRY_COUNTS): exist = True for _ in range(retry_counts): job = batch_v1_api.list_namespaced_cron_job('longhorn-system', label_selector=label) if len(job.items) == 0: exist = False break time.sleep(RETRY_INTERVAL) assert not exist
def wait_for_deployed_engine_image_count(client, image_name, expected_cnt, exclude_nodes=[])
-
Expand source code
def wait_for_deployed_engine_image_count(client, image_name, expected_cnt, exclude_nodes=[]): for i in range(RETRY_COUNTS): time.sleep(RETRY_INTERVAL) image = client.by_id_engine_image(image_name) deployed_cnt = 0 if image.nodeDeploymentMap is None: continue for node_name in image.nodeDeploymentMap: if node_name in exclude_nodes: continue if image.nodeDeploymentMap[node_name] is True: deployed_cnt = deployed_cnt + 1 if deployed_cnt == expected_cnt: break assert deployed_cnt == expected_cnt, f"image = {image}"
def wait_for_device_login(dest_path, name)
-
Expand source code
def wait_for_device_login(dest_path, name): dev = "" for i in range(RETRY_COUNTS): for j in range(RETRY_COMMAND_COUNT): files = [] try: files = os.listdir(dest_path) break except Exception: time.sleep(1) assert files if name in files: dev = name break time.sleep(RETRY_INTERVAL) assert dev == name return dev
def wait_for_disk_conditions(client, node_name, disk_name, key, value)
-
Expand source code
def wait_for_disk_conditions(client, node_name, disk_name, key, value): for i in range(RETRY_COUNTS): node = client.by_id_node(node_name) disks = node.disks disk = disks[disk_name] conditions = disk.conditions if conditions[key]["status"] == value: break time.sleep(RETRY_INTERVAL) assert conditions[key]["status"] == value return node
def wait_for_disk_status(client, node_name, disk_name, key, value)
-
Expand source code
def wait_for_disk_status(client, node_name, disk_name, key, value): # use wait_for_disk_storage_available to check storageAvailable assert key != "storageAvailable" for i in range(RETRY_COUNTS): node = client.by_id_node(node_name) disks = node.disks if len(disks) > 0 and \ disk_name in disks and \ disks[disk_name][key] == value: break time.sleep(RETRY_INTERVAL) assert len(disks) != 0 assert disk_name in disks assert disks[disk_name][key] == value, \ f"Wrong disk({disk_name}) {key} status.\n" \ f"Expect={value}\n" \ f"Got={disks[disk_name][key]}\n" \ f"node={client.by_id_node(node_name)}\n" \ f"volumes={client.list_volume()}\n" return node
def wait_for_disk_storage_available(client, node_name, disk_name, disk_path)
-
Expand source code
def wait_for_disk_storage_available(client, node_name, disk_name, disk_path): for i in range(RETRY_COUNTS): node = client.by_id_node(node_name) disks = node.disks if len(disks) > 0 and disk_name in disks: free, _ = get_host_disk_size(disk_path) if disks[disk_name]["storageAvailable"] == free: break time.sleep(RETRY_INTERVAL) assert len(disks) != 0 assert disk_name in disks assert disks[disk_name]["storageAvailable"] == free return node
def wait_for_disk_update(client, name, disk_num)
-
Expand source code
def wait_for_disk_update(client, name, disk_num): for i in range(RETRY_COUNTS): node = client.by_id_node(name) if len(node.disks) == disk_num: allUpdated = True disks = node.disks for d in disks: if disks[d]["diskUUID"] == "": allUpdated = False break if allUpdated: break time.sleep(RETRY_INTERVAL) assert len(node.disks) == disk_num return node
def wait_for_disk_uuid(client, node_name, uuid)
-
Expand source code
def wait_for_disk_uuid(client, node_name, uuid): found = False for i in range(RETRY_COUNTS): node = client.by_id_node(node_name) disks = node.disks for name in disks: if disks[name]["diskUUID"] == uuid: found = True break if found: break time.sleep(RETRY_INTERVAL) assert found return node
def wait_for_dr_volume_expansion(longhorn_api_client, volume_name, size_str)
-
Expand source code
def wait_for_dr_volume_expansion(longhorn_api_client, volume_name, size_str): complete = False for i in range(RETRY_COUNTS): volume = longhorn_api_client.by_id_volume(volume_name) if volume.size == size_str: engine = get_volume_engine(volume) if engine.size == volume.size: complete = True break time.sleep(RETRY_INTERVAL_LONG) assert complete
def wait_for_engine_image_condition(client, image_name, state)
-
Expand source code
def wait_for_engine_image_condition(client, image_name, state): """ state: "True", "False" """ # Indicate many times we want to see the ENGINE_NAME in the STATE. # This helps to prevent the flaky test case in which the ENGINE_NAME # is flapping between ready and not ready a few times before settling # down to the ready state # https://github.com/longhorn/longhorn/issues/7438 state_count = 1 if state == "True": state_count = 60 c = 0 for i in range(RETRY_COUNTS): wait_for_engine_image_creation(client, image_name) image = client.by_id_engine_image(image_name) if image['conditions'][0]['status'] == state: c += 1 if c >= state_count: break time.sleep(RETRY_INTERVAL_SHORT) assert image['conditions'][0]['status'] == state return image
state: "True", "False"
def wait_for_engine_image_creation(client, image_name)
-
Expand source code
def wait_for_engine_image_creation(client, image_name): for i in range(RETRY_COUNTS): images = client.list_engine_image() found = False for img in images: if img.name == image_name: found = True break if found: break time.sleep(RETRY_INTERVAL_SHORT) assert found
def wait_for_engine_image_deletion(client, core_api, engine_image_name)
-
Expand source code
def wait_for_engine_image_deletion(client, core_api, engine_image_name): deleted = False for i in range(RETRY_COUNTS): time.sleep(RETRY_INTERVAL) deleted = True ei_list = client.list_engine_image().data for ei in ei_list: if ei.name == engine_image_name: deleted = False break if not deleted: continue labels = "longhorn.io/component=engine-image," \ "longhorn.io/engine-image="+engine_image_name ei_pod_list = core_api.list_namespaced_pod( LONGHORN_NAMESPACE, label_selector=labels).items if len(ei_pod_list) != 0: deleted = False continue if deleted: break assert deleted
def wait_for_engine_image_incompatible(client, image_name)
-
Expand source code
def wait_for_engine_image_incompatible(client, image_name): wait_for_engine_image_creation(client, image_name) for i in range(RETRY_COUNTS): image = client.by_id_engine_image(image_name) if image.incompatible: break time.sleep(RETRY_INTERVAL) assert image.incompatible return image
def wait_for_engine_image_ref_count(client, image_name, count)
-
Expand source code
def wait_for_engine_image_ref_count(client, image_name, count): wait_for_engine_image_creation(client, image_name) for i in range(RETRY_COUNTS): image = client.by_id_engine_image(image_name) if image.refCount == count: break time.sleep(RETRY_INTERVAL) assert image.refCount == count, f"image = {image}" if count == 0: assert image.noRefSince != "" return image
def wait_for_engine_image_state(client, image_name, state)
-
Expand source code
def wait_for_engine_image_state(client, image_name, state): wait_for_engine_image_creation(client, image_name) for i in range(RETRY_COUNTS): image = client.by_id_engine_image(image_name) if image.state == state: break time.sleep(RETRY_INTERVAL) assert image.state == state return image
def wait_for_expansion_error_clear(longhorn_api_client, volume_name)
-
Expand source code
def wait_for_expansion_error_clear(longhorn_api_client, volume_name): complete = False for i in range(RETRY_COUNTS): volume = longhorn_api_client.by_id_volume(volume_name) engine = get_volume_engine(volume) if engine.lastExpansionFailedAt == "" and \ engine.lastExpansionError == "": complete = True break time.sleep(RETRY_INTERVAL) assert complete
def wait_for_expansion_failure(client, volume_name, last_failed_at='')
-
Expand source code
def wait_for_expansion_failure(client, volume_name, last_failed_at=""): failed = False for i in range(30): volume = client.by_id_volume(volume_name) engine = get_volume_engine(volume) if engine.lastExpansionFailedAt != last_failed_at: failed = True break time.sleep(RETRY_INTERVAL) assert failed
def wait_for_instance_manager_count(client, number, retry_counts=120)
-
Expand source code
def wait_for_instance_manager_count(client, number, retry_counts=120): for _ in range(retry_counts): im_counts = 0 ims = client.list_instance_manager() for im in ims: if im.dataEngine == DATA_ENGINE: im_counts = im_counts + 1 if im_counts == number: break time.sleep(RETRY_INTERVAL_LONG) return im_counts
def wait_for_instance_manager_desire_state(client, core_api, im_name, state, desire=True)
-
Expand source code
def wait_for_instance_manager_desire_state(client, core_api, im_name, state, desire=True): for i in range(RETRY_COUNTS_LONG): im = client.by_id_instance_manager(im_name) try: pod = core_api.read_namespaced_pod(name=im_name, namespace=LONGHORN_NAMESPACE) except Exception as e: # Continue with pod restarted case if e.reason == EXCEPTION_ERROR_REASON_NOT_FOUND: time.sleep(RETRY_INTERVAL) continue # Report any other error else: assert (not e) if desire: if im.currentState == state.lower() and pod.status.phase == state: break else: if im.currentState != state.lower() and pod.status.phase != state: break time.sleep(RETRY_INTERVAL) if desire: assert im.currentState == state.lower() assert pod.status.phase == state else: assert im.currentState != state.lower() assert pod.status.phase != state return im
def wait_for_node_mountpropagation_condition(client, name)
-
Expand source code
def wait_for_node_mountpropagation_condition(client, name): for i in range(RETRY_COUNTS): node = client.by_id_node(name) conditions = {} if "conditions" in node.keys(): conditions = node.conditions if NODE_CONDITION_MOUNTPROPAGATION in \ conditions.keys() and \ "status" in \ conditions[NODE_CONDITION_MOUNTPROPAGATION].keys() \ and conditions[NODE_CONDITION_MOUNTPROPAGATION]["status"] != \ CONDITION_STATUS_UNKNOWN: break time.sleep(RETRY_INTERVAL) return node
def wait_for_node_schedulable_condition(client, name)
-
Expand source code
def wait_for_node_schedulable_condition(client, name): for i in range(RETRY_COUNTS): node = client.by_id_node(name) conditions = {} if "conditions" in node.keys(): conditions = node.conditions if NODE_CONDITION_SCHEDULABLE in \ conditions.keys() and \ "status" in \ conditions[NODE_CONDITION_SCHEDULABLE].keys() \ and conditions[NODE_CONDITION_SCHEDULABLE]["status"] != \ CONDITION_STATUS_UNKNOWN: break time.sleep(RETRY_INTERVAL) return node
def wait_for_node_tag_update(client, name, tags)
-
Expand source code
def wait_for_node_tag_update(client, name, tags): updated = False for i in range(RETRY_COUNTS): node = client.by_id_node(name) if not tags and not node.tags: updated = True break elif node.tags is not None and set(node.tags) == set(tags): updated = True break time.sleep(RETRY_INTERVAL) assert updated return node
def wait_for_node_update(client, name, key, value)
-
Expand source code
def wait_for_node_update(client, name, key, value): for i in range(RETRY_COUNTS): node = client.by_id_node(name) if str(node[key]) == str(value): break time.sleep(RETRY_INTERVAL) assert str(node[key]) == str(value) return node
def wait_for_nvme_device()
-
Expand source code
def wait_for_nvme_device(): for _ in range(RETRY_COUNTS): try: output = subprocess.check_output(["nvme", "list"], text=True) print(f"nvme list output =\n {output}") for line in output.splitlines(): if "SPDK bdev Controller" in line: dev_path = line.split()[0] return dev_path except subprocess.CalledProcessError as e: print(f"nvme list failed: {e.output}") time.sleep(RETRY_INTERVAL) raise Exception("NVMe device not found after retries")
def wait_for_pod_annotation(core_api, label_selector, anno_key, anno_val)
-
Expand source code
def wait_for_pod_annotation(core_api, label_selector, anno_key, anno_val): matches = False for _ in range(RETRY_COUNTS): pods = core_api.list_namespaced_pod( namespace='longhorn-system', label_selector=label_selector) if anno_val is None: if any(pod.metadata.annotations is None or pod.metadata.annotations.get(anno_key, None) is None for pod in pods.items): matches = True break else: if any(pod.metadata.annotations is not None and pod.metadata.annotations.get(anno_key, None) == anno_val for pod in pods.items): matches = True break time.sleep(RETRY_INTERVAL) assert matches is True
def wait_for_pod_phase(core_api, pod_name, pod_phase, namespace='default')
-
Expand source code
def wait_for_pod_phase(core_api, pod_name, pod_phase, namespace="default"): is_phase = False for _ in range(RETRY_COUNTS): try: pod = core_api.read_namespaced_pod(name=pod_name, namespace=namespace) if pod.status.phase == pod_phase: is_phase = True break except Exception as e: print(f"Waiting for pod {pod_name} {pod_phase} failed: {e}") time.sleep(RETRY_INTERVAL_LONG) assert is_phase
def wait_for_pod_remount(core_api, pod_name, chk_path='/data/lost+found')
-
Expand source code
def wait_for_pod_remount(core_api, pod_name, chk_path="/data/lost+found"): check_command = [ '/bin/sh', '-c', 'ls ' + chk_path ] ready = False for i in range(RETRY_EXEC_COUNTS): try: output = stream(core_api.connect_get_namespaced_pod_exec, pod_name, 'default', command=check_command, stderr=True, stdin=False, stdout=True, tty=False) if "Input/output error" not in output: ready = True break except Exception: pass if ready: break time.sleep(RETRY_EXEC_INTERVAL) assert ready
def wait_for_pod_restart(core_api, pod_name, namespace='default')
-
Expand source code
def wait_for_pod_restart(core_api, pod_name, namespace="default"): pod = core_api.read_namespaced_pod(name=pod_name, namespace=namespace) restart_count = pod.status.container_statuses[0].restart_count pod_restarted = False for i in range(RETRY_COUNTS): pod = core_api.read_namespaced_pod(name=pod_name, namespace=namespace) count = pod.status.container_statuses[0].restart_count if count > restart_count: pod_restarted = True break time.sleep(RETRY_INTERVAL) assert pod_restarted
def wait_for_pods_volume_delete(client, pod_list, retry_counts=300)
-
Expand source code
def wait_for_pods_volume_delete(client, pod_list, # NOQA retry_counts=RETRY_BACKUP_COUNTS): volume_deleted = False for _ in range(retry_counts): volume_deleted = True volumes = client.list_volume() for v in volumes: for p in pod_list: if v.name == p['pv_name']: volume_deleted = False break time.sleep(RETRY_INTERVAL) assert volume_deleted is True
def wait_for_pods_volume_state(client, pod_list, field, value, retry_counts=150)
-
Expand source code
def wait_for_pods_volume_state(client, pod_list, field, value, # NOQA retry_counts=RETRY_COUNTS): for _ in range(retry_counts): volume_names = [] volumes = client.list_volume() for v in volumes: for p in pod_list: if v.name == p['pv_name'] and v[field] == value: volume_names.append(v.name) break time.sleep(RETRY_INTERVAL) return len(volume_names) == len(pod_list)
def wait_for_pvc_phase(api, pvc_name, phase)
-
Expand source code
def wait_for_pvc_phase(api, pvc_name, phase): complete = False for _ in range(RETRY_COUNTS): pvc = api.read_namespaced_persistent_volume_claim( name=pvc_name, namespace='default') try: assert pvc.status.phase == phase complete = True break except AssertionError: pass time.sleep(RETRY_INTERVAL) assert complete return pvc
def wait_for_rebuild_complete(client, volume_name, retry_count=150)
-
Expand source code
def wait_for_rebuild_complete(client, volume_name, retry_count=RETRY_COUNTS): completed = 0 rebuild_statuses = {} for i in range(retry_count): completed = 0 v = client.by_id_volume(volume_name) rebuild_statuses = v.rebuildStatus for status in rebuild_statuses: if status.state == "complete": assert status.progress == 100, f"status = {status}" assert not status.error assert not status.isRebuilding completed += 1 elif status.state == "": assert not status.error assert not status.isRebuilding completed += 1 elif status.state == "in_progress": assert status.isRebuilding else: assert status.state == "error" assert status.error != "" assert not status.isRebuilding if completed == len(rebuild_statuses): break time.sleep(RETRY_INTERVAL) assert completed == len(rebuild_statuses)
def wait_for_rebuild_start(client, volume_name, retry_count=150, retry_interval=1)
-
Expand source code
def wait_for_rebuild_start(client, volume_name, retry_count=RETRY_COUNTS, retry_interval=RETRY_INTERVAL): started = False for i in range(retry_count): v = client.by_id_volume(volume_name) rebuild_statuses = v.rebuildStatus for status in rebuild_statuses: if status.state == "in_progress": started = True break if started: break time.sleep(retry_interval) assert started return status.fromReplica, status.replica
def wait_for_recurring_jobs_cleanup(client)
-
Expand source code
def wait_for_recurring_jobs_cleanup(client): for _ in range(RETRY_COUNTS): policies = client.list_recurring_job() if len(policies) == 0: break time.sleep(RETRY_INTERVAL) assert len(client.list_recurring_job()) == 0
def wait_for_replica_count(client, volume_name, replica_count)
-
Expand source code
def wait_for_replica_count(client, volume_name, replica_count): for i in range(RETRY_COUNTS): volume = client.by_id_volume(volume_name) if len(volume.replicas) == replica_count: break time.sleep(RETRY_INTERVAL) volume = client.by_id_volume(volume_name) assert len(volume.replicas) == replica_count
def wait_for_replica_directory()
-
Expand source code
def wait_for_replica_directory(): found = False for i in range(RETRY_COUNTS): if os.path.exists(DEFAULT_REPLICA_DIRECTORY): found = True break time.sleep(RETRY_INTERVAL) assert found
def wait_for_replica_failed(client, volname, replica_name, retry_cnts=150, retry_ivl=1)
-
Expand source code
def wait_for_replica_failed(client, volname, replica_name, retry_cnts=RETRY_COUNTS, retry_ivl=RETRY_INTERVAL): failed = True debug_replica_not_failed = None debug_replica_in_im = None for i in range(retry_cnts): failed = True debug_replica_not_failed = None debug_replica_in_im = None volume = client.by_id_volume(volname) for r in volume.replicas: if r['name'] != replica_name: continue if r['running'] or r['failedAt'] == "": failed = False debug_replica_not_failed = r break if r['instanceManagerName'] != "": im = client.by_id_instance_manager(r['instanceManagerName']) instance_dict = {} # We still check the 'instances' for backward compatibility # with older versions (<v1.5.x). if im['instances'] is not None: instance_dict.update(im['instances']) if im['instanceReplicas'] is not None: instance_dict.update(im['instanceReplicas']) if r['name'] in instance_dict: failed = False debug_replica_in_im = im break if failed: break time.sleep(retry_ivl) err_msg = "Vol({}), Replica({}): {}, Instance_Manager: {}".format( volname, replica_name, debug_replica_not_failed, debug_replica_in_im ) assert failed, err_msg
def wait_for_replica_running(client, volname, replica_name)
-
Expand source code
def wait_for_replica_running(client, volname, replica_name): is_running = False for i in range(RETRY_COUNTS): volume = client.by_id_volume(volname) for r in volume.replicas: if r['name'] != replica_name: continue if r['running'] and r['instanceManagerName'] != "": im = client.by_id_instance_manager( r['instanceManagerName']) instance_dict = {} # We still check the 'instances' for backward compatibility # with older versions (<v1.5.x). if im['instances'] is not None: instance_dict.update(im['instances']) if im['instanceReplicas'] is not None: instance_dict.update(im['instanceReplicas']) if r['name'] in instance_dict: is_running = True break if is_running: break time.sleep(RETRY_INTERVAL) assert is_running
def wait_for_replica_scheduled(client,
volume_name,
to_nodes,
expect_success=2,
expect_fail=0,
anti_affinity=False,
chk_vol_healthy=True,
chk_replica_running=True)-
Expand source code
def wait_for_replica_scheduled(client, volume_name, to_nodes, expect_success=2, expect_fail=0, anti_affinity=False, chk_vol_healthy=True, chk_replica_running=True): for _ in range(RETRY_COUNTS): volume = client.by_id_volume(volume_name) if chk_vol_healthy: assert volume.robustness == VOLUME_ROBUSTNESS_HEALTHY scheduled = 0 unexpect_fail = max(0, expect_fail) expect_nodes = set(to_nodes) for r in volume.replicas: try: assert r.hostId in expect_nodes if chk_replica_running: assert r.running is True assert r.mode == "RW" if not anti_affinity: expect_nodes.remove(r.hostId) scheduled += 1 except AssertionError: if expect_fail >= 0: unexpect_fail -= 1 if scheduled == expect_success and unexpect_fail == 0: break time.sleep(RETRY_INTERVAL) assert scheduled == expect_success, f" Volume = {volume}" assert unexpect_fail == 0, f"Got {unexpect_fail} unexpected fail" if expect_fail >= 0: assert len(volume.replicas) == expect_success + expect_fail, \ f" Volume = {volume}" return volume
def wait_for_restoration_start(client, name)
-
Expand source code
def wait_for_restoration_start(client, name): return wait_for_volume_status(client, name, VOLUME_FIELD_RESTOREINITIATED, True)
def wait_for_running_engine_image_count(image_name, engine_cnt)
-
Expand source code
def wait_for_running_engine_image_count(image_name, engine_cnt): core_api = get_core_api_client() for i in range(RETRY_COUNTS): exist_engine_cnt = 0 longhorn_pod_list = core_api.list_namespaced_pod('longhorn-system') for pod in longhorn_pod_list.items: if "engine-image-" + image_name in pod.metadata.name and \ pod.status.phase == "Running": exist_engine_cnt += 1 if exist_engine_cnt == engine_cnt: break time.sleep(RETRY_INTERVAL) assert exist_engine_cnt == engine_cnt
def wait_for_snapshot_count(volume, number, retry_counts=120, count_removed=False)
-
Expand source code
def wait_for_snapshot_count(volume, number, retry_counts=120, count_removed=False): for _ in range(retry_counts): count = 0 for snapshot in volume.snapshotList(): if snapshot.removed is False or count_removed: count += 1 if count == number: return time.sleep(RETRY_SNAPSHOT_INTERVAL) assert False, \ f"failed to wait for snapshot.\n" \ f"Expect count={number}\n" \ f"Got count={count}"
def wait_for_snapshot_purge(client, volume_name, *snaps)
-
Expand source code
def wait_for_snapshot_purge(client, volume_name, *snaps): completed = 0 last_purge_progress = {} purge_status = {} for i in range(RETRY_COUNTS): completed = 0 v = client.by_id_volume(volume_name) purge_status = v.purgeStatus for status in purge_status: assert status.error == "" progress = status.progress assert progress <= 100 replica = status.replica last = last_purge_progress.get(replica) assert last is None or last <= status.progress last_purge_progress["replica"] = progress if status.state == "complete": assert progress == 100 completed += 1 if completed == len(purge_status): break time.sleep(RETRY_INTERVAL) assert completed == len(purge_status) # Now that the purge has been reported to be completed, the Snapshots # should be removed or "marked as removed" in the case of # the latest snapshot. found = False snapshots = v.snapshotList(volume=volume_name) for snap in snaps: for vs in snapshots.data: if snap == vs["name"]: if vs["removed"] is False: found = True break if "volume-head" not in vs["children"]: found = True break assert not found return v
def wait_for_support_bundle_cleanup(client)
-
Expand source code
def wait_for_support_bundle_cleanup(client): # NOQA ok = False for _ in range(RETRY_COUNTS): support_bundles = client.list_support_bundle() if len(support_bundles) == 0: ok = True break time.sleep(RETRY_INTERVAL) assert ok
def wait_for_support_bundle_state(state, node_id, name, client)
-
Expand source code
def wait_for_support_bundle_state(state, node_id, name, client): # NOQA ok = False for _ in range(RETRY_COUNTS): support_bundle = get_support_bundle(node_id, name, client) try: assert support_bundle['state'] == state ok = True break except Exception: time.sleep(RETRY_INTERVAL) assert ok
def wait_for_tainted_node_engine_image_undeployed(client, img_name, tainted_node)
-
Expand source code
def wait_for_tainted_node_engine_image_undeployed(client, img_name, tainted_node): for _ in range(RETRY_COUNTS): time.sleep(RETRY_INTERVAL) tainted_node_excluded = False img = client.by_id_engine_image(img_name) if img.nodeDeploymentMap is None: continue for node_name in img.nodeDeploymentMap: if node_name != tainted_node: continue if img.nodeDeploymentMap[node_name] is False: tainted_node_excluded = True break if tainted_node_excluded: break assert img.nodeDeploymentMap[tainted_node] is False
def wait_for_volume_attached(client, name)
-
Expand source code
def wait_for_volume_attached(client, name): return wait_for_volume_status(client, name, VOLUME_FIELD_STATE, VOLUME_STATE_ATTACHED)
def wait_for_volume_clone_status(client, name, key, value)
-
Expand source code
def wait_for_volume_clone_status(client, name, key, value): for _ in range(RETRY_COUNTS): volume = client.by_id_volume(name) try: if volume[VOLUME_FIELD_CLONE_STATUS][key] == value: break except Exception as e: print("\nVOLUME_FIELD_CLONE_STATUS is not ready") print(e) finally: time.sleep(RETRY_INTERVAL) assert volume[VOLUME_FIELD_CLONE_STATUS][key] == value, \ f" Expected value={value}\n. " \ f" Got volume[{VOLUME_FIELD_CLONE_STATUS}][{key}]= " \ f"{volume[VOLUME_FIELD_CLONE_STATUS][key]}\n. volume={volume}" return volume
def wait_for_volume_condition_restore(client, name, key, value)
-
Expand source code
def wait_for_volume_condition_restore(client, name, key, value): wait_for_volume_creation(client, name) for i in range(RETRY_COUNTS): volume = client.by_id_volume(name) conditions = volume.conditions if conditions is not None and \ conditions != {} and \ VOLUME_CONDITION_RESTORE in conditions and \ conditions[VOLUME_CONDITION_RESTORE][key] and \ conditions[VOLUME_CONDITION_RESTORE][key] == value: break time.sleep(RETRY_INTERVAL) conditions = volume.conditions assert conditions[VOLUME_CONDITION_RESTORE][key] == value return volume
def wait_for_volume_condition_scheduled(client, name, key, value)
-
Expand source code
def wait_for_volume_condition_scheduled(client, name, key, value): wait_for_volume_creation(client, name) for i in range(RETRY_COUNTS): volume = client.by_id_volume(name) conditions = volume.conditions if conditions is not None and \ conditions != {} and \ conditions[VOLUME_CONDITION_SCHEDULED] and \ conditions[VOLUME_CONDITION_SCHEDULED][key] and \ conditions[VOLUME_CONDITION_SCHEDULED][key] == value: break time.sleep(RETRY_INTERVAL) conditions = volume.conditions assert conditions[VOLUME_CONDITION_SCHEDULED][key] == value, \ f" Expected value = {value}, " \ f" Conditions[{VOLUME_CONDITION_SCHEDULED}][{key}] = " \ f"{conditions[VOLUME_CONDITION_SCHEDULED][key]}, Volume = {volume}" return volume
def wait_for_volume_condition_toomanysnapshots(client, name, key, value, expected_message=None)
-
Expand source code
def wait_for_volume_condition_toomanysnapshots(client, name, key, value, expected_message=None): wait_for_volume_creation(client, name) for _ in range(RETRY_COUNTS): volume = client.by_id_volume(name) conditions = volume.conditions if conditions is not None and \ conditions != {} and \ VOLUME_CONDITION_TOOMANYSNAPSHOTS in conditions and \ conditions[VOLUME_CONDITION_TOOMANYSNAPSHOTS][key] and \ conditions[VOLUME_CONDITION_TOOMANYSNAPSHOTS][key] == value: if expected_message is not None: current_message = \ conditions[VOLUME_CONDITION_TOOMANYSNAPSHOTS]['message'] if current_message == expected_message: break else: break time.sleep(RETRY_INTERVAL) conditions = volume.conditions assert conditions[VOLUME_CONDITION_TOOMANYSNAPSHOTS][key] == value if expected_message is not None: current_message = \ conditions[VOLUME_CONDITION_TOOMANYSNAPSHOTS]['message'] assert current_message == expected_message, \ f"Expected message = {expected_message},\n" \ f"but get '{current_message}'\n" return volume
def wait_for_volume_creation(client, name)
-
Expand source code
def wait_for_volume_creation(client, name): for i in range(RETRY_COUNTS): volumes = client.list_volume() found = False for volume in volumes: if volume.name == name: found = True break if found: break time.sleep(RETRY_INTERVAL) assert found
def wait_for_volume_current_image(client, name, image)
-
Expand source code
def wait_for_volume_current_image(client, name, image): wait_for_volume_creation(client, name) for i in range(RETRY_COUNTS): volume = client.by_id_volume(name) if volume.currentImage == image: break time.sleep(RETRY_INTERVAL) assert volume.currentImage == image return volume
def wait_for_volume_degraded(client, name)
-
Expand source code
def wait_for_volume_degraded(client, name): wait_for_volume_status(client, name, VOLUME_FIELD_STATE, VOLUME_STATE_ATTACHED) return wait_for_volume_status(client, name, VOLUME_FIELD_ROBUSTNESS, VOLUME_ROBUSTNESS_DEGRADED)
def wait_for_volume_delete(client, name)
-
Expand source code
def wait_for_volume_delete(client, name): for i in range(RETRY_COUNTS): volumes = client.list_volume() found = False for volume in volumes: if volume.name == name: found = True break if not found: break time.sleep(RETRY_INTERVAL) assert not found
def wait_for_volume_detached(client, name)
-
Expand source code
def wait_for_volume_detached(client, name): return wait_for_volume_status(client, name, VOLUME_FIELD_STATE, VOLUME_STATE_DETACHED)
def wait_for_volume_detached_unknown(client, name)
-
Expand source code
def wait_for_volume_detached_unknown(client, name): wait_for_volume_status(client, name, VOLUME_FIELD_ROBUSTNESS, VOLUME_ROBUSTNESS_UNKNOWN) return wait_for_volume_detached(client, name)
def wait_for_volume_endpoint(client, name)
-
Expand source code
def wait_for_volume_endpoint(client, name): for i in range(RETRY_COUNTS): v = client.by_id_volume(name) engine = get_volume_engine(v) if engine.endpoint != "": break time.sleep(RETRY_INTERVAL) check_volume_endpoint(v) return v
def wait_for_volume_expansion(longhorn_api_client, volume_name, expected_size='')
-
Expand source code
def wait_for_volume_expansion(longhorn_api_client, volume_name, expected_size=""): complete = False for i in range(RETRY_COUNTS): volume = longhorn_api_client.by_id_volume(volume_name) engine = get_volume_engine(volume) if expected_size != "" and engine.size != expected_size: time.sleep(RETRY_INTERVAL) continue if engine.size == volume.size: complete = True break time.sleep(RETRY_INTERVAL) assert complete
def wait_for_volume_faulted(client, name)
-
Expand source code
def wait_for_volume_faulted(client, name): # Comment out detach status check because status transition # were too fast recently # wait_for_volume_status(client, name, # VOLUME_FIELD_STATE, # VOLUME_STATE_DETACHED) return wait_for_volume_status(client, name, VOLUME_FIELD_ROBUSTNESS, VOLUME_ROBUSTNESS_FAULTED)
def wait_for_volume_frontend_disabled(client, volume_name, state=True)
-
Expand source code
def wait_for_volume_frontend_disabled(client, volume_name, state=True): for _ in range(RETRY_COUNTS): vol = client.by_id_volume(volume_name) try: assert vol.disableFrontend is state break except AssertionError: time.sleep(RETRY_INTERVAL)
def wait_for_volume_healthy(client, name, retry_count=150)
-
Expand source code
def wait_for_volume_healthy(client, name, retry_count=RETRY_COUNTS): wait_for_volume_status(client, name, VOLUME_FIELD_STATE, VOLUME_STATE_ATTACHED, retry_count) wait_for_volume_status(client, name, VOLUME_FIELD_ROBUSTNESS, VOLUME_ROBUSTNESS_HEALTHY, retry_count) return wait_for_volume_endpoint(client, name)
def wait_for_volume_healthy_no_frontend(client, name)
-
Expand source code
def wait_for_volume_healthy_no_frontend(client, name): wait_for_volume_status(client, name, VOLUME_FIELD_STATE, VOLUME_STATE_ATTACHED) return wait_for_volume_status(client, name, VOLUME_FIELD_ROBUSTNESS, VOLUME_ROBUSTNESS_HEALTHY)
def wait_for_volume_migration_node(client, volume_name, node_id, expected_replica_count=-1)
-
Expand source code
def wait_for_volume_migration_node(client, volume_name, node_id, expected_replica_count=-1): ready = False for i in range(RETRY_COUNTS): v = client.by_id_volume(volume_name) if expected_replica_count == -1: expected_replica_count = v.numberOfReplicas assert expected_replica_count >= 0 engines = v.controllers replicas = v.replicas if len(engines) == 1 and len(replicas) == expected_replica_count: e = engines[0] if e.endpoint != "": assert e.hostId == node_id ready = True break time.sleep(RETRY_INTERVAL) assert ready return v
def wait_for_volume_migration_ready(client, volume_name)
-
Expand source code
def wait_for_volume_migration_ready(client, volume_name): ready = False for i in range(RETRY_COUNTS): v = client.by_id_volume(volume_name) engines = v.controllers ready = len(engines) == 2 for e in engines: ready = ready and e.endpoint != "" if ready: break time.sleep(RETRY_INTERVAL) assert ready return v
def wait_for_volume_option_trim_auto_removing_snapshots(client, volume_name, enabled)
-
Expand source code
def wait_for_volume_option_trim_auto_removing_snapshots(client, volume_name, enabled): # NOQA for i in range(RETRY_COUNTS_SHORT): volume = client.by_id_volume(volume_name) if volume.controllers[0].unmapMarkSnapChainRemovedEnabled == enabled: break time.sleep(RETRY_INTERVAL) assert volume.controllers[0].unmapMarkSnapChainRemovedEnabled == enabled return volume
def wait_for_volume_recurring_job_update(volume, jobs=[], groups=[])
-
Expand source code
def wait_for_volume_recurring_job_update(volume, jobs=[], groups=[]): ok = False for _ in range(RETRY_COUNTS): volumeJobs, volumeGroups = get_volume_recurring_jobs_and_groups(volume) try: assert len(volumeGroups) == len(groups) for group in groups: assert group in volumeGroups assert len(volumeJobs) == len(jobs) for job in jobs: assert job in volumeJobs ok = True break except AssertionError: time.sleep(RETRY_INTERVAL) assert ok
def wait_for_volume_replica_auto_balance_update(client, volume_name, value)
-
Expand source code
def wait_for_volume_replica_auto_balance_update(client, volume_name, value): wait_for_volume_creation(client, volume_name) for i in range(RETRY_COUNTS): volume = client.by_id_volume(volume_name) if volume.replicaAutoBalance == value: break time.sleep(RETRY_INTERVAL) assert volume.replicaAutoBalance == value return volume
def wait_for_volume_replica_count(client, name, count)
-
Expand source code
def wait_for_volume_replica_count(client, name, count): wait_for_volume_creation(client, name) for i in range(RETRY_COUNTS): volume = client.by_id_volume(name) if len(volume.replicas) == count: break time.sleep(RETRY_INTERVAL) assert len(volume.replicas) == count return volume
def wait_for_volume_replica_rebuilt_on_same_node_different_disk(client, node_name, volume_name, old_disk_name)
-
Expand source code
def wait_for_volume_replica_rebuilt_on_same_node_different_disk(client, node_name, volume_name, old_disk_name): # NOQA new_disk_name = "" for _ in range(RETRY_COUNTS_SHORT): time.sleep(RETRY_INTERVAL_LONG) node = client.by_id_node(node_name) disks = node.disks new_disk_name = "" for name, disk in disks.items(): # if scheduledReplica has prefix of volume-name for scheduledReplica, _ in disk.scheduledReplica.items(): if scheduledReplica.startswith(volume_name): new_disk_name = name break if new_disk_name != old_disk_name: break assert new_disk_name != old_disk_name, \ "Failed to rebuild replica disk to another disk"
def wait_for_volume_replicas_mode(client, volname, mode, replica_names=None, replica_count=None)
-
Expand source code
def wait_for_volume_replicas_mode(client, volname, mode, replica_names=None, replica_count=None): verified = False for _ in range(RETRY_COUNTS): replicas = [] volume = client.by_id_volume(volname) if replica_names is None: replicas = volume.replicas else: for r_name in replica_names: found = False for r in volume.replicas: if r.name == r_name: replicas.append(r) found = True assert found count = 0 wo_count = 0 for r in replicas: if r.mode == mode: count += 1 if r.mode == 'WO': wo_count += 1 assert wo_count <= VOLUME_REPLICA_WO_LIMIT r_count = len(replicas) if replica_count is None else replica_count if count == r_count: verified = True break time.sleep(RETRY_INTERVAL) assert verified return volume
def wait_for_volume_replicas_running_on_hosts(client, volume_name, host_ids, replica_balanced)
-
Expand source code
def wait_for_volume_replicas_running_on_hosts(client, volume_name, host_ids, replica_balanced): hosts = list(host_ids) for i in range(RETRY_COUNTS): hosts = list(host_ids) num_running = 0 volume = client.by_id_volume(volume_name) for replica in volume.replicas: if not replica.running: continue if replica.hostId not in hosts: continue if replica_balanced: hosts.remove(replica.hostId) num_running += 1 if num_running == volume.numberOfReplicas: break time.sleep(RETRY_INTERVAL) assert num_running == volume.numberOfReplicas return volume
def wait_for_volume_restoration_completed(client, name)
-
Expand source code
def wait_for_volume_restoration_completed(client, name): wait_for_volume_creation(client, name) wait_for_restoration_start(client, name) monitor_restore_progress(client, name) return wait_for_volume_status(client, name, VOLUME_FIELD_RESTOREREQUIRED, False)
def wait_for_volume_restoration_start(client, volume_name, backup_name, progress=0)
-
Expand source code
def wait_for_volume_restoration_start(client, volume_name, backup_name, progress=0): wait_for_volume_status(client, volume_name, VOLUME_FIELD_STATE, VOLUME_STATE_ATTACHED) started = False for i in range(RETRY_COUNTS): volume = client.by_id_volume(volume_name) for status in volume.restoreStatus: if status.state == "in_progress" and \ status.progress > progress: started = True break # Sometime the restore time is pretty short # and the test may not be able to catch the intermediate status. if volume.controllers[0].lastRestoredBackup == backup_name: started = True if started: break time.sleep(RETRY_INTERVAL_SHORT) assert started return status.replica
def wait_for_volume_status(client, name, key, value, retry_count=360)
-
Expand source code
def wait_for_volume_status(client, name, key, value, retry_count=RETRY_COUNTS_LONG): wait_for_volume_creation(client, name) for i in range(retry_count): volume = client.by_id_volume(name) if volume[key] == value: break time.sleep(RETRY_INTERVAL) assert volume[key] == value, f" value={value}\n. \ volume[key]={volume[key]}\n. volume={volume}" return volume
def wait_longhorn_node_zone_reset(client)
-
Expand source code
def wait_longhorn_node_zone_reset(client): lh_nodes = client.list_node() node_names = map(lambda node: node.name, lh_nodes) for node_name in node_names: for j in range(RETRY_COUNTS): lh_node = client.by_id_node(node_name) if lh_node.zone == '': break time.sleep(RETRY_INTERVAL) assert lh_node.zone == ''
def wait_pod(pod_name)
-
Expand source code
def wait_pod(pod_name): api = get_core_api_client() pod = None for i in range(DEFAULT_POD_TIMEOUT): try: pod = api.read_namespaced_pod( name=pod_name, namespace='default') if pod is not None and pod.status.phase != 'Pending': break except Exception as e: print(f"Waiting for pod {pod_name} failed: {e}") time.sleep(DEFAULT_POD_INTERVAL) assert pod is not None and pod.status.phase == 'Running'
def wait_pod_attach_after_first_backup_completion(client, core_api, volume_name, label_name)
-
Expand source code
def wait_pod_attach_after_first_backup_completion( client, core_api, volume_name, label_name): completed = False for _ in range(RETRY_BACKUP_COUNTS): vol = client.by_id_volume(volume_name) for b in vol.backupStatus: if b.state == 'Completed': assert b.progress == 100 assert b.error == '' completed = True break if completed: wait_for_volume_frontend_disabled(client, vol.name, False) wait_for_volume_attached(client, vol.name) break label_selector = "name=" + label_name pods = core_api.list_namespaced_pod(namespace="default", label_selector=label_selector) for pod in pods.items: assert pod.status.phase != 'Running' assert vol.disableFrontend is True time.sleep(RETRY_BACKUP_INTERVAL) assert completed is True return vol
def wait_scheduling_failure(client, volume_name)
-
Expand source code
def wait_scheduling_failure(client, volume_name): """ Wait and make sure no new replicas are running on the specified volume. Trigger a failed assertion of one is detected. :param client: The Longhorn client to use in the request. :param volume_name: The name of the volume. """ scheduling_failure = False for i in range(RETRY_COUNTS): v = client.by_id_volume(volume_name) if v.conditions.Scheduled.status == "False" and \ v.conditions.Scheduled.reason == \ "ReplicaSchedulingFailure": scheduling_failure = True if scheduling_failure: break time.sleep(RETRY_INTERVAL) assert scheduling_failure, f" Scheduled Status = " \ f"{v.conditions.Scheduled.status}, Scheduled reason = " \ f"{v.conditions.Scheduled.reason}, volume = {v}"
Wait and make sure no new replicas are running on the specified volume. Trigger a failed assertion of one is detected. :param client: The Longhorn client to use in the request. :param volume_name: The name of the volume.
def wait_statefulset(statefulset_manifest)
-
Expand source code
def wait_statefulset(statefulset_manifest): api = get_apps_api_client() replicas = statefulset_manifest['spec']['replicas'] for i in range(DEFAULT_STATEFULSET_TIMEOUT): s_set = api.read_namespaced_stateful_set( name=statefulset_manifest['metadata']['name'], namespace='default') # s_set is none if statefulset is not yet created if s_set is not None and s_set.status.ready_replicas == replicas: break time.sleep(DEFAULT_STATEFULSET_INTERVAL) assert s_set.status.ready_replicas == replicas
def wait_volume_kubernetes_status(client, volume_name, expect_ks)
-
Expand source code
def wait_volume_kubernetes_status(client, volume_name, expect_ks): for i in range(RETRY_COUNTS): expected = True volume = client.by_id_volume(volume_name) ks = volume.kubernetesStatus ks = json.loads(json.dumps(ks, default=lambda o: o.__dict__)) for k, v in expect_ks.items(): if k in ('lastPVCRefAt', 'lastPodRefAt'): if (v != '' and ks[k] == '') or \ (v == '' and ks[k] != ''): expected = False break else: if ks[k] != v: expected = False break if expected: break time.sleep(RETRY_INTERVAL) assert expected
def write_device_random_data(dev, position={})
-
Expand source code
def write_device_random_data(dev, position={}): data = generate_random_data(VOLUME_RWTEST_SIZE) data_pos = generate_random_pos(VOLUME_RWTEST_SIZE, position) data_len = dev_write(dev, data_pos, data) checksum = get_device_checksum(dev) return { 'content': data, 'pos': data_pos, 'len': data_len, 'checksum': checksum }
def write_pod_block_volume_data(api, pod_name, test_data, offset, device_path)
-
Expand source code
def write_pod_block_volume_data(api, pod_name, test_data, offset, device_path): tmp_file = '/var/test_data' pre_write_cmd = [ '/bin/sh', '-c', 'echo -ne ' + test_data + ' > ' + tmp_file ] write_cmd = [ '/bin/sh', '-c', 'dd if=' + tmp_file + ' of=' + device_path + ' bs=' + str(len(test_data)) + ' count=1 seek=' + str(offset) ] with timeout(seconds=STREAM_EXEC_TIMEOUT, error_message='Timeout on executing stream write'): stream(api.connect_get_namespaced_pod_exec, pod_name, 'default', command=pre_write_cmd, stderr=True, stdin=False, stdout=True, tty=False) return stream( api.connect_get_namespaced_pod_exec, pod_name, 'default', command=write_cmd, stderr=True, stdin=False, stdout=True, tty=False)
def write_pod_volume_data(api, pod_name, test_data, filename='test')
-
Expand source code
def write_pod_volume_data(api, pod_name, test_data, filename='test'): """ Write data into a Pod's volume. Args: api: An instance of CoreV1API. pod_name: The name of the Pod. test_data: The data to be written. """ write_command = [ '/bin/sh', '-c', 'echo -ne ' + test_data + ' > /data/' + filename ] with timeout(seconds=STREAM_EXEC_TIMEOUT, error_message='Timeout on executing stream write'): return stream( api.connect_get_namespaced_pod_exec, pod_name, 'default', command=write_command, stderr=True, stdin=False, stdout=True, tty=False)
Write data into a Pod's volume.
Args
api
- An instance of CoreV1API.
pod_name
- The name of the Pod.
test_data
- The data to be written.
def write_pod_volume_random_data(api, pod_name, path, size_in_mb)
-
Expand source code
def write_pod_volume_random_data(api, pod_name, path, size_in_mb): write_cmd = [ '/bin/sh', '-c', 'dd if=/dev/urandom of=' + path + ' bs=1M' + ' count=' + str(size_in_mb) + '; sync' ] return stream( api.connect_get_namespaced_pod_exec, pod_name, 'default', command=write_cmd, stderr=True, stdin=False, stdout=True, tty=False)
def write_volume_data(volume, data)
-
Expand source code
def write_volume_data(volume, data): dev = get_volume_endpoint(volume) data_len = dev_write(dev, data['pos'], data['content']) checksum = get_device_checksum(dev) return { 'content': data['content'], 'pos': data['pos'], 'len': data_len, 'checksum': checksum }
def write_volume_dev_random_mb_data(path, offset_in_mb, length_in_mb, timeout_cnt=3)
-
Expand source code
def write_volume_dev_random_mb_data(path, offset_in_mb, length_in_mb, timeout_cnt=3): write_cmd = [ '/bin/sh', '-c', 'dd if=/dev/urandom of=%s bs=1M seek=%d count=%d' % (path, offset_in_mb, length_in_mb) ] with timeout(seconds=STREAM_EXEC_TIMEOUT * timeout_cnt, error_message='Timeout on writing dev'): subprocess.check_call(write_cmd)
def write_volume_random_data(volume, position={})
-
Expand source code
def write_volume_random_data(volume, position={}): dev = get_volume_endpoint(volume) return write_device_random_data(dev, position=position)
Classes
class AssertErrorCheckThread (target, args)
-
Expand source code
class AssertErrorCheckThread(threading.Thread): """ This class is used for catching exception caused in threads, especially for AssertionError now. Parameters: target : The threading function. args : Arguments of the target function. """ def __init__(self, target, args): threading.Thread.__init__(self) self.target = target self.args = args self.asserted = None def run(self): try: self.target(*self.args) except AssertionError as e: self.asserted = e def join(self): threading.Thread.join(self) if self.asserted: raise self.asserted
This class is used for catching exception caused in threads, especially for AssertionError now.
Parameters
target : The threading function. args : Arguments of the target function.
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.
Ancestors
- threading.Thread
Methods
def join(self)
-
Expand source code
def join(self): threading.Thread.join(self) if self.asserted: raise self.asserted
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating-point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
def run(self)
-
Expand source code
def run(self): try: self.target(*self.args) except AssertionError as e: self.asserted = e
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
class timeout (seconds=1, error_message='Timeout')
-
Expand source code
class timeout: def __init__(self, seconds=1, error_message='Timeout'): self.seconds = seconds self.error_message = error_message def handle_timeout(self, signum, frame): raise Exception(self.error_message) def __enter__(self): signal.signal(signal.SIGALRM, self.handle_timeout) signal.alarm(self.seconds) def __exit__(self, type, value, traceback): signal.alarm(0)
Methods
def handle_timeout(self, signum, frame)
-
Expand source code
def handle_timeout(self, signum, frame): raise Exception(self.error_message)