Module tests.test_statefulset
Functions
def create_and_test_backups(api, cli, pod_info)
-
Expand source code
def create_and_test_backups(api, cli, pod_info): """ Create backups for all Pods in a StatefulSet and tests that all the backups have the correct attributes. Args: api: An instance of CoreV1Api. cli: A Longhorn client instance. pod_info: A List of Pods with names and volume information. This List can be generated using the get_statefulset_pod_info function located in common.py. """ for pod in pod_info: pod['data'] = generate_random_data(VOLUME_RWTEST_SIZE) pod['backup_snapshot'] = '' # Create backup. volume_name = pod['pv_name'] volume = cli.by_id_volume(pod['pv_name']) create_snapshot(cli, volume_name) write_pod_volume_data(api, pod['pod_name'], pod['data']) pod['backup_snapshot'] = create_snapshot(cli, volume_name) create_snapshot(cli, volume_name) volume.snapshotBackup(name=pod['backup_snapshot']['name']) # Wait for backup to appear. found = False for i in range(DEFAULT_BACKUP_TIMEOUT): backup_volumes = cli.list_backupVolume() for bv in backup_volumes: if bv.volumeName == pod['pv_name']: found = True break if found: break time.sleep(DEFAULT_POD_INTERVAL) assert found found = False for i in range(DEFAULT_BACKUP_TIMEOUT): backups = bv.backupList().data for b in backups: if b['snapshotName'] == pod['backup_snapshot']['name']\ and b['state'] == 'Completed': found = True break if found: break time.sleep(DEFAULT_POD_INTERVAL) assert found # Make sure backup has the correct attributes. new_b = bv.backupGet(name=b.name) assert new_b.name == b.name assert new_b.url == b.url assert new_b.snapshotName == b.snapshotName assert new_b.snapshotCreated == b.snapshotCreated assert new_b.created == b.created assert new_b.volumeName == b.volumeName assert new_b.volumeSize == b.volumeSize assert new_b.volumeCreated == b.volumeCreated # This backup has the url attribute we need to restore from backup. pod['backup_snapshot'] = b
Create backups for all Pods in a StatefulSet and tests that all the backups have the correct attributes.
Args
api
- An instance of CoreV1Api.
cli
- A Longhorn client instance.
pod_info
- A List of Pods with names and volume information. This List can be generated using the get_statefulset_pod_info function located in common.py.
def test_statefulset_backup(set_random_backupstore, client, core_api, storage_class, statefulset)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA def test_statefulset_backup(set_random_backupstore, client, core_api, storage_class, statefulset): # NOQA """ Test that backups on StatefulSet volumes work properly. 1. Create a StatefulSet with VolumeClaimTemplate and Longhorn. 2. Wait for pods to run. Then create backup using following steps for each pod: 1. Create a snapshot 2. Write some data into it 3. Create another snapshot `backup_snapshot` 4. Create a third snapshot 5. Backup the snapshot `backup_snapshot` 6. Wait for backup to show up. 1 Verify the backup information """ statefulset_name = 'statefulset-backup-test' update_statefulset_manifests(statefulset, storage_class, statefulset_name) create_storage_class(storage_class) create_and_wait_statefulset(statefulset) pod_info = get_statefulset_pod_info(core_api, statefulset) create_and_test_backups(core_api, client, pod_info)
Test that backups on StatefulSet volumes work properly.
- Create a StatefulSet with VolumeClaimTemplate and Longhorn.
- Wait for pods to run.
Then create backup using following steps for each pod:
- Create a snapshot
- Write some data into it
- Create another snapshot
backup_snapshot
- Create a third snapshot
- Backup the snapshot
backup_snapshot
- Wait for backup to show up. 1 Verify the backup information
def test_statefulset_mount(client, core_api, storage_class, statefulset)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA def test_statefulset_mount(client, core_api, storage_class, statefulset): # NOQA """ Tests that volumes provisioned for a StatefulSet can be properly created, mounted, unmounted, and deleted on the Kubernetes cluster. 1. Create a StatefulSet using dynamic provisioned Longhorn volume. 2. Wait for pods to become running 3. Check volume properties are consistent with the StorageClass """ statefulset_name = 'statefulset-mount-test' update_statefulset_manifests(statefulset, storage_class, statefulset_name) create_storage_class(storage_class) create_and_wait_statefulset(statefulset) pod_info = get_statefulset_pod_info(core_api, statefulset) volumes = client.list_volume() assert len(volumes) == statefulset['spec']['replicas'] for v in volumes: # Workaround for checking volume name since they differ per pod. found = False for pod in pod_info: if v.name == pod['pv_name']: found = True break assert found pod_info.remove(pod) assert v.size == str(DEFAULT_VOLUME_SIZE * Gi) assert v.numberOfReplicas == \ int(storage_class['parameters']['numberOfReplicas']) assert v.state == 'attached' # Confirm that we've iterated through all the volumes. assert len(pod_info) == 0
Tests that volumes provisioned for a StatefulSet can be properly created, mounted, unmounted, and deleted on the Kubernetes cluster.
- Create a StatefulSet using dynamic provisioned Longhorn volume.
- Wait for pods to become running
- Check volume properties are consistent with the StorageClass
def test_statefulset_pod_deletion(core_api, storage_class, statefulset)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.csi # NOQA def test_statefulset_pod_deletion(core_api, storage_class, statefulset): # NOQA """ Test that a StatefulSet can spin up a new Pod with the same data after a previous Pod has been deleted. 1. Create a StatefulSet with VolumeClaimTemplate and Longhorn. 2. Wait for pods to run. 3. Write some data to one of the pod. 4. Delete that pod. 5. Wait for the StatefulSet to recreate the pod 6. Verify the data in the pod. """ statefulset_name = 'statefulset-pod-deletion-test' update_statefulset_manifests(statefulset, storage_class, statefulset_name) test_pod_name = statefulset_name + '-' + \ str(randrange(statefulset['spec']['replicas'])) test_data = generate_random_data(VOLUME_RWTEST_SIZE) create_storage_class(storage_class) create_and_wait_statefulset(statefulset) write_pod_volume_data(core_api, test_pod_name, test_data) # Not using delete_and_wait_pod here because there is the small chance the # StatefulSet recreates the Pod quickly enough where the function won't # detect that the Pod was deleted, which will time out and throw an error. core_api.delete_namespaced_pod(name=test_pod_name, namespace='default', body=k8sclient.V1DeleteOptions()) wait_statefulset(statefulset) resp = read_volume_data(core_api, test_pod_name) assert resp == test_data
Test that a StatefulSet can spin up a new Pod with the same data after a previous Pod has been deleted.
- Create a StatefulSet with VolumeClaimTemplate and Longhorn.
- Wait for pods to run.
- Write some data to one of the pod.
- Delete that pod.
- Wait for the StatefulSet to recreate the pod
- Verify the data in the pod.
def test_statefulset_recurring_backup(set_random_backupstore, client, core_api, storage_class, statefulset)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_statefulset_recurring_backup(set_random_backupstore, client, core_api, storage_class, statefulset): # NOQA """ Scenario : test recurring backups on StatefulSets Given 1 default backup recurring jobs created. When create a statefulset. And write data to every statefulset pod. And wait for 5 minutes. Then 2 snapshots created for every statefulset pod. """ # backup every minute recurring_jobs = { "backup": { "task": "backup", "groups": ["default"], "cron": "* * * * *", "retain": 2, "concurrency": 2, "labels": {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) statefulset_name = 'statefulset-backup-test' update_statefulset_manifests(statefulset, storage_class, statefulset_name) create_storage_class(storage_class) create_and_wait_statefulset(statefulset) pod_data = get_statefulset_pod_info(core_api, statefulset) for pod in pod_data: pod['data'] = generate_random_data(VOLUME_RWTEST_SIZE) pod['backup_snapshot'] = '' for pod in pod_data: volume = client.by_id_volume(pod['pv_name']) write_pod_volume_data(core_api, pod['pod_name'], pod['data']) time.sleep(150) for pod in pod_data: volume = client.by_id_volume(pod['pv_name']) write_pod_volume_data(core_api, pod['pod_name'], pod['data']) time.sleep(150) for pod in pod_data: volume = client.by_id_volume(pod['pv_name']) snapshots = volume.snapshotList() count = 0 for snapshot in snapshots: if snapshot.removed is False: count += 1 # one backup + volume-head assert count == 2
Scenario : test recurring backups on StatefulSets
Given 1 default backup recurring jobs created.
When create a statefulset. And write data to every statefulset pod. And wait for 5 minutes.
Then 2 snapshots created for every statefulset pod.
def test_statefulset_restore(set_random_backupstore, client, core_api, storage_class, statefulset)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA def test_statefulset_restore(set_random_backupstore, client, core_api, storage_class, statefulset): # NOQA """ Test that data can be restored into volumes usable by a StatefulSet. 1. Create a StatefulSet with VolumeClaimTemplate and Longhorn. 2. Wait for pods to run. 3. Create a backup for each pod. 4. Delete the StatefulSet, including the Longhorn volumes. 5. Create volumes and PV/PVC using previous backups from each Pod. 1. PVs will be created using the previous names. 2. PVCs will be created using previous name + "-2" due to statefulset has a naming policy for what should be PVC name for them. 6. Create a new StatefulSet using the previous name + "-2" 7. Wait for pods to be up. . Verify the pods contain the previous backed up data """ statefulset_name = 'statefulset-restore-test' update_statefulset_manifests(statefulset, storage_class, statefulset_name) create_storage_class(storage_class) create_and_wait_statefulset(statefulset) pod_info = get_statefulset_pod_info(core_api, statefulset) create_and_test_backups(core_api, client, pod_info) delete_and_wait_statefulset(core_api, client, statefulset) csi = check_csi(core_api) # StatefulSet fixture already cleans these up, use the manifests instead of # the fixtures to avoid issues during teardown. pv = { 'apiVersion': 'v1', 'kind': 'PersistentVolume', 'metadata': { 'name': '' }, 'spec': { 'capacity': { 'storage': size_to_string(DEFAULT_VOLUME_SIZE * Gi) }, 'volumeMode': 'Filesystem', 'accessModes': ['ReadWriteOnce'], 'persistentVolumeReclaimPolicy': 'Delete', 'storageClassName': DEFAULT_STORAGECLASS_NAME } } pvc = { 'apiVersion': 'v1', 'kind': 'PersistentVolumeClaim', 'metadata': { 'name': '' }, 'spec': { 'accessModes': [ 'ReadWriteOnce' ], 'resources': { 'requests': { 'storage': size_to_string(DEFAULT_VOLUME_SIZE * Gi) } }, 'storageClassName': DEFAULT_STORAGECLASS_NAME } } assert csi pv['spec']['csi'] = { 'driver': 'driver.longhorn.io', 'fsType': 'ext4', 'volumeAttributes': { 'numberOfReplicas': storage_class['parameters']['numberOfReplicas'], 'staleReplicaTimeout': storage_class['parameters']['staleReplicaTimeout'] }, 'volumeHandle': '' } # Make sure that volumes still work even if the Pod and StatefulSet names # are different. for pod in pod_info: pod['pod_name'] = pod['pod_name'].replace('statefulset-restore-test', 'statefulset-restore-test-2') pod['pvc_name'] = pod['pvc_name'].replace('statefulset-restore-test', 'statefulset-restore-test-2') pv['metadata']['name'] = pod['pvc_name'] client.create_volume( name=pod['pvc_name'], size=size_to_string(DEFAULT_VOLUME_SIZE * Gi), numberOfReplicas=int( storage_class['parameters']['numberOfReplicas']), fromBackup=pod['backup_snapshot']['url'], dataEngine=DATA_ENGINE) wait_for_volume_detached(client, pod['pvc_name']) pv['spec']['csi']['volumeHandle'] = pod['pvc_name'] core_api.create_persistent_volume(pv) pvc['metadata']['name'] = pod['pvc_name'] pvc['spec']['volumeName'] = pod['pvc_name'] core_api.create_namespaced_persistent_volume_claim( body=pvc, namespace='default') statefulset_name = 'statefulset-restore-test-2' update_statefulset_manifests(statefulset, storage_class, statefulset_name) create_and_wait_statefulset(statefulset) for pod in pod_info: resp = read_volume_data(core_api, pod['pod_name']) assert resp == pod['data']
Test that data can be restored into volumes usable by a StatefulSet.
- Create a StatefulSet with VolumeClaimTemplate and Longhorn.
- Wait for pods to run.
- Create a backup for each pod.
- Delete the StatefulSet, including the Longhorn volumes.
- Create volumes and PV/PVC using previous backups from each Pod.
- PVs will be created using the previous names.
- PVCs will be created using previous name + "-2" due to statefulset has a naming policy for what should be PVC name for them.
- Create a new StatefulSet using the previous name + "-2"
- Wait for pods to be up. . Verify the pods contain the previous backed up data
def test_statefulset_scaling(client, core_api, storage_class, statefulset)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.coretest # NOQA def test_statefulset_scaling(client, core_api, storage_class, statefulset): # NOQA """ Test that scaling up a StatefulSet successfully provisions new volumes. 1. Create a StatefulSet with VolumeClaimTemplate and Longhorn. 2. Wait for pods to run. 3. Verify the properties of volumes. 4. Scale the StatefulSet to 3 replicas 5. Wait for the new pod to become ready. 6. Verify the new volume properties. """ statefulset_name = 'statefulset-scaling-test' update_statefulset_manifests(statefulset, storage_class, statefulset_name) create_storage_class(storage_class) create_and_wait_statefulset(statefulset) pod_info = get_statefulset_pod_info(core_api, statefulset) volumes = client.list_volume() assert len(volumes) == statefulset['spec']['replicas'] for v in volumes: found = False for pod in pod_info: if v.name == pod['pv_name']: found = True break assert found pod_info.remove(pod) assert v.size == str(DEFAULT_VOLUME_SIZE * Gi) assert v.numberOfReplicas == \ int(storage_class['parameters']['numberOfReplicas']) assert v.state == 'attached' assert len(pod_info) == 0 statefulset['spec']['replicas'] = replicas = 3 apps_api = get_apps_api_client() apps_api.patch_namespaced_stateful_set( name=statefulset_name, namespace='default', body={ 'spec': { 'replicas': replicas } }) for i in range(DEFAULT_POD_TIMEOUT): s_set = apps_api.read_namespaced_stateful_set( name=statefulset_name, namespace='default') # s_set is none if statefulset is not yet created if s_set is not None and s_set.status.ready_replicas == replicas: break time.sleep(DEFAULT_POD_INTERVAL) assert s_set.status.ready_replicas == replicas pod_info = get_statefulset_pod_info(core_api, statefulset) volumes = client.list_volume() assert len(volumes) == replicas for v in volumes: found = False for pod in pod_info: if v.name == pod['pv_name']: found = True break assert found pod_info.remove(pod) assert v.size == str(DEFAULT_VOLUME_SIZE * Gi) assert v.numberOfReplicas == \ int(storage_class['parameters']['numberOfReplicas']) assert v.state == 'attached' assert len(pod_info) == 0
Test that scaling up a StatefulSet successfully provisions new volumes.
- Create a StatefulSet with VolumeClaimTemplate and Longhorn.
- Wait for pods to run.
- Verify the properties of volumes.
- Scale the StatefulSet to 3 replicas
- Wait for the new pod to become ready.
- Verify the new volume properties.