From c7b17775ea81537e44830bd7511c16513b76909a Mon Sep 17 00:00:00 2001 From: harshits Date: Fri, 17 Jan 2020 15:00:52 +0530 Subject: [PATCH] Adding changes --- qds_sdk/quest.py | 130 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 92 insertions(+), 38 deletions(-) diff --git a/qds_sdk/quest.py b/qds_sdk/quest.py index 714dd1ba..27ce2ea3 100644 --- a/qds_sdk/quest.py +++ b/qds_sdk/quest.py @@ -556,7 +556,8 @@ def add_source(pipeline_id, schema, format, data_store, topic_type, topics=None, raise ParseError("Please add only one valid source out of [kafka, s3, kinesis]") @staticmethod - def add_sink(pipeline_id, data_store, + def add_sink(pipeline_id, data_store, warehouse=None, catalog_name=None, streaming_stage=None, + snowflake_database=None, snowflake_table_name=None, data_format=None, kafka_bootstrap_server=None, topic=None, @@ -567,12 +568,18 @@ def add_sink(pipeline_id, data_store, other_gs_configurations=None, table_name=None, hive_database=None, - other_hive_configurations=None): + other_hive_configurations=None, + snowflake_name=None, snowflake_other_configurations=None): """ - Method to add sink for given pipeline. + :param pipeline_id: - :param format: :param data_store: + :param warehouse: + :param catalog_name: + :param streaming_stage: + :param snowflake_database: + :param snowflake_table_name: + :param data_format: :param kafka_bootstrap_server: :param topic: :param other_kafka_settings: @@ -583,6 +590,8 @@ def add_sink(pipeline_id, data_store, :param table_name: :param hive_database: :param other_hive_configurations: + :param snowflake_name: + :param snowflake_other_configurations: :return: """ url = Quest.rest_entity_path + "/" + pipeline_id + "/node" @@ -593,7 +602,9 @@ def add_sink(pipeline_id, data_store, return QuestAssisted._sink_s3(url, data_format, sink_path, partition_by, other_configurations=other_s3_configurations) if data_store == "snowflake": - return QuestAssisted._sink_snowflake(url, data_format) + return QuestAssisted._sink_snowflake(url, warehouse, catalog_name, streaming_stage, snowflake_database, + snowflake_table_name, name=snowflake_name, + other_configurations=snowflake_other_configurations) if data_store == "google_storage": return QuestAssisted._sink_google_storage(url, data_format, sink_path, partition_by, other_configurations=other_gs_configurations) @@ -637,34 +648,38 @@ def create_pipeline(pipeline_name, schema, source_data_format, source_data_store other_gs_configurations=None, table_name=None, hive_database=None, - other_hive_configurations=None): + other_hive_configurations=None, + warehouse=None, + catalog_name=None, + streaming_stage=None, + snowflake_database=None, + snowflake_table_name=None, + snowflake_name=None, + snowflake_other_configurations=None): """ - :param watermark_frequency: - :param sink_topics: - :param source_topics: - :param sink_data_format: - :param source_data_format: - :param filter_column_name: :param pipeline_name: :param schema: - :param format: + :param source_data_format: :param source_data_store: :param sink_data_store: :param checkpoint_location: :param cluster_label: :param output_mode: + :param sink_data_format: + :param source_topics: + :param sink_topics: :param trigger_interval: :param can_retry: + :param topic_type: :param command_line_options: - :param operators: dict of operators + :param operators: :param channel_id: :param endpoint_url: :param stream_name: :param starting_position: :param other_kinesis_settings: :param broker: - :param topic_type: :param use_registry: :param registry_subject: :param registry_id: @@ -674,7 +689,6 @@ def create_pipeline(pipeline_name, schema, source_data_format, source_data_store :param s3_other_settings: :param gs_other_settings: :param kafka_bootstrap_server: - :param topic: :param other_kafka_settings: :param sink_path: :param partition_by: @@ -683,13 +697,13 @@ def create_pipeline(pipeline_name, schema, source_data_format, source_data_store :param table_name: :param hive_database: :param other_hive_configurations: - :param condition: - :param value: - :param select_column_names: List of column names - :param frequency: - :param sliding_window_value_frequency: - :param window_interval_frequency: - :param other_columns: + :param warehouse: + :param catalog_name: + :param streaming_stage: + :param snowflake_database: + :param snowflake_table_name: + :param snowflake_name: + :param snowflake_other_configurations: :return: """ response = Quest.create(pipeline_name, QuestAssisted.create_type) @@ -724,7 +738,13 @@ def create_pipeline(pipeline_name, schema, source_data_format, source_data_store other_gs_configurations=other_gs_configurations, table_name=table_name, hive_database=hive_database, - other_hive_configurations=other_hive_configurations) + other_hive_configurations=other_hive_configurations, + warehouse=warehouse, catalog_name=catalog_name, + streaming_stage=streaming_stage, + snowflake_database=snowflake_database, + snowflake_table_name=snowflake_table_name, + snowflake_name=snowflake_name, + snowflake_other_configurations=snowflake_other_configurations) log.info(sink_reponse) property_response = QuestAssisted.add_property(pipeline_id, cluster_label, checkpoint_location, output_mode, trigger_interval=trigger_interval, @@ -741,20 +761,28 @@ def create_pipeline(pipeline_name, schema, source_data_format, source_data_store filter_column_name=operator["filter"]["column_name"]) elif operator.items()[0][0] is "select": operator_response = QuestAssisted.add_operator(pipeline_id, operator="select", - select_column_names=operator["select"]["column_names"]) + select_column_names=operator["select"][ + "column_names"]) elif operator.items()[0][0] is "watermark": operator_response = QuestAssisted.add_operator(pipeline_id, operator="watermark", - watermark_column_name=operator["watermark"]["column_name"], - watermark_frequency=operator["watermark"]["frequency"]) + watermark_column_name=operator["watermark"][ + "column_name"], + watermark_frequency=operator["watermark"][ + "frequency"]) elif operator.items()[0][0] is "windowed_group": operator_response = QuestAssisted.add_operator(pipeline_id, operator="windowed_group", - groupby_column_name=operator["windowed_group"]["column_name"], - sliding_window_value=operator["windowed_group"].get("sliding_window_value"), - window_interval_frequency=operator["windowed_group"]["window_interval_frequency"], - other_columns=operator["windowed_group"]["other_columns"]) + groupby_column_name=operator["windowed_group"][ + "column_name"], + sliding_window_value=operator["windowed_group"].get( + "sliding_window_value"), + window_interval_frequency=operator["windowed_group"][ + "window_interval_frequency"], + other_columns=operator["windowed_group"][ + "other_columns"]) else: - raise ParseError("Please enter valid operator value. Valid values are [filter, select, watermark, windowed_group]") + raise ParseError( + "Please enter valid operator value. Valid values are [filter, select, watermark, windowed_group]") log.info(operator_response) @@ -806,7 +834,9 @@ def add_operator(pipeline_id, operator, if operator == "windowed_group": return QuestAssisted._window_group_operator(url, groupby_column_name, sliding_window_value, window_interval_frequency, other_columns) - raise ParseError("Please add only one valid sink out of [filter, select, watermark, windowed_group]. Given value is {}".format(operator)) + raise ParseError( + "Please add only one valid sink out of [filter, select, watermark, windowed_group]. Given value is {}".format( + operator)) @staticmethod def _select_operator(url, column_names): @@ -1081,15 +1111,39 @@ def _sink_hive(url, table_name, databases="default", default_other_configuration return conn.put(url, data) @staticmethod - def _sink_snowflake(url, format, **kwargs): + def _sink_snowflake(url, warehouse, catalog_name, streaming_stage, database, table_name, name=None, + other_configurations=None): """ - :param url: API url with pipeline id - :param format: - :param kwargs: + :param url: + :param name: + :param warehouse: + :param catalog_name: + :param streaming_stage: + :param database: + :param table_name: + :param other_configurations: :return: """ - pass + conn = Qubole.agent() + if other_configurations is None: + other_configurations = {"sfSchema": "PUBLIC"} + data = {"data": { + "attributes": { + "fields": { + "name": name, + "catalog_name": catalog_name, + "warehouse": warehouse, + "streaming_stage": streaming_stage, + "database": database, + "table_name": table_name, + "other_configurations": other_configurations + }, + "data_store": "snowflake" + }, + "type": "sink" + }} + return conn.put(url, data) @staticmethod def _sink_google_storage(url, format, sink_path, partition_by, other_configurations):