From 7863503ce7a3b06d979a7d2e7b90075f58c146f3 Mon Sep 17 00:00:00 2001 From: harshits Date: Mon, 23 Dec 2019 18:03:33 +0530 Subject: [PATCH] adding changes --- qds_sdk/quest.py | 311 ++++++++++++++++++++++++++++------------------- 1 file changed, 185 insertions(+), 126 deletions(-) diff --git a/qds_sdk/quest.py b/qds_sdk/quest.py index e3e0f68d..479d3d23 100644 --- a/qds_sdk/quest.py +++ b/qds_sdk/quest.py @@ -111,7 +111,8 @@ def start_pause(args): elif args.status: response = Quest.status(args.pipeline_id) else: - raise ParseError("Please select only one param out of --start, --pause, --delete, --archive, --clone and --edit.") + raise ParseError( + "Please select only one param out of --start, --pause, --delete, --archive, --clone and --edit.") return json.dumps(response, default=lambda o: o.attributes, sort_keys=True, indent=4) @staticmethod @@ -122,13 +123,13 @@ def index(args): @staticmethod def create(args): pipeline = None - print "create type = ", type(args.create_type) + # print "create type = ", type(args.create_type) if args.create_type == "1": - pipeline = Assisted.create(args.name, args.create_type, args) + pipeline = QuestAssisted.create(args.name, args.create_type, args) elif args.create_type == "2": - pipeline = Jar.create(args.name, args.create_type, args) + pipeline = QuestJar.create(args.name, args.create_type, args) elif args.create_type == "3": - c = Code() + c = QuestCode() c.create(args.name, args) return json.dumps(pipeline, default=lambda o: o.attributes, sort_keys=True, indent=4) @@ -143,7 +144,11 @@ class Quest(Resource): rest_entity_path = "pipelines" @staticmethod - def index(status=None): + def get_pipline_id(response): + return response.get('data').get('id') + + @staticmethod + def list(status=None): if status is None or status == 'all': params = {"filter": "draft,archive,active"} else: @@ -154,58 +159,28 @@ def index(status=None): return questjson @staticmethod - def create(name, args): - pass + def create(pipeline_name, create_type, **kwargs): + """ + Create a pipeline object by issuing a POST request to the /pipelin?mode=wizard endpoint + Note - this creates pipeline in draft mode - def add_source(self, pipeline_id, args): - data_store = args.data_store - if data_store == 's3': - return self.s3_source(pipeline_id, args) - elif data_store == 'kafka': - return self.kafka_source(pipeline_id, args) - elif data_store == 'kinesis': - return self.kinesis_source(pipeline_id, args) - else: - log.error("Provide a valid data source {s3, kafka, kinesis}") - - def add_sink(self, pipeline_id, args): - data_store = args.get("data_store") - if data_store == 's3': - return self.s3_sink(pipeline_id, args) - elif data_store == 'kafka': - return self.kafka_sink(pipeline_id, args) - elif data_store == 'kinesis': - return self.kinesis_sink(pipeline_id, args) - elif data_store == 'snowflake': - return self.kinesis_sink(pipeline_id, args) - elif data_store == 'hive': - return self.kinesis_sink(pipeline_id, args) - else: - log.error("Provide a valid data source {s3, kafka, kinesis}") - - def s3_source(self, pipeline_id, args): - data_store = "s3" - type = "source" - schema = args.schema - name = args.source_name - path = args.source_path - format = args.format - other_settings = {"fileNameOnly": "false", "latestFirst": "false"} - conn = Qubole.agent() - url = self.rest_entity_path + "/" + str(pipeline_id) + '/node' - data = {"data": {"attributes": {"fields": {"name": name, "path": path, "format": format, "schema": schema, - "other_settings": other_settings}, "data_store": data_store}, - "type": type}} - return conn.post(url, data) + Args: + **kwargs: keyword arguments specific to create type - def add_property(self): - pass - - @staticmethod - def pause(pipeline_id): - url = Quest.rest_entity_path + "/" + pipeline_id + "/pause" + Returns: + Command object + """ conn = Qubole.agent() - return conn.put(url) + if create_type is None: + raise ParseError("Please enter create_type. 1:Assisted Mode, 2:BYOJ, 3:BYOC") + if pipeline_name is None: + raise ParseError("Enter pipeline name") + data = {"data": { + "attributes": {"name": pipeline_name, "status": "DRAFT", "create_type": create_type}, + "type": "pipelines"}} + url = Quest.rest_entity_path + "?mode=wizard" + response = conn.post(url, data) + return response @staticmethod def start(pipeline_id): @@ -222,101 +197,185 @@ def start(pipeline_id): return response @staticmethod - def status(pipeline_id): + def add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=None, output_mode=None, can_retry=True): conn = Qubole.agent() - url = Quest.rest_entity_path + "/" + pipeline_id + "/status" - response = conn.put(url) - log.info(response) + data = {"data": {"attributes": { + "cluster_label": cluster_label, + "can_retry": can_retry, + "checkpoint_location": checkpoint_location, + "trigger_interval": trigger_interval, + "output_mode": output_mode, + "command_line_options": "--conf spark.driver.extraLibraryPath=/usr/lib/hadoop2/lib/native\n--conf spark.eventLog.compress=true\n--conf spark.eventLog.enabled=true\n--conf spark.sql.streaming.qubole.enableStreamingEvents=true\n--conf spark.qubole.event.enabled=true" + }, + "type": "pipeline/properties" + } + } + url = Quest.rest_entity_path + "/" + str(pipeline_id) + "/properties/" + response = conn.put(url, data) return response @staticmethod - def delete(pipeline_id): - conn = Qubole.agent() - url = Quest.rest_entity_path + "/" + pipeline_id + "/delete" - response = conn.put(url) - log.info(response) - return response + def save_code(pipeline_id, create_type, code_or_fileLoc=None, jar_file_loc=None, language='scala'): + try: + code = None + if code_or_fileLoc: + if os.path.isdir(code_or_fileLoc): + q = open(code_or_fileLoc).read() + code = q + else: + code = code_or_fileLoc + elif jar_file_loc: + code=jar_file_loc - def get_pipline_id(self, response): - return response.get('data').get('id') + except IOError as e: + raise ParseError("Unable to open script location or script location and code both are empty") - def kafka_source(self, **kwargs): - pass + data = {"data": {"attributes": {"create_type": create_type, "code": str(code), "language": str(language)}}} + conn = Qubole.agent() + url = Quest.rest_entity_path + "/" + str(pipeline_id) + "/save_code" + response = conn.put(url, data) + return response - def kinesis_source(self, **kwargs): - pass def health(self): pass - def clone(self): - pass - - def save_code(self): - pass + @staticmethod + def clone(pipeline_id): + url = Quest.rest_entity_path + "/" + pipeline_id + "/duplicate" + log.info("Cloning pipeline with id {}".format(pipeline_id)) + conn = Qubole.agent() + return conn.put(url) - def edit(self): - pass + @staticmethod + def pause(pipeline_id): + url = Quest.rest_entity_path + "/" + pipeline_id + "/pause" + log.info("Pausing pipeline with id {}".format(pipeline_id)) + conn = Qubole.agent() + return conn.put(url) + @staticmethod + def edit(pipeline_id, **kwargs): -class Code(Quest): - def __init__(self): - self.create_type = 3 - def create(self, name, args): + @staticmethod + def archive(pipeline_id): + url = Quest.rest_entity_path + "/" + pipeline_id + "/archive" + log.info("Archiving pipeline with id {}".format(pipeline_id)) conn = Qubole.agent() - data = {"data": {"attributes": {"name": name, "status": "DRAFT", "description": args.description, - "create_type": self.create_type}, "type": "pipelines"}} - url = Quest.rest_entity_path + "?mode=wizard" - response = conn.post(url, data) - pipeline_id = response.get("data").get("id") - property = self.add_property(pipeline_id, cluster_label=args.cluster_label) - save_code = self.save_code(pipeline_id, script_location=args.script_location, code=args.code, - language=args.language) - return save_code + return conn.put(url) + + # @staticmethod + # def status(pipeline_id): + # conn = Qubole.agent() + # url = Quest.rest_entity_path + "/" + pipeline_id + "/status" + # response = conn.put(url) + # log.info(response) + # return response - def add_property(self, pipeline_id, **kwargs): + @staticmethod + def delete(pipeline_id): conn = Qubole.agent() - data = {"data": {"attributes": { - "cluster_label": kwargs.get("cluster_label"), - "can_retry": "true", - "checkpoint_location": kwargs.get("checkpoint_location"), - "trigger_interval": kwargs.get("trigger_interval"), - "output_mode": kwargs.get("output_mode"), - "command_line_options": "--conf spark.driver.extraLibraryPath=/usr/lib/hadoop2/lib/native\n--conf spark.eventLog.compress=true\n--conf spark.eventLog.enabled=true\n--conf spark.sql.streaming.qubole.enableStreamingEvents=true\n--conf spark.qubole.event.enabled=true" - }, - "type": "pipeline/properties" - } - } - url = self.rest_entity_path + "/" + str(pipeline_id) + "/properties/" - response = conn.put(url, data) + url = Quest.rest_entity_path + "/" + pipeline_id + "/delete" + log.info("Deleting Pipeline with id: {}".format(pipeline_id)) + response = conn.put(url) + log.info(response) return response - def save_code(self, pipeline_id, **kwargs): - try: - code = None - if kwargs.get("code"): - code = kwargs.get('code') - elif kwargs.get("script_location"): - q = open(kwargs.get("script_location")).read() - code = q - except IOError as e: - raise ParseError("Unable to open script location or script location and code both are empty") + @staticmethod + def edit_pipeline_name(pipeline_id, pipeline_name): + conn = Qubole.agent() + url = Quest.rest_entity_path + "/" + pipeline_id + data = {"data":{"attributes":{"name":pipeline_name},"type":"pipelines"}} + return conn.put(url, data) + +class QuestCode(Quest): + @staticmethod + def create_pipeline(pipeline_name, create_type, code_or_fileLoc, cluster_label, checkpoint_location, language='scala', **kwargs): + """ + Method to create pipeline in BYOC mode in one go. + :param pipeline_name: Name by which pipeline will be created. + :param create_type: 1->Assisted Mode, 2->BYOJ mode, 3->BYOC mode + :param code_or_fileLoc: code/location of file + :param language: scala/python + :param kwargs: other params are checkpoint_location, trigger_interval, outputmode, can_retry(true by default) + :return: pipeline id + """ + response = Quest.create(pipeline_name, create_type) + log.info(response) + pipeline_id = Quest.get_pipline_id(response) + property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=kwargs.get("trigger_interval"), output_mode=kwargs.get("output_mode"), can_retry=kwargs.get("can_retry")) + log.info(property) + save_response = Quest.save_code(pipeline_id, create_type=3, code_or_fileLoc=code_or_fileLoc, language=language) + pipeline_id = Quest.get_pipline_id(save_response) + return pipeline_id + + @staticmethod + def edit(pipeline_id, **kwargs): + """ + Method to Edit pipeline + :param pipeline_id: pipeline id + :return: + """ + checkpoint_location = kwargs.get("checkpoint_location") + cluster_label = kwargs.get("cluster_label") + code = kwargs.get("code_or_fileLoc") language = kwargs.get("language") - data = {"data": {"attributes": {"create_type": self.create_type, "code": str(code), "language": str(language)}}} - conn = Qubole.agent() - url = self.rest_entity_path + "/" + str(pipeline_id) + "/save_code" - response = conn.put(url, data) - return response + trigger_interval=kwargs.get("trigger_interval") + output_mode=kwargs.get("output_mode") + can_retry=kwargs.get("can_retry") + property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry) + log.info(property) + save_response = Quest.save_code(pipeline_id, create_type=3, code_or_fileLoc=code, language=language) + return save_response + + +class QuestJar(Quest): + @staticmethod + def create_pipeline(pipeline_name, create_type, jar_file, cluster_label, checkpoint_location, language='scala', **kwargs): + """ + Method to create pipeline in BYOC mode in one go. + :param pipeline_name: Name by which pipeline will be created. + :param create_type: 1->Assisted Mode, 2->BYOJ mode, 3->BYOC mode + :param code_or_fileLoc: code/location of file + :param language: scala/python + :param kwargs: other params are checkpoint_location, trigger_interval, outputmode, can_retry(true by default) + :return: pipeline id + """ + response = Quest.create(pipeline_name, create_type) + log.info(response) + pipeline_id = Quest.get_pipline_id(response) + property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, **kwargs) + log.info(property) + save_response = Quest.save_code(pipeline_id, create_type, jar_file_loc=jar_file, language=language) + pipeline_id = Quest.get_pipline_id(save_response) + return pipeline_id - def start(self, pipeline_id): - return super(Quest, self).start(pipeline_id) + @staticmethod + def edit(pipeline_id, **kwargs): + """ + Method to Edit pipeline + :param pipeline_id: pipeline id + :return: + """ + checkpoint_location = kwargs.get("checkpoint_location") + cluster_label = kwargs.get("cluster_label") + code = kwargs.get("code_or_fileLoc") + language = kwargs.get("language") + trigger_interval=kwargs.get("trigger_interval") + output_mode=kwargs.get("output_mode") + can_retry=kwargs.get("can_retry") + property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry) + log.info(property) + save_response = Quest.save_code(pipeline_id, create_type=3, code_or_fileLoc=code, language=language) + return save_response -class Jar(Quest): - pass +class QuestAssisted(Quest): + def kafka_source(self, **kwargs): + pass -class Assisted(Quest): - pass + def kinesis_source(self, **kwargs): + pass