Module tests.test_tagging
Functions
def generate_tag_name()
-
Expand source code
def generate_tag_name(): return "tag/" + "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(6))
def generate_unordered_tag_names()
-
Expand source code
def generate_unordered_tag_names(): unsorted = [] is_sorted = [] while unsorted == is_sorted: unsorted = [] for _ in range(3): unsorted.append(generate_tag_name()) is_sorted = sorted(unsorted) return unsorted, is_sorted
def test_tag_basic(client)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA def test_tag_basic(client): # NOQA """ Test that applying Tags to Nodes/Disks and retrieving them work as expected. Ensures that Tags are properly validated when updated. 1. Generate tags and apply to the disk and nodes 2. Make sure the tags are applied 3. Try to apply invalid tags to the disk and node. Action will fail. """ host_id = get_self_host_id() node = client.by_id_node(host_id) disks = get_update_disks(node.disks) if DATA_ENGINE == "v2": node_disk_count = 2 else: node_disk_count = 1 assert len(node.disks) == node_disk_count assert len(node.disks[list(node.disks)[0]].tags) == 0, f" disks = {disks}" assert len(node.tags) == 0 unsorted_disk, sorted_disk = generate_unordered_tag_names() unsorted_node, sorted_node = generate_unordered_tag_names() update_disks = get_update_disks(node.disks) update_disks[list(update_disks)[0]].tags = unsorted_disk node = update_node_disks(client, node.name, disks=update_disks) disks = get_update_disks(node.disks) assert disks[list(disks)[0]].tags == sorted_disk node = set_node_tags(client, node, unsorted_node) assert node.tags == sorted_node improper_tag_cases = [ [""], # Empty string [" "], # Whitespace ["/"], # Leading / [","], # Illegal character ] for tags in improper_tag_cases: with pytest.raises(Exception) as e: set_node_tags(client, node, tags) assert "at least one error encountered while validating tags" in \ str(e.value) with pytest.raises(Exception) as e: update_disks = get_update_disks(node.disks) update_disks[list(update_disks)[0]].tags = tags update_node_disks(client, node.name, disks=update_disks) assert "at least one error encountered while validating tags" in \ str(e.value) update_disks = get_update_disks(node.disks) update_disks[list(update_disks)[0]].tags = [] node = update_node_disks(client, node.name, disks=update_disks) disks = get_update_disks(node.disks) assert len(node.disks[list(node.disks)[0]].tags) == 0, f"disks = {disks}" node = set_node_tags(client, node) assert len(node.tags) == 0
Test that applying Tags to Nodes/Disks and retrieving them work as expected. Ensures that Tags are properly validated when updated.
- Generate tags and apply to the disk and nodes
- Make sure the tags are applied
- Try to apply invalid tags to the disk and node. Action will fail.
def test_tag_scheduling(client, node_default_tags)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA def test_tag_scheduling(client, node_default_tags): # NOQA """ Test success scheduling with tags Test prerequisites: - set Replica Node Level Soft Anti-Affinity enabled Case 1: Don't specify any tags, replica should be scheduled to 3 disks. Case 2: Use disk tags to select two nodes for all replicas. Case 3: Use node tags to select two nodes for all replicas. Case 4: Combine node and disk tags to schedule all replicas on one node. """ replica_node_soft_anti_affinity_setting = \ client.by_id_setting(SETTING_REPLICA_NODE_SOFT_ANTI_AFFINITY) client.update(replica_node_soft_anti_affinity_setting, value="true") host_id = get_self_host_id() tag_specs = [ # Select all Nodes. { "disk": [], "expected": 3, "node": [] }, # Selector works with AND on Disk Tags. { "disk": ["ssd", "nvme"], "expected": 2, "node": [] }, # Selector works with AND on Node Tags. { "disk": [], "expected": 2, "node": ["main", "storage"] }, # Selector works based on combined Disk AND Node selector. { "disk": ["ssd", "nvme"], "expected": 1, "node": ["storage", "main"] } ] for specs in tag_specs: volume_name = generate_volume_name() # NOQA client.create_volume(name=volume_name, size=SIZE, numberOfReplicas=3, diskSelector=specs["disk"], nodeSelector=specs["node"], dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) assert volume.diskSelector == specs["disk"] assert volume.nodeSelector == specs["node"] volume.attach(hostId=host_id) volume = wait_for_volume_healthy(client, volume_name) assert len(volume.replicas) == 3 check_volume_replicas(volume, specs, node_default_tags) cleanup_volume(client, volume)
Test success scheduling with tags
Test prerequisites: - set Replica Node Level Soft Anti-Affinity enabled
Case 1: Don't specify any tags, replica should be scheduled to 3 disks.
Case 2: Use disk tags to select two nodes for all replicas.
Case 3: Use node tags to select two nodes for all replicas.
Case 4: Combine node and disk tags to schedule all replicas on one node.
def test_tag_scheduling_failure(client, node_default_tags)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA def test_tag_scheduling_failure(client, node_default_tags): # NOQA """ Test that scheduling fails if no Nodes/Disks with the requested Tags are available. Case 1: Validate that if specifying nonexist tags in volume, API call will fail. Case 2: 1. Specify existing but no node or disk can unsatisfied tags. 2. Validate the volume will failed the scheduling """ invalid_tag_cases = [ # Only one Disk Tag exists. { "disk": ["doesnotexist", "ssd"], "node": [] }, # Only one Node Tag exists. { "disk": [], "node": ["doesnotexist", "main"] } ] for tags in invalid_tag_cases: volume_name = generate_volume_name() # NOQA with pytest.raises(Exception) as e: client.create_volume(name=volume_name, size=SIZE, numberOfReplicas=3, diskSelector=tags["disk"], nodeSelector=tags["node"], dataEngine=DATA_ENGINE) assert "does not exist" in str(e.value) unsatisfied_tag_cases = [ { "disk": [], "node": ["main", "fallback"] }, { "disk": ["ssd", "m2"], "node": [] } ] for tags in unsatisfied_tag_cases: volume_name = generate_volume_name() client.create_volume(name=volume_name, size=SIZE, numberOfReplicas=3, diskSelector=tags["disk"], nodeSelector=tags["node"], dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) assert volume.diskSelector == tags["disk"] assert volume.nodeSelector == tags["node"] wait_scheduling_failure(client, volume_name) client.delete(volume) wait_for_volume_delete(client, volume.name) volumes = client.list_volume() assert len(volumes) == 0
Test that scheduling fails if no Nodes/Disks with the requested Tags are available.
Case 1: Validate that if specifying nonexist tags in volume, API call will fail.
Case 2:
- Specify existing but no node or disk can unsatisfied tags.
- Validate the volume will failed the scheduling
def test_tag_scheduling_on_update(client, node_default_tags, volume_name)
-
Expand source code
@pytest.mark.v2_volume_test # NOQA def test_tag_scheduling_on_update(client, node_default_tags, volume_name): # NOQA """ Test that Replicas get scheduled if a Node/Disk disks updated with the proper Tags. Test prerequisites: - set Replica Node Level Soft Anti-Affinity enabled 1. Create volume with tags that can not be satisfied 2. Wait for volume to fail scheduling 3. Update the node and disk with extra tags to satisfy the volume 4. Verify now volume has been scheduled 5. Attach the volume and check the replicas has been scheduled properly """ replica_node_soft_anti_affinity_setting = \ client.by_id_setting(SETTING_REPLICA_NODE_SOFT_ANTI_AFFINITY) client.update(replica_node_soft_anti_affinity_setting, value="true") tag_spec = { "disk": ["ssd", "m2"], "expected": 1, "node": ["main", "fallback"] } client.create_volume(name=volume_name, size=SIZE, numberOfReplicas=3, diskSelector=tag_spec["disk"], nodeSelector=tag_spec["node"], dataEngine=DATA_ENGINE) volume = wait_for_volume_detached(client, volume_name) assert volume.diskSelector == tag_spec["disk"] assert volume.nodeSelector == tag_spec["node"] wait_scheduling_failure(client, volume_name) host_id = get_self_host_id() node = client.by_id_node(host_id) update_disks = get_update_disks(node.disks) update_disks[list(update_disks)[0]].tags = tag_spec["disk"] node = update_node_disks(client, node.name, disks=update_disks) set_node_tags(client, node, tag_spec["node"]) scheduled = False for i in range(RETRY_COUNTS): v = client.by_id_volume(volume_name) if v.conditions.Scheduled.status == "True": scheduled = True if scheduled: break sleep(RETRY_INTERVAL) assert scheduled volume.attach(hostId=host_id) volume = wait_for_volume_healthy(client, volume_name) nodes = client.list_node() node_mapping = {node.id: { "disk": node.disks[list(node.disks)[0]].tags, "node": node.tags } for node in nodes} assert len(volume.replicas) == 3 check_volume_replicas(volume, tag_spec, node_mapping) cleanup_volume(client, volume)
Test that Replicas get scheduled if a Node/Disk disks updated with the proper Tags.
Test prerequisites: - set Replica Node Level Soft Anti-Affinity enabled
- Create volume with tags that can not be satisfied
- Wait for volume to fail scheduling
- Update the node and disk with extra tags to satisfy the volume
- Verify now volume has been scheduled
- Attach the volume and check the replicas has been scheduled properly