Module tests.test_snapshot

Functions

def check_hashed_and_with_immediate_hash(client, volume_name, snapshot_data_integrity, volume_size=16, data_size=1)
Expand source code
def check_hashed_and_with_immediate_hash(client, volume_name, snapshot_data_integrity, volume_size=16, data_size=1):  # NOQA
    """
    1. Create and attach a volume
    2. Create snapshots
    3. Check snapshots' checksums are calculated and set and also
       check the checksums of the snapshot disk files are matched
       with the values from the api
    """

    # Step 1
    volume = create_and_check_volume(client, volume_name,
                                     num_of_replicas=3,
                                     size=str(volume_size * Mi),
                                     snapshot_data_integrity=snapshot_data_integrity) # NOQA

    lht_hostId = get_self_host_id()
    volume = volume.attach(hostId=lht_hostId)
    volume = wait_for_volume_healthy(client, volume_name)

    volume = client.by_id_volume(volume_name)

    # Step 2
    create_snapshots(client, volume, data_size, 3)

    # Step 3
    generate_checksums_time = wait_for_snapshot_checksums_generate(volume) # NOQA
    # The checksum should be calculated after creating the snapshot immediately
    time_range = range(0, SNAPSHOT_CHECK_TOLERATION_DELAY)
    assert generate_checksums_time in time_range
  1. Create and attach a volume
  2. Create snapshots
  3. Check snapshots' checksums are calculated and set and also check the checksums of the snapshot disk files are matched with the values from the api
def check_hashed_and_without_immediate_hash(client, volume_name, snapshot_data_integrity)
Expand source code
def check_hashed_and_without_immediate_hash(client, volume_name, snapshot_data_integrity): # NOQA
    """
    1. Create and attach a volume
    2. Create snapshots
    3. Sleep for 120 seconds. Check snapshots' checksums are not calculated
       and not set, because the immediate hash after snapshot creation feature
       is disabled
    4. Check snapshots' checksums are calculated and set by the periodic check
       mechanism
    """

    # Step 1
    volume = create_and_check_volume(client, volume_name,
                                     num_of_replicas=3,
                                     size=str(16 * Mi),
                                     snapshot_data_integrity=snapshot_data_integrity) # NOQA

    lht_hostId = get_self_host_id()
    volume = volume.attach(hostId=lht_hostId)
    volume = wait_for_volume_healthy(client, volume_name)

    volume = client.by_id_volume(volume_name)

    # Step 2
    create_snapshots(client, volume, 1, 3)

    # Step 3
    sleep_time = 120
    time.sleep(sleep_time)
    assert not check_snapshot_checksums_set(volume)

    # Step 4
    generate_checksums_time = wait_for_snapshot_checksums_generate(volume) # NOQA
    remaining = SNAPSHOT_CHECK_PERIOD - sleep_time
    time_range = range(0, remaining+SNAPSHOT_CHECK_TOLERATION_DELAY)
    assert generate_checksums_time in time_range
  1. Create and attach a volume
  2. Create snapshots
  3. Sleep for 120 seconds. Check snapshots' checksums are not calculated and not set, because the immediate hash after snapshot creation feature is disabled
  4. Check snapshots' checksums are calculated and set by the periodic check mechanism
