Module tests.test_stress

Functions

def backup_create_and_record_md5sum(client, core_api, volume_name, pod_name, snapshots_md5sum)
Expand source code
def backup_create_and_record_md5sum(client, core_api, volume_name, pod_name, snapshots_md5sum): # NOQA
    volume = client.by_id_volume(volume_name)

    data_md5sum = read_data_md5sum(core_api, pod_name)

    snap_name = snapshot_create_and_record_md5sum(client,
                                                  core_api,
                                                  volume_name,
                                                  pod_name,
                                                  snapshots_md5sum)
    snap = snapshot_data(snap_name)

    snapshots_md5sum[snap_name] = snap

    volume.snapshotBackup(name=snap_name)

    global WAIT_BACKUP_COMPLETE
    if WAIT_BACKUP_COMPLETE is None:
        WAIT_BACKUP_COMPLETE = bool(random.getrandbits(1))

    if WAIT_BACKUP_COMPLETE is True:
        # wait for volume backupStatus to be updated with new backup record
        for i in range(RETRY_COUNTS):
            volume = client.by_id_volume(volume_name)
            for b in volume.backupStatus:
                if b.snapshot == snap_name:
                    if b.error == "":
                        wait_for_backup_completion(client,
                                                   volume_name,
                                                   snap_name)
                        break
                    else:
                        print("...aborting backup " + b.error)
                        return
            time.sleep(RETRY_INTERVAL)

    _, b = find_backup(client, volume_name, snap_name)

    snap.set_backup_name(b["name"])
    snap.set_backup_url(b["url"])
    snap.set_data_md5sum(data_md5sum)
def check_and_set_backupstore(client)
Expand source code
def check_and_set_backupstore(client):
    setting = client.by_id_setting(SETTING_BACKUP_TARGET)

    if setting["value"] == "":
        set_random_backupstore(client)
def clean_volume_backups(client, volume_name)
Expand source code
def clean_volume_backups(client, volume_name):
    print("cleaning all volume backups ", end='... ')
    bvs = client.list_backupVolume().data

    bv = None
    for i in bvs:
        if i.name == volume_name:
            bv = i
            break

    if bv is not None:
        backups = bv.backupList()
        for b in backups:
            bv.backupDelete(name=b.name)

        print("done!")

    else:
        print("no backups found!")
def create_recurring_jobs(client, volume_name)
Expand source code
def create_recurring_jobs(client, volume_name):
    volume = client.by_id_volume(volume_name)

    recurring_jobs = get_recurring_jobs()

    volume.recurringUpdate(jobs=recurring_jobs)
def delete_data(k8s_api_client, pod_name)
Expand source code
def delete_data(k8s_api_client, pod_name):
    file_name = 'data-' + pod_name + '.bin'
    test_data = generate_random_data(0)

    write_pod_volume_data(k8s_api_client,
                          pod_name,
                          test_data,
                          filename=file_name)

    volume_data = read_volume_data(k8s_api_client,
                                   pod_name,
                                   filename=file_name)

    assert volume_data == ""
def delete_random_snapshot(client, volume_name, snapshots_md5sum)
Expand source code
def delete_random_snapshot(client, volume_name, snapshots_md5sum):
    volume = client.by_id_volume(volume_name)

    # wait for volume healthy if rebuilding deleted replica
    if len(volume.robustness) != VOLUME_ROBUSTNESS_HEALTHY:
        wait_for_volume_healthy(client, volume_name)

    volume = client.by_id_volume(volume_name)

    snapshot = get_random_snapshot(snapshots_md5sum)

    if snapshot is None:
        print("skipped, no recorded snapshot found", end=" ")
        return

    volume.snapshotDelete(name=snapshot)

    snapshots_md5sum[snapshot].mark_as_removed()

    global PURGE_DELETED_SNAPSHOT
    if PURGE_DELETED_SNAPSHOT is None:
        PURGE_DELETED_SNAPSHOT = bool(random.getrandbits(1))

    if PURGE_DELETED_SNAPSHOT is True:
        purge_random_snapshot(client, volume_name, snapshot)
