Module tests.test_metric
Functions
def check_metric(core_api, metric_name, metric_labels, expected_value=None, metric_node_id=None)
-
Expand source code
def check_metric(core_api, metric_name, metric_labels, expected_value=None, metric_node_id=get_self_host_id()): # NOQA if metric_node_id is None: # Populate metric data from all nodes. client = get_longhorn_api_client() # NOQA nodes = client.list_node() metric_data = [] for node in nodes: metric_data.extend(get_metrics(core_api, node.id)) else: # Populate metric data for the specified node. metric_data = get_metrics(core_api, metric_node_id) found_metric = None for family in metric_data: found_metric = next((sample for sample in family.samples if sample.name == metric_name), None) # NOQA if found_metric: break assert found_metric is not None examine_metric_value(found_metric, metric_labels, expected_value)
def check_metric_count_all_nodes(client, core_api, metric_name, metric_labels, expected_count)
-
Expand source code
def check_metric_count_all_nodes(client, core_api, metric_name, metric_labels, expected_count): # NOQA # Find the metrics based on the given labels. def filter_metrics_by_labels(metrics, labels): filtered_metrics = [] for metric in metrics: is_matched = True for key, value in labels.items(): if type(value) in (float, int): continue if metric.labels[key] != value: is_matched = False break if is_matched: filtered_metrics.append(metric) print(filtered_metrics) return filtered_metrics filtered_metrics = [] for node in client.list_node(): metric_data = get_metrics(core_api, node.name) metrics = find_metrics(metric_data, metric_name) if len(metrics) == 0: continue filtered_metrics.extend( filter_metrics_by_labels(metrics, metric_labels) ) assert len(filtered_metrics) == expected_count
def check_metric_sum_on_all_nodes(client, core_api, metric_name, expected_labels, expected_value=None)
-
Expand source code
def check_metric_sum_on_all_nodes(client, core_api, metric_name, expected_labels, expected_value=None): # NOQA # Initialize total_metrics to store the sum of the metric values. total_metrics = {"labels": defaultdict(None), "value": 0.0} # Initialize the total_metric_values to store the sum of the # metric label values. total_metric_values = total_metrics["labels"] # Find the metric based on the given labels. def filter_metric_by_labels(metrics, labels): for metric in metrics: is_matched = True for key, value in labels.items(): if type(value) in (float, int): continue if metric.labels[key] != value: is_matched = False break if is_matched: return metric raise AssertionError("Cannot find the metric matching the labels") for node in client.list_node(): metric_data = get_metrics(core_api, node.name) metrics = find_metrics(metric_data, metric_name) if len(metrics) == 0: continue filtered_metric = filter_metric_by_labels(metrics, expected_labels) for key, value in expected_labels.items(): value_type = type(value) if key not in total_metric_values: total_metric_values[key] = value_type( filtered_metric.labels[key] ) # Accumulate the metric label values. elif isinstance(value, (float, int)): total_metric_values[key] += value_type( filtered_metric.labels[key] ) # Accumulate the metric values. total_metrics["value"] += filtered_metric.value for key, value in expected_labels.items(): assert total_metric_values[key] == value if expected_value is not None: assert total_metrics["value"] == expected_value else: assert total_metrics["value"] >= 0.0
def check_metric_with_condition(core_api, metric_name, metric_labels, expected_value=None, metric_node_id=None)
-
Expand source code
def check_metric_with_condition(core_api, metric_name, metric_labels, expected_value=None, metric_node_id=get_self_host_id()): # NOQA) """ Some metric have multiple conditions, for example metric longhorn_node_status have condition - allowScheduling - mountpropagation - ready - schedulable metric longhorn_disk_status have conditions - ready - schedulable Use this function to get specific condition of a mertic """ metric_data = get_metrics(core_api, metric_node_id) found_metric = next( (sample for family in metric_data for sample in family.samples if sample.name == metric_name and sample.labels.get("condition") == metric_labels.get("condition")), None ) assert found_metric is not None examine_metric_value(found_metric, metric_labels, expected_value)
Some metric have multiple conditions, for example metric longhorn_node_status have condition - allowScheduling - mountpropagation - ready - schedulable metric longhorn_disk_status have conditions - ready - schedulable Use this function to get specific condition of a mertic
def examine_metric_value(found_metric, metric_labels, expected_value=None)
-
Expand source code
def examine_metric_value(found_metric, metric_labels, expected_value=None): for key, value in metric_labels.items(): assert found_metric.labels[key] == value if expected_value is not None: assert found_metric.value == expected_value else: assert found_metric.value >= 0.0
def find_metric(metric_data, metric_name)
-
Expand source code
def find_metric(metric_data, metric_name): return find_metrics(metric_data, metric_name)[0]
def find_metrics(metric_data, metric_name)
-
Expand source code
def find_metrics(metric_data, metric_name): metrics = [] # Find the metric with the given name in the provided metric data for family in metric_data: for sample in family.samples: if sample.name == metric_name: metrics.append(sample) return metrics
def get_metrics(core_api, metric_node_id)
-
Expand source code
def get_metrics(core_api, metric_node_id): # NOQA pods = core_api.list_namespaced_pod(namespace=LONGHORN_NAMESPACE, label_selector="app=longhorn-manager") for po in pods.items: if po.spec.node_name == metric_node_id: manager_ip = po.status.pod_ip break if not manager_ip: raise RuntimeError( f"No Longhorn manager pod found on node {metric_node_id}" ) # Handle IPv6 addresses ip_obj = ipaddress.ip_address(manager_ip) if ip_obj.version == 6: manager_ip = f"[{manager_ip}]" metrics = requests.get("http://{}:9500/metrics".format(manager_ip)).content string_data = metrics.decode('utf-8') result = text_string_to_metric_families(string_data) return result
def test_metric_longhorn_backup(set_random_backupstore, client, core_api, batch_v1_api, volume_name)
-
Expand source code
def test_metric_longhorn_backup(set_random_backupstore, client, core_api, batch_v1_api, volume_name): # NOQA """ Scenario: test metric longhorn_backup_actual_size_bytes and longhorn_backup_state Issue: https://github.com/longhorn/longhorn/issues/9429 Given a volume When a backup is created by user Then has a metric longhorn_backup_actual_size_bytes value equals to the size of the backup, and volume label is the volume name and recurring_job label is empty And has a metric longhorn_backup_state value equals to 3 (Completed), and volume label is the volume name and recurring_job label is empty When a recurring backup job is created Then should have a metric longhorn_backup_actual_size_bytes value equals to the size of the backup, and volume label is the volume name and recurring_job label is the job name And should have a metric longhorn_backup_state value equals to 3 (Completed), and volume label is the volume name and recurring_job label is the job name """ self_hostId = get_self_host_id() # create a volume and attach it to a node. volume_size = 50 * Mi client.create_volume(name=volume_name, numberOfReplicas=1, size=str(volume_size)) volume = wait_for_volume_detached(client, volume_name) volume.attach(hostId=self_hostId) volume = wait_for_volume_healthy(client, volume_name) # create the user backup. data_size = 10 * Mi backup_data = {'pos': 0, 'len': data_size, 'content': generate_random_data(data_size)} write_volume_data(volume, backup_data) bv, _, _, _ = create_backup(client, volume_name) wait_for_backup_count(bv, 1) # get the backup size. backup_size = 0 backups = bv.backupList().data for backup in backups: if backup['snapshotName'] == "volume-head": continue backup_size = int(backup['size']) assert backup_size > 0 # assert the metric values for the user backup. user_backup_metric_labels = { "volume": volume_name, "recurring_job": "", } wait_for_metric_sum_on_all_nodes(client, core_api, "longhorn_backup_actual_size_bytes", user_backup_metric_labels, backup_size) wait_for_metric_sum_on_all_nodes(client, core_api, "longhorn_backup_state", user_backup_metric_labels, 3) # delete the existing backup before creating a recurring backup job. delete_backup_volume(client, bv.name) # create a recurring backup job. recurring_jobs = { RECURRING_JOB_NAME: { TASK: BACKUP, GROUPS: [DEFAULT], CRON: SCHEDULE_1MIN, RETAIN: 1, CONCURRENCY: 1, LABELS: {}, }, } create_recurring_jobs(client, recurring_jobs) check_recurring_jobs(client, recurring_jobs) wait_for_cron_job_count(batch_v1_api, 1) # wait for the recurring backup job to run. time.sleep(90) bv = find_backup_volume(client, volume_name) wait_for_backup_count(bv, 1) # get the recurring backup size. recurring_backup_size = 0 backups = bv.backupList().data for backup in backups: if backup['snapshotName'] == "volume-head": continue recurring_backup_size = int(backup['size']) assert recurring_backup_size > 0 # assert the metric values for the recurring backup. recurring_backup_metric_labels = { "volume": volume_name, "recurring_job": RECURRING_JOB_NAME, } wait_for_metric_sum_on_all_nodes(client, core_api, "longhorn_backup_actual_size_bytes", recurring_backup_metric_labels, recurring_backup_size) wait_for_metric_sum_on_all_nodes(client, core_api, "longhorn_backup_state", recurring_backup_metric_labels, 3)
Scenario: test metric longhorn_backup_actual_size_bytes and longhorn_backup_state
Issue: https://github.com/longhorn/longhorn/issues/9429
Given a volume
When a backup is created by user Then has a metric longhorn_backup_actual_size_bytes value equals to the size of the backup, and volume label is the volume name and recurring_job label is empty And has a metric longhorn_backup_state value equals to 3 (Completed), and volume label is the volume name and recurring_job label is empty
When a recurring backup job is created Then should have a metric longhorn_backup_actual_size_bytes value equals to the size of the backup, and volume label is the volume name and recurring_job label is the job name And should have a metric longhorn_backup_state value equals to 3 (Completed), and volume label is the volume name and recurring_job label is the job name
def test_metric_longhorn_snapshot_actual_size_bytes(client, core_api, volume_name)
-
Expand source code
def test_metric_longhorn_snapshot_actual_size_bytes(client, core_api, volume_name): # NOQA """ Scenario: test metric longhorn_snapshot_actual_size_bytes Issue: https://github.com/longhorn/longhorn/issues/5869 Given a volume When 1 snapshot is created by user And 1 snapshot is created by system Then has a metric longhorn_snapshot_actual_size_bytes value equals to the size of the user created snapshot, and volume label is the volume name and user_created label is true And has a metric longhorn_snapshot_actual_size_bytes value equals to the size of the system created snapshot, and volume label is the volume name and user_created label is false When 3 snapshot is created by user Then has 4 metrics longhorn_snapshot_actual_size_bytes with volume label is the volume name and user_created label is true And has 1 metrics longhorn_snapshot_actual_size_bytes with volume label is the volume name and user_created label is false """ self_hostId = get_self_host_id() # create a volume and attach it to a node. volume_size = 50 * Mi client.create_volume(name=volume_name, numberOfReplicas=1, size=str(volume_size)) volume = wait_for_volume_detached(client, volume_name) volume.attach(hostId=self_hostId) volume = wait_for_volume_healthy(client, volume_name) # create the user snapshot. data_size = 10 * Mi user_snapshot_data_0 = {'pos': 0, 'len': data_size, 'content': generate_random_data(data_size)} write_volume_data(volume, user_snapshot_data_0) create_snapshot(client, volume_name) # create the system snapshot by expanding the volume. system_snapshot_data_0 = {'pos': 0, 'len': data_size, 'content': generate_random_data(data_size)} write_volume_data(volume, system_snapshot_data_0) volume_size_expanded_0 = str(volume_size * 2) volume.expand(size=volume_size_expanded_0) wait_for_volume_expansion(client, volume_name) volume = client.by_id_volume(volume_name) assert volume.size == volume_size_expanded_0 # get the snapshot sizes. user_snapshot_size = 0 system_snapshot_size = 0 snapshots = volume.snapshotList() for snapshot in snapshots: if snapshot.name == "volume-head": continue if snapshot.usercreated: user_snapshot_size = int(snapshot.size) else: system_snapshot_size = int(snapshot.size) assert user_snapshot_size > 0 assert system_snapshot_size > 0 # assert the metric values for the user snapshot. user_snapshot_metric_labels = { "volume": volume_name, "user_created": "true", } check_metric_sum_on_all_nodes(client, core_api, "longhorn_snapshot_actual_size_bytes", user_snapshot_metric_labels, user_snapshot_size) # assert the metric values for the system snapshot. system_snapshot_metric_labels = { "volume": volume_name, "user_created": "false", } check_metric_sum_on_all_nodes(client, core_api, "longhorn_snapshot_actual_size_bytes", system_snapshot_metric_labels, system_snapshot_size) # create 3 more user snapshots. create_snapshot(client, volume_name) create_snapshot(client, volume_name) create_snapshot(client, volume_name) wait_for_metric_count_all_nodes(client, core_api, "longhorn_snapshot_actual_size_bytes", user_snapshot_metric_labels, 4) wait_for_metric_count_all_nodes(client, core_api, "longhorn_snapshot_actual_size_bytes", system_snapshot_metric_labels, 1)
Scenario: test metric longhorn_snapshot_actual_size_bytes
Issue: https://github.com/longhorn/longhorn/issues/5869
Given a volume
When 1 snapshot is created by user And 1 snapshot is created by system Then has a metric longhorn_snapshot_actual_size_bytes value equals to the size of the user created snapshot, and volume label is the volume name and user_created label is true And has a metric longhorn_snapshot_actual_size_bytes value equals to the size of the system created snapshot, and volume label is the volume name and user_created label is false
When 3 snapshot is created by user Then has 4 metrics longhorn_snapshot_actual_size_bytes with volume label is the volume name and user_created label is true And has 1 metrics longhorn_snapshot_actual_size_bytes with volume label is the volume name and user_created label is false
def test_metric_longhorn_volume_file_system_read_only(client, core_api, volume_name, pod)
-
Expand source code
def test_metric_longhorn_volume_file_system_read_only(client, core_api, volume_name, pod): # NOQA """ Scenario: test metric longhorn_volume_file_system_read_only Issue: https://github.com/longhorn/longhorn/issues/8508 Given a volume is created and attached to a pod And the volume is healthy When mount the volume as read-only And wait for the volume to become healthy And write the data to the pod And flush data to persistent storage in the pod with sync command Then has 1 metrics longhorn_volume_file_system_read_only with labels ... "pvc": pvc_name ... "volume": volume_name """ pv_name = "pv-" + volume_name pvc_name = "pvc-" + volume_name pod_name = "pod-" + volume_name volume = create_and_check_volume(client, volume_name, size=str(1 * Gi)) create_pv_for_volume(client, core_api, volume, pv_name) create_pvc_for_volume(client, core_api, volume, pvc_name) pod['metadata']['name'] = pod_name pod['spec']['volumes'] = [{ 'name': pod['spec']['containers'][0]['volumeMounts'][0]['name'], 'persistentVolumeClaim': { 'claimName': pvc_name, }, }] create_and_wait_pod(core_api, pod) wait_for_volume_healthy(client, volume_name) metric_labels = { "pvc": pvc_name, "volume": volume_name, } for _ in range(RETRY_COUNTS): remount_volume_read_only(client, core_api, volume_name) wait_for_volume_healthy(client, volume_name) write_pod_volume_data(core_api, pod_name, 'longhorn-integration-test', filename='test') stream(core_api.connect_get_namespaced_pod_exec, pod_name, 'default', command=["sync"], stderr=True, stdin=False, stdout=True, tty=False) try: check_metric(core_api, "longhorn_volume_file_system_read_only", metric_labels, 1.0, metric_node_id=None) return except AssertionError: print("Retrying to remount volume as read-only...") continue raise AssertionError("Failed to verify 'longhorn_volume_file_system_read_only' metric after all retries") # NOQA
Scenario: test metric longhorn_volume_file_system_read_only
Issue: https://github.com/longhorn/longhorn/issues/8508
Given a volume is created and attached to a pod And the volume is healthy
When mount the volume as read-only And wait for the volume to become healthy And write the data to the pod And flush data to persistent storage in the pod with sync command
Then has 1 metrics longhorn_volume_file_system_read_only with labels … "pvc": pvc_name … "volume": volume_name
def test_node_metrics(client, core_api)
-
Expand source code
def test_node_metrics(client, core_api): # NOQA lht_hostId = get_self_host_id() node = client.by_id_node(lht_hostId) disks = node.disks for _, disk in iter(disks.items()): if disk.path == DEFAULT_DISK_PATH: default_disk = disk break assert default_disk is not None metric_labels = {} check_metric(core_api, "longhorn_node_count_total", metric_labels, expected_value=3.0) metric_labels = { "node": lht_hostId, } check_metric(core_api, "longhorn_node_cpu_capacity_millicpu", metric_labels) check_metric(core_api, "longhorn_node_cpu_usage_millicpu", metric_labels) check_metric(core_api, "longhorn_node_memory_capacity_bytes", metric_labels) check_metric(core_api, "longhorn_node_memory_usage_bytes", metric_labels) check_metric(core_api, "longhorn_node_storage_capacity_bytes", metric_labels, default_disk.storageMaximum) check_metric(core_api, "longhorn_node_storage_usage_bytes", metric_labels) check_metric(core_api, "longhorn_node_storage_reservation_bytes", metric_labels, default_disk.storageReserved) # check longhorn_node_status by 4 different conditions metric_labels = { "condition": "mountpropagation", "condition_reason": "", "node": lht_hostId } check_metric_with_condition(core_api, "longhorn_node_status", metric_labels, 1.0) metric_labels = { "condition": "ready", "condition_reason": "", "node": lht_hostId } check_metric_with_condition(core_api, "longhorn_node_status", metric_labels, 1.0) metric_labels = { "condition": "allowScheduling", "condition_reason": "", "node": lht_hostId, } check_metric_with_condition(core_api, "longhorn_node_status", metric_labels, 1.0) node = client.by_id_node(lht_hostId) set_node_scheduling(client, node, allowScheduling=False, retry=True) check_metric_with_condition(core_api, "longhorn_node_status", metric_labels, 0.0) metric_labels = { "condition": "schedulable", "condition_reason": "", "node": lht_hostId } check_metric_with_condition(core_api, "longhorn_node_status", metric_labels, 1.0) metric_labels = { "condition": "schedulable", "condition_reason": "KubernetesNodeCordoned", "node": lht_hostId } set_node_cordon(core_api, lht_hostId, True) wait_for_node_update(client, lht_hostId, "allowScheduling", False) check_metric_with_condition(core_api, "longhorn_node_status", metric_labels, 0.0)
def test_volume_metrics(client, core_api, volume_name, pvc_namespace)
-
Expand source code
@pytest.mark.parametrize("pvc_namespace", [LONGHORN_NAMESPACE, "default"]) # NOQA def test_volume_metrics(client, core_api, volume_name, pvc_namespace): # NOQA """ https://longhorn.io/docs/master/monitoring/metrics/#volume The goal of this test case is to verify that the accuracy of volume metrics by sending HTTP requests to http://{longhorn-manager IP}:9500/metrics and use prometheus_client to validate the return value. """ lht_hostId = get_self_host_id() pv_name = volume_name + "-pv" pvc_name = volume_name + "-pvc" volume_size = str(1 * Gi) volume = create_and_check_volume(client, volume_name, num_of_replicas=3, size=volume_size) volume = client.by_id_volume(volume_name) create_pv_for_volume(client, core_api, volume, pv_name) create_pvc_for_volume(client, core_api, volume, pvc_name, pvc_namespace) volume = client.by_id_volume(volume_name) volume.attach(hostId=lht_hostId) volume = wait_for_volume_healthy(client, volume_name) data_size = 100 * Mi data = {'pos': 0, 'len': data_size, 'content': generate_random_data(data_size)} write_volume_data(volume, data) volume = client.by_id_volume(volume_name) capacity_size = float(volume.size) metric_labels = { "node": lht_hostId, "pvc": pvc_name, "volume": volume_name, "pvc_namespace": pvc_namespace } # check volume metric basic wait_for_metric_volume_actual_size(client, core_api, "longhorn_volume_actual_size_bytes", metric_labels, volume_name) check_metric(core_api, "longhorn_volume_capacity_bytes", metric_labels, capacity_size) check_metric(core_api, "longhorn_volume_read_throughput", metric_labels) check_metric(core_api, "longhorn_volume_write_throughput", metric_labels) check_metric(core_api, "longhorn_volume_read_iops", metric_labels) check_metric(core_api, "longhorn_volume_write_iops", metric_labels) check_metric(core_api, "longhorn_volume_read_latency", metric_labels) check_metric(core_api, "longhorn_volume_write_latency", metric_labels) # verify longhorn_volume_robustness when volume is healthy, # degraded, faulted or unknown volume.detach() volume = wait_for_volume_detached_unknown(client, volume_name) check_metric(core_api, "longhorn_volume_robustness", metric_labels, longhorn_volume_robustness["unknown"]) volume.attach(hostId=lht_hostId) volume = wait_for_volume_healthy(client, volume_name) check_metric(core_api, "longhorn_volume_robustness", metric_labels, longhorn_volume_robustness["healthy"]) volume.updateReplicaCount(replicaCount=4) volume = wait_for_volume_degraded(client, volume_name) check_metric(core_api, "longhorn_volume_robustness", metric_labels, longhorn_volume_robustness["degraded"]) volume.updateReplicaCount(replicaCount=3) volume = wait_for_volume_healthy(client, volume_name) delete_replica_processes(client, core_api, volume_name) volume = wait_for_volume_faulted(client, volume_name) check_metric(core_api, "longhorn_volume_robustness", metric_labels, longhorn_volume_robustness["faulted"]) # verify longhorn_volume_state when volume is attached or detached volume = wait_for_volume_healthy(client, volume_name) check_metric(core_api, "longhorn_volume_state", metric_labels, longhorn_volume_state["attached"]) volume.detach() volume = wait_for_volume_detached(client, volume_name) check_metric(core_api, "longhorn_volume_state", metric_labels, longhorn_volume_state["detached"])
https://longhorn.io/docs/master/monitoring/metrics/#volume
The goal of this test case is to verify that the accuracy of volume metrics by sending HTTP requests to http://{longhorn-manager IP}:9500/metrics and use prometheus_client to validate the return value.
def wait_for_metric(core_api, metric_name, metric_labels, expected_value, metric_node_id=None)
-
Expand source code
def wait_for_metric(core_api, metric_name, metric_labels, expected_value, metric_node_id=get_self_host_id()): # NOQA for _ in range(RETRY_COUNTS): time.sleep(RETRY_INTERVAL) try: check_metric(core_api, metric_name, metric_labels, expected_value, metric_node_id=metric_node_id) return except AssertionError: continue check_metric(core_api, metric_name, metric_labels, expected_value, metric_node_id=metric_node_id)
def wait_for_metric_count_all_nodes(client, core_api, metric_name, metric_labels, expected_count)
-
Expand source code
def wait_for_metric_count_all_nodes(client, core_api, metric_name, metric_labels, expected_count): # NOQA for _ in range(RETRY_COUNTS): time.sleep(RETRY_INTERVAL) try: check_metric_count_all_nodes(client, core_api, metric_name, metric_labels, expected_count) return except AssertionError: continue check_metric_count_all_nodes(client, core_api, metric_name, metric_labels, expected_count)
def wait_for_metric_sum_on_all_nodes(client, core_api, metric_name, metric_labels, expected_value)
-
Expand source code
def wait_for_metric_sum_on_all_nodes(client, core_api, metric_name, metric_labels, expected_value): # NOQA for _ in range(RETRY_COUNTS): time.sleep(RETRY_INTERVAL) try: check_metric_sum_on_all_nodes(client, core_api, metric_name, metric_labels, expected_value) return except AssertionError: continue check_metric_sum_on_all_nodes(client, core_api, metric_name, metric_labels, expected_value)
def wait_for_metric_volume_actual_size(client, core_api, metric_name, metric_labels, volume_name)
-
Expand source code
def wait_for_metric_volume_actual_size(client, core_api, metric_name, metric_labels, volume_name): # NOQA for _ in range(RETRY_COUNTS): time.sleep(RETRY_INTERVAL) volume = client.by_id_volume(volume_name) actual_size = int(volume.controllers[0].actualSize) try: check_metric(core_api, metric_name, metric_labels, actual_size) return except AssertionError: continue check_metric(core_api, metric_name, metric_labels, actual_size)