From 1b43fd99c18ad7f41483991c1653ac357a1b4589 Mon Sep 17 00:00:00 2001 From: harshits Date: Tue, 24 Dec 2019 16:36:33 +0530 Subject: [PATCH] adding changes --- qds_sdk/quest.py | 263 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 243 insertions(+), 20 deletions(-) diff --git a/qds_sdk/quest.py b/qds_sdk/quest.py index 479d3d23..28d49f22 100644 --- a/qds_sdk/quest.py +++ b/qds_sdk/quest.py @@ -197,7 +197,8 @@ def start(pipeline_id): return response @staticmethod - def add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=None, output_mode=None, can_retry=True): + def add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=None, output_mode=None, + can_retry=True): conn = Qubole.agent() data = {"data": {"attributes": { "cluster_label": cluster_label, @@ -225,7 +226,7 @@ def save_code(pipeline_id, create_type, code_or_fileLoc=None, jar_file_loc=None, else: code = code_or_fileLoc elif jar_file_loc: - code=jar_file_loc + code = jar_file_loc except IOError as e: raise ParseError("Unable to open script location or script location and code both are empty") @@ -236,7 +237,6 @@ def save_code(pipeline_id, create_type, code_or_fileLoc=None, jar_file_loc=None, response = conn.put(url, data) return response - def health(self): pass @@ -256,7 +256,7 @@ def pause(pipeline_id): @staticmethod def edit(pipeline_id, **kwargs): - + pass @staticmethod def archive(pipeline_id): @@ -286,13 +286,14 @@ def delete(pipeline_id): def edit_pipeline_name(pipeline_id, pipeline_name): conn = Qubole.agent() url = Quest.rest_entity_path + "/" + pipeline_id - data = {"data":{"attributes":{"name":pipeline_name},"type":"pipelines"}} + data = {"data": {"attributes": {"name": pipeline_name}, "type": "pipelines"}} return conn.put(url, data) class QuestCode(Quest): @staticmethod - def create_pipeline(pipeline_name, create_type, code_or_fileLoc, cluster_label, checkpoint_location, language='scala', **kwargs): + def create_pipeline(pipeline_name, create_type, code_or_fileLoc, cluster_label, checkpoint_location, + language='scala', **kwargs): """ Method to create pipeline in BYOC mode in one go. :param pipeline_name: Name by which pipeline will be created. @@ -305,7 +306,9 @@ def create_pipeline(pipeline_name, create_type, code_or_fileLoc, cluster_label, response = Quest.create(pipeline_name, create_type) log.info(response) pipeline_id = Quest.get_pipline_id(response) - property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=kwargs.get("trigger_interval"), output_mode=kwargs.get("output_mode"), can_retry=kwargs.get("can_retry")) + property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, + trigger_interval=kwargs.get("trigger_interval"), + output_mode=kwargs.get("output_mode"), can_retry=kwargs.get("can_retry")) log.info(property) save_response = Quest.save_code(pipeline_id, create_type=3, code_or_fileLoc=code_or_fileLoc, language=language) pipeline_id = Quest.get_pipline_id(save_response) @@ -322,10 +325,11 @@ def edit(pipeline_id, **kwargs): cluster_label = kwargs.get("cluster_label") code = kwargs.get("code_or_fileLoc") language = kwargs.get("language") - trigger_interval=kwargs.get("trigger_interval") - output_mode=kwargs.get("output_mode") - can_retry=kwargs.get("can_retry") - property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry) + trigger_interval = kwargs.get("trigger_interval") + output_mode = kwargs.get("output_mode") + can_retry = kwargs.get("can_retry") + property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, + trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry) log.info(property) save_response = Quest.save_code(pipeline_id, create_type=3, code_or_fileLoc=code, language=language) return save_response @@ -333,7 +337,8 @@ def edit(pipeline_id, **kwargs): class QuestJar(Quest): @staticmethod - def create_pipeline(pipeline_name, create_type, jar_file, cluster_label, checkpoint_location, language='scala', **kwargs): + def create_pipeline(pipeline_name, create_type, jar_file, cluster_label, checkpoint_location, language='scala', + **kwargs): """ Method to create pipeline in BYOC mode in one go. :param pipeline_name: Name by which pipeline will be created. @@ -363,19 +368,237 @@ def edit(pipeline_id, **kwargs): cluster_label = kwargs.get("cluster_label") code = kwargs.get("code_or_fileLoc") language = kwargs.get("language") - trigger_interval=kwargs.get("trigger_interval") - output_mode=kwargs.get("output_mode") - can_retry=kwargs.get("can_retry") - property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry) + trigger_interval = kwargs.get("trigger_interval") + output_mode = kwargs.get("output_mode") + can_retry = kwargs.get("can_retry") + property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, + trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry) log.info(property) save_response = Quest.save_code(pipeline_id, create_type=3, code_or_fileLoc=code, language=language) return save_response - class QuestAssisted(Quest): - def kafka_source(self, **kwargs): - pass + @staticmethod + def add_source(pipeline_id, source_path, schema, format, data_store, **kwargs): + """ + Method to add source in assisted mode pipeline. + :param pipeline_id: id of pipeline for which source need to be added/updated. + :param source_path: location of data + :param schema: key value pair eg: {"id":"Integer"} + :param format: JSON/Avro/Parquet/ORC. + :param other_settings: key value pairs eg: {"fileNameOnly": "false", "latestFirst": "false"} + :return: response in dict format + """ + url = Quest.rest_entity_path + "/" + pipeline_id + "/node" + sources = {"kafka": QuestAssisted._source_kafka, "kinesis": QuestAssisted._source_kinesis, + "s3": QuestAssisted._source_s3, "google_storage": QuestAssisted._source_google_storage} + if data_store in sources.keys(): + return sources[data_store](url, pipeline_id, source_path, schema, format, data_store, **kwargs) + raise ParseError("Please add only one valid source out of [kafka, s3, kinesis]") + + @staticmethod + def add_sink(pipeline_id, source_path, schema, format, data_store, **kwargs): + """ + Method to add sink for given pipeline. + :param pipeline_id: + :param source_path: + :param schema: + :param format: + :param data_store: + :param kwargs: + :return: + """ + url = Quest.rest_entity_path + "/" + pipeline_id + "/node" + sources = {"kafka": QuestAssisted._sink_kafka, "snowflake": QuestAssisted._sink_snowflake, + "hive": QuestAssisted._sink_hive, + "s3": QuestAssisted._sink_s3, "google_storage": QuestAssisted._sink_google_storage} + if data_store in sources.keys(): + return sources[data_store](url, pipeline_id, source_path, schema, format, **kwargs) + raise ParseError("Please add only one valid sink out of [kafka, s3, snowflake, hive, google_storage]") + + @staticmethod + def add_operator(operator, pipeline_id, column_name, **kwargs): + url = Quest.rest_entity_path + "/" + pipeline_id + "/operator" + if operator == 'filter': + return QuestAssisted._filter_operator(url, column_name, kwargs.get("condition"), kwargs.get("value")) + if operator == "select": + return QuestAssisted._select_operator(url, column_name) + if operator == "watermark": + return QuestAssisted._watermark_operator(url, column_name, kwargs.get("frequency")) + if operator == "window_group": + return QuestAssisted._window_group_operator(url, column_name, kwargs.get("sliding_window_value_frequency"), + kwargs.get("window_interval_frequency"), + kwargs.get("other_columns")) + raise ParseError("Please add only one valid sink out of [kafka, s3, snowflake, hive, google_storage]") + + @staticmethod + def _select_operator(url, column_names): + conn = Qubole.agent() + data = {"data": {"attributes": {"operator": "select", "column_names": column_names}}} + return conn.put(url, data) + + @staticmethod + def _filter_operator(url, column_name, condition, value): + conn = Qubole.agent() + data = {"data": {"attributes": {"operator": "filter", "column_name": column_name, + "condition": condition, "value": value}}} + return conn.put(url, data) + + @staticmethod + def _watermark_operator(url, column_name, frequency): + conn = Qubole.agent() + data = {"data": {"attributes": {"operator": "watermark", "column_name": column_name, + "frequency": frequency, "unit": "minute"}}} + return conn.put(url, data) + + @staticmethod + def _window_group_operator(url, column_name, sliding_window_value_frequency, window_interval_frequency, + other_columns): + conn = Qubole.agent() + data = {"data": {"attributes": {"operator": "windowed_group", + "window_expression": {"column_name": column_name, "sliding_window_value": { + "frequency": sliding_window_value_frequency, + "unit": "minute"}, + "window_interval": { + "frequency": window_interval_frequency, + "unit": "minute"}}, "other_columns": other_columns, + "action": "count"}}} + return conn.put(url, data) + + @staticmethod + def _source_kafka(url, schema, format, **kwargs): + conn = Qubole.agent() + default_kafka = {"kafkaConsumer.pollTimeoutMs": 512, "fetchOffset.numRetries": 3, + "fetchOffset.retryIntervalMs": 10} + data = { + "data": { + "attributes": { + "fields": { + "name": kwargs.get("name", "source_kafka"), + "brokers": kwargs.get("broker"), + "topics": kwargs.get("topics"), + "topic_type": kwargs.get("topic_type", "multiple"), + "schema": schema, + "use_registry": kwargs.get("use_registry", "write"), + "registry_subject": kwargs.get("registry_subject", None), + "registry_id": kwargs.get("registry_id", None), + "starting_offsets": kwargs.get("starting_offsets", "latest"), + "format": format, + "other_kafka_consumer_settings": kwargs.get("other_kafka_consumer_settings", default_kafka) + }, + "data_store": "kafka" + }, + "type": "source" + } + } + return conn.put(url, data) - def kinesis_source(self, **kwargs): + @staticmethod + def _source_kinesis(url, schema, format, **kwargs): + conn = Qubole.agent() + other_kinesis_settings = {"kinesis.executor.maxFetchTimeInMs": 1000, + "kinesis.executor.maxFetchRecordsPerShard": 100000, + "kinesis.executor.maxRecordPerRead": 10000} + data = { + "data": { + "attributes": { + "fields": { + "name": kwargs.get("name", "source_kinesis"), + "endpoint_url": kwargs.get("endpoint_url"), + "stream_name": kwargs.get("stream_name"), + "schema": schema, + "starting_position": kwargs.get("starting_position", "latest"), + "format": format, + "other_kinesis_settings": kwargs.get("other_kinesis_settings", other_kinesis_settings) + }, + "data_store": "kinesis" + }, + "type": "source" + } + } + return conn.put(url, data) + + @staticmethod + def _source_s3(url, schema, format, **kwargs): + conn = Qubole.agent() + other_settings = {"fileNameOnly": "false", "latestFirst": "false"} + data = { + "data": { + "attributes": { + "fields": { + "name": kwargs.get("name", "source_s3"), + "path": kwargs.get("path"), + "schema": schema, + "format": format, + "other_settings": kwargs.get("other_settings", other_settings) + }, + "data_store": "s3" + }, + "type": "source" + } + } + return conn.put(url, data) + + @staticmethod + def _source_google_storage(url, schema, format, **kwargs): + conn = Qubole.agent() + other_settings = {"fileNameOnly": "false", "latestFirst": "false"} + data = {"data": + {"attributes": + {"fields": + {"path": kwargs.get("path"), + "format": format, + "schema": schema, + "other_settings": other_settings + }, + "data_store": "googleStorage"}, + "type": "source" + } + } + return conn.put(url, data) + + @staticmethod + def _sink_kafka(url, format, **kwargs): + conn = Qubole.agent() + default = {"kafka.max.block.ms": 60000} + data = {"data": {"attributes": { + "fields": {"kafka_bootstrap_server": kwargs.get("kafka_bootstrap_server"), "topic": kwargs.get("topic"), + "format": format, "other_kafka_settings": kwargs.get("other_kafka_settings", default)}, + "data_store": "kafka"}, + "type": "sink"}} + return conn.put(url, data) + + @staticmethod + def _sink_s3(url, format, **kwargs): + conn = Qubole.agent() + data = {"data": {"attributes": { + "fields": {"path": kwargs.get("path"), "partition_by": kwargs.get("partition_by"), + "other_configurations": kwargs.get("other_configurations"), "format": format}, + "data_store": "s3"}, "type": "sink"}} + return conn.put(url, data) + + @staticmethod + def _sink_hive(url, format, **kwargs): + conn = Qubole.agent() + default_other_configurations = {"table.metastore.stopOnFailure": "false", + "table.metastore.updateIntervalSeconds": 10} + data = {"data": {"attributes": { + "fields": {"database": kwargs.get("databases", "default"), "table_name": kwargs.get("table_name"), + "other_configurations": kwargs.get("other_configuration", default_other_configurations)}, + "data_store": "hive"}, + "type": "sink"}} + return conn.put(url, data) + + @staticmethod + def _sink_snowflake(url, format, **kwargs): pass + + @staticmethod + def _sink_google_storage(url, format, **kwargs): + conn = Qubole.agent() + data = {"data": {"attributes": { + "fields": {"path": kwargs.get("path"), "partition_by": kwargs.get("partition_by"), + "other_configurations": kwargs.get("other_configurations"), "format": format}, + "data_store": "googleStorage"}, "type": "sink"}} + return conn.put(url, data)