def delete_replica(client, volume_name)
Expand source code
def delete_replica(client, volume_name):
    volume = client.by_id_volume(volume_name)

    replica_count = len(volume.replicas)

    healthy_replica_count = 0
    for replica in volume.replicas:
        if replica.running is True and replica.mode == "RW":
            healthy_replica_count += 1

    # return if there is no or only one healthy replica left
    if healthy_replica_count == 1:
        print("skipped, only one healthy replica found", end=" ")
        return

    if healthy_replica_count == 0:
        print("skipped, no healthy replicas found", end=" ")
        return

    replica_id = randrange(0, replica_count)

    replica_name = volume["replicas"][replica_id]["name"]

    volume.replicaRemove(name=replica_name)

    wait_for_volume_degraded(client, volume_name)

    global WAIT_REPLICA_REBUILD
    if WAIT_REPLICA_REBUILD is None:
        WAIT_REPLICA_REBUILD = bool(random.getrandbits(1))

    if WAIT_REPLICA_REBUILD is True:
        wait_for_volume_replica_count(client, volume_name, replica_count)
        replica_names = map(lambda replica: replica.name, volume["replicas"])
        wait_new_replica_ready(client, volume_name, replica_names)
def generate_load(request)
Expand source code
@pytest.fixture
def generate_load(request):

    index = get_random_suffix()

    longhorn_api_client = get_longhorn_api_client()
    k8s_api_client = get_core_api_client()

    check_and_set_backupstore(longhorn_api_client)

    volume_name = STRESS_VOLUME_NAME_PREFIX + index
    pv_name = STRESS_PV_NAME_PREFIX + index
    pvc_name = STRESS_PVC_NAME_PREFIX + index
    pod_name = STRESS_POD_NAME_PREFIX + index

    atexit.register(remove_datafile, pod_name)
    atexit.register(delete_and_wait_longhorn, longhorn_api_client, volume_name)
    atexit.register(delete_and_wait_pv, k8s_api_client, pv_name)
    atexit.register(delete_and_wait_pvc, k8s_api_client, pvc_name)
    atexit.register(delete_and_wait_pod, k8s_api_client, pod_name)

    longhorn_volume = create_and_check_volume(
        longhorn_api_client,
        volume_name,
        size=VOLUME_SIZE
    )

    wait_for_volume_detached(longhorn_api_client, volume_name)

    pod_manifest = generate_pod_with_pvc_manifest(pod_name, pvc_name)

    create_pv_for_volume(longhorn_api_client,
                         k8s_api_client,
                         longhorn_volume,
                         pv_name)

    create_pvc_for_volume(longhorn_api_client,
                          k8s_api_client,
                          longhorn_volume,
                          pvc_name)

    create_and_wait_pod(k8s_api_client, pod_manifest)

    snapshots_md5sum = dict()

    write_data(k8s_api_client, pod_name)
    create_recurring_jobs(longhorn_api_client, volume_name)

    global N_RANDOM_ACTIONS
    for round in range(N_RANDOM_ACTIONS):
        action = randrange(0, 8)

        if action == 0:
            print("write data started: " + time_now(), end=', ')
            write_data(k8s_api_client, pod_name)
            print("ended: " + time_now())

        elif action == 1:
            print("delete data started: " + time_now(), end=', ')
            delete_data(k8s_api_client, pod_name)
            print("ended: " + time_now())

        elif action == 2:
            print("create snapshot started: " + time_now(), end=', ')
            snapshot_create_and_record_md5sum(longhorn_api_client,
                                              k8s_api_client,
                                              volume_name,
                                              pod_name,
                                              snapshots_md5sum)
            print("ended: " + time_now())

        elif action == 3:
            print("delete random snapshot  started: " + time_now(), end=', ')
            delete_random_snapshot(longhorn_api_client,
                                   volume_name,
                                   snapshots_md5sum)
            print("ended: " + time_now())

        elif action == 4:
            print("revert random snapshot started: " + time_now(), end=', ')
            revert_random_snapshot(longhorn_api_client,
                                   k8s_api_client,
                                   volume_name,
                                   pod_manifest,
                                   snapshots_md5sum)
            print("ended: " + time_now())

        elif action == 5:
            print("create backup started: " + time_now(), end=', ')
            backup_create_and_record_md5sum(longhorn_api_client,
                                            k8s_api_client,
                                            volume_name,
                                            pod_name,
                                            snapshots_md5sum)
            print("ended: " + time_now())

        elif action == 6:
            print("delete replica started: " + time_now(), end=', ')
            delete_replica(longhorn_api_client, volume_name)
            print("ended: " + time_now())

        elif action == 7:
            print("restore random backup started: " + time_now(), end=', ')
            restore_and_check_random_backup(longhorn_api_client,
                                            k8s_api_client,
                                            volume_name,
                                            pod_name,
                                            snapshots_md5sum)

            print("ended: " + time_now())

    clean_volume_backups(longhorn_api_client, volume_name)
