Module tests.test_recurring_job
Functions
def add_recurring_job_source_to_pvc(name, core_api)
-
Expand source code
def add_recurring_job_source_to_pvc(name, core_api): # NOQA claim = core_api.read_namespaced_persistent_volume_claim( name=name, namespace='default' ) if claim.metadata.labels is None: claim.metadata.labels = {} claim.metadata.labels["recurring-job.longhorn.io/source"] = "enabled" update_persistent_volume_claim(core_api, name, 'default', claim)
def recurring_job_labels_test(client, labels, volume_name, size='33554432', backing_image='')
-
Expand source code
def recurring_job_labels_test(client, labels, volume_name, size=SIZE, backing_image=""): # NOQA recurring_jobs = { RECURRING_JOB_NAME: { TASK: BACKUP, GROUPS: [DEFAULT], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: labels, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) client.create_volume(name=volume_name, size=size, numberOfReplicas=2, backingImage=backing_image, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) volume.attach(hostId=get_self_host_id()) volume = wait_for_volume_healthy(client, volume_name) write_volume_random_data(volume) time.sleep(75 - WRITE_DATA_INTERVAL) # 1 minute 15 second labels["we-added-this-label"] = "definitely" update_recurring_job(client, RECURRING_JOB_NAME, recurring_jobs[RECURRING_JOB_NAME][GROUPS], labels) write_volume_random_data(volume) time.sleep(135) # 2 minute 15 second # 1 from Backup, 1 from Volume Head. wait_for_snapshot_count(volume, 2) # Verify the Labels on the actual Backup. bv = find_backup_volume(client, volume_name) wait_for_backup_count(bv, 1) backups = bv.backupList().data b = bv.backupGet(name=backups[0].name) for key, val in iter(labels.items()): assert b.labels.get(key) == val assert b.labels.get(RECURRING_JOB_LABEL) == RECURRING_JOB_NAME # One extra Label from RecurringJob and another one below # Longhorn will automatically add a label `longhorn.io/volume-access-mode` # to a newly created backup assert len(b.labels) == len(labels) + 2 wait_for_backup_volume(client, bv.name, backing_image)
def remove_recurring_job_source_to_pvc(name, core_api)
-
Expand source code
def remove_recurring_job_source_to_pvc(name, core_api): # NOQA claim = core_api.read_namespaced_persistent_volume_claim( name=name, namespace='default' ) if claim.metadata.labels is None: claim.metadata.labels = {} del claim.metadata.labels["recurring-job.longhorn.io/source"] update_persistent_volume_claim(core_api, name, 'default', claim)
def test_recurring_job(set_random_backupstore, client, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job(set_random_backupstore, client, volume_name): # NOQA """ Scenario : test recurring job (S3/NFS) Given `snapshot1` recurring job created and cron at 1 min and retain 2. `backup1` recurring job created and cron at 2 min and retain 1. `backup2` recurring job created and cron at 1 min and retain 2. And a volume created and attached. When label volume with recurring job `snapshot1`. label volume with recurring job `backup1`. And wait until the 20th second since the beginning of an even minute. And write data to volume. wait for 2 minutes. And write data to volume. wait for 2 minutes. Then volume have 4 snapshots. (2 from `snapshot1`, 1 from `backup1`, 1 from `volume-head`) When label volume with recurring job `backup2` And write data to volume. wait for 2 minutes. And write data to volume. wait for 2 minutes. Then volume have 5 snapshots. (2 from `snapshot1`, 1 from `backup1`, 1 from `backup2`, 1 from `volume-head`) When wait until backups complete. Then `backup1` completed 1 backups. `backup2` completed 2 backups. """ ''' The timeline looks like this: 0 1 2 3 4 5 6 7 8 9 10 (minute) | W | | W | | |W | |W | | | (write data) | S | S | | S | S | | (snapshot1) | | B | B | B | B | B (backup1) | | | | | | B B B B B (backup2) ''' snap1 = SNAPSHOT + "1" back1 = BACKUP + "1" back2 = BACKUP + "2" recurring_jobs = { snap1: { TASK: SNAPSHOT, GROUPS: [], CRON: SCHEDULE_1MIN, RETAIN: 2, CONCURRENCY: 1, LABELS: {}, }, back1: { TASK: BACKUP, GROUPS: [], CRON: "*/2 * * * *", RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, back2: { TASK: BACKUP, GROUPS: [], CRON: SCHEDULE_1MIN, RETAIN: 2, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) volume = client.create_volume(name=volume_name, size=SIZE, numberOfReplicas=2, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) volume = volume.attach(hostId=get_self_host_id()) volume = wait_for_volume_healthy(client, volume_name) volume.recurringJobAdd(name=snap1, isGroup=False) volume.recurringJobAdd(name=back1, isGroup=False) wait_for_volume_recurring_job_update(volume, jobs=[snap1, back1], groups=[DEFAULT]) wait_until_begin_of_an_even_minute() # wait until the 30th second of an even minute # make sure that snapshot job happens before the backup job time.sleep(30) write_volume_random_data(volume) time.sleep(60 * 2) write_volume_random_data(volume) time.sleep(60 * 2) # 2 from job_snap, 1 from job_backup, 1 volume-head wait_for_snapshot_count(volume, 4) volume.recurringJobAdd(name=back2, isGroup=False) wait_for_volume_recurring_job_update(volume, jobs=[snap1, back1, back2], groups=[DEFAULT]) write_volume_random_data(volume) time.sleep(60 * 2) write_volume_random_data(volume) time.sleep(60 * 2) # 2 from job_snap, 1 from job_backup, 1 from job_backup2, 1 volume-head wait_for_snapshot_count(volume, 5) complete_backup_1_count = 0 complete_backup_2_count = 0 volume = wait_for_backup_completion(client, volume_name) for b in volume.backupStatus: if "backup1-" in b.snapshot and b.state != "Deleting": complete_backup_1_count += 1 elif "backup2-" in b.snapshot and b.state != "Deleting": complete_backup_2_count += 1 # 1 completed backups from backup1 # 2 completed backups from backup2 # NOTE: NFS backup can be slow sometimes and error prone assert complete_backup_1_count == 1, \ f"backupStatus = {client.by_id_volume(volume_name).backupStatus}" assert complete_backup_2_count == 2, \ f"backupStatus = {client.by_id_volume(volume_name).backupStatus}"
Scenario : test recurring job (S3/NFS)
Given
snapshot1
recurring job created and cron at 1 min and retain 2.backup1
recurring job created and cron at 2 min and retain 1.backup2
recurring job created and cron at 1 min and retain 2. And a volume created and attached.When label volume with recurring job
snapshot1
. label volume with recurring jobbackup1
. And wait until the 20th second since the beginning of an even minute. And write data to volume. wait for 2 minutes. And write data to volume. wait for 2 minutes. Then volume have 4 snapshots. (2 fromsnapshot1
, 1 frombackup1
, 1 fromvolume-head
)When label volume with recurring job
backup2
And write data to volume. wait for 2 minutes. And write data to volume. wait for 2 minutes. Then volume have 5 snapshots. (2 fromsnapshot1
, 1 frombackup1
, 1 frombackup2
, 1 fromvolume-head
)When wait until backups complete. Then
backup1
completed 1 backups.backup2
completed 2 backups. def test_recurring_job_backup(set_random_backupstore, client, batch_v1_api)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_backup(set_random_backupstore, client, batch_v1_api): # NOQA """ Scenario: test recurring job backup (S3/NFS) Given volume `test-job-1` created, attached, and healthy. volume `test-job-2` created, attached, and healthy. When create a recurring job with `default` in groups. Then should have 1 cron job. When write some data to volume `test-job-1`. write some data to volume `test-job-2`. And wait for `backup1` cron job scheduled time. Then volume `test-job-1` should have 1 backups. volume `test-job-2` should have 1 backups. When write some data to volume `test-job-1`. write some data to volume `test-job-2`. And wait for `backup1` cron job scheduled time. Then volume `test-job-1` should have 2 backups. volume `test-job-2` should have 2 backups. """ volume1_name = "test-job-1" volume2_name = "test-job-2" client.create_volume(name=volume1_name, size=SIZE, dataEngine=DATA_ENGINE) client.create_volume(name=volume2_name, size=SIZE, dataEngine=DATA_ENGINE) volume1 = wait_for_volume_detached(client, volume1_name) volume2 = wait_for_volume_detached(client, volume2_name) self_host = get_self_host_id() volume1.attach(hostId=self_host) volume2.attach(hostId=self_host) volume1 = wait_for_volume_healthy(client, volume1_name) volume2 = wait_for_volume_healthy(client, volume2_name) recurring_jobs = { RECURRING_JOB_NAME: { TASK: BACKUP, GROUPS: [DEFAULT], CRON: SCHEDULE_1MIN, RETAIN: 2, CONCURRENCY: 2, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) wait_for_cron_job_count(batch_v1_api, 1) # 1st job write_volume_random_data(volume1) write_volume_random_data(volume2) time.sleep(60 - WRITE_DATA_INTERVAL) wait_for_backup_count( find_backup_volume(client, volume1_name, retry=RETRY_COUNTS_SHORT), 1) wait_for_backup_count( find_backup_volume(client, volume2_name, retry=RETRY_COUNTS_SHORT), 1) # 2nd job write_volume_random_data(volume1) write_volume_random_data(volume2) time.sleep(60 - WRITE_DATA_INTERVAL) wait_for_backup_count( find_backup_volume(client, volume1_name, retry=RETRY_COUNTS_SHORT), 2) wait_for_backup_count( find_backup_volume(client, volume2_name, retry=RETRY_COUNTS_SHORT), 2)
Scenario: test recurring job backup (S3/NFS)
Given volume
test-job-1
created, attached, and healthy. volumetest-job-2
created, attached, and healthy.When create a recurring job with
default
in groups. Then should have 1 cron job.When write some data to volume
test-job-1
. write some data to volumetest-job-2
. And wait forbackup1
cron job scheduled time. Then volumetest-job-1
should have 1 backups. volumetest-job-2
should have 1 backups.When write some data to volume
test-job-1
. write some data to volumetest-job-2
. And wait forbackup1
cron job scheduled time. Then volumetest-job-1
should have 2 backups. volumetest-job-2
should have 2 backups. def test_recurring_job_default(client, batch_v1_api, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_default(client, batch_v1_api, volume_name): # NOQA """ Scenario: test recurring job set with default in groups Given 1 volume created, attached, and healthy. # Setting recurring job in volume label should not remove the defaults. When set `snapshot` recurring job in volume label. Then should contain `default` job-group in volume labels. should contain `snapshot` job in volume labels. # Should be able to remove the default label. When delete recurring job-group `default` in volume label. Then volume should have `snapshot` job in job label. volume should not have `default` group in job label. # Remove all volume recurring job labels should bring in default When delete all recurring jobs in volume label. Then volume should not have `snapshot` job in job label. volume should have `default` group in job label. """ client.create_volume(name=volume_name, size=SIZE, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) volume.attach(hostId=get_self_host_id()) volume = wait_for_volume_healthy(client, volume_name) # Setting recurring job in volume label should not remove the defaults. volume.recurringJobAdd(name=SNAPSHOT, isGroup=False) wait_for_volume_recurring_job_update(volume, jobs=[SNAPSHOT], groups=[DEFAULT]) # Should be able to remove the default label. volume.recurringJobDelete(name=DEFAULT, isGroup=True) wait_for_volume_recurring_job_update(volume, jobs=[SNAPSHOT], groups=[]) # Remove all volume recurring job labels should bring in default volume.recurringJobDelete(name=SNAPSHOT, isGroup=False) wait_for_volume_recurring_job_update(volume, jobs=[], groups=[DEFAULT])
Scenario: test recurring job set with default in groups
Given 1 volume created, attached, and healthy.
Setting recurring job in volume label should not remove the defaults.
When set
snapshot
recurring job in volume label. Then should containdefault
job-group in volume labels. should containsnapshot
job in volume labels.Should be able to remove the default label.
When delete recurring job-group
default
in volume label. Then volume should havesnapshot
job in job label. volume should not havedefault
group in job label.Remove all volume recurring job labels should bring in default
When delete all recurring jobs in volume label. Then volume should not have
snapshot
job in job label. volume should havedefault
group in job label. def test_recurring_job_delete(client, batch_v1_api, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_delete(client, batch_v1_api, volume_name): # NOQA """ Scenario: test delete recurring job Given 1 volume created, attached, and healthy. When create `snapshot1` recurring job with `default, group-1` in groups. create `snapshot2` recurring job with `default` in groups.. create `snapshot3` recurring job with `` in groups. create `backup1` recurring job with `default, group-1` in groups. create `backup2` recurring job with `default` in groups. create `backup3` recurring job with `` in groups. Then default `snapshot1` cron job should exist. default `snapshot2` cron job should exist. `snapshot3` cron job should exist. default `backup1` cron job should exist. default `backup2` cron job should exist. `backup3` cron job should exist. # Delete `snapshot2` recurring job should delete the cron job When delete `snapshot-2` recurring job. Then default `snapshot1` cron job should exist. default `snapshot2` cron job should not exist. `snapshot3` cron job should exist. default `backup1` cron job should exist. default `backup2` cron job should exist. `backup3` cron job should exist. # Delete multiple recurring jobs should reflect on the cron jobs. When delete `backup-1` recurring job. delete `backup-2` recurring job. delete `backup-3` recurring job. Then default `snapshot1` cron job should exist. default `snapshot2` cron job should not exist. `snapshot3` cron job should exist. default `backup1` cron job should not exist. default `backup2` cron job should not exist. `backup3` cron job should not exist. """ client.create_volume(name=volume_name, size=SIZE, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) volume.attach(hostId=get_self_host_id()) volume = wait_for_volume_healthy(client, volume_name) snap1 = SNAPSHOT + "1" snap2 = SNAPSHOT + "2" snap3 = SNAPSHOT + "3" back1 = BACKUP + "1" back2 = BACKUP + "2" back3 = BACKUP + "3" group1 = "group-1" recurring_jobs = { snap1: { TASK: SNAPSHOT, GROUPS: [DEFAULT, group1], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: {}, }, snap2: { TASK: SNAPSHOT, GROUPS: [DEFAULT], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: {}, }, snap3: { TASK: SNAPSHOT, GROUPS: [], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: {}, }, back1: { TASK: BACKUP, GROUPS: [DEFAULT, group1], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: {}, }, back2: { TASK: BACKUP, GROUPS: [DEFAULT], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: {}, }, back3: { TASK: BACKUP, GROUPS: [], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) wait_for_cron_job_count(batch_v1_api, 6) # snapshot wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+snap1) wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+snap2) wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+snap3) # backup wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+back1) wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+back2) wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+back3) # Delete `snapshot2` recurring job should delete the cron job snap2_recurring_job = client.by_id_recurring_job(snap2) client.delete(snap2_recurring_job) wait_for_cron_job_count(batch_v1_api, 5) # snapshot wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+snap1) wait_for_cron_job_delete(batch_v1_api, JOB_LABEL+"="+snap2) wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+snap3) # backup wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+back1) wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+back2) wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+back3) # Delete multiple recurring jobs should reflect on the cron jobs. back1_recurring_job = client.by_id_recurring_job(back1) back2_recurring_job = client.by_id_recurring_job(back2) back3_recurring_job = client.by_id_recurring_job(back3) client.delete(back1_recurring_job) client.delete(back2_recurring_job) client.delete(back3_recurring_job) wait_for_cron_job_count(batch_v1_api, 2) # snapshot wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+snap1) wait_for_cron_job_delete(batch_v1_api, JOB_LABEL+"="+snap2) wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+snap3) # backup wait_for_cron_job_delete(batch_v1_api, JOB_LABEL+"="+back1) wait_for_cron_job_delete(batch_v1_api, JOB_LABEL+"="+back2) wait_for_cron_job_delete(batch_v1_api, JOB_LABEL+"="+back3)
Scenario: test delete recurring job
Given 1 volume created, attached, and healthy.
When create
snapshot1
recurring job withdefault, group-1
in groups. createsnapshot2
recurring job withdefault
in groups.. createsnapshot3
recurring job within groups. create <code>backup1</code> recurring job with `default, group-1` in groups. create <code>backup2</code> recurring job with <code>default</code> in groups. create <code>backup3</code> recurring job with
in groups. Then defaultsnapshot1
cron job should exist. defaultsnapshot2
cron job should exist.snapshot3
cron job should exist. defaultbackup1
cron job should exist. defaultbackup2
cron job should exist.backup3
cron job should exist.Delete
snapshot2
recurring job should delete the cron jobWhen delete
snapshot-2
recurring job. Then defaultsnapshot1
cron job should exist. defaultsnapshot2
cron job should not exist.snapshot3
cron job should exist. defaultbackup1
cron job should exist. defaultbackup2
cron job should exist.backup3
cron job should exist.Delete multiple recurring jobs should reflect on the cron jobs.
When delete
backup-1
recurring job. deletebackup-2
recurring job. deletebackup-3
recurring job. Then defaultsnapshot1
cron job should exist. defaultsnapshot2
cron job should not exist.snapshot3
cron job should exist. defaultbackup1
cron job should not exist. defaultbackup2
cron job should not exist.backup3
cron job should not exist. def test_recurring_job_delete_should_remove_volume_label(client, batch_v1_api, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_delete_should_remove_volume_label(client, batch_v1_api, volume_name): # NOQA """ Scenario: test delete recurring job should remove volume labels Given 1 volume created. And create `snapshot1` recurring job. create `backup1` recurring job. And `snapshot1` job applied to the volume. `backup1` job applied to the volume. And `snapshot1` job exist in volume recurring job label. `backup1` job exist in volume recurring job label. When delete `snapshot1` recurring job. Then `snapshot1` job not exist in volume recurring job label. `backup1` job exist in volume recurring job label. When delete `backup1` recurring job. Then `snapshot1` job not exist in volume recurring job label. `backup1` job not exist in volume recurring job label. Given create `snapshot1` recurring job with `group-1` in groups. And create `snapshot2` recurring job with `group-1` in groups. And create `backup1` recurring job with `group-1` in groups. And create `backup2` recurring job with `default` in groups. // The default job-group automatically applies to the volumes // with no recurring job. We want to keep the test focused on the // behavior so removing the default job-group assignment first. And remove volume recurring job label `default` job-group. And `group-1` job-group applied to the volume. And `group-1` job-group exist in volume recurring job label. `default` job-group not exist in volume recurring job label. When delete `snapshot1` recurring job. Then `group-1` job-group exist in volume recurring job label. When delete `snapshot2` recurring job. Then `group-1` job-group exist in volume recurring job label. When delete `back1` recurring job. Then `group-1` job-group not exist in volume recurring job label. And `default` job-group exist in volume recurring job label. When delete `back2` recurring job. Then should not remove `default` job-group in volume. """ client.create_volume(name=volume_name, size=SIZE, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) snap1 = SNAPSHOT + "1" snap2 = SNAPSHOT + "2" back1 = BACKUP + "1" back2 = BACKUP + "2" group1 = "group-1" snap_job = { TASK: SNAPSHOT, GROUPS: [group1], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: {}, } back_job = { TASK: BACKUP, GROUPS: [group1], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: {}, } recurring_jobs = { snap1: snap_job, back1: back_job, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) # Delete recurring job should remove volume label volume.recurringJobAdd(name=snap1, isGroup=False) volume.recurringJobAdd(name=back1, isGroup=False) wait_for_volume_recurring_job_update(volume, jobs=[snap1, back1], groups=[DEFAULT]) snap1_recurring_job = client.by_id_recurring_job(snap1) back1_recurring_job = client.by_id_recurring_job(back1) client.delete(snap1_recurring_job) wait_for_volume_recurring_job_update(volume, jobs=[back1], groups=[DEFAULT]) client.delete(back1_recurring_job) wait_for_volume_recurring_job_update(volume, jobs=[], groups=[DEFAULT]) # Delete recurring job-group in-use by other recurring-job # should not remove the volume label recurring_jobs = { snap1: snap_job, snap2: snap_job, back1: back_job, back2: { TASK: BACKUP, GROUPS: [DEFAULT], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) volume.recurringJobAdd(name=group1, isGroup=True) wait_for_volume_recurring_job_update(volume, jobs=[], groups=[group1, DEFAULT]) volume.recurringJobDelete(name=DEFAULT, isGroup=True) wait_for_volume_recurring_job_update(volume, jobs=[], groups=[group1]) snap1_recurring_job = client.by_id_recurring_job(snap1) client.delete(snap1_recurring_job) time.sleep(5) wait_for_volume_recurring_job_update(volume, jobs=[], groups=[group1]) snap2_recurring_job = client.by_id_recurring_job(snap2) client.delete(snap2_recurring_job) time.sleep(5) wait_for_volume_recurring_job_update(volume, jobs=[], groups=[group1]) # Delete last recurring job of the group would clean up that job-group # for all volumes. back1_recurring_job = client.by_id_recurring_job(back1) client.delete(back1_recurring_job) time.sleep(5) wait_for_volume_recurring_job_update(volume, jobs=[], groups=[DEFAULT]) # Delete recurring job in default job-group should not effect on the # default job-group auto-assignment back2_recurring_job = client.by_id_recurring_job(back2) client.delete(back2_recurring_job) time.sleep(5) wait_for_volume_recurring_job_update(volume, jobs=[], groups=[DEFAULT])
Scenario: test delete recurring job should remove volume labels
Given 1 volume created. And create
snapshot1
recurring job. createbackup1
recurring job. Andsnapshot1
job applied to the volume.backup1
job applied to the volume. Andsnapshot1
job exist in volume recurring job label.backup1
job exist in volume recurring job label.When delete
snapshot1
recurring job. Thensnapshot1
job not exist in volume recurring job label.backup1
job exist in volume recurring job label.When delete
backup1
recurring job. Thensnapshot1
job not exist in volume recurring job label.backup1
job not exist in volume recurring job label.Given create
snapshot1
recurring job withgroup-1
in groups. And createsnapshot2
recurring job withgroup-1
in groups. And createbackup1
recurring job withgroup-1
in groups. And createbackup2
recurring job withdefault
in groups. // The default job-group automatically applies to the volumes // with no recurring job. We want to keep the test focused on the // behavior so removing the default job-group assignment first. And remove volume recurring job labeldefault
job-group. Andgroup-1
job-group applied to the volume. Andgroup-1
job-group exist in volume recurring job label.default
job-group not exist in volume recurring job label.When delete
snapshot1
recurring job. Thengroup-1
job-group exist in volume recurring job label.When delete
snapshot2
recurring job. Thengroup-1
job-group exist in volume recurring job label.When delete
back1
recurring job. Thengroup-1
job-group not exist in volume recurring job label. Anddefault
job-group exist in volume recurring job label.When delete
back2
recurring job. Then should not removedefault
job-group in volume. def test_recurring_job_detached_volume(client, batch_v1_api, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_detached_volume(client, batch_v1_api, volume_name): # NOQA """ Scenario: test recurring job while volume is detached Given a volume created, and attached. And write some data to the volume. And detach the volume. When create a recurring job running at 1 minute interval, and with `default` in groups, and with `retain` set to `2`. And 1 cron job should be created. And wait for 2 minutes. And attach volume and wait until healthy Then the volume should have 1 snapshot When wait for 2 minute. Then then volume should have only 2 snapshots. """ client.create_volume(name=volume_name, size=SIZE, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) self_host = get_self_host_id() volume.attach(hostId=self_host) volume = wait_for_volume_healthy(client, volume_name) write_volume_random_data(volume) volume.detach() recurring_jobs = { RECURRING_JOB_NAME: { TASK: BACKUP, GROUPS: [DEFAULT], CRON: SCHEDULE_1MIN, RETAIN: 2, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) wait_for_cron_job_count(batch_v1_api, 1) time.sleep(60 * 2) volume.attach(hostId=self_host) volume = wait_for_volume_healthy(client, volume_name) wait_for_snapshot_count(volume, 1) time.sleep(60 * 2) wait_for_snapshot_count(volume, 2)
Scenario: test recurring job while volume is detached
Given a volume created, and attached. And write some data to the volume. And detach the volume.
When create a recurring job running at 1 minute interval, and with
default
in groups, and withretain
set to2
. And 1 cron job should be created. And wait for 2 minutes. And attach volume and wait until healthyThen the volume should have 1 snapshot
When wait for 2 minute. Then then volume should have only 2 snapshots.
def test_recurring_job_duplicated(client)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_duplicated(client): # NOQA """ Scenario: test create duplicated recurring jobs Given recurring job created. When create same recurring job again. Then should fail. """ recurring_jobs = { RECURRING_JOB_NAME: { TASK: BACKUP, GROUPS: [], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) with pytest.raises(Exception) as e: create_recurring_jobs(client, recurring_jobs) assert "already exists" in str(e.value)
Scenario: test create duplicated recurring jobs
Given recurring job created. When create same recurring job again. Then should fail.
def test_recurring_job_filesystem_trim(client, core_api, batch_v1_api, volume_name, csi_pv, pvc, pod_make, access_mode)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA @pytest.mark.parametrize("access_mode", [ACCESS_MODE_RWO, ACCESS_MODE_RWX]) # NOQA def test_recurring_job_filesystem_trim(client, core_api, batch_v1_api, volume_name, csi_pv, pvc, pod_make, access_mode): # NOQA """ Scenario: test recurring job filesystem-trim Given a workload (volume, pv, pvc, pod). And create 50mb file in volume. And actual size of the volume should increase. And delete the 50mb file in volume. And actual size of the volume should not decrease. When create a recurring job with: - task: filesystem-trim - retain: 1 Then recurring job retain mutated to 0. When assign the recurring job to volume. And wait for the cron job scheduled time. Then volume actual size should decrease 50mb. """ pod_name, _, _, _ = \ prepare_pod_with_data_in_mb(client, core_api, csi_pv, pvc, pod_make, volume_name, data_size_in_mb=10, access_mode=access_mode) volume = client.by_id_volume(volume_name) # Do the first trim to ensure that the final trim size only includes the # created file. volume.trimFilesystem(volume=volume_name) initial_size = wait_for_actual_size_change_mb(client, volume_name, 0) file_delete = "/data/trim.test" test_size = 50 write_pod_volume_random_data(core_api, pod_name, file_delete, test_size) size_after_file_created = \ wait_for_actual_size_change_mb(client, volume_name, initial_size) assert size_after_file_created - initial_size == test_size command = f'rm {file_delete}' exec_command_in_pod(core_api, command, pod_name, 'default') try: wait_for_actual_size_change_mb( client, volume_name, size_after_file_created, retry_counts=10, wait_stablize=True ) assert False, \ f'expecting no change in actual size: {size_after_file_created}' except Exception: pass recurring_jobs = { RECURRING_JOB_NAME: { TASK: FILESYSTEM_TRIM, GROUPS: [DEFAULT], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) wait_for_cron_job_count(batch_v1_api, 1) size_after_file_deleted = \ wait_for_actual_size_change_mb(client, volume_name, size_after_file_created) size_trimmed = size_after_file_created - size_after_file_deleted assert size_trimmed == test_size
Scenario: test recurring job filesystem-trim
Given a workload (volume, pv, pvc, pod). And create 50mb file in volume. And actual size of the volume should increase. And delete the 50mb file in volume. And actual size of the volume should not decrease.
When create a recurring job with: - task: filesystem-trim - retain: 1 Then recurring job retain mutated to 0.
When assign the recurring job to volume. And wait for the cron job scheduled time. Then volume actual size should decrease 50mb.
def test_recurring_job_groups(set_random_backupstore, client, batch_v1_api)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_groups(set_random_backupstore, client, batch_v1_api): # NOQA """ Scenario: test recurring job groups (S3/NFS) Given volume `test-job-1` created, attached, and healthy. volume `test-job-2` created, attached, and healthy. And create `snapshot` recurring job with `group-1, group-2` in groups. set cron job to run every 2 minutes. set retain to 1. create `backup` recurring job with `group-1` in groups. set cron job to run every 3 minutes. set retain to 1 When set `group1` recurring job in volume `test-job-1` label. set `group2` recurring job in volume `test-job-2` label. And write some data to volume `test-job-1`. write some data to volume `test-job-2`. And wait for 2 minutes. And write some data to volume `test-job-1`. write some data to volume `test-job-2`. And wait for 1 minute. Then volume `test-job-1` should have 3 snapshots after scheduled time. volume `test-job-2` should have 2 snapshots after scheduled time. And volume `test-job-1` should have 1 backup after scheduled time. volume `test-job-2` should have 0 backup after scheduled time. """ volume1_name = "test-job-1" volume2_name = "test-job-2" client.create_volume(name=volume1_name, size=SIZE, dataEngine=DATA_ENGINE) client.create_volume(name=volume2_name, size=SIZE, dataEngine=DATA_ENGINE) volume1 = wait_for_volume_detached(client, volume1_name) volume2 = wait_for_volume_detached(client, volume2_name) self_id = get_self_host_id() volume1.attach(hostId=self_id) volume2.attach(hostId=self_id) volume1 = wait_for_volume_healthy(client, volume1_name) volume2 = wait_for_volume_healthy(client, volume2_name) group1 = "group-1" group2 = "group-2" recurring_jobs = { SNAPSHOT: { TASK: SNAPSHOT, GROUPS: [group1, group2], CRON: "*/2 * * * *", RETAIN: 1, CONCURRENCY: 2, LABELS: {}, }, BACKUP: { TASK: BACKUP, GROUPS: [group1], CRON: "*/3 * * * *", RETAIN: 1, CONCURRENCY: 2, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) volume1.recurringJobAdd(name=group1, isGroup=True) volume2.recurringJobAdd(name=group2, isGroup=True) wait_for_cron_job_count(batch_v1_api, 2) write_volume_random_data(volume1) write_volume_random_data(volume2) time.sleep(60 * 2 - WRITE_DATA_INTERVAL) write_volume_random_data(volume1) write_volume_random_data(volume2) time.sleep(60) wait_for_snapshot_count(volume1, 3) # volume-head,snapshot,backup-snapshot wait_for_snapshot_count(volume2, 2) # volume-head,snapshot wait_for_backup_count(find_backup_volume(client, volume1_name, retry=RETRY_COUNTS_SHORT), 1) backup_created = True try: wait_for_backup_count(find_backup_volume(client, volume2_name), 1, retry_counts=60) # AttributeError: # BackupVolume is 'NoneType' object has no attribute 'backupList' except (AssertionError, AttributeError): backup_created = False assert not backup_created
Scenario: test recurring job groups (S3/NFS)
Given volume
test-job-1
created, attached, and healthy. volumetest-job-2
created, attached, and healthy. And createsnapshot
recurring job withgroup-1, group-2
in groups. set cron job to run every 2 minutes. set retain to 1. createbackup
recurring job withgroup-1
in groups. set cron job to run every 3 minutes. set retain to 1When set
group1
recurring job in volumetest-job-1
label. setgroup2
recurring job in volumetest-job-2
label. And write some data to volumetest-job-1
. write some data to volumetest-job-2
. And wait for 2 minutes. And write some data to volumetest-job-1
. write some data to volumetest-job-2
. And wait for 1 minute.Then volume
test-job-1
should have 3 snapshots after scheduled time. volumetest-job-2
should have 2 snapshots after scheduled time. And volumetest-job-1
should have 1 backup after scheduled time. volumetest-job-2
should have 0 backup after scheduled time. def test_recurring_job_in_storageclass(set_random_backupstore, client, core_api, storage_class, statefulset)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_in_storageclass(set_random_backupstore, client, core_api, storage_class, statefulset): # NOQA """ Test create volume with StorageClass contains recurring jobs 1. Create a StorageClass with recurring jobs 2. Create a StatefulSet with PVC template and StorageClass 3. Verify the recurring jobs run correctly. """ recurring_jobs = { SNAPSHOT: { TASK: SNAPSHOT, GROUPS: [], CRON: SCHEDULE_1MIN, RETAIN: 2, CONCURRENCY: 1, LABELS: {}, }, BACKUP: { TASK: BACKUP, GROUPS: [], CRON: "*/2 * * * *", RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) recurring_job_selector = [ { NAME: SNAPSHOT, ISGROUP: False, }, { NAME: BACKUP, ISGROUP: False, }, ] storage_class["parameters"]["recurringJobSelector"] = \ json.dumps(recurring_job_selector) create_storage_class(storage_class) # wait until the beginning of an even minute wait_until_begin_of_an_even_minute() statefulset_name = 'recurring-job-in-storageclass-test' update_statefulset_manifests(statefulset, storage_class, statefulset_name) start_time = datetime.utcnow() create_and_wait_statefulset(statefulset) statefulset_creating_duration = datetime.utcnow() - start_time assert 150 > statefulset_creating_duration.seconds # We want to write data exactly at the 150th second since the start_time time.sleep(150 - statefulset_creating_duration.seconds) pod_info = get_statefulset_pod_info(core_api, statefulset) volume_info = [p['pv_name'] for p in pod_info] pod_names = [p['pod_name'] for p in pod_info] # write random data to volume to trigger recurring snapshot and backup job volume_data_path = "/data/test" for pod_name in pod_names: write_pod_volume_random_data(core_api, pod_name, volume_data_path, 2) time.sleep(150) # 2.5 minutes for volume_name in volume_info: # NOQA volume = client.by_id_volume(volume_name) wait_for_snapshot_count(volume, 4)
Test create volume with StorageClass contains recurring jobs
- Create a StorageClass with recurring jobs
- Create a StatefulSet with PVC template and StorageClass
- Verify the recurring jobs run correctly.
def test_recurring_job_in_volume_creation(client, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_in_volume_creation(client, volume_name): # NOQA """ Scenario: test create volume with recurring jobs Given 2 recurring jobs created. And volume create and a attached. When label recurring job to volume. And write data to volume. wait 2.5 minutes. And write data to volume. wait 2.5 minutes. Then volume have 4 snapshots. """ recurring_jobs = { SNAPSHOT: { TASK: SNAPSHOT, GROUPS: [], CRON: SCHEDULE_1MIN, RETAIN: 2, CONCURRENCY: 1, LABELS: {}, }, BACKUP: { TASK: BACKUP, GROUPS: [], CRON: "*/2 * * * *", RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) client.create_volume(name=volume_name, size=SIZE, numberOfReplicas=2, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) volume.attach(hostId=get_self_host_id()) volume = wait_for_volume_healthy(client, volume_name) volume.recurringJobAdd(name=SNAPSHOT, isGroup=False) volume.recurringJobAdd(name=BACKUP, isGroup=False) wait_for_volume_recurring_job_update(volume, jobs=[SNAPSHOT, BACKUP], groups=[DEFAULT]) wait_until_begin_of_an_even_minute() # wait until the 10th second of an even minute # to avoid writing data at the same time backup is taking time.sleep(10) write_volume_random_data(volume) time.sleep(150) # 2.5 minutes write_volume_random_data(volume) time.sleep(150) # 2.5 minutes wait_for_snapshot_count(volume, 4)
Scenario: test create volume with recurring jobs
Given 2 recurring jobs created. And volume create and a attached.
When label recurring job to volume. And write data to volume. wait 2.5 minutes. And write data to volume. wait 2.5 minutes.
Then volume have 4 snapshots.
def test_recurring_job_kubernetes_status(set_random_backupstore, client, core_api, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.csi # NOQA @pytest.mark.recurring_job def test_recurring_job_kubernetes_status(set_random_backupstore, client, core_api, volume_name): # NOQA """ Scenario: test recurringJob properly backs up the KubernetesStatus (S3/NFS) Given volume created and detached. And PV from volume created and verified. When create backup recurring job to run every 2 minutes. And attach volume. And write some data to volume. And wait 5 minutes. Then volume have 2 snapshots. volume have 1 backup. And backup have the Kubernetes Status labels. """ client.create_volume(name=volume_name, size=SIZE, numberOfReplicas=2, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) pv_name = "pv-" + volume_name create_pv_for_volume(client, core_api, volume, pv_name) ks = { 'pvName': pv_name, 'pvStatus': 'Available', 'namespace': '', 'pvcName': '', 'lastPVCRefAt': '', 'lastPodRefAt': '', } wait_volume_kubernetes_status(client, volume_name, ks) recurring_jobs = { RECURRING_JOB_NAME: { TASK: BACKUP, GROUPS: [DEFAULT], CRON: "*/2 * * * *", RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) volume.attach(hostId=get_self_host_id()) volume = wait_for_volume_healthy(client, volume_name) write_volume_random_data(volume) time.sleep(60 * 5) # 1 from Backup, 1 from Volume Head. wait_for_snapshot_count(volume, 2) # Verify the Labels on the actual Backup. bv = find_backup_volume(client, volume_name) backups = bv.backupList().data wait_for_backup_count(bv, 1) b = bv.backupGet(name=backups[0].name) status = json.loads(b.labels.get(KUBERNETES_STATUS_LABEL)) assert b.labels.get(RECURRING_JOB_LABEL) == RECURRING_JOB_NAME assert status == { 'lastPodRefAt': '', 'lastPVCRefAt': '', 'namespace': '', 'pvcName': '', 'pvName': pv_name, 'pvStatus': 'Available', 'workloadsStatus': None } # Two Labels: KubernetesStatus and RecurringJob additional 1 # Longhorn will automatically add a label `longhorn.io/volume-access-mode` # to a newly created backup assert len(b.labels) == 3
Scenario: test recurringJob properly backs up the KubernetesStatus (S3/NFS)
Given volume created and detached. And PV from volume created and verified.
When create backup recurring job to run every 2 minutes. And attach volume. And write some data to volume. And wait 5 minutes.
Then volume have 2 snapshots. volume have 1 backup. And backup have the Kubernetes Status labels.
def test_recurring_job_label_on_pvc(client, core_api, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job def test_recurring_job_label_on_pvc(client, core_api, volume_name): # NOQA """ Scenario: test recurring job label on PVC Given volume created. And PV created. and PVC created. and PVC source recurring job label exists. and Volume default recurring job label exists. and RecurringJobs created (recurring-test-1, recurring-test-2). and RecurringJob (recurring-test-2) is in group (recurring-test-2) When add a recurring job label (recurring-test-1) on the PVC. And add a recurring job-group label (recurring-test-2) on the PVC. Then Volume has recurring job label (recurring-test-1). And Volume has recurring job-group label (recurring-test-2). When remove PVC recurring job label (recurring-test-2). Then Volume has recurring job label (recurring-test-1). When delete RecurringJobs (recurring-test-1). Then Volume has recurring job label (default). And PVC has no recurring job. """ volume = create_and_check_volume(client, volume_name, size=str(2 * Gi)) volume = wait_for_volume_detached(client, volume.name) pv_name = volume_name + "-pv" create_pv_for_volume(client, core_api, volume, pv_name) pvc_name = volume_name + "-pvc" create_pvc_for_volume(client, core_api, volume, pvc_name) add_recurring_job_source_to_pvc(pvc_name, core_api) recurring_job_name_1 = f'{RECURRING_JOB_NAME}-1' recurring_job_name_2 = f'{RECURRING_JOB_NAME}-2' recurring_jobs = { recurring_job_name_1: { TASK: SNAPSHOT, GROUPS: [], CRON: "*/2 * * * *", RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, recurring_job_name_2: { TASK: SNAPSHOT, GROUPS: [recurring_job_name_2], CRON: "*/2 * * * *", RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) claim = core_api.read_namespaced_persistent_volume_claim( name=pvc_name, namespace='default' ) if claim.metadata.labels is None: claim.metadata.labels = {} label_key_1 = f"recurring-job.longhorn.io/{recurring_job_name_1}" label_key_2 = f"recurring-job-group.longhorn.io/{recurring_job_name_2}" claim.metadata.labels[label_key_1] = "enabled" claim.metadata.labels[label_key_2] = "enabled" update_persistent_volume_claim(core_api, pvc_name, 'default', claim) wait_for_volume_recurring_job_update(volume, jobs=[recurring_job_name_1], groups=[recurring_job_name_2]) claim = core_api.read_namespaced_persistent_volume_claim( name=pvc_name, namespace='default' ) del claim.metadata.labels[label_key_2] update_persistent_volume_claim(core_api, pvc_name, 'default', claim) wait_for_volume_recurring_job_update(volume, jobs=[recurring_job_name_1], groups=[]) recurring_job_1 = client.by_id_recurring_job(recurring_job_name_1) client.delete(recurring_job_1) wait_for_volume_recurring_job_update(volume, jobs=[], groups=[DEFAULT]) claim = core_api.read_namespaced_persistent_volume_claim( name=pvc_name, namespace='default' ) if claim.metadata.labels is None: claim.metadata.labels = {} unexpected_count = 0 for label in claim.metadata.labels: if not label.startswith("recurring-job"): continue if label == "recurring-job.longhorn.io/source": continue unexpected_count += 1 assert unexpected_count == 0
Scenario: test recurring job label on PVC
Given volume created. And PV created. and PVC created. and PVC source recurring job label exists. and Volume default recurring job label exists. and RecurringJobs created (recurring-test-1, recurring-test-2). and RecurringJob (recurring-test-2) is in group (recurring-test-2)
When add a recurring job label (recurring-test-1) on the PVC. And add a recurring job-group label (recurring-test-2) on the PVC. Then Volume has recurring job label (recurring-test-1). And Volume has recurring job-group label (recurring-test-2).
When remove PVC recurring job label (recurring-test-2). Then Volume has recurring job label (recurring-test-1).
When delete RecurringJobs (recurring-test-1). Then Volume has recurring job label (default). And PVC has no recurring job.
def test_recurring_job_labels(set_random_backupstore, client, random_labels, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_labels(set_random_backupstore, client, random_labels, volume_name): # NOQA """ Scenario: test a recurring job with labels (S3/NFS) Given a recurring job created, with `default` in groups, with random labels. And volume created and attached. And write data to volume. When add another label to the recurring job. And write data to volume. And wait after scheduled time. Then should have 2 snapshots. And backup should have correct labels. """ recurring_job_labels_test(client, random_labels, volume_name) # NOQA
Scenario: test a recurring job with labels (S3/NFS)
Given a recurring job created, with
default
in groups, with random labels. And volume created and attached. And write data to volume.When add another label to the recurring job. And write data to volume. And wait after scheduled time.
Then should have 2 snapshots. And backup should have correct labels.
def test_recurring_job_multiple_volumes(set_random_backupstore, client, batch_v1_api)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_multiple_volumes(set_random_backupstore, client, batch_v1_api): # NOQA """ Scenario: test recurring job with multiple volumes Given volume `test-job-1` created, attached and healthy. And create `backup1` recurring job with `default` in groups. create `backup2` recurring job with `` in groups. And `default` group exist in `test-job-1` volume recurring job label. And `backup1` cron job exist. `backup2` cron job exist. And write data to `test-job-1` volume. And 2 snapshot exist in `test-job-1` volume. And 1 backup exist in `test-job-1` volume. When create and attach volume `test-job-2`. wait for volume `test-job-2` to be healthy. And `default` group exist in `test-job-2` volume recurring job label. And write data to `test-job-1` volume. Then 2 snapshot exist in `test-job-2` volume. 1 backup exist in `test-job-2` volume. When add `backup2` in `test-job-2` volume label. And `default` group exist in `test-job-1` volume recurring job label. `default` group exist in `test-job-2` volume recurring job label. `backup2` group exist in `test-job-2` volume recurring job label. And write data to `test-job-1`. write data to `test-job-2`. Then wait for schedule time. And 2 backup exist in `test-job-2` volume. 1 backup exist in `test-job-1` volume. """ volume1_name = "test-job-1" client.create_volume(name=volume1_name, size=SIZE, dataEngine=DATA_ENGINE) volume1 = wait_for_volume_detached(client, volume1_name) volume1.attach(hostId=get_self_host_id()) volume1 = wait_for_volume_healthy(client, volume1_name) back1 = BACKUP + "1" back2 = BACKUP + "2" recurring_jobs = { back1: { TASK: BACKUP, GROUPS: [DEFAULT], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: {}, }, back2: { TASK: BACKUP, GROUPS: [], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) wait_for_volume_recurring_job_update(volume1, jobs=[], groups=[DEFAULT]) wait_for_cron_job_count(batch_v1_api, 2) wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+back1) wait_for_cron_job_create(batch_v1_api, JOB_LABEL+"="+back2) write_volume_random_data(volume1) wait_for_snapshot_count(volume1, 2) wait_for_backup_count(find_backup_volume(client, volume1_name, retry=RETRY_COUNTS_SHORT), 1) volume2_name = "test-job-2" client.create_volume(name=volume2_name, size=SIZE, dataEngine=DATA_ENGINE) volume2 = wait_for_volume_detached(client, volume2_name) volume2.attach(hostId=get_self_host_id()) volume2 = wait_for_volume_healthy(client, volume2_name) wait_for_volume_recurring_job_update(volume2, jobs=[], groups=[DEFAULT]) write_volume_random_data(volume2) wait_for_snapshot_count(volume2, 2) wait_for_backup_count(find_backup_volume(client, volume2_name, retry=RETRY_COUNTS_SHORT), 1) volume2.recurringJobAdd(name=back2, isGroup=False) wait_for_volume_recurring_job_update(volume1, jobs=[], groups=[DEFAULT]) wait_for_volume_recurring_job_update(volume2, jobs=[back2], groups=[DEFAULT]) write_volume_random_data(volume1) write_volume_random_data(volume2) time.sleep(70 - WRITE_DATA_INTERVAL) wait_for_backup_count(find_backup_volume(client, volume2_name), 2) wait_for_backup_count(find_backup_volume(client, volume1_name), 1)
Scenario: test recurring job with multiple volumes
Given volume
test-job-1
created, attached and healthy. And createbackup1
recurring job withdefault
in groups. createbackup2
recurring job with` in groups. And <code>default</code> group exist in
test-job-1volume recurring job label. And <code>backup1</code> cron job exist. <code>backup2</code> cron job exist. And write data to
test-job-1volume. And 2 snapshot exist in
test-job-1volume. And 1 backup exist in
test-job-1` volume.When create and attach volume
test-job-2
. wait for volumetest-job-2
to be healthy. Anddefault
group exist intest-job-2
volume recurring job label. And write data totest-job-1
volume. Then 2 snapshot exist intest-job-2
volume. 1 backup exist intest-job-2
volume.When add
backup2
intest-job-2
volume label. Anddefault
group exist intest-job-1
volume recurring job label.default
group exist intest-job-2
volume recurring job label.backup2
group exist intest-job-2
volume recurring job label. And write data totest-job-1
. write data totest-job-2
. Then wait for schedule time. And 2 backup exist intest-job-2
volume. 1 backup exist intest-job-1
volume. def test_recurring_job_restored_from_backup_target(set_random_backupstore, client, batch_v1_api)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_restored_from_backup_target(set_random_backupstore, client, batch_v1_api): # NOQA """ Scenario: test recurring job backup (S3/NFS) 1. Create a volume and attach it to a node or a workload. 2. Create some recurring jobs (some are in groups) 3. Label the volume with created recurring jobs (some are in groups) 4. Create a backup or wait for a recurring job starting 5. Wait for backup creation completed. 6. Check if recurring jobs/groups information is stored in the backup volume configuration on the backup target 7. Create a volume from the backup just created. 8. Check the volume if it has labels of recurring jobs and groups. 9. Delete recurring jobs that are already stored in the backup volume on the backup. 10. Create a volume from the backup just created. 11. Check if recurring jobs have been created. 12. Check if restoring volume has labels of recurring jobs and groups. """ common.update_setting(client, common.SETTING_DEGRADED_AVAILABILITY, "false") SCHEDULE_1WEEK = "0 0 * * 0" SCHEDULE_2MIN = "*/2 * * * *" snap1 = SNAPSHOT + "1" back1 = BACKUP + "1" back2 = BACKUP + "2" group1 = "group01" volume_name1 = "record-recur" + "-" + generate_volume_name() rvolume_name1 = "restore-01" + "-" + generate_volume_name() rvolume_name2 = "restore-02" + "-" + generate_volume_name() recurring_jobs = { back1: { TASK: BACKUP, GROUPS: [DEFAULT], CRON: SCHEDULE_2MIN, RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, snap1: { TASK: SNAPSHOT, GROUPS: [], CRON: SCHEDULE_1MIN, RETAIN: 2, CONCURRENCY: 1, LABELS: {}, }, back2: { TASK: BACKUP, GROUPS: [group1], CRON: SCHEDULE_1WEEK, RETAIN: 3, CONCURRENCY: 3, LABELS: {}, }, } common.update_setting(client, SETTING_RESTORE_RECURRING_JOBS, "true") create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) volume = client.create_volume(name=volume_name1, size=SIZE, numberOfReplicas=2, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name1) volume = volume.attach(hostId=get_self_host_id()) volume = wait_for_volume_healthy(client, volume_name1) volume.recurringJobAdd(name=snap1, isGroup=False) volume.recurringJobAdd(name=back1, isGroup=False) volume.recurringJobAdd(name=group1, isGroup=True) wait_for_volume_recurring_job_update(volume, jobs=[snap1, back1], groups=[DEFAULT, group1]) write_volume_random_data(volume) time.sleep(120) # 2 from job_snap, 1 from job_backup, 1 volume-head wait_for_snapshot_count(volume, 4) complete_backup_1_count = 0 restore_snapshot_name = "" wait_for_backup_completion(client, volume_name1) volume = client.by_id_volume(volume_name1) for b in volume.backupStatus: if back1+"-" in b.snapshot: complete_backup_1_count += 1 restore_snapshot_name = b.snapshot assert complete_backup_1_count == 1 volume.detach() # create a volume from the backup with recurring jobs exist _, backup = find_backup(client, volume_name1, restore_snapshot_name) client.create_volume(name=rvolume_name1, size=SIZE, fromBackup=backup.url, dataEngine=DATA_ENGINE) rvolume1 = wait_for_volume_detached(client, rvolume_name1) wait_for_volume_recurring_job_update(rvolume1, jobs=[snap1, back1], groups=[DEFAULT, group1]) # create a volume from the backup with recurring jobs do not exist cleanup_all_recurring_jobs(client) client.create_volume(name=rvolume_name2, size=SIZE, fromBackup=backup.url, dataEngine=DATA_ENGINE) rvolume2 = wait_for_volume_detached(client, rvolume_name2) wait_for_volume_recurring_job_update(rvolume2, jobs=[snap1, back1], groups=[DEFAULT, group1])
Scenario: test recurring job backup (S3/NFS)
- Create a volume and attach it to a node or a workload.
- Create some recurring jobs (some are in groups)
- Label the volume with created recurring jobs (some are in groups)
- Create a backup or wait for a recurring job starting
- Wait for backup creation completed.
-
Check if recurring jobs/groups information is stored in the backup volume configuration on the backup target
-
Create a volume from the backup just created.
-
Check the volume if it has labels of recurring jobs and groups.
-
Delete recurring jobs that are already stored in the backup volume on the backup.
- Create a volume from the backup just created.
- Check if recurring jobs have been created.
- Check if restoring volume has labels of recurring jobs and groups.
def test_recurring_job_snapshot(client, batch_v1_api)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_snapshot(client, batch_v1_api): # NOQA """ Scenario: test recurring job snapshot Given volume `test-job-1` created, attached, and healthy. volume `test-job-2` created, attached, and healthy. When create a recurring job with `default` in groups. Then should have 1 cron job. And volume `test-job-1` should have volume-head 1 snapshot. volume `test-job-2` should have volume-head 1 snapshot. When write some data to volume `test-job-1`. write some data to volume `test-job-2`. And wait for cron job scheduled time. Then volume `test-job-1` should have 2 snapshots after scheduled time. volume `test-job-2` should have 2 snapshots after scheduled time. When write some data to volume `test-job-1`. write some data to volume `test-job-2`. And wait for cron job scheduled time. Then volume `test-job-1` should have 3 snapshots after scheduled time. volume `test-job-2` should have 3 snapshots after scheduled time. """ volume1_name = "test-job-1" volume2_name = "test-job-2" client.create_volume(name=volume1_name, size=SIZE, dataEngine=DATA_ENGINE) client.create_volume(name=volume2_name, size=SIZE, dataEngine=DATA_ENGINE) volume1 = wait_for_volume_detached(client, volume1_name) volume2 = wait_for_volume_detached(client, volume2_name) self_host = get_self_host_id() volume1.attach(hostId=self_host) volume2.attach(hostId=self_host) volume1 = wait_for_volume_healthy(client, volume1_name) volume2 = wait_for_volume_healthy(client, volume2_name) recurring_jobs = { RECURRING_JOB_NAME: { TASK: SNAPSHOT, GROUPS: [DEFAULT], CRON: SCHEDULE_1MIN, RETAIN: 2, CONCURRENCY: 2, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) wait_for_cron_job_count(batch_v1_api, 1) # volume-head wait_for_snapshot_count(volume1, 1) wait_for_snapshot_count(volume2, 1) # 1st job write_volume_random_data(volume1) write_volume_random_data(volume2) time.sleep(60) wait_for_snapshot_count(volume1, 2) wait_for_snapshot_count(volume2, 2) # 2nd job # wait_until_begin_of_a_minute() write_volume_random_data(volume1) write_volume_random_data(volume2) time.sleep(60) wait_for_snapshot_count(volume1, 3) wait_for_snapshot_count(volume2, 3)
Scenario: test recurring job snapshot
Given volume
test-job-1
created, attached, and healthy. volumetest-job-2
created, attached, and healthy.When create a recurring job with
default
in groups. Then should have 1 cron job. And volumetest-job-1
should have volume-head 1 snapshot. volumetest-job-2
should have volume-head 1 snapshot.When write some data to volume
test-job-1
. write some data to volumetest-job-2
. And wait for cron job scheduled time. Then volumetest-job-1
should have 2 snapshots after scheduled time. volumetest-job-2
should have 2 snapshots after scheduled time.When write some data to volume
test-job-1
. write some data to volumetest-job-2
. And wait for cron job scheduled time. Then volumetest-job-1
should have 3 snapshots after scheduled time. volumetest-job-2
should have 3 snapshots after scheduled time. def test_recurring_job_snapshot_cleanup(set_random_backupstore, client, batch_v1_api, volume_name)
-
Expand source code
@pytest.mark.recurring_job # NOQA def test_recurring_job_snapshot_cleanup(set_random_backupstore, client, batch_v1_api, volume_name): # NOQA """ Scenario: test recurring job snapshot-cleanup Given volume created, attached, and healthy. And 2 snapshot were created by system. And 1 snapshot were created by user. And volume has 4 snapshots. - 2 system-created - 1 user-created - 1 volume-head When create a recurring job with: - task: system-snapshot-delete - retain: 1 Then recurring job retain mutated to 0. When assign the recurring job to volume. And wait for the cron job scheduled time. Then volume should have 2 snapshots. - 0 system-created - 1 user-created - 1 volume-head """ client.create_volume(name=volume_name, size=SIZE, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) self_host = get_self_host_id() volume.attach(hostId=self_host) volume = wait_for_volume_healthy(client, volume_name) expand_size = str(64 * Mi) volume.expand(size=expand_size) wait_for_volume_expansion(client, volume_name) # - 1 new system-created snapshot # - 1 volume-head wait_for_snapshot_count(volume, 2) expand_size = str(128 * Mi) volume.expand(size=expand_size) wait_for_volume_expansion(client, volume_name) # - 1 new system-created snapshot # - 1 system-created snapshot # - 1 volume-head wait_for_snapshot_count(volume, 3) create_snapshot(client, volume_name) # - 1 new system-created snapshot # - 1 system-created snapshot # - 1 user-created snapshot # - 1 volume-head wait_for_snapshot_count(volume, 4) recurring_jobs = { RECURRING_JOB_NAME: { TASK: SNAPSHOT_CLEANUP, GROUPS: [], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) wait_for_cron_job_count(batch_v1_api, 1) volume = client.by_id_volume(volume_name) volume.recurringJobAdd(name=RECURRING_JOB_NAME, isGroup=False) time.sleep(60) # - 0 system-creatd snapshot # - 1 user-created snapshot # - 1 volume-head wait_for_snapshot_count(volume, 2) system_created_count = 0 for snapshot in volume.snapshotList(): if not snapshot.usercreated and snapshot.name != VOLUME_HEAD_NAME: system_created_count += 1 assert system_created_count == 0
Scenario: test recurring job snapshot-cleanup
Given volume created, attached, and healthy. And 2 snapshot were created by system. And 1 snapshot were created by user. And volume has 4 snapshots. - 2 system-created - 1 user-created - 1 volume-head
When create a recurring job with: - task: system-snapshot-delete - retain: 1 Then recurring job retain mutated to 0.
When assign the recurring job to volume. And wait for the cron job scheduled time. Then volume should have 2 snapshots. - 0 system-created - 1 user-created - 1 volume-head
def test_recurring_job_snapshot_delete(set_random_backupstore, client, batch_v1_api, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_snapshot_delete(set_random_backupstore, client, batch_v1_api, volume_name): # NOQA """ Scenario: test recurring job snapshot-delete Given volume created, attached, and healthy. And 3 snapshots were created. And volume has 4 snapshots. - 3 user-created - 1 volume-head When create a recurring job with: - task: snapshot-delete - retain: 1 And assign the recurring job to volume. And wait for the cron job scheduled time. Then volume should have 2 snapshots. - 1 snapshot retained - 1 volume-head When recurring job unassigned to volume. And create 5 snapshots. And volume has 7 snapshots. - 5 new snapshots - 1 old snapshot - 1 volume-head And update recurring job retain to 3. And assign the recurring job to volume. And wait for the cron job scheduled time. Then volume should have 4 snapshots - 3 snapshots retained - 1 volume-head """ client.create_volume(name=volume_name, size=SIZE, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) self_host = get_self_host_id() volume.attach(hostId=self_host) volume = wait_for_volume_healthy(client, volume_name) num_snapshots = 3 for _ in range(num_snapshots): create_snapshot(client, volume_name) # - 3 new snapshot # - 1 volume-head wait_for_snapshot_count(volume, 4) recurring_jobs = { RECURRING_JOB_NAME: { TASK: SNAPSHOT_DELETE, GROUPS: [], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) wait_for_cron_job_count(batch_v1_api, 1) volume = client.by_id_volume(volume_name) volume.recurringJobAdd(name=RECURRING_JOB_NAME, isGroup=False) time.sleep(60) # - 1 snapshot retained # - 1 volume-head try: wait_for_snapshot_count(volume, 2) except Exception: wait_for_snapshot_count(volume, 2, count_removed=True) volume.recurringJobDelete(name=RECURRING_JOB_NAME, isGroup=False) num_snapshots = 5 for _ in range(num_snapshots): create_snapshot(client, volume_name) volume = client.by_id_volume(volume_name) # - 5 new snapshot # - 1 old snapshot # - 1 volume-head try: wait_for_snapshot_count(volume, 7) except Exception: wait_for_snapshot_count(volume, 7, count_removed=True) update_recurring_job(client, RECURRING_JOB_NAME, groups=[], labels={}, retain=3) volume.recurringJobAdd(name=RECURRING_JOB_NAME, isGroup=False) time.sleep(60) # - 3 snapshot retained # - 1 volume-head try: wait_for_snapshot_count(volume, 4) except Exception: wait_for_snapshot_count(volume, 4, count_removed=True)
Scenario: test recurring job snapshot-delete
Given volume created, attached, and healthy. And 3 snapshots were created. And volume has 4 snapshots. - 3 user-created - 1 volume-head
When create a recurring job with: - task: snapshot-delete - retain: 1 And assign the recurring job to volume. And wait for the cron job scheduled time.
Then volume should have 2 snapshots. - 1 snapshot retained - 1 volume-head
When recurring job unassigned to volume. And create 5 snapshots. And volume has 7 snapshots. - 5 new snapshots - 1 old snapshot - 1 volume-head And update recurring job retain to 3. And assign the recurring job to volume. And wait for the cron job scheduled time.
Then volume should have 4 snapshots - 3 snapshots retained - 1 volume-head
def test_recurring_job_snapshot_delete_retain_0(set_random_backupstore, client, batch_v1_api, volume_name)
-
Expand source code
@pytest.mark.recurring_job # NOQA def test_recurring_job_snapshot_delete_retain_0(set_random_backupstore, client, batch_v1_api, volume_name): # NOQA """ Scenario: test recurring job snapshot-delete with retain value 0 Given volume created, attached, and healthy. And 1 snapshots were created. And volume has 2 snapshots. - 1 user-created - 1 volume-head When create a recurring job with: - task: snapshot-delete - retain: 0 And assign the recurring job to volume. And wait for the cron job scheduled time. Then volume should have 1 snapshot. - 0 snapshot retained - 1 volume-head """ client.create_volume(name=volume_name, size=SIZE, dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) self_host = get_self_host_id() volume.attach(hostId=self_host) volume = wait_for_volume_healthy(client, volume_name) num_snapshots = 1 for _ in range(num_snapshots): create_snapshot(client, volume_name) # - 1 new snapshot # - 1 volume-head wait_for_snapshot_count(volume, 2) recurring_jobs = { RECURRING_JOB_NAME: { TASK: SNAPSHOT_DELETE, GROUPS: [], CRON: SCHEDULE_1MIN, RETAIN: 0, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) wait_for_cron_job_count(batch_v1_api, 1) volume = client.by_id_volume(volume_name) volume.recurringJobAdd(name=RECURRING_JOB_NAME, isGroup=False) time.sleep(60) # - 0 snapshot retained # - 1 volume-head try: wait_for_snapshot_count(volume, 1) except Exception: wait_for_snapshot_count(volume, 1, count_removed=True)
Scenario: test recurring job snapshot-delete with retain value 0
Given volume created, attached, and healthy. And 1 snapshots were created. And volume has 2 snapshots. - 1 user-created - 1 volume-head
When create a recurring job with: - task: snapshot-delete - retain: 0 And assign the recurring job to volume. And wait for the cron job scheduled time.
Then volume should have 1 snapshot. - 0 snapshot retained - 1 volume-head
def test_recurring_job_source_label(client, core_api, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job def test_recurring_job_source_label(client, core_api, volume_name): # NOQA """ Scenario: test recurring job source label Given volume created. And PV created. and PVC created. and Volume default recurring job label exists. and RecurringJob created (recurring-test). When add source recurring job label on PVC. And add a recurring job label (recurring-test) on the PVC. And Volume has recurring job label (recurring-test). When delete source recurring job label on PVC. And remove PVC recurring job label (recurring-test). Then wait some times for volume controller to resync (30 seconds) And Volume has recurring job label (recurring-test). When add source recurring job label on PVC. Then Volume has recurring job label (default). """ volume = create_and_check_volume(client, volume_name, size=str(2 * Gi)) volume = wait_for_volume_detached(client, volume.name) pv_name = volume_name + "-pv" create_pv_for_volume(client, core_api, volume, pv_name) pvc_name = volume_name + "-pvc" create_pvc_for_volume(client, core_api, volume, pvc_name) recurring_jobs = { RECURRING_JOB_NAME: { TASK: SNAPSHOT, GROUPS: [], CRON: "*/2 * * * *", RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) add_recurring_job_source_to_pvc(pvc_name, core_api) claim = core_api.read_namespaced_persistent_volume_claim( name=pvc_name, namespace='default' ) label_key = f"recurring-job.longhorn.io/{RECURRING_JOB_NAME}" claim.metadata.labels[label_key] = "enabled" update_persistent_volume_claim(core_api, pvc_name, 'default', claim) wait_for_volume_recurring_job_update(volume, jobs=[RECURRING_JOB_NAME], groups=[]) remove_recurring_job_source_to_pvc(pvc_name, core_api) claim = core_api.read_namespaced_persistent_volume_claim( name=pvc_name, namespace='default' ) del claim.metadata.labels[label_key] update_persistent_volume_claim(core_api, pvc_name, 'default', claim) try: wait_for_volume_recurring_job_update(volume, jobs=[], groups=[DEFAULT]) raise Exception("unexpected volume recurring job label update") except Exception: wait_for_volume_recurring_job_update(volume, jobs=[RECURRING_JOB_NAME], groups=[]) add_recurring_job_source_to_pvc(pvc_name, core_api) wait_for_volume_recurring_job_update(volume, jobs=[], groups=[DEFAULT])
Scenario: test recurring job source label
Given volume created. And PV created. and PVC created. and Volume default recurring job label exists. and RecurringJob created (recurring-test).
When add source recurring job label on PVC. And add a recurring job label (recurring-test) on the PVC. And Volume has recurring job label (recurring-test).
When delete source recurring job label on PVC. And remove PVC recurring job label (recurring-test). Then wait some times for volume controller to resync (30 seconds) And Volume has recurring job label (recurring-test).
When add source recurring job label on PVC. Then Volume has recurring job label (default).
def test_recurring_job_volume_label_when_job_and_group_use_same_name(client, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA @pytest.mark.recurring_job # NOQA def test_recurring_job_volume_label_when_job_and_group_use_same_name(client, volume_name): # NOQA """ Scenario: test volume label recurring job when recurring job and job-group uses the same name. Given volume-1 created. volume-2 created. volume-3 created. And create `snapshot1` recurring job with `snapshot-1` in groups. create `snapshot2` recurring job with `snapshot-1` in groups. And `snapshot1` job-group applied to volume-1. `snapshot1` job applied to volume-2. `snapshot2` job applied to volume-3. And `snapshot1` job-group exist in volume-1 recurring job label. `snapshot1` job exist in volume-2 recurring job label. `snapshot2` job exist in volume-3 recurring job label. When delete `snapshot1` recurring job. Then `snapshot1` job-group exist in volume-1 recurring job label. `snapshot1` job not exist in volume-2 recurring job label. `snapshot2` job exist in volume-3 recurring job label. When delete `snapshot2` recurring job. Then `snapshot1` job-group not exist in volume-1 recurring job label. `snapshot2` job not exist in volume-3 recurring job label. """ volume1_name = volume_name + "-1" volume2_name = volume_name + "-2" volume3_name = volume_name + "-3" client.create_volume(name=volume1_name, size=SIZE, dataEngine=DATA_ENGINE) client.create_volume(name=volume2_name, size=SIZE, dataEngine=DATA_ENGINE) client.create_volume(name=volume3_name, size=SIZE, dataEngine=DATA_ENGINE) volume1 = wait_for_volume_detached(client, volume1_name) volume2 = wait_for_volume_detached(client, volume2_name) volume3 = wait_for_volume_detached(client, volume3_name) snap1 = SNAPSHOT + "1" snap2 = SNAPSHOT + "2" snapshot = { TASK: SNAPSHOT, GROUPS: [snap1], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 2, LABELS: {}, } recurring_jobs = { snap1: snapshot, snap2: snapshot, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) # Delete recurring job should correctly remove volume label volume1.recurringJobAdd(name=snap1, isGroup=True) volume2.recurringJobAdd(name=snap1, isGroup=False) volume3.recurringJobAdd(name=snap2, isGroup=False) wait_for_volume_recurring_job_update(volume1, jobs=[], groups=[snap1, DEFAULT]) wait_for_volume_recurring_job_update(volume2, jobs=[snap1], groups=[DEFAULT]) wait_for_volume_recurring_job_update(volume3, jobs=[snap2], groups=[DEFAULT]) snap1_recurring_job = client.by_id_recurring_job(snap1) snap2_recurring_job = client.by_id_recurring_job(snap2) client.delete(snap1_recurring_job) wait_for_volume_recurring_job_update(volume2, jobs=[], groups=[DEFAULT]) wait_for_volume_recurring_job_update(volume1, jobs=[], groups=[snap1, DEFAULT]) wait_for_volume_recurring_job_update(volume3, jobs=[snap2], groups=[DEFAULT]) client.delete(snap2_recurring_job) wait_for_volume_recurring_job_update(volume1, jobs=[], groups=[DEFAULT]) wait_for_volume_recurring_job_update(volume3, jobs=[], groups=[DEFAULT])
Scenario: test volume label recurring job when recurring job and job-group uses the same name.
Given volume-1 created. volume-2 created. volume-3 created. And create
snapshot1
recurring job withsnapshot-1
in groups. createsnapshot2
recurring job withsnapshot-1
in groups. Andsnapshot1
job-group applied to volume-1.snapshot1
job applied to volume-2.snapshot2
job applied to volume-3. Andsnapshot1
job-group exist in volume-1 recurring job label.snapshot1
job exist in volume-2 recurring job label.snapshot2
job exist in volume-3 recurring job label.When delete
snapshot1
recurring job. Thensnapshot1
job-group exist in volume-1 recurring job label.snapshot1
job not exist in volume-2 recurring job label.snapshot2
job exist in volume-3 recurring job label.When delete
snapshot2
recurring job. Thensnapshot1
job-group not exist in volume-1 recurring job label.snapshot2
job not exist in volume-3 recurring job label. def test_recurring_jobs_allow_detached_volume(set_random_backupstore,
client,
core_api,
apps_api,
volume_name,
make_deployment_with_pvc)-
Expand source code
@pytest.mark.v2_volume_test # NOQA def test_recurring_jobs_allow_detached_volume(set_random_backupstore, client, core_api, apps_api, volume_name, make_deployment_with_pvc): # NOQA """ Scenario: test recurring jobs for detached volume with `allow-recurring-job-while-volume-detached` set to true Context: In the current Longhorn implementation, users cannot do recurring backup when volumes are detached. This feature gives the users an option to do recurring backup even when volumes are detached. longhorn/longhorn#1509 Given `allow-recurring-job-while-volume-detached` set to `true`. And volume created and attached. And 50MB data written to volume. And volume detached. When a recurring job created runs every minute. And wait for backup to complete. Then volume have 1 backup in 2 minutes retry loop. When delete the recurring job. And create a PV from volume. And create a PVC from volume. And create a deployment from PVC. And write 400MB data to the volume from the pod. And scale deployment replicas to 0. wait until the volume is detached. And create a recurring job runs every 2 minutes. And wait for backup to start. And scale deployment replicas to 1. Then volume's frontend is disabled. And pod cannot start. When wait until backup complete. And delete the recurring job. Then pod can start in 10 minutes retry loop. """ common.update_setting(client, SETTING_RECURRING_JOB_WHILE_VOLUME_DETACHED, "true") volume = create_and_check_volume(client, volume_name, size=str(1 * Gi)) volume.attach(hostId=get_self_host_id()) volume = wait_for_volume_healthy(client, volume.name) data = { 'pos': 0, 'content': common.generate_random_data(50 * Mi), } common.write_volume_data(volume, data) # Give sometimes for data to flush to disk time.sleep(15) volume.detach() volume = wait_for_volume_detached(client, volume.name) recurring_jobs = { RECURRING_JOB_NAME: { TASK: BACKUP, GROUPS: [DEFAULT], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) wait_for_backup_completion(client, volume.name) for _ in range(4): bv = find_backup_volume(client, volume.name, retry=RETRY_COUNTS_SHORT) wait_for_backup_count(bv, 1) time.sleep(30) cleanup_all_recurring_jobs(client) pv_name = volume_name + "-pv" create_pv_for_volume(client, core_api, volume, pv_name) pvc_name = volume_name + "-pvc" create_pvc_for_volume(client, core_api, volume, pvc_name) deployment_name = volume_name + "-dep" deployment = make_deployment_with_pvc(deployment_name, pvc_name) create_and_wait_deployment(apps_api, deployment) size_mb = 400 pod_names = common.get_deployment_pod_names(core_api, deployment) write_pod_volume_random_data(core_api, pod_names[0], "/data/test", size_mb) deployment['spec']['replicas'] = 0 apps_api.patch_namespaced_deployment(body=deployment, namespace='default', name=deployment["metadata"]["name"]) volume = wait_for_volume_detached(client, volume.name) recurring_jobs = { RECURRING_JOB_NAME: { TASK: BACKUP, GROUPS: [DEFAULT], CRON: "*/2 * * * *", RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) wait_for_backup_to_start(client, volume.name) deployment['spec']['replicas'] = 1 apps_api.patch_namespaced_deployment(body=deployment, namespace='default', name=deployment["metadata"]["name"]) deployment_label_name = deployment["metadata"]["labels"]["name"] common.wait_pod_attach_after_first_backup_completion( client, core_api, volume.name, deployment_label_name) cleanup_all_recurring_jobs(client) pod_names = common.get_deployment_pod_names(core_api, deployment) common.wait_for_pod_phase(core_api, pod_names[0], pod_phase="Running")
Scenario: test recurring jobs for detached volume with
allow-recurring-job-while-volume-detached
set to trueContext: In the current Longhorn implementation, users cannot do recurring backup when volumes are detached. This feature gives the users an option to do recurring backup even when volumes are detached. longhorn/longhorn#1509
Given
allow-recurring-job-while-volume-detached
set totrue
. And volume created and attached. And 50MB data written to volume. And volume detached.When a recurring job created runs every minute. And wait for backup to complete.
Then volume have 1 backup in 2 minutes retry loop.
When delete the recurring job. And create a PV from volume. And create a PVC from volume. And create a deployment from PVC. And write 400MB data to the volume from the pod. And scale deployment replicas to 0. wait until the volume is detached. And create a recurring job runs every 2 minutes. And wait for backup to start. And scale deployment replicas to 1. Then volume's frontend is disabled. And pod cannot start.
When wait until backup complete. And delete the recurring job. Then pod can start in 10 minutes retry loop.
def test_recurring_jobs_maximum_retain(client, core_api, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA def test_recurring_jobs_maximum_retain(client, core_api, volume_name): # NOQA """ Scenario: test recurring jobs' maximum retain Given set a recurring job retain to 101. When create recurring job. Then should fail. When set recurring job retain to 100. And create recurring job. Then recurring job created with retain equals to 100. When update recurring job retain to 101. Then should fail. """ # set max total number of retain to exceed 100 recurring_jobs = { RECURRING_JOB_NAME: { TASK: BACKUP, GROUPS: [], CRON: SCHEDULE_1MIN, RETAIN: 101, CONCURRENCY: 1, LABELS: {}, }, } validator_error = "retain value should be less than or equal to 100" with pytest.raises(Exception) as e: create_recurring_jobs(client, recurring_jobs) assert validator_error.upper() in str(e.value).upper() recurring_jobs[RECURRING_JOB_NAME][RETAIN] = 100 create_recurring_jobs(client, recurring_jobs) recurring_job = client.by_id_recurring_job(RECURRING_JOB_NAME) assert recurring_job.retain == 100 with pytest.raises(Exception) as e: update_recurring_job(client, RECURRING_JOB_NAME, groups=[], labels={}, retain=101) assert validator_error.upper() in str(e.value).upper()
Scenario: test recurring jobs' maximum retain
Given set a recurring job retain to 101.
When create recurring job. Then should fail.
When set recurring job retain to 100. And create recurring job. Then recurring job created with retain equals to 100.
When update recurring job retain to 101. Then should fail.
def test_recurring_jobs_on_nodes_with_taints()
-
Expand source code
@pytest.mark.skip(reason="TODO") def test_recurring_jobs_on_nodes_with_taints(): # NOQA """ Test recurring jobs on nodes with taints Context: Test the prevention of creation of multiple pods due to recurring job's pod being rejected by Taint controller on nodes with taints Steps: 1. Set taint toleration for Longhorn components `persistence=true:NoExecute` 2. Taint `node-1` with `persistence=true:NoExecute` 3. Create a volume, vol-1. Attach vol-1 to node-1 Write some data to vol-1 4. Create a recurring backup job which: Has retain count 10 Runs every minute 5. Wait for 3 minutes. Verify that the there is 1 backup created Verify that the total number of pod in longhorn-system namespace < 50 Verify that the number of pods of the cronjob is <= 2 6. Taint all nodes with `persistence=true:NoExecute` 7. Write some data to vol-1 8. Wait for 3 minutes. Verify that the there are 2 backups created in total Verify that the total number of pod in longhorn-system namespace < 50 Verify that the number of pods of the cronjob is <= 2 9. Remove `persistence=true:NoExecute` from all nodes and Longhorn setting Clean up backups, volumes """ pass
Test recurring jobs on nodes with taints
Context:
Test the prevention of creation of multiple pods due to recurring job's pod being rejected by Taint controller on nodes with taints
Steps:
- Set taint toleration for Longhorn components
persistence=true:NoExecute
- Taint
node-1
withpersistence=true:NoExecute
- Create a volume, vol-1. Attach vol-1 to node-1 Write some data to vol-1
- Create a recurring backup job which: Has retain count 10 Runs every minute
-
Wait for 3 minutes. Verify that the there is 1 backup created Verify that the total number of pod in longhorn-system namespace < 50 Verify that the number of pods of the cronjob is <= 2
-
Taint all nodes with
persistence=true:NoExecute
- Write some data to vol-1
-
Wait for 3 minutes. Verify that the there are 2 backups created in total Verify that the total number of pod in longhorn-system namespace < 50 Verify that the number of pods of the cronjob is <= 2
-
Remove
persistence=true:NoExecute
from all nodes and Longhorn setting Clean up backups, volumes
- Set taint toleration for Longhorn components
def test_recurring_jobs_when_volume_detached_unexpectedly(set_random_backupstore,
client,
core_api,
apps_api,
volume_name,
make_deployment_with_pvc)-
Expand source code
@pytest.mark.v2_volume_test # NOQA def test_recurring_jobs_when_volume_detached_unexpectedly(set_random_backupstore, client, core_api, apps_api, volume_name, make_deployment_with_pvc): # NOQA """ Scenario: test recurring jobs when volume detached unexpectedly Context: If the volume is automatically attached by the recurring backup job, make sure that workload pod eventually is able to use the volume when volume is detached unexpectedly during the backup process. Given `allow-recurring-job-while-volume-detached` set to `true`. And volume created and detached. And PV created from volume. And PVC created from volume. And deployment created from PVC. And 500MB data written to the volume. And deployment replica scaled to 0. And volume detached. When create a backup recurring job runs every 2 minutes. And wait for backup to start. wait for backup progress > 50%. And kill the engine process of the volume. Then volume is attached and healthy. When backup completed. Then volume is detached with `frontendDisabled=false`. When deployment replica scaled to 1. Then the data exist in the deployment pod. """ common.update_setting(client, SETTING_RECURRING_JOB_WHILE_VOLUME_DETACHED, "true") volume = create_and_check_volume(client, volume_name, size=str(2 * Gi)) volume = wait_for_volume_detached(client, volume.name) pv_name = volume_name + "-pv" create_pv_for_volume(client, core_api, volume, pv_name) pvc_name = volume_name + "-pvc" create_pvc_for_volume(client, core_api, volume, pvc_name) deployment_name = volume_name + "-dep" deployment = make_deployment_with_pvc(deployment_name, pvc_name) create_and_wait_deployment(apps_api, deployment) size_mb = 1000 pod_names = common.get_deployment_pod_names(core_api, deployment) write_pod_volume_random_data(core_api, pod_names[0], "/data/test", size_mb) data = read_volume_data(core_api, pod_names[0], 'default') deployment['spec']['replicas'] = 0 apps_api.patch_namespaced_deployment(body=deployment, namespace='default', name=deployment["metadata"]["name"]) volume = wait_for_volume_detached(client, volume_name) recurring_jobs = { RECURRING_JOB_NAME: { TASK: BACKUP, GROUPS: [], CRON: "*/2 * * * *", RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) volume.recurringJobAdd(name=RECURRING_JOB_NAME, isGroup=False) wait_for_volume_recurring_job_update(volume, jobs=[RECURRING_JOB_NAME], groups=[DEFAULT]) time.sleep(60) wait_for_recurring_backup_to_start(client, core_api, volume_name, expected_snapshot_count=1, minimum_progress=50) crash_engine_process_with_sigkill(client, core_api, volume_name) time.sleep(10) wait_for_volume_healthy_no_frontend(client, volume_name) # Since the backup state is removed after the backup complete and it # could happen quickly. Checking for the both in-progress and complete # state could be hard to catch, thus we only check the complete state wait_for_backup_completion(client, volume_name) wait_for_volume_detached(client, volume_name) deployment['spec']['replicas'] = 1 apps_api.patch_namespaced_deployment(body=deployment, namespace='default', name=deployment["metadata"]["name"]) wait_deployment_replica_ready(apps_api, deployment["metadata"]["name"], 1) pod_names = common.get_deployment_pod_names(core_api, deployment) assert read_volume_data(core_api, pod_names[0], 'default') == data # Use fixture to cleanup the backupstore and since we # crashed the engine replica initiated the backup, it's # backupstore lock will still be present, so we need # to wait till the lock is expired, before we can delete # the backups volume.recurringJobDelete(name=RECURRING_JOB_NAME, isGroup=False) backupstore.backupstore_wait_for_lock_expiration()
Scenario: test recurring jobs when volume detached unexpectedly
Context: If the volume is automatically attached by the recurring backup job, make sure that workload pod eventually is able to use the volume when volume is detached unexpectedly during the backup process.
Given
allow-recurring-job-while-volume-detached
set totrue
. And volume created and detached. And PV created from volume. And PVC created from volume. And deployment created from PVC. And 500MB data written to the volume. And deployment replica scaled to 0. And volume detached.When create a backup recurring job runs every 2 minutes. And wait for backup to start. wait for backup progress > 50%. And kill the engine process of the volume. Then volume is attached and healthy.
When backup completed. Then volume is detached with
frontendDisabled=false
.When deployment replica scaled to 1. Then the data exist in the deployment pod.
def wait_for_actual_size_change_mb(client, vol_name, old_size, retry_counts=60, wait_stablize=True)
-
Expand source code
def wait_for_actual_size_change_mb(client, vol_name, old_size, # NOQA retry_counts=60, wait_stablize=True): size_change = 0 for _ in range(retry_counts): time.sleep(5) volume = client.by_id_volume(vol_name) new_size = int(int(volume.controllers[0].actualSize) / Mi) if wait_stablize and new_size != size_change: size_change = new_size continue if new_size != old_size: return new_size assert False, f'expecting actual size change: {old_size} -> {new_size}'
def wait_for_recurring_backup_to_start(client, core_api, volume_name, expected_snapshot_count, minimum_progress=0)
-
Expand source code
def wait_for_recurring_backup_to_start(client, core_api, volume_name, expected_snapshot_count, minimum_progress=0): # NOQA job_pod_name = volume_name + '-backup-c' snapshot_name = '' snapshots = [] check_pod_existence(core_api, job_pod_name, namespace=LONGHORN_NAMESPACE) # Find the snapshot which is being backed up for _ in range(RETRY_BACKUP_COUNTS): volume = client.by_id_volume(volume_name) try: snapshots = volume.snapshotList() assert len(snapshots) == expected_snapshot_count + 1 for snapshot in snapshots: if snapshot.children['volume-head']: snapshot_name = snapshot.name break if len(snapshot_name) != 0: break except (AttributeError, ApiException, AssertionError): time.sleep(RETRY_BACKUP_INTERVAL) assert len(snapshot_name) != 0 # To ensure the progress of backup wait_for_backup_to_start(client, volume_name, snapshot_name=snapshot_name, chk_progress=minimum_progress) return snapshot_name
def wait_until_begin_of_a_minute()
-
Expand source code
def wait_until_begin_of_a_minute(): while True: current_time = datetime.utcnow() if current_time.second == 0: break time.sleep(1)
def wait_until_begin_of_an_even_minute()
-
Expand source code
def wait_until_begin_of_an_even_minute(): while True: current_time = datetime.utcnow() if current_time.second == 0 and current_time.minute % 2 == 0: break time.sleep(1)