def check_per_volume_hash_disable(client, volume_name, snapshot_data_integrity)
Expand source code
def check_per_volume_hash_disable(client, volume_name, snapshot_data_integrity):  # NOQA
    """
    1. Create and attach a volume
    2. Create snapshots
    3. Sleep for SNAPSHOT_CHECK_PERIOD+SNAPSHOT_CHECK_TOLERATION_DELAY seconds.
       Check snapshots' checksums are calculated
       and set and also check the checksums of the snapshot disk files
       are matched with the values from the api
       Check snapshots' checksums are calculated by periodic snapshot
       verification mechanism and set and also check the checksums of
       the snapshot disk files are matched with the values from the api
    """

    # Step 1
    volume = create_and_check_volume(client, volume_name,
                                     num_of_replicas=3,
                                     size=str(16 * Mi),
                                     snapshot_data_integrity=snapshot_data_integrity)  # NOQA

    lht_hostId = get_self_host_id()
    volume = volume.attach(hostId=lht_hostId)
    volume = wait_for_volume_healthy(client, volume_name)

    volume = client.by_id_volume(volume_name)

    # Step 2
    create_snapshots(client, volume, 1, 3)

    # Step 3
    time.sleep(SNAPSHOT_CHECK_PERIOD+SNAPSHOT_CHECK_TOLERATION_DELAY)
    assert not check_snapshot_checksums_set(volume)
  1. Create and attach a volume
  2. Create snapshots
  3. Sleep for SNAPSHOT_CHECK_PERIOD+SNAPSHOT_CHECK_TOLERATION_DELAY seconds. Check snapshots' checksums are calculated and set and also check the checksums of the snapshot disk files are matched with the values from the api Check snapshots' checksums are calculated by periodic snapshot verification mechanism and set and also check the checksums of the snapshot disk files are matched with the values from the api
def check_snapshot_checksums_and_change_timestamps(volume)
Expand source code
def check_snapshot_checksums_and_change_timestamps(volume):
    data_path = get_local_host_replica_data_path(volume)
    assert data_path != ""

    for i in range(RETRY_COUNTS):
        value = volume.snapshotList()
        snapshots = value.data
        for s in snapshots:
            if s.name != "volume-head" and s.usercreated is True:
                if s.checksum == "":
                    break

                # Check checksums in snapshot resource and in checksum file
                # are matched
                disk_path = os.path.join(data_path,
                                         "volume-snap-" + s.name + ".img")
                checksum = get_checksum_in_checksum_file(disk_path)
                print(f'snapshot {s.name}: '
                      f'checksum in resource={s.checksum}, '
                      f'checksum recalculated={checksum}')
                assert checksum == s.checksum

                # Check checksums in snapshot resource and the calculated value
                # are matched
                checksum = get_checksum_from_snapshot_disk_file(data_path,
                                                                s.name)
                print(f'snapshot {s.name}: '
                      f'checksum in resource={s.checksum}, '
                      f'checksum recorded={checksum}')
                assert checksum == s.checksum

                # Check ctime in checksum file and from stat are matched
                ctime_recorded = get_ctime_in_checksum_file(disk_path)
                ctime = get_ctime_from_snapshot_disk_file(data_path, s.name)

                print(f'snapshot {s.name}: '
                      f'ctime recorded={ctime_recorded}, '
                      f'ctime={ctime}')

                df = pd.DataFrame({'string': [ctime_recorded, ctime]})
                df['timestamp'] = pd.to_datetime(df['string'].str.replace(" +0000", "", regex=False))  # NOQA
                assert str(df['timestamp'][0]) == str(df['timestamp'][1])

            if snapshots.index(s) == len(snapshots)-1:
                return True

        time.sleep(RETRY_INTERVAL)

    return False
def check_snapshot_checksums_set(volume)
Expand source code
def check_snapshot_checksums_set(volume):
    value = volume.snapshotList()
    snapshots = value.data
    for s in snapshots:
        if s.name == "volume-head":
            continue
        if s.checksum == "":
            return False

    for s in snapshots:
        print(f'Checksum of snapshot {s.name} is {s.checksum}')
    return True
def corrupt_snapshot_on_local_host(volume, snapshot_name)
Expand source code
def corrupt_snapshot_on_local_host(volume, snapshot_name):
    data_path = get_local_host_replica_data_path(volume)
    assert data_path != ""

    disk_path = os.path.join(data_path,
                             "volume-snap-" + snapshot_name + ".img")
    exec_cmd = ["dd", "if=/dev/urandom", "of=" + disk_path,
                "bs=1", "count=10", "conv=notrunc"]
    try:
        subprocess.check_output(exec_cmd)
    except subprocess.CalledProcessError as e:
        print(e.output)
        return False
    return True