def get_data_filename(pod_name)
Expand source code
def get_data_filename(pod_name):
    return STRESS_DATAFILE_NAME_PREFIX + pod_name + STRESS_DATAFILE_NAME_SUFFIX
def get_md5sum(file_path)
Expand source code
def get_md5sum(file_path):
    hash_md5 = hashlib.md5()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()
def get_random_backup_snapshot_data(snapshots_md5sum)
Expand source code
def get_random_backup_snapshot_data(snapshots_md5sum):
    snapshots = list(snapshots_md5sum.keys())

    snapshots_count = len(snapshots)

    if snapshots_count == 0:
        return None

    for i in range(RETRY_COUNTS):
        snapshot_id = randrange(0, snapshots_count)
        snapshot = snapshots[snapshot_id]

        if snapshots_md5sum[snapshot].backup_name is None:
            continue
        else:
            break

    if snapshots_md5sum[snapshot].backup_name is None:
        return None
    else:
        return snapshots_md5sum[snapshot]
def get_random_snapshot(snapshots_md5sum)
Expand source code
def get_random_snapshot(snapshots_md5sum):
    snapshots = list(snapshots_md5sum.keys())

    snapshots_count = len(snapshots)

    if snapshots_count == 0:
        return None

    for i in range(RETRY_COUNTS):
        snapshot_id = randrange(0, snapshots_count)
        snapshot = snapshots[snapshot_id]

        if snapshots_md5sum[snapshot].removed is True:
            continue
        else:
            break

    if snapshots_md5sum[snapshot].removed is True:
        return None
    else:
        return snapshot
def get_random_suffix()
Expand source code
def get_random_suffix():
    return ''.join(random.choice(string.ascii_lowercase + string.digits)
                   for _ in range(6))
def get_recurring_jobs()
Expand source code
def get_recurring_jobs():
    backup_job = {"name": "backup", "cron": "*/5 * * * *",
                  "task": "backup", "retain": 3}

    snapshot_job = {"name": "snap", "cron": "*/2 * * * *",
                    "task": "snapshot", "retain": 5}

    return [backup_job, snapshot_job]
def purge_random_snapshot(longhorn_api_client, volume_name, snapshot_name)
Expand source code
def purge_random_snapshot(longhorn_api_client, volume_name, snapshot_name):

    volume = longhorn_api_client.by_id_volume(volume_name)

    volume.snapshotPurge()

    wait_for_snapshot_purge(
        longhorn_api_client,
        volume_name,
        snapshot_name
    )
def read_data_md5sum(k8s_api_client, pod_name)
Expand source code
def read_data_md5sum(k8s_api_client, pod_name):
    file_name = get_data_filename(pod_name)
    dest_file_path = os.path.join(STRESS_DEST_DIR, file_name)

    exec_command = exec_command = ['/bin/sh']
    resp = stream(k8s_api_client.connect_get_namespaced_pod_exec,
                  pod_name,
                  'default',
                  command=exec_command,
                  stderr=True, stdin=True,
                  stdout=True, tty=False,
                  _preload_content=False)

    resp.write_stdin("md5sum " + dest_file_path + "\n")
    res = resp.readline_stdout(timeout=READ_MD5SUM_TIMEOUT).split()[0]

    return res
