Skip to content

Commit

Permalink
Fix linter errors in DataSciencePipelinesKfp.py and ray_integration.py (
Browse files Browse the repository at this point in the history
#1325)

* Apply `poetry run ruff check ods_ci/ --fix`

Signed-off-by: Jiri Daněk <jdanek@redhat.com>

* Apply `poetry run black ods_ci/`

Signed-off-by: Jiri Daněk <jdanek@redhat.com>

---------

Signed-off-by: Jiri Daněk <jdanek@redhat.com>
Co-authored-by: Jorge <jgarciao@users.noreply.github.com>
  • Loading branch information
jiridanek and jgarciao authored Mar 27, 2024
1 parent a3de25b commit 507fee7
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 39 deletions.
4 changes: 1 addition & 3 deletions ods_ci/libs/DataSciencePipelinesAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ def add_role_to_user(self, name, user, project):
def do_http_request(self, url):
assert self.route != "", "Login First"
response = requests.get(
f"http://{self.route}/{url}",
headers={"Authorization": f"Bearer {self.sa_token}"},
verify=self.get_cert()
f"http://{self.route}/{url}", headers={"Authorization": f"Bearer {self.sa_token}"}, verify=self.get_cert()
)
assert response.status_code == 200
return response.url
Expand Down
50 changes: 24 additions & 26 deletions ods_ci/libs/DataSciencePipelinesKfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import sys
import time

from DataSciencePipelinesAPI import DataSciencePipelinesAPI
from robotlibcore import keyword
from urllib3.exceptions import MaxRetryError, SSLError
Expand All @@ -18,7 +19,7 @@ def __init__(self):
self.client = None
self.api = None

def get_client(self, user, pwd, project, route_name='ds-pipeline-dspa'):
def get_client(self, user, pwd, project, route_name="ds-pipeline-dspa"):
if self.client is None:
self.api = DataSciencePipelinesAPI()
self.api.login_and_wait_dsp_route(user, pwd, project, route_name)
Expand All @@ -44,7 +45,11 @@ def get_client(self, user, pwd, project, route_name='ds-pipeline-dspa'):
# we assume it is a cluster with self-signed certs
if type(e.reason) == SSLError:
# try to retrieve the certificate
self.client = Client(host=f"https://{self.api.route}/", existing_token=self.api.sa_token, ssl_ca_cert=self.api.get_cert())
self.client = Client(
host=f"https://{self.api.route}/",
existing_token=self.api.sa_token,
ssl_ca_cert=self.api.get_cert(),
)
return self.client, self.api

def get_bucket_name(self, api, project):
Expand All @@ -71,18 +76,15 @@ def setup_client(self, user, pwd, project):

@keyword
def import_run_pipeline(self, pipeline_url, pipeline_params):
print(f'pipeline_params({type(pipeline_params)}): {pipeline_params}')
print(f'downloading: {pipeline_url}')
print(f"pipeline_params({type(pipeline_params)}): {pipeline_params}")
print(f"downloading: {pipeline_url}")
test_pipeline_run_yaml, _ = self.api.do_get(pipeline_url, skip_ssl=True)
pipeline_file = "/tmp/test_pipeline_run_yaml.yaml"
with open(pipeline_file, "w", encoding="utf-8") as f:
f.write(test_pipeline_run_yaml)
print(f'{pipeline_url} content stored at {pipeline_file}')
print('create a run from pipeline')
response = self.client.create_run_from_pipeline_package(
pipeline_file=pipeline_file,
arguments=pipeline_params
)
print(f"{pipeline_url} content stored at {pipeline_file}")
print("create a run from pipeline")
response = self.client.create_run_from_pipeline_package(pipeline_file=pipeline_file, arguments=pipeline_params)
print(response)
return response.run_id

Expand All @@ -109,9 +111,9 @@ def delete_run(self, run_id):

@keyword
def create_run_from_pipeline_func(
self, user, pwd, project, source_code, fn, pipeline_params={}, current_path=None, route_name='ds-pipeline-dspa'
self, user, pwd, project, source_code, fn, pipeline_params={}, current_path=None, route_name="ds-pipeline-dspa"
):
print(f'pipeline_params: {pipeline_params}')
print(f"pipeline_params: {pipeline_params}")
client, api = self.get_client(user, pwd, project, route_name)
mlpipeline_minio_artifact_secret = api.get_secret(project, "ds-pipeline-s3-dspa")
bucket_name = self.get_bucket_name(api, project)
Expand All @@ -128,23 +130,19 @@ def create_run_from_pipeline_func(
# pipeline_params
# there are some special keys to retrieve argument values dynamically
# in pipeline v2, we must match the parameters names
if 'mlpipeline_minio_artifact_secret' in pipeline_params:
pipeline_params['mlpipeline_minio_artifact_secret'] = str(mlpipeline_minio_artifact_secret["data"])
if 'bucket_name' in pipeline_params:
pipeline_params['bucket_name'] = bucket_name
if 'openshift_server' in pipeline_params:
pipeline_params['openshift_server'] = self.api.get_openshift_server()
if 'openshift_token' in pipeline_params:
pipeline_params['openshift_token'] = self.api.get_openshift_token()
print(f'pipeline_params modified with dynamic values: {pipeline_params}')
if "mlpipeline_minio_artifact_secret" in pipeline_params:
pipeline_params["mlpipeline_minio_artifact_secret"] = str(mlpipeline_minio_artifact_secret["data"])
if "bucket_name" in pipeline_params:
pipeline_params["bucket_name"] = bucket_name
if "openshift_server" in pipeline_params:
pipeline_params["openshift_server"] = self.api.get_openshift_server()
if "openshift_token" in pipeline_params:
pipeline_params["openshift_token"] = self.api.get_openshift_token()
print(f"pipeline_params modified with dynamic values: {pipeline_params}")

# create_run_from_pipeline_func will compile the code
# if you need to see the yaml, for debugging purpose, call: TektonCompiler().compile(pipeline, f'{fn}.yaml')
result = client.create_run_from_pipeline_func(
pipeline_func=pipeline,
arguments=pipeline_params
)
result = client.create_run_from_pipeline_func(pipeline_func=pipeline, arguments=pipeline_params)
# easy to debug and double check failures
print(result)
return result.run_id

Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ def print_msg(msg: str):
def flipcoin_pipeline():
flip_coin_op = components.create_component_from_func(flip_coin, base_image=DataSciencePipelinesKfp.base_image)
print_op = components.create_component_from_func(print_msg, base_image=DataSciencePipelinesKfp.base_image)
random_num_op = components.create_component_from_func(
random_num, base_image=DataSciencePipelinesKfp.base_image
)
random_num_op = components.create_component_from_func(random_num, base_image=DataSciencePipelinesKfp.base_image)

flip = flip_coin_op()
with dsl.Condition(flip.output == "heads"):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from kfp import dsl

from ods_ci.libs.DataSciencePipelinesKfp import DataSciencePipelinesKfp


@dsl.component(packages_to_install=['codeflare-sdk'], base_image=DataSciencePipelinesKfp.base_image)
@dsl.component(packages_to_install=["codeflare-sdk"], base_image=DataSciencePipelinesKfp.base_image)
def ray_fn(openshift_server: str, openshift_token: str) -> int:
import ray
from codeflare_sdk.cluster.auth import TokenAuthentication
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Test pipeline to exercise various data flow mechanisms."""

import kfp

from ods_ci.libs.DataSciencePipelinesKfp import DataSciencePipelinesKfp


Expand Down Expand Up @@ -45,7 +47,7 @@ def receive_file(
shutil.copyfile(incomingfile, saveartifact)


@kfp.dsl.component(packages_to_install=['minio'], base_image=DataSciencePipelinesKfp.base_image)
@kfp.dsl.component(packages_to_install=["minio"], base_image=DataSciencePipelinesKfp.base_image)
def test_uploaded_artifact(
previous_step: kfp.dsl.InputPath(),
file_size_bytes: int,
Expand All @@ -54,12 +56,13 @@ def test_uploaded_artifact(
):
import base64
import json

from minio import Minio

def inner_decode(my_str):
return base64.b64decode(my_str).decode("utf-8")

mlpipeline_minio_artifact_secret = json.loads(mlpipeline_minio_artifact_secret.replace("\'", "\""))
mlpipeline_minio_artifact_secret = json.loads(mlpipeline_minio_artifact_secret.replace("'", '"'))
host = inner_decode(mlpipeline_minio_artifact_secret["host"])
port = inner_decode(mlpipeline_minio_artifact_secret["port"])
access_key = inner_decode(mlpipeline_minio_artifact_secret["accesskey"])
Expand All @@ -68,8 +71,8 @@ def inner_decode(my_str):
secure = secure.lower() == "true"
client = Minio(f"{host}:{port}", access_key=access_key, secret_key=secret_key, secure=secure)

store_object = previous_step.replace(f'/s3/{bucket_name}/', '')
print(f'parsing {previous_step} to {store_object} ')
store_object = previous_step.replace(f"/s3/{bucket_name}/", "")
print(f"parsing {previous_step} to {store_object} ")
data = client.get_object(bucket_name, store_object)

with open("my-testfile", "wb") as file_data:
Expand All @@ -88,7 +91,6 @@ def inner_decode(my_str):
name="Test Data Passing Pipeline 1",
)
def wire_up_pipeline(mlpipeline_minio_artifact_secret: str, bucket_name: str):

file_size_mb = 20
file_size_bytes = file_size_mb * 1024 * 1024

Expand All @@ -102,5 +104,5 @@ def wire_up_pipeline(mlpipeline_minio_artifact_secret: str, bucket_name: str):
previous_step=receive_file_task.output,
file_size_bytes=file_size_bytes,
mlpipeline_minio_artifact_secret=mlpipeline_minio_artifact_secret,
bucket_name=bucket_name
bucket_name=bucket_name,
)

0 comments on commit 507fee7

Please sign in to comment.