def create_snapshots(client, volume, data_size, num_snapshots)
Expand source code
def create_snapshots(client, volume, data_size, num_snapshots):  # NOQA
    dev = get_volume_endpoint(volume)

    snaps = []
    for i in range(num_snapshots):
        assert write_device_random_data(dev, data_size)
        snap = create_snapshot(client, volume.name)
        snaps.append(snap)

    snapshots = volume.snapshotList()
    snapMap = {}
    for snap in snapshots:
        snapMap[snap.name] = snap

    for i in range(num_snapshots):
        snap = snaps[i]
        assert snapMap[snap.name].name == snap.name
        assert snapMap[snap.name].removed is False
        if i > 0:
            prev_snap = snaps[i-1]
            assert snapMap[snap.name].parent == prev_snap.name
def detect_and_repair_corrupted_replica(client, volume_name, data_integrity_mode, retry_count=360)
Expand source code
def detect_and_repair_corrupted_replica(client, volume_name, data_integrity_mode, retry_count=SNAPSHOT_CHECK_PERIOD+SNAPSHOT_CHECK_TOLERATION_DELAY):  # NOQA
    """
    1. Create and attach a volume
    2. Create snapshots
    3. Check snapshots' checksums are calculated and set and also
       check the checksums of the snapshot disk files are matched
       with the values from the api
    4. Corrupt the snapshot of replica on the local host
    5. Check the replica rebuild is ran correctly
    6. Check snapshots are repaired
    """

    # Step 1
    volume = create_and_check_volume(client, volume_name,
                                     num_of_replicas=3,
                                     size=str(2 * Gi))

    lht_hostId = get_self_host_id()
    volume = volume.attach(hostId=lht_hostId)
    volume = wait_for_volume_healthy(client, volume_name)
    volume = client.by_id_volume(volume_name)

    # Step 2
    create_snapshots(client, volume, 1536, 3)

    # Step 3
    assert check_snapshot_checksums_and_change_timestamps(volume)

    # Step 4
    snapshot_name = get_available_snapshot(volume)
    assert snapshot_name != ""

    assert corrupt_snapshot_on_local_host(volume, snapshot_name)

    # Step 5
    wait_for_rebuild_start(client, volume_name, retry_count, 1)

    volume = client.by_id_volume(volume_name)
    assert len(volume.replicas) == 3

    wait_for_rebuild_complete(client, volume_name)

    # Step 6
    # The checksum in checksum file should be updated by the periodic check
    time.sleep(SNAPSHOT_CHECK_PERIOD+SNAPSHOT_CHECK_TOLERATION_DELAY)
    assert check_snapshot_checksums_and_change_timestamps(volume)
  1. Create and attach a volume
  2. Create snapshots
  3. Check snapshots' checksums are calculated and set and also check the checksums of the snapshot disk files are matched with the values from the api
  4. Corrupt the snapshot of replica on the local host
  5. Check the replica rebuild is ran correctly
  6. Check snapshots are repaired
def get_available_snapshot(volume)
Expand source code
def get_available_snapshot(volume):
    data_path = get_local_host_replica_data_path(volume)
    assert data_path != ""

    value = volume.snapshotList()
    snapshots = value.data
    for s in snapshots:
        if s.name != "volume-head" and s.usercreated is True:
            return s.name

    return ""
def get_available_snapshots(volume)
Expand source code
def get_available_snapshots(volume):
    # return "all" snapshots without volume-head
    data_path = get_local_host_replica_data_path(volume)
    assert data_path != ""

    available_snapshots = []
    value = volume.snapshotList()
    snapshots = value.data
    for s in snapshots:
        if s.name != "volume-head":
            available_snapshots.append(s)

    return available_snapshots
def get_checksum_from_snapshot_disk_file(data_path, snapshot_name)
Expand source code
def get_checksum_from_snapshot_disk_file(data_path, snapshot_name):
    exec_cmd = ["java", "-jar", "/jacksum.jar", "-a", "crc64_go-iso",
                os.path.join(data_path,
                             "volume-snap-" + snapshot_name + ".img")]
    try:
        output = subprocess.check_output(exec_cmd)
    except subprocess.CalledProcessError as e:
        print(e.output)
        return ""

    data = str(output, 'utf-8')
    fields = data.split(' ')
    return fields[0]