def remove_datafile(pod_name)
Expand source code
def remove_datafile(pod_name):
    file_path = os.path.join(STRESS_RANDOM_DATA_DIR,
                             get_data_filename(pod_name))

    if os.path.exists(file_path):
        os.remove(file_path)
def restore_and_check_random_backup(client, core_api, volume_name, pod_name, snapshots_md5sum)
Expand source code
def restore_and_check_random_backup(client, core_api, volume_name, pod_name, snapshots_md5sum): # NOQA
    res_volume_name = volume_name + '-restore'

    host_id = get_self_host_id()

    snap_data = get_random_backup_snapshot_data(snapshots_md5sum)

    if snap_data is None:
        print("skipped, no recorded backup found", end=" ")
        return

    backup_url = snap_data.backup_url

    client.create_volume(name=res_volume_name,
                         size=VOLUME_SIZE,
                         fromBackup=backup_url)

    wait_for_volume_restoration_completed(client, res_volume_name)

    wait_for_volume_detached(client, res_volume_name)

    res_volume = client.by_id_volume(res_volume_name)

    res_volume.attach(hostId=host_id)

    res_volume = wait_for_volume_healthy(client, res_volume_name)

    dev = get_volume_endpoint(res_volume)

    mount_path = os.path.join(DIRECTORY_PATH, res_volume_name)

    command = ['mkdir', '-p', mount_path]
    subprocess.check_call(command)

    mount_disk(dev, mount_path)

    datafile_name = get_data_filename(pod_name)
    datafile_path = os.path.join(mount_path, datafile_name)

    command = ['md5sum', datafile_path]
    output = subprocess.check_output(command)

    bkp_data_md5sum = output.split()[0].decode('utf-8')

    bkp_checksum_ok = False
    if snap_data.data_md5sum == bkp_data_md5sum:
        bkp_checksum_ok = True

    umount_disk(mount_path)

    command = ['rmdir', mount_path]
    subprocess.check_call(command)

    res_volume = client.by_id_volume(res_volume_name)

    res_volume.detach()

    wait_for_volume_detached(client, res_volume_name)

    delete_and_wait_longhorn(client, res_volume_name)

    assert bkp_checksum_ok
def revert_random_snapshot(client, core_api, volume_name, pod_manifest, snapshots_md5sum)
Expand source code
def revert_random_snapshot(client, core_api, volume_name, pod_manifest, snapshots_md5sum): # NOQA
    volume = client.by_id_volume(volume_name)
    host_id = get_self_host_id()
    pod_name = pod_manifest["metadata"]["name"]

    # wait for volume healthy if rebuilding deleted replica
    if len(volume.robustness) != VOLUME_ROBUSTNESS_HEALTHY:
        wait_for_volume_healthy(client, volume_name)

    snapshot = get_random_snapshot(snapshots_md5sum)

    if snapshot is None:
        print("skipped, no snapshot found", end=" ")
        return

    delete_and_wait_pod(core_api, pod_name)

    wait_for_volume_detached(client, volume_name)

    volume = client.by_id_volume(volume_name)

    volume.attach(hostId=host_id, disableFrontend=True)

    volume = wait_for_volume_healthy_no_frontend(client, volume_name)

    volume.snapshotRevert(name=snapshot)

    volume = client.by_id_volume(volume_name)

    volume.detach()

    wait_for_volume_detached(client, volume_name)

    create_and_wait_pod(core_api, pod_manifest)

    current_md5sum = read_data_md5sum(core_api, pod_name)

    assert current_md5sum == snapshots_md5sum[snapshot].data_md5sum
def snapshot_create_and_record_md5sum(client, core_api, volume_name, pod_name, snapshots_md5sum)
Expand source code
def snapshot_create_and_record_md5sum(client, core_api, volume_name, pod_name, snapshots_md5sum): # NOQA
    data_md5sum = read_data_md5sum(core_api, pod_name)
    snap = create_snapshot(client, volume_name)

    snap_data = snapshot_data(snap["name"])
    snap_data.set_data_md5sum(data_md5sum)
    snapshots_md5sum[snap["name"]] = snap_data

    return snap["name"]
