Module tests.test_infra
Functions
def detect_cloudprovider()
-
Expand source code
def detect_cloudprovider(): cloudprovider_name = os.getenv("CLOUDPROVIDER") if cloudprovider_name == "aws": cloudprovider = aws() return cloudprovider else: assert False
def is_infra_k3s()
-
Expand source code
def is_infra_k3s(): exec_cmd = ["kubectl", "version"] if "k3s" in str(subprocess.check_output(exec_cmd)): return True else: return False
def is_node_ready_k8s(node_name, k8s_api_client)
-
Expand source code
def is_node_ready_k8s(node_name, k8s_api_client): print(f'==> check node {node_name} is ready ...') node_status_k8s = k8s_api_client.read_node_status(node_name) for node_condition in node_status_k8s.status.conditions: if node_condition.type == "Ready": print(f'type = {node_condition.type}, ' f'status = {node_condition.status}') if node_condition.type == "Ready" and \ node_condition.status == "True": node_ready = True else: node_ready = False return node_ready
def is_node_ready_longhorn(node_name, longhorn_api_client)
-
Expand source code
def is_node_ready_longhorn(node_name, longhorn_api_client): node = longhorn_api_client.by_id_node(node_name) if node.conditions.Ready.status == 'False': node_ready = False else: node_ready = True return node_ready
def reset_cluster_ready_status(request)
-
Expand source code
@pytest.fixture def reset_cluster_ready_status(request): yield node_controlplane_label = 'node-role.kubernetes.io/control-plane' node_ip_annotation = "flannel.alpha.coreos.com/public-ip" node_ipv6_annotation = "flannel.alpha.coreos.com/public-ipv6" k8s_api_client = get_core_api_client() longhorn_api_client = get_longhorn_api_client() cloudprovider = detect_cloudprovider() print('==> test completed! reset cluster ready status ...') for node_item in k8s_api_client.list_node().items: if node_controlplane_label not in node_item.metadata.labels: node_name = node_item.metadata.name try: node_ip = node_item.metadata.annotations[node_ip_annotation] except KeyError: node_ip = node_item.metadata.annotations[node_ipv6_annotation] # TODO: Implement support for IPv6 in the cloud provider # client. # # For now, IPv6 addresses are skipped, and host-level # operations will be covered by the robot test cases. print(f'Skipping node {node_name} with IPv6 address {node_ip}') continue node = cloudprovider.instance_id_by_ip(node_ip) else: continue if is_node_ready_k8s(node_name, k8s_api_client) is False: cloudprovider.instance_start(node) print(f'==> wait for aws node {node_name} up ...') aws_node_up = wait_for_node_up_aws(cloudprovider, node) assert aws_node_up, f'expect aws node {node_name} up' node_up_k8s = wait_for_node_up_k8s(node_name, k8s_api_client) assert node_up_k8s else: continue node_up_longhorn =\ wait_for_node_up_longhorn(node_name, longhorn_api_client) assert node_up_longhorn
def test_offline_node(reset_cluster_ready_status)
-
Expand source code
@pytest.mark.infra @pytest.mark.order(-1) def test_offline_node(reset_cluster_ready_status): """ Test offline node 1. Bring down one of the nodes in Kubernetes cluster (avoid current node) 2. Make sure the Longhorn node state become `down` """ pod_lable_selector = "longhorn-test=test-job" node_controlplane_label = 'node-role.kubernetes.io/control-plane' node_ip_annotation = "flannel.alpha.coreos.com/public-ip" node_ipv6_annotation = "flannel.alpha.coreos.com/public-ipv6" k8s_api_client = get_core_api_client() longhorn_api_client = get_longhorn_api_client() cloudprovider = detect_cloudprovider() for pod in k8s_api_client.list_namespaced_pod( 'default', label_selector=pod_lable_selector).items: if pod.metadata.name == "longhorn-test": longhorn_test_node_name = pod.spec.node_name for node_item in k8s_api_client.list_node().items: if node_controlplane_label not in node_item.metadata.labels: node_name = node_item.metadata.name try: node_ip = node_item.metadata.annotations[node_ip_annotation] except KeyError: node_ip = node_item.metadata.annotations[node_ipv6_annotation] # TODO: Implement support for IPv6 in the cloud provider # client. # # For now, IPv6 addresses are skipped, and host-level # operations will be covered by the robot test cases. print(f'Skipping test; found node {node_name} with IPv6 address {node_ip}') # NOQA return if node_name == longhorn_test_node_name: continue else: node = cloudprovider.instance_id_by_ip(node_ip) break print(f'==> stop node: {node_name}') cloudprovider.instance_stop(node) aws_node_down = wait_for_node_down_aws(cloudprovider, node) assert aws_node_down k8s_node_down = wait_for_node_down_k8s(node_name, k8s_api_client) assert k8s_node_down longhorn_api_client = get_longhorn_api_client() longhorn_node_down = wait_for_node_down_longhorn(node_name, longhorn_api_client) assert longhorn_node_down
Test offline node
- Bring down one of the nodes in Kubernetes cluster (avoid current node)
- Make sure the Longhorn node state become
down
def wait_for_node_down_aws(cloudprovider, node)
-
Expand source code
def wait_for_node_down_aws(cloudprovider, node): aws_node_down = False for i in range(RETRY_COUNTS): if cloudprovider.instance_status(node) == 'stopped': aws_node_down = True break else: time.sleep(RETRY_INTERVAL_LONG) continue return aws_node_down
def wait_for_node_down_k8s(node_name, k8s_api_client)
-
Expand source code
def wait_for_node_down_k8s(node_name, k8s_api_client): node_down = False for i in range(RETRY_COUNTS): if is_node_ready_k8s(node_name, k8s_api_client) is False: node_down = True break else: time.sleep(RETRY_INTERVAL_LONG) continue return node_down
def wait_for_node_down_longhorn(node_name, longhorn_api_client)
-
Expand source code
def wait_for_node_down_longhorn(node_name, longhorn_api_client): longhorn_node_down = False for i in range(RETRY_COUNTS): if is_node_ready_longhorn(node_name, longhorn_api_client) is False: longhorn_node_down = True break else: time.sleep(RETRY_INTERVAL) continue return longhorn_node_down
def wait_for_node_up_aws(cloudprovider, node)
-
Expand source code
def wait_for_node_up_aws(cloudprovider, node): aws_node_up = False for i in range(RETRY_COUNTS): status = cloudprovider.instance_status(node) print(f'instance status = {status}') if status == 'running': aws_node_up = True break else: time.sleep(RETRY_INTERVAL_LONG) continue return aws_node_up
def wait_for_node_up_k8s(node_name, k8s_api_client)
-
Expand source code
def wait_for_node_up_k8s(node_name, k8s_api_client): node_up = False for i in range(RETRY_COUNTS): if is_node_ready_k8s(node_name, k8s_api_client) is True: node_up = True break else: time.sleep(RETRY_INTERVAL) continue return node_up
def wait_for_node_up_longhorn(node_name, longhorn_api_client)
-
Expand source code
def wait_for_node_up_longhorn(node_name, longhorn_api_client): longhorn_node_up = False for i in range(RETRY_COUNTS): if is_node_ready_longhorn(node_name, longhorn_api_client) is True: longhorn_node_up = True break else: time.sleep(RETRY_INTERVAL) continue return longhorn_node_up