def get_checksum_in_checksum_file(disk_path)
Expand source code
def get_checksum_in_checksum_file(disk_path):
    checksum_file = disk_path + ".checksum"
    try:
        f = open(checksum_file)
        metadata = json.load(f)
    except OSError as e:
        print(e)
        raise

    return metadata["checksum"]
def get_ctime_from_snapshot_disk_file(data_path, snapshot_name)
Expand source code
def get_ctime_from_snapshot_disk_file(data_path, snapshot_name):
    exec_cmd = ["stat", "-c", "%z",
                os.path.join(data_path,
                             "volume-snap-" + snapshot_name + ".img")]
    try:
        output = subprocess.check_output(exec_cmd)
    except subprocess.CalledProcessError as e:
        print(e.output)
        return ""

    return str(output, 'utf-8').rstrip('\n') + " UTC"
def get_ctime_in_checksum_file(disk_path)
Expand source code
def get_ctime_in_checksum_file(disk_path):
    checksum_file = disk_path + ".checksum"
    try:
        f = open(checksum_file)
        metadata = json.load(f)
    except OSError as e:
        print(e)
        raise

    return metadata["change_time"]
def get_local_host_replica_data_path(volume)
Expand source code
def get_local_host_replica_data_path(volume):
    lht_hostId = get_self_host_id()

    dataPath = ""
    replicas = volume.replicas
    for r in replicas:
        if r.hostId == lht_hostId:
            dataPath = r.dataPath
            break

    return dataPath
def prepare_settings_for_snapshot_test(client, data_integrity, immediate_check, fast_replica_rebuild, period_in_second=300)
Expand source code
def prepare_settings_for_snapshot_test(client, data_integrity, immediate_check, fast_replica_rebuild, period_in_second=SNAPSHOT_CHECK_PERIOD):  # NOQA
    period_in_minute = period_in_second // 60
    # Make the next hash time more predictable
    now = datetime.datetime.now()
    minutes = ",".join(str(e)
                       for e in [(now.minute + i) % 60
                                 for i in range(0, 30, period_in_minute)])
    hours = str(now.hour) + "," + str((now.hour + 1) % 24)

    cronjob = f"{minutes} {hours} * * *"

    snapshot_data_integrity_setting = SETTING_SNAPSHOT_DATA_INTEGRITY
    snapshot_fast_data_rebuild_enabled_setting = \
        SETTING_SNAPSHOT_FAST_REPLICA_REBUILD_ENABLED

    update_setting(client,
                   snapshot_data_integrity_setting,
                   data_integrity)
    update_setting(client,
                   SETTING_SNAPSHOT_DATA_INTEGRITY_IMMEDIATE_CHECK_AFTER_SNAPSHOT_CREATION,  # NOQA
                   immediate_check)
    update_setting(client,
                   SETTING_SNAPSHOT_DATA_INTEGRITY_CRONJOB,
                   cronjob)
    update_setting(client,
                   snapshot_fast_data_rebuild_enabled_setting,
                   fast_replica_rebuild)
