Module tests.test_csi
Functions
def backupstore_test(client, core_api, csi_pv, pvc, pod_make, pod_name, vol_name, backing_image, test_data)
-
Expand source code
def backupstore_test(client, core_api, csi_pv, pvc, pod_make, pod_name, vol_name, backing_image, test_data): # NOQA write_pod_volume_data(core_api, pod_name, test_data) volume = client.by_id_volume(vol_name) snap = create_snapshot(client, vol_name) volume.snapshotBackup(name=snap.name) common.wait_for_backup_completion(client, vol_name, snap.name) bv, b = common.find_backup(client, vol_name, snap.name) pod2_name = 'csi-backup-test-2' vol2_name = create_and_wait_csi_pod( pod2_name, client, core_api, csi_pv, pvc, pod_make, backing_image, b.url) volume2 = client.by_id_volume(vol2_name) resp = read_volume_data(core_api, pod2_name) assert resp == test_data delete_backup(client, bv, b.name) delete_and_wait_pod(core_api, pod2_name) client.delete(volume2)
def create_and_verify_block_volume(client, core_api, storage_class, pvc, pod_manifest, is_rwx)
-
Expand source code
def create_and_verify_block_volume(client, core_api, storage_class, pvc, pod_manifest, is_rwx): # NOQA pod_name = 'csi-block-volume-test' pvc_name = pod_name + "-pvc" device_path = "/dev/longhorn/longhorn-test-blk" pvc['metadata']['name'] = pvc_name pvc['spec']['volumeMode'] = 'Block' pvc['spec']['storageClassName'] = storage_class['metadata']['name'] if is_rwx: pvc['spec']['accessModes'] = ['ReadWriteMany'] pvc['spec']['resources'] = { 'requests': { 'storage': size_to_string(1 * Gi) } } pod_manifest['metadata']['name'] = pod_name pod_manifest['spec']['volumes'] = [{ 'name': 'longhorn-blk', 'persistentVolumeClaim': { 'claimName': pvc_name, }, }] pod_manifest['spec']['containers'][0]['volumeMounts'] = [] pod_manifest['spec']['containers'][0]['volumeDevices'] = [ {'name': 'longhorn-blk', 'devicePath': device_path} ] create_pvc(pvc) pv_name = wait_and_get_pv_for_pvc(core_api, pvc_name).metadata.name create_and_wait_pod(core_api, pod_manifest) test_data = generate_random_data(VOLUME_RWTEST_SIZE) test_offset = random.randint(0, VOLUME_RWTEST_SIZE) write_pod_block_volume_data( core_api, pod_name, test_data, test_offset, device_path) returned_data = read_pod_block_volume_data( core_api, pod_name, len(test_data), test_offset, device_path ) assert test_data == returned_data md5_sum = get_pod_data_md5sum( core_api, pod_name, device_path) delete_and_wait_pod(core_api, pod_name) common.wait_for_volume_detached(client, pv_name) pod_name_2 = 'csi-block-volume-test-reuse' pod_manifest['metadata']['name'] = pod_name_2 create_and_wait_pod(core_api, pod_manifest) returned_data = read_pod_block_volume_data( core_api, pod_name_2, len(test_data), test_offset, device_path ) assert test_data == returned_data md5_sum_2 = get_pod_data_md5sum( core_api, pod_name_2, device_path) assert md5_sum == md5_sum_2 delete_and_wait_pod(core_api, pod_name_2) delete_and_wait_pvc(core_api, pvc_name) delete_and_wait_pv(core_api, pv_name) wait_delete_dm_device(core_api, pv_name)
def create_and_wait_csi_pod(pod_name, client, core_api, csi_pv, pvc, pod_make, backing_image, from_backup)
-
Expand source code
def create_and_wait_csi_pod(pod_name, client, core_api, csi_pv, pvc, pod_make, backing_image, from_backup): # NOQA volume_name = generate_volume_name() create_and_wait_csi_pod_named_pv(volume_name, pod_name, client, core_api, csi_pv, pvc, pod_make, backing_image, from_backup) return volume_name
def create_and_wait_csi_pod_named_pv(pv_name, pod_name, client, core_api, csi_pv, pvc, pod_make, backing_image, from_backup)
-
Expand source code
def create_and_wait_csi_pod_named_pv(pv_name, pod_name, client, core_api, csi_pv, pvc, pod_make, backing_image, from_backup): # NOQA pod = pod_make(name=pod_name) pod['spec']['volumes'] = [ create_pvc_spec(pv_name) ] csi_pv['metadata']['name'] = pv_name csi_pv['spec']['csi']['volumeHandle'] = pv_name csi_pv['spec']['csi']['volumeAttributes']['fromBackup'] = from_backup pvc['metadata']['name'] = pv_name pvc['spec']['volumeName'] = pv_name update_storageclass_references(CSI_PV_TEST_STORAGE_NAME, csi_pv, pvc) create_pv_storage(core_api, client, csi_pv, pvc, backing_image, from_backup) create_and_wait_pod(core_api, pod)
def create_pv_storage(api, cli, pv, claim, backing_image, from_backup)
-
Expand source code
def create_pv_storage(api, cli, pv, claim, backing_image, from_backup): """ Manually create a new PV and PVC for testing. """ cli.create_volume( name=pv['metadata']['name'], size=pv['spec']['capacity']['storage'], numberOfReplicas=int(pv['spec']['csi']['volumeAttributes'] ['numberOfReplicas']), backingImage=backing_image, fromBackup=from_backup, dataEngine=DATA_ENGINE) if from_backup: common.wait_for_volume_restoration_completed(cli, pv['metadata']['name']) common.wait_for_volume_detached(cli, pv['metadata']['name']) api.create_persistent_volume(pv) api.create_namespaced_persistent_volume_claim( body=claim, namespace='default')
Manually create a new PV and PVC for testing.
def csi_backup_test(client, core_api, csi_pv, pvc, pod_make, backing_image='')
-
Expand source code
def csi_backup_test(client, core_api, csi_pv, pvc, pod_make, backing_image=""): # NOQA pod_name = 'csi-backup-test' vol_name = create_and_wait_csi_pod( pod_name, client, core_api, csi_pv, pvc, pod_make, backing_image, "") test_data = generate_random_data(VOLUME_RWTEST_SIZE) backupstore_test(client, core_api, csi_pv, pvc, pod_make, pod_name, vol_name, backing_image, test_data) delete_and_wait_pod(core_api, pod_name) delete_and_wait_pvc(core_api, vol_name) delete_and_wait_pv(core_api, vol_name)
def csi_io_test(client, core_api, csi_pv, pvc, pod_make, backing_image='')
-
Expand source code
def csi_io_test(client, core_api, csi_pv, pvc, pod_make, backing_image=""): # NOQA pv_name = generate_volume_name() pod_name = 'csi-io-test' create_and_wait_csi_pod_named_pv(pv_name, pod_name, client, core_api, csi_pv, pvc, pod_make, backing_image, "") test_data = generate_random_data(VOLUME_RWTEST_SIZE) write_pod_volume_data(core_api, pod_name, test_data) delete_and_wait_pod(core_api, pod_name) common.wait_for_volume_detached(client, csi_pv['metadata']['name']) pod_name = 'csi-io-test-2' pod = pod_make(name=pod_name) pod['spec']['volumes'] = [ create_pvc_spec(pv_name) ] csi_pv['metadata']['name'] = pv_name csi_pv['spec']['csi']['volumeHandle'] = pv_name pvc['metadata']['name'] = pv_name pvc['spec']['volumeName'] = pv_name update_storageclass_references(CSI_PV_TEST_STORAGE_NAME, csi_pv, pvc) create_and_wait_pod(core_api, pod) resp = read_volume_data(core_api, pod_name) assert resp == test_data delete_and_wait_pod(core_api, pod_name) delete_and_wait_pvc(core_api, pvc['metadata']['name']) delete_and_wait_pv(core_api, pv_name)
def csi_mount_test(client, core_api, csi_pv, pvc, pod_make, volume_size, backing_image='')
-
Expand source code
def csi_mount_test(client, core_api, csi_pv, pvc, pod_make, # NOQA volume_size, backing_image=""): # NOQA pod_name = 'csi-mount-test' create_and_wait_csi_pod(pod_name, client, core_api, csi_pv, pvc, pod_make, backing_image, "") volumes = client.list_volume().data assert len(volumes) == 1 assert volumes[0].name == csi_pv['metadata']['name'] assert volumes[0].size == str(volume_size) assert volumes[0].numberOfReplicas == \ int(csi_pv['spec']['csi']['volumeAttributes']["numberOfReplicas"]) assert volumes[0].state == "attached" assert volumes[0].backingImage == backing_image delete_and_wait_pod(core_api, pod_name) delete_and_wait_pvc(core_api, pvc['metadata']['name']) delete_and_wait_pv(core_api, csi_pv['metadata']['name'])
def md5sum_thread(pod_name, destination_in_pod)
-
Expand source code
def md5sum_thread(pod_name, destination_in_pod): ''' For test case test_csi_block_volume_online_expansion and test_csi_mount_volume_online_expansion use. Use a new api instance in thread or when this threading is still running, execute other k8s command in main thread will hit error Handshake status 200 OK ''' k8s_api = k8sclient.CoreV1Api() command = ["md5sum", destination_in_pod] resp = stream(k8s_api.connect_get_namespaced_pod_exec, pod_name, namespace="default", command=command, stderr=True, stdin=False, stdout=True, tty=False) return resp.strip().split(" ")[0]
For test case test_csi_block_volume_online_expansion and test_csi_mount_volume_online_expansion use.
Use a new api instance in thread or when this threading is still running, execute other k8s command in main thread will hit error Handshake status 200 OK
def test_allow_volume_creation_with_degraded_availability_csi(client, core_api, apps_api, make_deployment_with_pvc)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.coretest # NOQA def test_allow_volume_creation_with_degraded_availability_csi( client, core_api, apps_api, make_deployment_with_pvc): # NOQA """ Test Allow Volume Creation with Degraded Availability (CSI) Requirement: 1. Set `allow-volume-creation-with-degraded-availability` to true. 2. Set `node-level-soft-anti-affinity` to false. Steps: 1. Disable scheduling for node 3. 2. Create a Deployment Pod with a volume and 3 replicas. 1. After the volume is attached, scheduling error should be seen. 3. Write data to the Pod. 4. Scale down the deployment to 0 to detach the volume. 1. Scheduled condition should become true. 5. Scale up the deployment back to 1 and verify the data. 1. Scheduled condition should become false. 6. Enable the scheduling for node 3. 1. Volume should start rebuilding on the node 3 soon. 2. Once the rebuilding starts, the scheduled condition should become true. 7. Once rebuild finished, scale down and back the deployment to verify the data. """ setting = client.by_id_setting(common.SETTING_DEGRADED_AVAILABILITY) client.update(setting, value="true") setting = client.by_id_setting(SETTING_REPLICA_NODE_SOFT_ANTI_AFFINITY) client.update(setting, value="false") nodes = client.list_node() node3 = nodes[2] client.update(node3, allowScheduling=False) vol = common.create_and_check_volume(client, generate_volume_name(), size=str(500 * Mi)) pv_name = vol.name + "-pv" common.create_pv_for_volume(client, core_api, vol, pv_name) pvc_name = vol.name + "-pvc" common.create_pvc_for_volume(client, core_api, vol, pvc_name) deployment_name = vol.name + "-dep" deployment = make_deployment_with_pvc(deployment_name, pvc_name) deployment["spec"]["replicas"] = 3 apps_api.create_namespaced_deployment(body=deployment, namespace='default') common.wait_for_volume_status(client, vol.name, common.VOLUME_FIELD_STATE, common.VOLUME_STATE_ATTACHED) common.wait_scheduling_failure(client, vol.name) data_path = "/data/test" pod = common.wait_and_get_any_deployment_pod(core_api, deployment_name) common.write_pod_volume_random_data(core_api, pod.metadata.name, data_path, common.DATA_SIZE_IN_MB_2) created_md5sum = get_pod_data_md5sum(core_api, pod.metadata.name, data_path) deployment['spec']['replicas'] = 0 apps_api.patch_namespaced_deployment(body=deployment, namespace='default', name=deployment_name) vol = common.wait_for_volume_detached(client, vol.name) # The volume condition cannot change in the same reconcile loop as the # volume state changes to detached. We need to wait for the condition # change instead of just checking it once directly. common.wait_for_volume_condition_scheduled(client, vol.name, "status", common.CONDITION_STATUS_TRUE) deployment['spec']['replicas'] = 1 apps_api.patch_namespaced_deployment(body=deployment, namespace='default', name=deployment_name) common.wait_for_volume_status(client, vol.name, common.VOLUME_FIELD_STATE, common.VOLUME_STATE_ATTACHED) common.wait_for_volume_condition_scheduled(client, vol.name, "status", common.CONDITION_STATUS_FALSE) # This is a workaround, this flaky step is because sometimes, pod # just changed to running state can not get data # https://github.com/longhorn/longhorn-tests/pull/999#discussion_r902125882 # NOQA for i in range(common.RETRY_COMMAND_COUNT): try: pod = common.wait_and_get_any_deployment_pod(core_api, deployment_name) pod_md5 = get_pod_data_md5sum(core_api, pod.metadata.name, data_path) except Exception as e: print(e) time.sleep(common.RETRY_INTERVAL) continue assert created_md5sum == pod_md5 client.update(node3, allowScheduling=True) vol = client.by_id_volume(vol.name) common.wait_for_rebuild_complete(client, vol.name) deployment['spec']['replicas'] = 0 apps_api.patch_namespaced_deployment(body=deployment, namespace='default', name=deployment_name) common.wait_for_volume_detached(client, vol.name) deployment['spec']['replicas'] = 1 apps_api.patch_namespaced_deployment(body=deployment, namespace='default', name=deployment_name) common.wait_for_volume_status(client, vol.name, common.VOLUME_FIELD_ROBUSTNESS, common.VOLUME_ROBUSTNESS_HEALTHY) pod = common.wait_and_get_any_deployment_pod(core_api, deployment_name) assert created_md5sum == get_pod_data_md5sum(core_api, pod.metadata.name, data_path)
Test Allow Volume Creation with Degraded Availability (CSI)
Requirement: 1. Set
allow-volume-creation-with-degraded-availability
to true. 2. Setnode-level-soft-anti-affinity
to false.Steps: 1. Disable scheduling for node 3. 2. Create a Deployment Pod with a volume and 3 replicas. 1. After the volume is attached, scheduling error should be seen. 3. Write data to the Pod. 4. Scale down the deployment to 0 to detach the volume. 1. Scheduled condition should become true. 5. Scale up the deployment back to 1 and verify the data. 1. Scheduled condition should become false. 6. Enable the scheduling for node 3. 1. Volume should start rebuilding on the node 3 soon. 2. Once the rebuilding starts, the scheduled condition should become true. 7. Once rebuild finished, scale down and back the deployment to verify the data.
def test_csi_backup(set_random_backupstore, client, core_api, csi_pv, pvc, pod_make)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.csi # NOQA def test_csi_backup(set_random_backupstore, client, core_api, csi_pv, pvc, pod_make): # NOQA """ Test that backup/restore works with volumes created by CSI driver. Run the test for all the backupstores 1. Create PV/PVC/Pod using dynamic provisioned volume 2. Write data and create snapshot using Longhorn API 3. Verify the existence of backup 4. Create another Pod using restored backup 5. Verify the data in the new Pod """ csi_backup_test(client, core_api, csi_pv, pvc, pod_make)
Test that backup/restore works with volumes created by CSI driver.
Run the test for all the backupstores
- Create PV/PVC/Pod using dynamic provisioned volume
- Write data and create snapshot using Longhorn API
- Verify the existence of backup
- Create another Pod using restored backup
- Verify the data in the new Pod
def test_csi_block_volume(client, core_api, storage_class, pvc, pod_manifest)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.csi # NOQA def test_csi_block_volume(client, core_api, storage_class, pvc, pod_manifest): # NOQA """ Test CSI feature: raw block volume 1. Create a PVC with `volumeMode = Block` 2. Create a pod using the PVC to dynamic provision a volume 3. Verify the pod creation 4. Generate `test_data` and write to the block volume directly in the pod 5. Read the data back for validation 6. Delete the pod and create `pod2` to use the same volume 7. Validate the data in `pod2` is consistent with `test_data` """ storage_class['reclaimPolicy'] = 'Retain' create_storage_class(storage_class) create_and_verify_block_volume(client, core_api, storage_class, pvc, pod_manifest, False)
Test CSI feature: raw block volume
- Create a PVC with
volumeMode = Block
- Create a pod using the PVC to dynamic provision a volume
- Verify the pod creation
- Generate
test_data
and write to the block volume directly in the pod - Read the data back for validation
- Delete the pod and create
pod2
to use the same volume - Validate the data in
pod2
is consistent withtest_data
- Create a PVC with
def test_csi_block_volume_online_expansion(client, core_api, storage_class, pvc, pod_manifest)
-
Expand source code
@pytest.mark.csi # NOQA @pytest.mark.csi_expansion # NOQA def test_csi_block_volume_online_expansion(client, core_api, storage_class, pvc, pod_manifest): # NOQA """ Test CSI feature: online expansion for block volume 1. Create a new `storage_class` with `allowVolumeExpansion` set 2. Create PVC with access mode "block" and Pod with the new StorageClass 3. Use `dd` command copy data into volume block device. 4. During the copy, update pvc.spec.resources to expand the volume 5. Verify the volume expansion done using Longhorn API and Check the PVC & PV size 6. Wait for the copy complete. 7. Calculate the checksum for the copied data inside the block volume asynchronously. 8. During the calculation, update pvc.spec.resources to expand the volume again. 9. Wait for the calculation complete, then compare the checksum. 10. Do cleanup: Remove the original `test_data`as well as the pod and PVC. """ create_storage_class(storage_class) pod_dev_volume_path = "/dev/longhorn/testblk" pod_name = "csi-block-volume-online-expansion-test" pvc_name = pod_name + "-pvc" pvc['metadata']['name'] = pvc_name pvc['spec']['storageClassName'] = storage_class['metadata']['name'] pvc['spec']['volumeMode'] = "Block" create_pvc(pvc) del pod_manifest['spec']['containers'][0]['volumeMounts'] pod_manifest['metadata']['name'] = pod_name pod_manifest['spec']['containers'][0]['volumeDevices'] = [{ 'devicePath': pod_dev_volume_path, 'name': 'block-vol' }] pod_manifest['spec']['volumes'] = [{ "name": "block-vol", "persistentVolumeClaim": { "claimName": pvc_name } }] pod_name = pod_manifest['metadata']['name'] pod_manifest['spec']['nodeName'] = get_self_host_id() create_and_wait_pod(core_api, pod_manifest) volume_name = get_volume_name(core_api, pvc_name) dev_volume = "/dev/longhorn/{}".format(volume_name) cmd = ['dd', 'if=/dev/urandom', 'of=' + dev_volume, 'bs=1M', 'count=2500'] process = subprocess.Popen(cmd) expand_and_wait_for_pvc(core_api, pvc, EXPANDED_VOLUME_SIZE*Gi) wait_for_volume_expansion(client, volume_name) process.wait() if process.returncode == 0: write_data_complete = True else: write_data_complete = False assert write_data_complete subprocess.check_call(["sync"]) executor = ThreadPoolExecutor(max_workers=5) future = executor.submit(md5sum_thread, pod_name, pod_dev_volume_path) expand_and_wait_for_pvc(core_api, pvc, (EXPANDED_VOLUME_SIZE+1)*Gi) wait_for_volume_expansion(client, volume_name) thread_timeout = 120 try: pod_dev_md5 = future.result(timeout=thread_timeout) except TimeoutError: print("md5 thread exceed timeout ({})s".format(thread_timeout)) volume_dev_md5 = subprocess.check_output(["md5sum", dev_volume])\ .strip().decode('utf-8').split(" ")[0] assert pod_dev_md5 == volume_dev_md5
Test CSI feature: online expansion for block volume
- Create a new
storage_class
withallowVolumeExpansion
set - Create PVC with access mode "block" and Pod with the new StorageClass
- Use
dd
command copy data into volume block device. - During the copy, update pvc.spec.resources to expand the volume
- Verify the volume expansion done using Longhorn API and Check the PVC & PV size
- Wait for the copy complete.
- Calculate the checksum for the copied data inside the block volume asynchronously.
- During the calculation, update pvc.spec.resources to expand the volume again.
- Wait for the calculation complete, then compare the checksum.
- Do cleanup: Remove the original
test_data
as well as the pod and PVC.
- Create a new
def test_csi_encrypted_block_volume(client, core_api, storage_class, crypto_secret, pvc, pod_manifest)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.csi # NOQA def test_csi_encrypted_block_volume(client, core_api, storage_class, crypto_secret, pvc, pod_manifest): # NOQA """ Test CSI feature: encrypted block volume 1. Create a PVC with encrypted `volumeMode = Block` 2. Create a pod using the PVC to dynamic provision a volume 3. Verify the pod creation 4. Generate `test_data` and write to the block volume directly in the pod 5. Read the data back for validation 6. Delete the pod and create `pod2` to use the same volume 7. Validate the data in `pod2` is consistent with `test_data` """ secret = crypto_secret(LONGHORN_NAMESPACE) create_crypto_secret(secret) storage_class['reclaimPolicy'] = 'Retain' storage_class['parameters']['csi.storage.k8s.io/provisioner-secret-name'] = 'longhorn-crypto' # NOQA storage_class['parameters']['csi.storage.k8s.io/provisioner-secret-namespace'] = LONGHORN_NAMESPACE # NOQA storage_class['parameters']['csi.storage.k8s.io/node-publish-secret-name'] = 'longhorn-crypto' # NOQA storage_class['parameters']['csi.storage.k8s.io/node-publish-secret-namespace'] = LONGHORN_NAMESPACE # NOQA storage_class['parameters']['csi.storage.k8s.io/node-stage-secret-name'] = 'longhorn-crypto' # NOQA storage_class['parameters']['csi.storage.k8s.io/node-stage-secret-namespace'] = LONGHORN_NAMESPACE # NOQA storage_class['parameters']['encrypted'] = 'true' storage_class['parameters']['dataEngine'] = DATA_ENGINE create_storage_class(storage_class) create_and_verify_block_volume(client, core_api, storage_class, pvc, pod_manifest, False)
Test CSI feature: encrypted block volume
- Create a PVC with encrypted
volumeMode = Block
- Create a pod using the PVC to dynamic provision a volume
- Verify the pod creation
- Generate
test_data
and write to the block volume directly in the pod - Read the data back for validation
- Delete the pod and create
pod2
to use the same volume - Validate the data in
pod2
is consistent withtest_data
- Create a PVC with encrypted
def test_csi_encrypted_migratable_block_volume(client, core_api, storage_class, crypto_secret, pvc, pod_manifest)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.csi # NOQA def test_csi_encrypted_migratable_block_volume(client, core_api, storage_class, crypto_secret, pvc, pod_manifest): # NOQA """ Test CSI feature: encrypted migratable block volume Issue: https://github.com/longhorn/longhorn/issues/7678 1. Create a PVC with encrypted `volumeMode = Block` and `migratable = true` 2. Create a pod using the PVC to dynamic provision a volume 3. Verify the pod creation 4. Generate `test_data` and write to the block volume directly in the pod 5. Read the data back for validation 6. Delete the pod and create `pod2` to use the same volume 7. Validate the data in `pod2` is consistent with `test_data` """ secret = crypto_secret(LONGHORN_NAMESPACE) create_crypto_secret(secret) storage_class['reclaimPolicy'] = 'Retain' storage_class['parameters']['csi.storage.k8s.io/provisioner-secret-name'] = 'longhorn-crypto' # NOQA storage_class['parameters']['csi.storage.k8s.io/provisioner-secret-namespace'] = LONGHORN_NAMESPACE # NOQA storage_class['parameters']['csi.storage.k8s.io/node-publish-secret-name'] = 'longhorn-crypto' # NOQA storage_class['parameters']['csi.storage.k8s.io/node-publish-secret-namespace'] = LONGHORN_NAMESPACE # NOQA storage_class['parameters']['csi.storage.k8s.io/node-stage-secret-name'] = 'longhorn-crypto' # NOQA storage_class['parameters']['csi.storage.k8s.io/node-stage-secret-namespace'] = LONGHORN_NAMESPACE # NOQA storage_class['parameters']['migratable'] = 'true' storage_class['parameters']['encrypted'] = 'true' storage_class['parameters']['dataEngine'] = DATA_ENGINE create_storage_class(storage_class) create_and_verify_block_volume(client, core_api, storage_class, pvc, pod_manifest, True)
Test CSI feature: encrypted migratable block volume
Issue: https://github.com/longhorn/longhorn/issues/7678
- Create a PVC with encrypted
volumeMode = Block
andmigratable = true
- Create a pod using the PVC to dynamic provision a volume
- Verify the pod creation
- Generate
test_data
and write to the block volume directly in the pod - Read the data back for validation
- Delete the pod and create
pod2
to use the same volume - Validate the data in
pod2
is consistent withtest_data
- Create a PVC with encrypted
def test_csi_expansion_with_replica_failure(client, core_api, storage_class, pvc, pod_manifest)
-
Expand source code
@pytest.mark.coretest # NOQA def test_csi_expansion_with_replica_failure(client, core_api, storage_class, pvc, pod_manifest): # NOQA """ Test expansion success but with one replica expansion failure 1. Create a new `storage_class` with `allowVolumeExpansion` set 2. Create PVC and Pod with dynamic provisioned volume from the StorageClass 3. Create an empty directory with expansion snapshot tmp meta file path for one replica so that the replica expansion will fail 4. Generate `test_data` and write to the pod 5. Update pvc.spec.resources to expand the volume 6. Check expansion result using Longhorn API. There will be expansion error caused by the failed replica but overall the expansion should succeed. 7. Check if the volume will reuse the failed replica during rebuilding. 8. Validate the volume content, then check if data writing looks fine """ replenish_wait_setting = \ client.by_id_setting(SETTING_REPLICA_REPLENISHMENT_WAIT_INTERVAL) client.update(replenish_wait_setting, value="600") create_storage_class(storage_class) pod_name = 'csi-expansion-with-replica-failure-test' pvc_name = pod_name + "-pvc" pvc['metadata']['name'] = pvc_name pvc['spec']['storageClassName'] = storage_class['metadata']['name'] create_pvc(pvc) pod_manifest['metadata']['name'] = pod_name pod_manifest['spec']['volumes'] = [{ 'name': pod_manifest['spec']['containers'][0]['volumeMounts'][0]['name'], 'persistentVolumeClaim': {'claimName': pvc_name}, }] create_and_wait_pod(core_api, pod_manifest) expand_size = str(EXPANDED_VOLUME_SIZE*Gi) pv = wait_and_get_pv_for_pvc(core_api, pvc_name) assert pv.status.phase == "Bound" volume_name = pv.spec.csi.volume_handle volume = client.by_id_volume(volume_name) failed_replica = volume.replicas[0] fail_replica_expansion(client, core_api, volume_name, expand_size, [failed_replica]) test_data = generate_random_data(VOLUME_RWTEST_SIZE) write_pod_volume_data(core_api, pod_name, test_data) # There will be replica expansion error info # but the expansion should succeed. # Will not try to capture the transient error info. expand_and_wait_for_pvc(core_api, pvc, EXPANDED_VOLUME_SIZE*Gi) wait_for_volume_expansion(client, volume_name) volume = client.by_id_volume(volume_name) assert volume.size == expand_size # May not be able to capture the transient state. Best effort here. for r in volume.replicas: if r.failedAt != "": assert r.name == failed_replica.name # Check if the failed replica will be reused during rebuilding, # and if the volume still works fine. volume = wait_for_volume_healthy(client, volume_name) for r in volume.replicas: assert r.mode == "RW" resp = read_volume_data(core_api, pod_name) assert resp == test_data test_data = generate_random_data(VOLUME_RWTEST_SIZE) write_pod_volume_data(core_api, pod_name, test_data) resp = read_volume_data(core_api, pod_name) assert resp == test_data
Test expansion success but with one replica expansion failure
- Create a new
storage_class
withallowVolumeExpansion
set - Create PVC and Pod with dynamic provisioned volume from the StorageClass
- Create an empty directory with expansion snapshot tmp meta file path for one replica so that the replica expansion will fail
- Generate
test_data
and write to the pod - Update pvc.spec.resources to expand the volume
- Check expansion result using Longhorn API. There will be expansion error caused by the failed replica but overall the expansion should succeed.
- Check if the volume will reuse the failed replica during rebuilding.
- Validate the volume content, then check if data writing looks fine
- Create a new
def test_csi_io(client, core_api, csi_pv, pvc, pod_make)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.csi # NOQA def test_csi_io(client, core_api, csi_pv, pvc, pod_make): # NOQA """ Test that input and output on a statically defined CSI volume works as expected. Note: Fixtures are torn down here in reverse order that they are specified as a parameter. Take caution when reordering test fixtures. 1. Create PV/PVC/Pod with dynamic positioned Longhorn volume 2. Generate `test_data` and write it to volume using the equivalent of `kubectl exec` 3. Delete the Pod 4. Create another pod with the same PV 5. Check the previous created `test_data` in the new Pod """ csi_io_test(client, core_api, csi_pv, pvc, pod_make)
Test that input and output on a statically defined CSI volume works as expected.
Note: Fixtures are torn down here in reverse order that they are specified as a parameter. Take caution when reordering test fixtures.
- Create PV/PVC/Pod with dynamic positioned Longhorn volume
- Generate
test_data
and write it to volume using the equivalent ofkubectl exec
- Delete the Pod
- Create another pod with the same PV
- Check the previous created
test_data
in the new Pod
def test_csi_minimal_volume_size(client, core_api, csi_pv, pvc, pod_make)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.csi # NOQA def test_csi_minimal_volume_size( client, core_api, csi_pv, pvc, pod_make): # NOQA """ Test CSI Minimal Volume Size 1. Create a PVC requesting size 5MiB. Check the PVC requested size is 5MiB and capacity size get is 10MiB. 2. Remove the PVC. 3. Create a PVC requesting size 10MiB. Check the PVC requested size and capacity size get are both 10MiB. 4. Create a pod to use this PVC. 5. Write some data to the volume and read it back to compare. """ vol_name = generate_volume_name() create_and_check_volume(client, vol_name, size=str(100*Mi)) low_storage = str(5*Mi) min_storage = str(10*Mi) pv_name = vol_name + "-pv" csi_pv['metadata']['name'] = pv_name csi_pv['spec']['csi']['volumeHandle'] = vol_name csi_pv['spec']['capacity']['storage'] = min_storage csi_pv['spec']['persistentVolumeReclaimPolicy'] = 'Retain' core_api.create_persistent_volume(csi_pv) pvc_name = vol_name + "-pvc" pvc['metadata']['name'] = pvc_name pvc['spec']['volumeName'] = pv_name pvc['spec']['resources']['requests']['storage'] = low_storage pvc['spec']['storageClassName'] = '' core_api.create_namespaced_persistent_volume_claim(body=pvc, namespace='default') claim = common.wait_for_pvc_phase(core_api, pvc_name, "Bound") assert claim.spec.resources.requests['storage'] == low_storage assert claim.status.capacity['storage'] == min_storage common.delete_and_wait_pvc(core_api, pvc_name) common.delete_and_wait_pv(core_api, pv_name) wait_for_volume_detached(client, vol_name) core_api.create_persistent_volume(csi_pv) pvc['spec']['resources']['requests']['storage'] = min_storage core_api.create_namespaced_persistent_volume_claim(body=pvc, namespace='default') claim = common.wait_for_pvc_phase(core_api, pvc_name, "Bound") assert claim.spec.resources.requests['storage'] == min_storage assert claim.status.capacity['storage'] == min_storage pod_name = vol_name + '-pod' pod = pod_make(name=pod_name) pod['spec']['volumes'] = [create_pvc_spec(pvc_name)] create_and_wait_pod(core_api, pod) test_data = "longhorn-integration-test" test_file = "test" write_pod_volume_data(core_api, pod_name, test_data, test_file) read_data = read_volume_data(core_api, pod_name, test_file) assert read_data == test_data
Test CSI Minimal Volume Size
- Create a PVC requesting size 5MiB. Check the PVC requested size is 5MiB and capacity size get is 10MiB.
- Remove the PVC.
- Create a PVC requesting size 10MiB. Check the PVC requested size and capacity size get are both 10MiB.
- Create a pod to use this PVC.
- Write some data to the volume and read it back to compare.
def test_csi_mount(client, core_api, csi_pv, pvc, pod_make)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.coretest # NOQA @pytest.mark.csi # NOQA def test_csi_mount(client, core_api, csi_pv, pvc, pod_make): # NOQA """ Test that a statically defined CSI volume can be created, mounted, unmounted, and deleted properly on the Kubernetes cluster. Note: Fixtures are torn down here in reverse order that they are specified as a parameter. Take caution when reordering test fixtures. 1. Create a PV/PVC/Pod with pre-created Longhorn volume 1. Using Kubernetes manifest instead of Longhorn PV/PVC creation API 2. Make sure the pod is running 3. Verify the volume status """ volume_size = DEFAULT_VOLUME_SIZE * Gi csi_mount_test(client, core_api, csi_pv, pvc, pod_make, volume_size)
Test that a statically defined CSI volume can be created, mounted, unmounted, and deleted properly on the Kubernetes cluster.
Note: Fixtures are torn down here in reverse order that they are specified as a parameter. Take caution when reordering test fixtures.
- Create a PV/PVC/Pod with pre-created Longhorn volume
- Using Kubernetes manifest instead of Longhorn PV/PVC creation API
- Make sure the pod is running
- Verify the volume status
- Create a PV/PVC/Pod with pre-created Longhorn volume
def test_csi_mount_volume_online_expansion(client, core_api, storage_class, pvc, pod_manifest)
-
Expand source code
@pytest.mark.csi # NOQA @pytest.mark.csi_expansion # NOQA def test_csi_mount_volume_online_expansion(client, core_api, storage_class, pvc, pod_manifest): # NOQA """ Test CSI feature: online expansion for mount volume 1. Create a new `storage_class` with `allowVolumeExpansion` set 2. Create PVC and Pod with the new StorageClass 3. Use `dd` command copy data into volume mount point. 4. During the copy, update pvc.spec.resources to expand the volume 5. Verify the volume expansion done using Longhorn API and Check the PVC & PV size 6. Wait for the copy complete. 7. Calculate the checksum for the copied data inside volume mount point. 8. Update pvc.spec.resources to expand the volume and get data checksum again. 9. Wait for the calculation complete, then compare the checksum. 10. Do cleanup: Remove the original `test_data`as well as the pod and PVC. """ create_storage_class(storage_class) volume_data_path = "/data/file" pod_name = "csi-mount-volume-online-expansion-test" pvc_name = pod_name + "-pvc" pvc['metadata']['name'] = pvc_name pvc['spec']['storageClassName'] = storage_class['metadata']['name'] create_pvc(pvc) pod_manifest['metadata']['name'] = pod_name pod_manifest['spec']['volumes'] = [create_pvc_spec(pvc_name)] pod_manifest['spec']['nodeName'] = get_self_host_id() create_and_wait_pod(core_api, pod_manifest) volume_name = get_volume_name(core_api, pvc_name) # Use new process to do copy data and expand at the same time cmd = ["kubectl", "exec", "-it", pod_name, "--", "/bin/sh", "-c", "dd if=/dev/urandom of={} bs=1M count=2500". format(volume_data_path)] process = subprocess.Popen(cmd) expand_and_wait_for_pvc(core_api, pvc, (EXPANDED_VOLUME_SIZE)*Gi) wait_for_volume_expansion(client, volume_name) process.wait() if process.returncode == 0: write_data_complete = True else: write_data_complete = False assert write_data_complete subprocess.check_call(["sync"]) command = 'md5sum {}'.format(volume_data_path) md5_before_expanding = \ exec_command_in_pod(core_api, command, pod_name, 'default').\ strip().split(" ")[0] executor = ThreadPoolExecutor(max_workers=5) future = executor.submit(md5sum_thread, pod_name, volume_data_path) expand_and_wait_for_pvc(core_api, pvc, (EXPANDED_VOLUME_SIZE+1)*Gi) wait_for_volume_expansion(client, volume_name) thread_timeout = 60 try: md5_after_expanding = future.result(timeout=thread_timeout) except TimeoutError: print("md5 thread exceed timeout ({})s".format(thread_timeout)) assert md5_after_expanding == md5_before_expanding
Test CSI feature: online expansion for mount volume
- Create a new
storage_class
withallowVolumeExpansion
set - Create PVC and Pod with the new StorageClass
- Use
dd
command copy data into volume mount point. - During the copy, update pvc.spec.resources to expand the volume
- Verify the volume expansion done using Longhorn API and Check the PVC & PV size
- Wait for the copy complete.
- Calculate the checksum for the copied data inside volume mount point.
- Update pvc.spec.resources to expand the volume and get data checksum again.
- Wait for the calculation complete, then compare the checksum.
- Do cleanup: Remove the original
test_data
as well as the pod and PVC.
- Create a new
def test_csi_offline_expansion(client, core_api, storage_class, pvc, pod_manifest)
-
Expand source code
@pytest.mark.coretest # NOQA @pytest.mark.csi # NOQA @pytest.mark.csi_expansion # NOQA def test_csi_offline_expansion(client, core_api, storage_class, pvc, pod_manifest): # NOQA """ Test CSI feature: offline expansion 1. Create a new `storage_class` with `allowVolumeExpansion` set 2. Create PVC and Pod with dynamic provisioned volume from the StorageClass 3. Generate `test_data` and write to the pod 4. Delete the pod 5. Update pvc.spec.resources to expand the volume 6. Verify the volume expansion done using Longhorn API 7. Create a new pod and validate the volume content """ create_storage_class(storage_class) pod_name = 'csi-offline-expand-volume-test' pvc_name = pod_name + "-pvc" pvc['metadata']['name'] = pvc_name pvc['spec']['storageClassName'] = storage_class['metadata']['name'] create_pvc(pvc) pod_manifest['metadata']['name'] = pod_name pod_manifest['spec']['volumes'] = [{ 'name': pod_manifest['spec']['containers'][0]['volumeMounts'][0]['name'], 'persistentVolumeClaim': {'claimName': pvc_name}, }] create_and_wait_pod(core_api, pod_manifest) test_data = generate_random_data(VOLUME_RWTEST_SIZE) write_pod_volume_data(core_api, pod_name, test_data) delete_and_wait_pod(core_api, pod_name) pv = wait_and_get_pv_for_pvc(core_api, pvc_name) assert pv.status.phase == "Bound" volume_name = pv.spec.csi.volume_handle wait_for_volume_detached(client, volume_name) expand_and_wait_for_pvc(core_api, pvc, EXPANDED_VOLUME_SIZE*Gi) wait_for_volume_expansion(client, volume_name) wait_for_volume_detached(client, volume_name) volume = client.by_id_volume(volume_name) assert volume.state == "detached" assert volume.size == str(EXPANDED_VOLUME_SIZE*Gi) pod_manifest['metadata']['name'] = pod_name pod_manifest['spec']['volumes'] = [{ 'name': pod_manifest['spec']['containers'][0]['volumeMounts'][0]['name'], 'persistentVolumeClaim': {'claimName': pvc_name}, }] create_and_wait_pod(core_api, pod_manifest) resp = read_volume_data(core_api, pod_name) assert resp == test_data volume = client.by_id_volume(volume_name) engine = get_volume_engine(volume) assert volume.size == str(EXPANDED_VOLUME_SIZE*Gi) assert volume.size == engine.size
Test CSI feature: offline expansion
- Create a new
storage_class
withallowVolumeExpansion
set - Create PVC and Pod with dynamic provisioned volume from the StorageClass
- Generate
test_data
and write to the pod - Delete the pod
- Update pvc.spec.resources to expand the volume
- Verify the volume expansion done using Longhorn API
- Create a new pod and validate the volume content
- Create a new
def test_csi_storage_capacity(client, storage_class)
-
Expand source code
@pytest.mark.csi # NOQA def test_csi_storage_capacity(client, storage_class): # NOQA """ Test that CSIStorageCapacity objects are properly created 1. Verify that initially there are no CSIStorageCapacity objects 2. Create new storage class with volumeBindingMode set to WaitForFirstConsumer 3. Verify that CSIStorageCapacity object is created for each node """ api = k8sclient.StorageV1Api() csi_storage_capacities = api.list_namespaced_csi_storage_capacity( LONGHORN_NAMESPACE) assert len(csi_storage_capacities.items) == 0 sc_name = 'longhorn-wait-for-first-consumer' storage_class['metadata']['name'] = sc_name storage_class['volumeBindingMode'] = 'WaitForFirstConsumer' create_storage_class(storage_class) nodes = client.list_node() for _ in range(RETRY_COUNTS_SHORT): csi_storage_capacities = api.list_namespaced_csi_storage_capacity( LONGHORN_NAMESPACE) if len(csi_storage_capacities.items) == len(nodes): break time.sleep(RETRY_INTERVAL_LONG) assert len(csi_storage_capacities.items) == len(nodes)
Test that CSIStorageCapacity objects are properly created
- Verify that initially there are no CSIStorageCapacity objects
- Create new storage class with volumeBindingMode set to WaitForFirstConsumer
- Verify that CSIStorageCapacity object is created for each node
def test_restage_volume_if_node_stage_volume_not_called()
-
Expand source code
@pytest.mark.csi @pytest.mark.skip(reason="TODO") def test_restage_volume_if_node_stage_volume_not_called(): """ Test restage volume if NodeStageVolume not called (CSI) 1. Create a PVC with spec.volumeMode == Block. 2. Create a Deployment with spec.replicas == 1 that uses the PVC. Set a spec.selector on the Deployment so it can only run Pods on one node. 3. Hard reboot the node running the Deployment's single Pod. 4. Before the node comes back, force delete the "running" Pod. 5. Before the node comes back, verify there is now one pending Pod and one terminating Pod. 6. After the node comes back, verify that a Pod becomes running and remains running. It is fine if that Pod is different not the pending one from above. The automatic remount mechanism may cause some churn. 7. Force delete the running Pod again. 8. Verify that a Pod becomes running and remains running. """ pass
Test restage volume if NodeStageVolume not called (CSI)
- Create a PVC with spec.volumeMode == Block.
- Create a Deployment with spec.replicas == 1 that uses the PVC. Set a spec.selector on the Deployment so it can only run Pods on one node.
- Hard reboot the node running the Deployment's single Pod.
- Before the node comes back, force delete the "running" Pod.
- Before the node comes back, verify there is now one pending Pod and one terminating Pod.
- After the node comes back, verify that a Pod becomes running and remains running. It is fine if that Pod is different not the pending one from above. The automatic remount mechanism may cause some churn.
- Force delete the running Pod again.
- Verify that a Pod becomes running and remains running.
def test_xfs_pv(client, core_api, pod_manifest)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA def test_xfs_pv(client, core_api, pod_manifest): # NOQA """ Test create PV with new XFS filesystem 1. Create a volume 2. Create a PV for the existing volume, specify `xfs` as filesystem 3. Create PVC and Pod 4. Make sure Pod is running. 5. Write data into the pod and read back for validation. Note: The volume will be formatted to XFS filesystem by Kubernetes in this case. """ volume_name = generate_volume_name() volume = create_and_check_volume(client, volume_name, size=XFS_MIN_SIZE) create_pv_for_volume(client, core_api, volume, volume_name, "xfs") create_pvc_for_volume(client, core_api, volume, volume_name) pod_manifest['spec']['volumes'] = [{ "name": "pod-data", "persistentVolumeClaim": { "claimName": volume_name } }] pod_name = pod_manifest['metadata']['name'] create_and_wait_pod(core_api, pod_manifest) test_data = generate_random_data(VOLUME_RWTEST_SIZE) write_pod_volume_data(core_api, pod_name, test_data) resp = read_volume_data(core_api, pod_name) assert resp == test_data
Test create PV with new XFS filesystem
- Create a volume
- Create a PV for the existing volume, specify
xfs
as filesystem - Create PVC and Pod
- Make sure Pod is running.
- Write data into the pod and read back for validation.
Note: The volume will be formatted to XFS filesystem by Kubernetes in this case.
def test_xfs_pv_existing_volume(client, core_api, pod_manifest)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA def test_xfs_pv_existing_volume(client, core_api, pod_manifest): # NOQA """ Test create PV with existing XFS filesystem 1. Create a volume 2. Create PV/PVC for the existing volume, specify `xfs` as filesystem 3. Attach the volume to the current node. 4. Format it to `xfs` 5. Create a POD using the volume FIXME: We should write data in step 4 and validate the data in step 5, make sure the disk won't be reformatted """ volume_name = generate_volume_name() volume = create_and_check_volume(client, volume_name, size=XFS_MIN_SIZE) create_pv_for_volume(client, core_api, volume, volume_name, "xfs") create_pvc_for_volume(client, core_api, volume, volume_name) host_id = get_self_host_id() volume = volume.attach(hostId=host_id) volume = wait_for_volume_healthy(client, volume_name) cmd = ['mkfs.xfs', get_volume_endpoint(volume)] subprocess.check_call(cmd) volume = volume.detach() volume = wait_for_volume_detached(client, volume_name) pod_manifest['spec']['volumes'] = [{ "name": "pod-data", "persistentVolumeClaim": { "claimName": volume_name } }] create_and_wait_pod(core_api, pod_manifest)
Test create PV with existing XFS filesystem
- Create a volume
- Create PV/PVC for the existing volume, specify
xfs
as filesystem - Attach the volume to the current node.
- Format it to
xfs
- Create a POD using the volume
FIXME: We should write data in step 4 and validate the data in step 5, make sure the disk won't be reformatted
def update_storageclass_references(name, pv, claim)
-
Expand source code
def update_storageclass_references(name, pv, claim): """ Rename all references to a StorageClass to a specified name. """ pv['spec']['storageClassName'] = name claim['spec']['storageClassName'] = name
Rename all references to a StorageClass to a specified name.