def test_reset_env()
Expand source code
@pytest.mark.stress
def test_reset_env():
    k8s_api_client = get_core_api_client()
    k8s_storage_client = get_storage_api_client()
    longhorn_api_client = get_longhorn_api_client()

    pod_list = k8s_api_client.list_namespaced_pod("default")
    for pod in pod_list.items:
        if STRESS_POD_NAME_PREFIX in pod.metadata.name:
            delete_and_wait_pod(k8s_api_client, pod.metadata.name)

    pvc_list = \
        k8s_api_client.list_namespaced_persistent_volume_claim("default")
    for pvc in pvc_list.items:
        if STRESS_PVC_NAME_PREFIX in pvc.metadata.name:
            delete_and_wait_pvc(k8s_api_client, pvc.metadata.name)

    pv_list = k8s_api_client.list_persistent_volume()
    for pv in pv_list.items:
        pv_name = pv.metadata.name
        if STRESS_PV_NAME_PREFIX in pv_name:
            try:
                delete_and_wait_pv(k8s_api_client, pv_name)
            except AssertionError:
                volumeattachment_list = \
                    k8s_storage_client.list_volume_attachment()
                for volumeattachment in volumeattachment_list.items:
                    volume_attachment_name = \
                        volumeattachment.spec.source.persistent_volume_name
                    if volume_attachment_name == pv_name:
                        delete_and_wait_volume_attachment(
                            k8s_storage_client,
                            volume_attachment_name
                        )
                        delete_and_wait_pv(k8s_api_client, pv.metadata.name)

    volume_list = \
        longhorn_api_client.list_volume()
    for volume in volume_list.data:
        if STRESS_VOLUME_NAME_PREFIX in volume.name:
            delete_and_wait_longhorn(longhorn_api_client, volume.name)
def test_stress(generate_load)
Expand source code
@pytest.mark.stress
def test_stress(generate_load):
    pass
def time_now()
Expand source code
def time_now():
    return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def write_data(k8s_api_client, pod_name)
Expand source code
def write_data(k8s_api_client, pod_name):
    src_dir_path = STRESS_RANDOM_DATA_DIR
    dest_dir_path = '/data/'
    file_name = get_data_filename(pod_name)

    src_file_path = src_dir_path + file_name
    dest_file_path = dest_dir_path + file_name

    src_file = open('%s' % src_file_path, 'wb')
    src_file.write(os.urandom(TEST_DATA_BYTES))
    src_file.close()
    src_file_md5sum = get_md5sum(src_file_path)
    command = 'kubectl cp ' + src_file_path + \
              ' ' + pod_name + ':' + dest_file_path
    subprocess.call(command, shell=True)

    exec_command = exec_command = ['/bin/sh']
    resp = stream(k8s_api_client.connect_get_namespaced_pod_exec,
                  pod_name,
                  'default',
                  command=exec_command,
                  stderr=True, stdin=True,
                  stdout=True, tty=False,
                  _preload_content=False)

    resp.write_stdin("md5sum " + dest_file_path + "\n")
    res = resp.readline_stdout(timeout=READ_MD5SUM_TIMEOUT).split()[0]

    assert res == src_file_md5sum

Classes

class snapshot_data (snapshot_name)
Expand source code
class snapshot_data:
    def __init__(self, snapshot_name):
        self.snapshot_name = snapshot_name
        self.removed = False
        self.backup_name = None
        self.backup_url = None
        self.data_md5sum = None

    def set_backup_name(self, backup_name):
        self.backup_name = backup_name

    def set_backup_url(self, backup_url):
        self.backup_url = backup_url

    def set_data_md5sum(self, data_md5sum):
        self.data_md5sum = data_md5sum

    def mark_as_removed(self):
        self.removed = True

Methods

def mark_as_removed(self)
Expand source code
def mark_as_removed(self):
    self.removed = True
def set_backup_name(self, backup_name)
Expand source code
def set_backup_name(self, backup_name):
    self.backup_name = backup_name
def set_backup_url(self, backup_url)
Expand source code
def set_backup_url(self, backup_url):
    self.backup_url = backup_url
def set_data_md5sum(self, data_md5sum)
Expand source code
def set_data_md5sum(self, data_md5sum):
    self.data_md5sum = data_md5sum