def test_freeze_file_system_for_snapshot()
Expand source code
@pytest.mark.skip(reason="TODO") # NOQA
def test_freeze_file_system_for_snapshot(): # NOQA
    """
    1. Record the test start time.
    2. Set freeze-filesystem-for-snapshot to true.
    3. Create a Longhorn volume 6 GiB with a replica on every Longhorn node.
    4. Create a workload pod that mounts the volume and runs
       "dd if=/dev/random of=/path/to/file/under/mount/point bs=1M count=5000".
       (Add "status=progress" if running manually to see the progress freeze.)
    5. After the workload is running, take a snapshot.
    6. Verify the snapshot succeeded.
    7. Verify the pod completed successfully.
    8. Verify the following logs appeared in the instance-manager running the
       volume engine after the test began. (Checking logs this way isn't ideal,
       but there isn't really a better way to confirm the freeze.)
       - "Freezing filesystem mounted at"
       - "Unfreezing filesystem mounted at"
    """
  1. Record the test start time.
  2. Set freeze-filesystem-for-snapshot to true.
  3. Create a Longhorn volume 6 GiB with a replica on every Longhorn node.
  4. Create a workload pod that mounts the volume and runs "dd if=/dev/random of=/path/to/file/under/mount/point bs=1M count=5000". (Add "status=progress" if running manually to see the progress freeze.)
  5. After the workload is running, take a snapshot.
  6. Verify the snapshot succeeded.
  7. Verify the pod completed successfully.
  8. Verify the following logs appeared in the instance-manager running the volume engine after the test began. (Checking logs this way isn't ideal, but there isn't really a better way to confirm the freeze.)
  9. "Freezing filesystem mounted at"
  10. "Unfreezing filesystem mounted at"
def test_snapshot_cr(client, volume_name, settings_reset)
Expand source code
@pytest.mark.v2_volume_test  # NOQA
def test_snapshot_cr(client, volume_name, settings_reset):  # NOQA
    """
    GitHub ticket: https://github.com/longhorn/longhorn/issues/6298

    1. set auto-cleanup-system-generated-snapshot to true
    2. Create and attach a volume with 3 replicas
    3. Delete one of the volumes replicas.
    4. Wait for the replica to rebuild and volume become healthy
    5. Verify that there is one Longhorn snapshot CR of this volume
    6. Repeat steps 3-5 about 10 times
    """
    setting = client.by_id_setting(
        SETTING_AUTO_CLEANUP_SYSTEM_GERERATED_SNAPSHOT)
    client.update(setting, value="true")

    lht_hostId = get_self_host_id()
    volume = create_and_check_volume(client, volume_name,
                                     num_of_replicas=3,
                                     size=str(1 * Gi))
    volume = volume.attach(hostId=lht_hostId)
    wait_for_volume_healthy(client, volume_name)
    volume = client.by_id_volume(volume_name)

    snapshots = volume.snapshotList()

    repeat_time = 10
    created_time = ""
    for i in range(repeat_time):
        delete_replica_on_test_node(client, volume_name)
        wait_for_volume_degraded(client, volume_name)
        wait_for_volume_healthy(client, volume_name)

        # 2 snapshots, 1 is volume-head, the other 1 was system generated
        for j in range(RETRY_COUNTS_SHORT):
            volume = client.by_id_volume(volume_name)
            snapshots = get_available_snapshots(volume)
            if len(snapshots) == 1:
                break
            time.sleep(RETRY_INTERVAL)

        assert len(snapshots) == 1
        assert snapshots[0].created != created_time
        created_time = snapshots[0].created

GitHub ticket: https://github.com/longhorn/longhorn/issues/6298

  1. set auto-cleanup-system-generated-snapshot to true
  2. Create and attach a volume with 3 replicas
  3. Delete one of the volumes replicas.
  4. Wait for the replica to rebuild and volume become healthy
  5. Verify that there is one Longhorn snapshot CR of this volume
  6. Repeat steps 3-5 about 10 times
def test_snapshot_hash_detect_corruption_in_global_enabled_mode(client, volume_name, settings_reset)
Expand source code
@pytest.mark.long_running
def test_snapshot_hash_detect_corruption_in_global_enabled_mode(client, volume_name, settings_reset):  # NOQA
    """
    Check the snapshot corruption can be detected and replica is rebuilt
    - global data-integrity is set to enabled
    - global immediate_hash is enabled
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_ENABLED,
                                       '{"v1":"true","v2":"true"}',
                                       '{"v1":"false","v2":"false"}')
    detect_and_repair_corrupted_replica(client, volume_name,
                                        "enabled",
                                        retry_count=SNAPSHOT_CHECK_PERIOD * 2)

Check the snapshot corruption can be detected and replica is rebuilt - global data-integrity is set to enabled - global immediate_hash is enabled

def test_snapshot_hash_detect_corruption_in_global_fast_check_mode(client, volume_name, settings_reset)
Expand source code
@pytest.mark.long_running
def test_snapshot_hash_detect_corruption_in_global_fast_check_mode(client, volume_name, settings_reset):  # NOQA
    """
    Check the snapshot corruption can be detected and replica is rebuilt
    - global data-integrity is set to fast-check
    - global immediate_hash is enabled
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_FAST_CHECK,
                                       '{"v1":"true","v2":"true"}',
                                       '{"v1":"false","v2":"false"}')
    detect_and_repair_corrupted_replica(client, volume_name,
                                        "fast-check",
                                        retry_count=SNAPSHOT_CHECK_PERIOD * 2)

