Skip to content

Commit

Permalink
test(components): fix k8s_client 401 unauthorized error (#9749)
Browse files Browse the repository at this point in the history
* Initiate a new k8s client when calling _get_resource

* Remove k8s_client for methods that use _get_resource

* Initiate a new k8s client when calling _delete_resource
  • Loading branch information
rd-pong authored Jul 18, 2023
1 parent 81e989a commit fdb25f6
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def test_create_v2_endpoint(kfp_client, experiment_id, boto3_session, test_file_
shallow_canary=True,
)
)
k8s_client = ack_utils.k8s_client()
input_model_name = utils.generate_random_string(10) + "-v2-model"
input_endpoint_config_name = (
utils.generate_random_string(10) + "-v2-endpoint-config"
Expand Down Expand Up @@ -63,7 +62,7 @@ def test_create_v2_endpoint(kfp_client, experiment_id, boto3_session, test_file_
)

endpoint_describe = ack_utils._get_resource(
k8s_client, input_endpoint_name, "endpoints"
input_endpoint_name, "endpoints"
)

outputs = {
Expand Down Expand Up @@ -142,11 +141,11 @@ def test_create_v2_endpoint(kfp_client, experiment_id, boto3_session, test_file_
)
utils.remove_dir(download_dir)
finally:
ack_utils._delete_resource(k8s_client, input_endpoint_name, "endpoints")
ack_utils._delete_resource(input_endpoint_name, "endpoints")
ack_utils._delete_resource(
k8s_client, input_endpoint_config_name, "endpointconfigs"
input_endpoint_config_name, "endpointconfigs"
)
ack_utils._delete_resource(k8s_client, input_model_name, "models")
ack_utils._delete_resource(input_model_name, "models")


@pytest.mark.v2
Expand All @@ -159,7 +158,6 @@ def test_terminate_v2_endpoint(kfp_client, experiment_id):
os.path.join(download_dir, "config.yaml"),
)
)
k8s_client = ack_utils.k8s_client()
input_model_name = utils.generate_random_string(10) + "-v2-model"
input_endpoint_config_name = (
utils.generate_random_string(10) + "-v2-endpoint-config"
Expand All @@ -181,22 +179,20 @@ def test_terminate_v2_endpoint(kfp_client, experiment_id):
"running",
)
assert ack_utils.wait_for_condition(
k8s_client,
input_endpoint_name,
ack_utils.does_endpoint_exist,
wait_periods=12,
period_length=12,
)
kfp_client_utils.terminate_run(kfp_client, run_id)
assert ack_utils.wait_for_condition(
k8s_client,
input_endpoint_name,
ack_utils.is_endpoint_deleted,
wait_periods=20,
period_length=20,
)
finally:
ack_utils._delete_resource(
k8s_client, input_endpoint_config_name, "endpointconfigs"
input_endpoint_config_name, "endpointconfigs"
)
ack_utils._delete_resource(k8s_client, input_model_name, "models")
ack_utils._delete_resource(input_model_name, "models")
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def test_job_definitions(kfp_client, experiment_id, test_file_dir, deploy_endpoi
os.path.join(download_dir, "config.yaml"),
)
)
k8s_client = ack_utils.k8s_client()
job_definition_name = (
utils.generate_random_string(10) + "-v2-" + test_params["TestName"]
)
Expand All @@ -56,7 +55,7 @@ def test_job_definitions(kfp_client, experiment_id, test_file_dir, deploy_endpoi

# Verify if the job definition CR is created
job_definition_describe = ack_utils._get_resource(
k8s_client, job_definition_name, test_params["Plural"]
job_definition_name, test_params["Plural"]
)
assert (
job_definition_name
Expand Down Expand Up @@ -88,5 +87,5 @@ def test_job_definitions(kfp_client, experiment_id, test_file_dir, deploy_endpoi

finally:
ack_utils._delete_resource(
k8s_client, job_definition_name, test_params["Plural"]
job_definition_name, test_params["Plural"]
)
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def test_create_v2_monitoring_schedule(
os.path.join(download_dir, "config.yaml"),
)
)
k8s_client = ack_utils.k8s_client()

# parameters for model bias job definition
job_definition_name = (
Expand Down Expand Up @@ -92,7 +91,7 @@ def test_create_v2_monitoring_schedule(

# Verify if the job definition CR is created properly
job_definition_describe = ack_utils._get_resource(
k8s_client, job_definition_name, "modelbiasjobdefinitions"
job_definition_name, "modelbiasjobdefinitions"
)
assert (
job_definition_name
Expand All @@ -107,7 +106,7 @@ def test_create_v2_monitoring_schedule(

# Verify if monitoring schedule CR is created properly
monitoring_schedule_describe = ack_utils._get_resource(
k8s_client, monitoring_schedule_name, "monitoringschedules"
monitoring_schedule_name, "monitoringschedules"
)
assert (
monitoring_schedule_name
Expand All @@ -124,14 +123,12 @@ def test_create_v2_monitoring_schedule(

finally:
ack_utils._delete_resource(
k8s_client,
job_definition_name,
"modelbiasjobdefinitions",
wait_periods=10,
period_length=30,
)
ack_utils._delete_resource(
k8s_client,
monitoring_schedule_name,
"monitoringschedules",
wait_periods=10,
Expand Down Expand Up @@ -164,7 +161,6 @@ def test_update_v2_monitoring_schedule(
os.path.join(download_dir, "config.yaml"),
)
)
k8s_client = ack_utils.k8s_client()

# parameters for job definition
test_params["Arguments"][test_params["JobInputName"]]["endpointInput"][
Expand Down Expand Up @@ -202,7 +198,7 @@ def test_update_v2_monitoring_schedule(

# Verify if monitoring schedule CR is created properly
monitoring_schedule_describe = ack_utils._get_resource(
k8s_client, monitoring_schedule_name, "monitoringschedules"
monitoring_schedule_name, "monitoringschedules"
)
assert (
monitoring_schedule_name
Expand All @@ -221,7 +217,7 @@ def test_update_v2_monitoring_schedule(

# Verify if the job definition CR is created properly
job_definition_1_describe = ack_utils._get_resource(
k8s_client, job_definition_name_1, "dataqualityjobdefinitions"
job_definition_name_1, "dataqualityjobdefinitions"
)
assert (
job_definition_name_1
Expand Down Expand Up @@ -262,7 +258,7 @@ def test_update_v2_monitoring_schedule(

# Verify if monitoring schedule is updated with correct job definition
monitoring_schedule_updated_describe = ack_utils._get_resource(
k8s_client, monitoring_schedule_name, "monitoringschedules"
monitoring_schedule_name, "monitoringschedules"
)
assert (
monitoring_schedule_updated_describe["status"]["monitoringScheduleStatus"]
Expand All @@ -277,7 +273,7 @@ def test_update_v2_monitoring_schedule(

# Verify if the new job definition CR is created properly
job_definition_2_describe = ack_utils._get_resource(
k8s_client, job_definition_name_2, "dataqualityjobdefinitions"
job_definition_name_2, "dataqualityjobdefinitions"
)
assert (
job_definition_name_2
Expand All @@ -296,21 +292,18 @@ def test_update_v2_monitoring_schedule(

finally:
ack_utils._delete_resource(
k8s_client,
job_definition_name_1,
test_params["Plural"],
wait_periods=10,
period_length=30,
)
ack_utils._delete_resource(
k8s_client,
job_definition_name_2,
test_params["Plural"],
wait_periods=10,
period_length=30,
)
ack_utils._delete_resource(
k8s_client,
monitoring_schedule_name,
"monitoringschedules",
wait_periods=10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
],
)
def test_trainingjobV2(kfp_client, experiment_id, test_file_dir):
k8s_client = ack_utils.k8s_client()
test_file_dir = "resources/config/ack-training-job"
download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated"))
test_params = utils.load_params(
Expand Down Expand Up @@ -68,7 +67,7 @@ def test_trainingjobV2(kfp_client, experiment_id, test_file_dir):

# Verify Training job was successful on SageMaker
print(f"training job name: {input_job_name}")
train_response = ack_utils._get_resource(k8s_client, input_job_name, "trainingjobs")
train_response = ack_utils._get_resource(input_job_name, "trainingjobs")
assert (
train_response["status"]["trainingJobStatus"]
== output_training_job_status
Expand All @@ -87,7 +86,6 @@ def test_trainingjobV2(kfp_client, experiment_id, test_file_dir):

@pytest.mark.v2
def test_terminate_trainingjob(kfp_client, experiment_id):
k8s_client = ack_utils.k8s_client()
test_file_dir = "resources/config/ack-training-job"
download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated_terminate"))

Expand All @@ -114,7 +112,7 @@ def test_terminate_trainingjob(kfp_client, experiment_id):
kfp_client_utils.terminate_run(kfp_client, run_id)
desiredStatuses = ["Stopping", "Stopped"]
training_status_reached = ack_utils.wait_for_trainingjob_status(
k8s_client, input_job_name, desiredStatuses, 10, 6
input_job_name, desiredStatuses, 10, 6
)
assert training_status_reached

Expand Down
27 changes: 14 additions & 13 deletions components/aws/sagemaker/tests/integration_tests/utils/ack_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ def k8s_client():
return config.new_client_from_config()


def _get_resource(k8s_client, job_name, plural):
def _get_resource(job_name, plural):
"""Get the custom resource detail similar to: kubectl describe <resource> JOB_NAME -n NAMESPACE.
Returns:
None or object: None if the resource doesn't exist in server or there is an error, otherwise the
custom object.
"""
_api = client.CustomObjectsApi(k8s_client)
# Instantiate a new client every time to avoid connection issues.
_api = client.CustomObjectsApi(k8s_client())
namespace = os.environ.get("NAMESPACE")
try:
job_description = _api.get_namespaced_custom_object(
Expand All @@ -29,12 +30,12 @@ def _get_resource(k8s_client, job_name, plural):
return job_description


def _delete_resource(k8s_client, job_name, plural, wait_periods=10, period_length=20):
def _delete_resource(job_name, plural, wait_periods=10, period_length=20):
"""Delete the custom resource
Returns:
True or False: True if the resource is deleted, False if the resource deletion times out
"""
_api = client.CustomObjectsApi(k8s_client)
_api = client.CustomObjectsApi(k8s_client())
namespace = os.environ.get("NAMESPACE")

try:
Expand All @@ -50,7 +51,7 @@ def _delete_resource(k8s_client, job_name, plural, wait_periods=10, period_lengt

for _ in range(wait_periods):
sleep(period_length)
if _get_resource(k8s_client, job_name, plural) is None:
if _get_resource(job_name, plural) is None:
print(f"Resource {job_name} deleted successfully.")
return True

Expand All @@ -60,30 +61,30 @@ def _delete_resource(k8s_client, job_name, plural, wait_periods=10, period_lengt

# TODO: Make this a generalized function for non-job resources.
def wait_for_trainingjob_status(
k8s_client, training_job_name, desiredStatuses, wait_periods, period_length
training_job_name, desiredStatuses, wait_periods, period_length
):
for _ in range(wait_periods):
response = _get_resource(k8s_client, training_job_name, "trainingjobs")
response = _get_resource(training_job_name, "trainingjobs")
if response["status"]["trainingJobStatus"] in desiredStatuses:
return True
sleep(period_length)
return False


def wait_for_condition(
k8s_client, resource_name, validator_function, wait_periods=10, period_length=8
resource_name, validator_function, wait_periods=10, period_length=8
):
for _ in range(wait_periods):
if not validator_function(k8s_client, resource_name):
if not validator_function(resource_name):
sleep(period_length)
else:
return True
return False


def does_endpoint_exist(k8s_client, endpoint_name):
def does_endpoint_exist(endpoint_name):
try:
response = _get_resource(k8s_client, endpoint_name, "endpoints")
response = _get_resource(endpoint_name, "endpoints")
if response:
return True
if response is None: # kubernetes module error
Expand All @@ -92,8 +93,8 @@ def does_endpoint_exist(k8s_client, endpoint_name):
return False


def is_endpoint_deleted(k8s_client, endpoint_name):
response = _get_resource(k8s_client, endpoint_name, "endpoints")
def is_endpoint_deleted(endpoint_name):
response = _get_resource(endpoint_name, "endpoints")
if response:
return False
if response is None:
Expand Down

0 comments on commit fdb25f6

Please sign in to comment.