diff --git a/bin/qds.py b/bin/qds.py index bc434023..7f015f98 100755 --- a/bin/qds.py +++ b/bin/qds.py @@ -17,7 +17,7 @@ from qds_sdk.template import TemplateCmdLine from qds_sdk.clusterv2 import ClusterCmdLine from qds_sdk.sensors import * -from qds_sdk.quest import QuestCmdLine +from qds_sdk.pipelines import PipelinesCmdLine import os import sys import traceback @@ -90,8 +90,8 @@ " action --help\n" "\nScheduler subcommand:\n" " scheduler --help\n" - "\nQuest subcommand:\n" - " quest --help\n" + "\nPipelines subcommand:\n" + " pipelines --help\n" "\nTemplate subcommand:\n" " template --help\n" "\nAccount subcommand:\n" @@ -559,7 +559,7 @@ def templatemain(args): print(result) def questmain(args): - result = QuestCmdLine.run(args) + result = PipelinesCmdLine.run(args) print(result) @@ -706,13 +706,13 @@ def main(): return usermain(args) if a0 == "template": return templatemain(args) - if a0 == "quest": + if a0 == "pipelines": return questmain(args) cmdset = set(CommandClasses.keys()) sys.stderr.write("First command must be one of <%s>\n" % "|".join(cmdset.union(["cluster", "action", "scheduler", "report", - "dbtap", "role", "group", "app", "account", "nezha", "user", "template", "quest"]))) + "dbtap", "role", "group", "app", "account", "nezha", "user", "template", "pipelines"]))) usage(optparser) diff --git a/qds_sdk/quest.py b/qds_sdk/pipelines.py similarity index 87% rename from qds_sdk/quest.py rename to qds_sdk/pipelines.py index d522d69c..f1fa500d 100644 --- a/qds_sdk/quest.py +++ b/qds_sdk/pipelines.py @@ -1,6 +1,6 @@ """ -The quest module contains the base definition for -a generic quest commands. +The Pipelines module contains the base definition for +a generic Pipelines commands. """ from qds_sdk.actions import * import json @@ -14,13 +14,13 @@ _URI_RE = re.compile(r's3://([^/]+)/?(.*)') -class QuestCmdLine: - """qds_sdk.QuestCmdLine is the interface used by qds.py.""" +class PipelinesCmdLine: + """qds_sdk.PipelinesCmdLine is the interface used by qds.py.""" @staticmethod def parsers(): - argparser = ArgumentParser(prog="qds.py quest", - description="Quest client for Qubole Data Service.") + argparser = ArgumentParser(prog="qds.py pipelines", + description="Pipelines client for Qubole Data Service.") subparsers = argparser.add_subparsers() # Create @@ -51,7 +51,7 @@ def parsers(): create.add_argument("--command-line-options", dest="command_line_options", help="command line options on property page.") - create.set_defaults(func=QuestCmdLine.create) + create.set_defaults(func=PipelinesCmdLine.create) # Update/Edit update_properties = subparsers.add_parser("update-property", @@ -67,14 +67,14 @@ def parsers(): help="command line options on property page.") update_properties.add_argument("--can-retry", dest="can_retry", help="can retry true or false") - update_properties.set_defaults(func=QuestCmdLine.update_properties) + update_properties.set_defaults(func=PipelinesCmdLine.update_properties) update_code = subparsers.add_parser("update-code", help="Update code of a existing pipeline") update_code.add_argument( "-c", "--code", dest="code", help="query string") update_code.add_argument("-f", "--script-location", dest="script_location", help="Path where code to run is stored. local file path") - update_code.set_defaults(func=QuestCmdLine.update_code) + update_code.set_defaults(func=PipelinesCmdLine.update_code) update_code.add_argument( "--jar-path", dest="jar_path", @@ -94,37 +94,37 @@ def parsers(): delete = subparsers.add_parser("delete", help="Delete Pipeline") delete.add_argument("--pipeline-id", dest="pipeline_id", required=True, help='Id of pipeline which need to be started') - delete.set_defaults(func=QuestCmdLine.delete) + delete.set_defaults(func=PipelinesCmdLine.delete) status = subparsers.add_parser("status", help="Status of Pipeline") status.add_argument("--pipeline-id", dest="pipeline_id", required=True, help='Id of pipeline which need to be started') - status.set_defaults(func=QuestCmdLine.status) + status.set_defaults(func=PipelinesCmdLine.status) start = subparsers.add_parser("start", help="Start Pipeline") start.add_argument("--pipeline-id", dest="pipeline_id", required=True, help='Id of pipeline which need to be started') - start.set_defaults(func=QuestCmdLine.start) + start.set_defaults(func=PipelinesCmdLine.start) pause = subparsers.add_parser("pause", help="pause Pipeline") pause.add_argument("--pipeline-id", dest="pipeline_id", required=True, help='Id of pipeline which need to be started') - pause.set_defaults(func=QuestCmdLine.pause) + pause.set_defaults(func=PipelinesCmdLine.pause) clone = subparsers.add_parser("clone", help="clone Pipeline") clone.add_argument("--pipeline-id", dest="pipeline_id", required=True, help='Id of pipeline which need to be started') - clone.set_defaults(func=QuestCmdLine.clone) + clone.set_defaults(func=PipelinesCmdLine.clone) archive = subparsers.add_parser("archive", help="archive Pipeline") archive.add_argument("--pipeline-id", dest="pipeline_id", required=True, help='Id of pipeline which need to be started') - archive.set_defaults(func=QuestCmdLine.archive) + archive.set_defaults(func=PipelinesCmdLine.archive) health = subparsers.add_parser("health", help="health of Pipeline") health.add_argument("--pipeline-id", dest="pipeline_id", required=True, help='Id of pipeline which need to be started') - health.set_defaults(func=QuestCmdLine.health) + health.set_defaults(func=PipelinesCmdLine.health) # list index = subparsers.add_parser("list", help="list of Pipeline.") index.add_argument("--pipeline-status", dest="status", required=True, help='Id of pipeline which need to be started. ' 'Valid values = [active, archive, all, draft] ') - index.set_defaults(func=QuestCmdLine.index) + index.set_defaults(func=PipelinesCmdLine.index) return argparser @staticmethod @@ -134,7 +134,7 @@ def run(args): :param args: :return: """ - parser = QuestCmdLine.parsers() + parser = PipelinesCmdLine.parsers() parsed = parser.parse_args(args) return parsed.func(parsed) @@ -145,7 +145,7 @@ def delete(args): :param args: :return: """ - response = Quest.delete(args.pipeline_id) + response = Pipelines.delete(args.pipeline_id) return json.dumps( response, default=lambda o: o.attributes, sort_keys=True, indent=4) @@ -156,7 +156,7 @@ def pause(args): :param args: :return: """ - response = Quest.pause(args.pipeline_id) + response = Pipelines.pause(args.pipeline_id) return json.dumps( response, default=lambda o: o.attributes, sort_keys=True, indent=4) @@ -167,7 +167,7 @@ def archive(args): :param args: :return: """ - response = Quest.archive(args.pipeline_id) + response = Pipelines.archive(args.pipeline_id) return json.dumps( response, default=lambda o: o.attributes, sort_keys=True, indent=4) @@ -178,7 +178,7 @@ def clone(args): :param args: :return: """ - response = Quest.clone(args.pipeline_id) + response = Pipelines.clone(args.pipeline_id) return json.dumps(response, default=lambda o: o.attributes, sort_keys=True, indent=4) @staticmethod @@ -188,7 +188,7 @@ def status(args): :param args: :return: """ - response = Quest.get_status(args.pipeline_id) + response = Pipelines.get_status(args.pipeline_id) return json.dumps( response, default=lambda o: o.attributes, sort_keys=True, indent=4) @@ -199,7 +199,7 @@ def health(args): :param args: :return: """ - response = Quest.get_health(args.pipeline_id) + response = Pipelines.get_health(args.pipeline_id) return json.dumps( response, default=lambda o: o.attributes, sort_keys=True, indent=4) @@ -210,7 +210,7 @@ def start(args): :param args: :return: """ - response = Quest.start(args.pipeline_id) + response = Pipelines.start(args.pipeline_id) return json.dumps(response, sort_keys=True, indent=4) @staticmethod @@ -220,7 +220,7 @@ def index(args): :param args: :return: """ - pipelinelist = Quest.list(args.status) + pipelinelist = Pipelines.list(args.status) return json.dumps( pipelinelist, default=lambda o: o.attributes, sort_keys=True, indent=4) @@ -233,7 +233,7 @@ def create(args): """ pipeline = None if int(args.create_type) == 2: - pipeline = QuestJar.create_pipeline(pipeline_name=args.name, + pipeline = PipelinesJar.create_pipeline(pipeline_name=args.name, jar_path=args.jar_path, main_class_name=args.main_class_name, cluster_label=args.cluster_label, @@ -241,7 +241,7 @@ def create(args): command_line_options=args.command_line_options) elif int(args.create_type) == 3: if args.code: - pipeline = QuestCode.create_pipeline(pipeline_name=args.name, + pipeline = PipelinesCode.create_pipeline(pipeline_name=args.name, cluster_label=args.cluster_label, code=args.code, file_path=args.script_location, @@ -249,7 +249,7 @@ def create(args): user_arguments=args.user_arguments, command_line_options=args.command_line_options) elif args.script_location: - pipeline = QuestCode.create_pipeline(pipeline_name=args.name, + pipeline = PipelinesCode.create_pipeline(pipeline_name=args.name, cluster_label=args.cluster_label, code=args.code, file_path=args.script_location, @@ -268,7 +268,7 @@ def update_properties(args): """ params = args.__dict__ log.debug(params) - Quest.add_property(pipeline_id=args.pipeline_id, + Pipelines.add_property(pipeline_id=args.pipeline_id, cluster_label=args.cluster_label, can_retry=args.can_retry, command_line_options=args.command_line_options) @@ -281,7 +281,7 @@ def update_code(args): :return: """ if args.jar_path or args.main_class_name: - response = QuestJar.save_code(pipeline_id=args.pipeline_id, + response = PipelinesJar.save_code(pipeline_id=args.pipeline_id, code=args.code, file_path=args.script_location, language=args.language, @@ -289,7 +289,7 @@ def update_code(args): user_arguments=args.user_arguments, main_class_name=args.main_class_name) elif args.code or args.script_location: - response = QuestCode.save_code(pipeline_id=args.pipeline_id, + response = PipelinesCode.save_code(pipeline_id=args.pipeline_id, code=args.code, file_path=args.script_location, language=args.language, @@ -299,8 +299,8 @@ def update_code(args): return json.dumps(response, sort_keys=True, indent=4) -class Quest(Resource): - """qds_sdk.Quest is the base Qubole Quest class.""" +class Pipelines(Resource): + """qds_sdk.Pipelines is the base Qubole Pipelines class.""" """ all commands use the /pipelines endpoint""" @@ -326,7 +326,7 @@ def list(status=None): else: params = {"filter": status.lower()} conn = Qubole.agent() - url_path = Quest.rest_entity_path + url_path = Pipelines.rest_entity_path pipeline_list = conn.get(url_path, params) return pipeline_list @@ -352,9 +352,9 @@ def create(cls, pipeline_name, create_type, **kwargs): "create_type": create_type}, "type": "pipelines"} } - url = Quest.rest_entity_path + "?mode=wizard" + url = Pipelines.rest_entity_path + "?mode=wizard" response = conn.post(url, data) - cls.pipeline_id = Quest.get_pipline_id(response) + cls.pipeline_id = Pipelines.get_pipline_id(response) cls.pipeline_name = pipeline_name @staticmethod @@ -365,9 +365,9 @@ def start(pipeline_id): :return: response """ conn = Qubole.agent() - url = Quest.rest_entity_path + "/" + pipeline_id + "/start" + url = Pipelines.rest_entity_path + "/" + pipeline_id + "/start" response = conn.put(url) - pipeline_status = Quest.get_status(pipeline_id) + pipeline_status = Pipelines.get_status(pipeline_id) while pipeline_status == 'waiting': log.info("Pipeline is in waiting state....") time.sleep(10) @@ -410,7 +410,7 @@ def add_property(pipeline_id, } } log.info("Data {}".format(data)) - url = Quest.rest_entity_path + "/" + pipeline_id + "/properties" + url = Pipelines.rest_entity_path + "/" + pipeline_id + "/properties" response = conn.put(url, data) log.debug(response) return response @@ -481,7 +481,7 @@ def get_health(pipeline_id): :return: """ conn = Qubole.agent() - url = Quest.rest_entity_path + "/" + pipeline_id + url = Pipelines.rest_entity_path + "/" + pipeline_id response = conn.get(url) log.info(response) return response.get("data").get("attributes").get("health") @@ -493,7 +493,7 @@ def clone(pipeline_id): :param pipeline_id: :return: """ - url = Quest.rest_entity_path + "/" + pipeline_id + "/duplicate" + url = Pipelines.rest_entity_path + "/" + pipeline_id + "/duplicate" log.info("Cloning pipeline with id {}".format(pipeline_id)) conn = Qubole.agent() return conn.post(url) @@ -505,7 +505,7 @@ def pause(pipeline_id): :param pipeline_id: :return: """ - url = Quest.rest_entity_path + "/" + pipeline_id + "/pause" + url = Pipelines.rest_entity_path + "/" + pipeline_id + "/pause" log.info("Pausing pipeline with id {}".format(pipeline_id)) conn = Qubole.agent() return conn.put(url) @@ -517,7 +517,7 @@ def archive(pipeline_id): :param pipeline_id: :return: """ - url = Quest.rest_entity_path + "/" + pipeline_id + "/archive" + url = Pipelines.rest_entity_path + "/" + pipeline_id + "/archive" log.info("Archiving pipeline with id {}".format(pipeline_id)) conn = Qubole.agent() return conn.put(url) @@ -530,7 +530,7 @@ def get_status(pipeline_id): :return: """ conn = Qubole.agent() - url = Quest.rest_entity_path + "/" + pipeline_id + url = Pipelines.rest_entity_path + "/" + pipeline_id response = conn.get(url) log.debug(response) return response.get("data").get( @@ -544,7 +544,7 @@ def delete(pipeline_id): :return: """ conn = Qubole.agent() - url = Quest.rest_entity_path + "/" + pipeline_id + "/delete" + url = Pipelines.rest_entity_path + "/" + pipeline_id + "/delete" log.info("Deleting Pipeline with id: {}".format(pipeline_id)) response = conn.put(url) log.info(response) @@ -559,7 +559,7 @@ def edit_pipeline_name(pipeline_id, pipeline_name): :return: """ conn = Qubole.agent() - url = Quest.rest_entity_path + "/" + pipeline_id + url = Pipelines.rest_entity_path + "/" + pipeline_id data = { "data": { "attributes": { @@ -584,7 +584,7 @@ def set_alert(pipeline_id, channel_id): } } conn = Qubole.agent() - url = Quest.rest_entity_path + "/" + pipeline_id + "/alerts" + url = Pipelines.rest_entity_path + "/" + pipeline_id + "/alerts" return conn.put(url, data) @staticmethod @@ -594,14 +594,14 @@ def get_code(pipeline_id): :param pipeline_id: :return: """ - url = Quest.rest_entity_path + "/" + pipeline_id + url = Pipelines.rest_entity_path + "/" + pipeline_id conn = Qubole.agent() reponse = conn.get(url) code = reponse.get("meta")["command_details"]["code"] return code -class QuestCode(Quest): +class PipelinesCode(Pipelines): create_type = 3 @staticmethod @@ -627,24 +627,24 @@ def create_pipeline(pipeline_name, :param channel_id: :return: """ - QuestCode.create(pipeline_name, QuestCode.create_type) - pipeline_id = QuestCode.pipeline_id - response = QuestCode.add_property(pipeline_id, cluster_label, + PipelinesCode.create(pipeline_name, PipelinesCode.create_type) + pipeline_id = PipelinesCode.pipeline_id + response = PipelinesCode.add_property(pipeline_id, cluster_label, can_retry=can_retry, command_line_options=command_line_options) log.debug(response) - response = QuestCode.save_code(pipeline_id, + response = PipelinesCode.save_code(pipeline_id, code=code, file_path=file_path, language=language, user_arguments=user_arguments) if channel_id: - response = Quest.set_alert(pipeline_id, channel_id) + response = Pipelines.set_alert(pipeline_id, channel_id) log.info(response) return response -class QuestJar(Quest): +class PipelinesJar(Pipelines): create_type = 2 @staticmethod @@ -668,24 +668,24 @@ def create_pipeline(pipeline_name, :param user_arguments: :return: """ - QuestJar.create(pipeline_name, QuestJar.create_type) - pipeline_id = QuestJar.pipeline_id - QuestJar.add_property(pipeline_id, + PipelinesJar.create(pipeline_name, PipelinesJar.create_type) + pipeline_id = PipelinesJar.pipeline_id + PipelinesJar.add_property(pipeline_id, cluster_label, can_retry=can_retry, command_line_options=command_line_options) - QuestJar.save_code(pipeline_id, + PipelinesJar.save_code(pipeline_id, jar_path=jar_path, main_class_name=main_class_name, user_arguments=user_arguments) - QuestJar.jar_path = jar_path + PipelinesJar.jar_path = jar_path if channel_id: - response = Quest.set_alert(pipeline_id, channel_id) + response = Pipelines.set_alert(pipeline_id, channel_id) log.info(response) - return QuestJar + return PipelinesJar -class QuestAssisted(Quest): +class PipelinesAssisted(Pipelines): create_type = 1 @staticmethod diff --git a/tests/test_quest.py b/tests/test_quest.py index 17ae4274..0ee50385 100644 --- a/tests/test_quest.py +++ b/tests/test_quest.py @@ -1,7 +1,7 @@ from __future__ import print_function from test_base import QdsCliTestCase from test_base import print_command -from qds_sdk.quest import QuestCode +from qds_sdk.pipelines import PipelinesCode from qds_sdk.connection import Connection import qds from mock import * @@ -19,7 +19,7 @@ class TestQuestList(QdsCliTestCase): def test_list_pipeline(self): - sys.argv = ['qds.py', 'quest', 'list', '--pipeline-status', 'draft'] + sys.argv = ['qds.py', 'pipelines', 'list', '--pipeline-status', 'draft'] print_command() Connection._api_call = Mock(return_value={}) params = {'filter': "draft"} @@ -28,7 +28,7 @@ def test_list_pipeline(self): "GET", "pipelines", params=params) def test_pause_pipeline(self): - sys.argv = ['qds.py', 'quest', 'pause', '--pipeline-id', '153'] + sys.argv = ['qds.py', 'pipelines', 'pause', '--pipeline-id', '153'] print_command() Connection._api_call = Mock(return_value={}) qds.main() @@ -36,7 +36,7 @@ def test_pause_pipeline(self): "PUT", "pipelines/153/pause", None) def test_clone_pipeline(self): - sys.argv = ['qds.py', 'quest', 'clone', '--pipeline-id', '153'] + sys.argv = ['qds.py', 'pipelines', 'clone', '--pipeline-id', '153'] print_command() Connection._api_call = Mock(return_value={}) qds.main() @@ -44,7 +44,7 @@ def test_clone_pipeline(self): "POST", "pipelines/153/duplicate", None) def test_archive_pipeline(self): - sys.argv = ['qds.py', 'quest', 'archive', '--pipeline-id', '153'] + sys.argv = ['qds.py', 'pipelines', 'archive', '--pipeline-id', '153'] print_command() Connection._api_call = Mock(return_value={}) qds.main() @@ -52,7 +52,7 @@ def test_archive_pipeline(self): "PUT", "pipelines/153/archive", None) def test_delete_pipeline(self): - sys.argv = ['qds.py', 'quest', 'delete', '--pipeline-id', '153'] + sys.argv = ['qds.py', 'pipelines', 'delete', '--pipeline-id', '153'] print_command() Connection._api_call = Mock(return_value={}) qds.main() @@ -60,7 +60,7 @@ def test_delete_pipeline(self): "PUT", "pipelines/153/delete", None) def test_create_pipeline(self): - sys.argv = ['qds.py', 'quest', 'create', '--create-type', '3', '--pipeline-name', 'test_pipeline_name', + sys.argv = ['qds.py', 'pipelines', 'create', '--create-type', '3', '--pipeline-name', 'test_pipeline_name', '--cluster-label', 'spark', '-c', 'print("hello")', '--language', 'python', '--user-arguments', 'users_argument'] print_command() d1 = {"data": {"attributes": {"name": "test_pipeline_name", "status": "DRAFT", "create_type": 3}, @@ -79,9 +79,9 @@ def test_create_pipeline(self): "owner_name": "eam-airflow", "pipeline_instance_status": "draft", "create_type": 3, "health": "UNKNOWN"}}} - QuestCode.pipeline_id = '1' - QuestCode.pipeline_code = """print("helloworld")""" - QuestCode.pipeline_name = "test_pipeline_name" + PipelinesCode.pipeline_id = '1' + PipelinesCode.pipeline_code = """print("helloworld")""" + PipelinesCode.pipeline_name = "test_pipeline_name" d2 = {"data": {"attributes": {"cluster_label": "spark", "can_retry": True, "checkpoint_location": None, "trigger_interval": None, "output_mode": None,