Check the snapshot corruption can be detected and replica is rebuilt - global data-integrity is set to fast-check - global immediate_hash is enabled

def test_snapshot_hash_global_disabled_and_per_volume_enabled_and_with_immediate_hash(client, volume_name, settings_reset)
Expand source code
def test_snapshot_hash_global_disabled_and_per_volume_enabled_and_with_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are immediately calculated when the snapshots
    check when
    - global data-integrity is set to disabled
    - global immediate_hash is enabled
    - per-volume data-integrity is set to enabled
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_DISABLED,
                                       '{"v1":"true","v2":"true"}',
                                       '{"v1":"true","v2":"true"}')
    check_hashed_and_with_immediate_hash(client, volume_name, "enabled")

Check snapshots' checksums are immediately calculated when the snapshots check when - global data-integrity is set to disabled - global immediate_hash is enabled - per-volume data-integrity is set to enabled

def test_snapshot_hash_global_disabled_and_per_volume_enabled_and_without_immediate_hash(client, volume_name, settings_reset)
Expand source code
def test_snapshot_hash_global_disabled_and_per_volume_enabled_and_without_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are calculated by the periodic checksum check
    - global data-integrity is set to disabled
    - global immediate_hash is disabled
    - per-volume data-integrity is set to enabled
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_DISABLED,
                                       '{"v1":"false","v2":"false"}',
                                       '{"v1":"true","v2":"true"}')
    check_hashed_and_without_immediate_hash(client, volume_name, "enabled")

Check snapshots' checksums are calculated by the periodic checksum check - global data-integrity is set to disabled - global immediate_hash is disabled - per-volume data-integrity is set to enabled

def test_snapshot_hash_global_disabled_and_per_volume_fast_check_and_with_immediate_hash(client, volume_name, settings_reset)
Expand source code
def test_snapshot_hash_global_disabled_and_per_volume_fast_check_and_with_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are immediately calculated when the snapshots
    check when
    - global data-integrity is set to disabled
    - global immediate_hash is enabled
    - per-volume data-integrity is set to fast-check
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_DISABLED,
                                       '{"v1":"true","v2":"true"}',
                                       '{"v1":"true","v2":"true"}')
    check_hashed_and_with_immediate_hash(client, volume_name, "fast-check")

Check snapshots' checksums are immediately calculated when the snapshots check when - global data-integrity is set to disabled - global immediate_hash is enabled - per-volume data-integrity is set to fast-check

def test_snapshot_hash_global_disabled_and_per_volume_fast_check_and_without_immediate_hash(client, volume_name, settings_reset)
Expand source code
def test_snapshot_hash_global_disabled_and_per_volume_fast_check_and_without_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are calculated by the periodic checksum check
    - global data-integrity is set to disabled
    - global immediate_hash is disabled
    - per-volume data-integrity is set to fast-check
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_DISABLED,
                                       '{"v1":"false","v2":"false"}',
                                       '{"v1":"true","v2":"true"}')
    check_hashed_and_without_immediate_hash(client, volume_name, "fast-check")

Check snapshots' checksums are calculated by the periodic checksum check - global data-integrity is set to disabled - global immediate_hash is disabled - per-volume data-integrity is set to fast-check

def test_snapshot_hash_global_disabled_with_immediate_hash(client, volume_name, settings_reset)
Expand source code
@pytest.mark.v2_volume_test  # NOQA
def test_snapshot_hash_global_disabled_with_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are not calculated when
    check when
    - global data-integrity is set to disabled
    - global immediate_hash is enabled
    - per-volume data-integrity is set to disabled and follows the
      global setting
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_DISABLED,
                                       '{"v1":"true","v2":"true"}',
                                       '{"v1":"true","v2":"true"}')
    check_per_volume_hash_disable(client, volume_name, "ignored")

Check snapshots' checksums are not calculated when check when - global data-integrity is set to disabled - global immediate_hash is enabled - per-volume data-integrity is set to disabled and follows the global setting

def test_snapshot_hash_global_disabled_without_immediate_hash(client, volume_name, settings_reset)
Expand source code
@pytest.mark.v2_volume_test  # NOQA
def test_snapshot_hash_global_disabled_without_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are not calculated when
    check when
    - global data-integrity is set to disabled
    - global immediate_hash is disabled
    - per-volume data-integrity is set to disabled and follows the
      global setting
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_DISABLED,
                                       '{"v1":"false","v2":"false"}',
                                       '{"v1":"true","v2":"true"}')
    check_per_volume_hash_disable(client, volume_name, "ignored")

Check snapshots' checksums are not calculated when check when - global data-integrity is set to disabled - global immediate_hash is disabled - per-volume data-integrity is set to disabled and follows the global setting

def test_snapshot_hash_global_enabled_and_per_volume_disable_and_with_immediate_hash(client, volume_name, settings_reset)
Expand source code
def test_snapshot_hash_global_enabled_and_per_volume_disable_and_with_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are not calculated
    - global data-integrity is set to enabled
    - global immediate_hash is enabled
    - per-volume data-integrity is set to disabled
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_ENABLED,
                                       '{"v1":"true","v2":"true"}',
                                       '{"v1":"true","v2":"true"}')
    check_per_volume_hash_disable(client, volume_name, "disabled")

Check snapshots' checksums are not calculated - global data-integrity is set to enabled - global immediate_hash is enabled - per-volume data-integrity is set to disabled

def test_snapshot_hash_global_enabled_and_per_volume_disable_and_without_immediate_hash(client, volume_name, settings_reset)
Expand source code
def test_snapshot_hash_global_enabled_and_per_volume_disable_and_without_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are not calculated
    - global data-integrity is set to enabled
    - global immediate_hash is disabled
    - per-volume data-integrity is set to disabled
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_ENABLED,
                                       '{"v1":"false","v2":"false"}',
                                       '{"v1":"true","v2":"true"}')
    check_per_volume_hash_disable(client, volume_name, "disabled")

Check snapshots' checksums are not calculated - global data-integrity is set to enabled - global immediate_hash is disabled - per-volume data-integrity is set to disabled

def test_snapshot_hash_global_enabled_with_immediate_hash(client, volume_name, settings_reset)
Expand source code
def test_snapshot_hash_global_enabled_with_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are immediately calculated when the snapshots
    are created
    - global data-integrity is set to enabled
    - global immediate_hash is enabled
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_ENABLED,
                                       '{"v1":"true","v2":"true"}',
                                       '{"v1":"true","v2":"true"}')
    check_hashed_and_with_immediate_hash(client, volume_name, "ignored")

Check snapshots' checksums are immediately calculated when the snapshots are created - global data-integrity is set to enabled - global immediate_hash is enabled

def test_snapshot_hash_global_enabled_without_immediate_hash(client, volume_name, settings_reset)
Expand source code
def test_snapshot_hash_global_enabled_without_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are calculated by the periodic checksum check
    - global data-integrity is set to enabled
    - global immediate_hash is disabled
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_ENABLED,
                                       '{"v1":"false","v2":"false"}',
                                       '{"v1":"true","v2":"true"}')
    check_hashed_and_without_immediate_hash(client, volume_name, "ignored")

Check snapshots' checksums are calculated by the periodic checksum check - global data-integrity is set to enabled - global immediate_hash is disabled

def test_snapshot_hash_global_fast_check_and_per_volume_disable_and_with_immediate_hash(client, volume_name, settings_reset)
Expand source code
@pytest.mark.v2_volume_test  # NOQA
def test_snapshot_hash_global_fast_check_and_per_volume_disable_and_with_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are not calculated
    - global data-integrity is set to fast-check
    - global immediate_hash is enabled
    - per-volume data-integrity is set to disabled
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_FAST_CHECK,
                                       '{"v1":"true","v2":"true"}',
                                       '{"v1":"true","v2":"true"}')
    check_per_volume_hash_disable(client, volume_name, "disabled")

Check snapshots' checksums are not calculated - global data-integrity is set to fast-check - global immediate_hash is enabled - per-volume data-integrity is set to disabled

def test_snapshot_hash_global_fast_check_and_per_volume_disable_and_without_immediate_hash(client, volume_name, settings_reset)
Expand source code
@pytest.mark.v2_volume_test  # NOQA
def test_snapshot_hash_global_fast_check_and_per_volume_disable_and_without_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are not calculated
    - global data-integrity is set to fast-check
    - global immediate_hash is disabled
    - per-volume data-integrity is set to disabled
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_FAST_CHECK,
                                       '{"v1":"false","v2":"false"}',
                                       '{"v1":"true","v2":"true"}')
    check_per_volume_hash_disable(client, volume_name, "disabled")

Check snapshots' checksums are not calculated - global data-integrity is set to fast-check - global immediate_hash is disabled - per-volume data-integrity is set to disabled

def test_snapshot_hash_global_fast_check_with_immediate_hash(client, volume_name, settings_reset)
Expand source code
def test_snapshot_hash_global_fast_check_with_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are immediately calculated when the snapshots
    are created
    - global data-integrity is set to fast-check
    - global immediate_hash is enabled
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_FAST_CHECK,
                                       '{"v1":"true","v2":"true"}',
                                       '{"v1":"true","v2":"true"}')
    check_hashed_and_with_immediate_hash(client,
                                         volume_name,
                                         "ignored")

Check snapshots' checksums are immediately calculated when the snapshots are created - global data-integrity is set to fast-check - global immediate_hash is enabled

def test_snapshot_hash_global_fast_check_without_immediate_hash(client, volume_name, settings_reset)
Expand source code
def test_snapshot_hash_global_fast_check_without_immediate_hash(client, volume_name, settings_reset):  # NOQA
    """
    Check snapshots' checksums are calculated by the periodic checksum check
    - global data-integrity is set to fast-check
    - global immediate_hash is disabled
    """
    prepare_settings_for_snapshot_test(client,
                                       SNAPSHOT_DATA_INTEGRITY_FAST_CHECK,
                                       '{"v1":"false","v2":"false"}',
                                       '{"v1":"true","v2":"true"}')
    check_hashed_and_without_immediate_hash(client, volume_name,
                                            "ignored")

Check snapshots' checksums are calculated by the periodic checksum check - global data-integrity is set to fast-check - global immediate_hash is disabled

def wait_for_snapshot_checksums_generate(volume_name)
Expand source code
def wait_for_snapshot_checksums_generate(volume_name):   # NOQA
    snapshot_checksums_generate = False

    start_time = time.time()
    for _ in range(RETRY_WAIT_CHECKSUM_COUNTS):
        if check_snapshot_checksums_set(volume_name):
            elapsed_time = int(time.time() - start_time)
            print(f'All checksums are set in {elapsed_time} sec')
            snapshot_checksums_generate = True
            break
        else:
            time.sleep(RETRY_INTERVAL)

    assert snapshot_checksums_generate
    return elapsed_time
def write_device_random_data(dev, size)
Expand source code
def write_device_random_data(dev, size):
    exec_cmd = ["dd", "if=/dev/urandom", "of=" + dev,
                "bs=1M", "count=" + str(size)]
    try:
        subprocess.check_output(exec_cmd)
    except subprocess.CalledProcessError as e:
        print(e.output)
        return False
    return True