Skip to content

Commit

Permalink
Adding changes
Browse files Browse the repository at this point in the history
  • Loading branch information
harshits committed Jan 17, 2020
1 parent 58da3b4 commit c7b1777
Showing 1 changed file with 92 additions and 38 deletions.
130 changes: 92 additions & 38 deletions qds_sdk/quest.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,8 @@ def add_source(pipeline_id, schema, format, data_store, topic_type, topics=None,
raise ParseError("Please add only one valid source out of [kafka, s3, kinesis]")

@staticmethod
def add_sink(pipeline_id, data_store,
def add_sink(pipeline_id, data_store, warehouse=None, catalog_name=None, streaming_stage=None,
snowflake_database=None, snowflake_table_name=None,
data_format=None,
kafka_bootstrap_server=None,
topic=None,
Expand All @@ -567,12 +568,18 @@ def add_sink(pipeline_id, data_store,
other_gs_configurations=None,
table_name=None,
hive_database=None,
other_hive_configurations=None):
other_hive_configurations=None,
snowflake_name=None, snowflake_other_configurations=None):
"""
Method to add sink for given pipeline.
:param pipeline_id:
:param format:
:param data_store:
:param warehouse:
:param catalog_name:
:param streaming_stage:
:param snowflake_database:
:param snowflake_table_name:
:param data_format:
:param kafka_bootstrap_server:
:param topic:
:param other_kafka_settings:
Expand All @@ -583,6 +590,8 @@ def add_sink(pipeline_id, data_store,
:param table_name:
:param hive_database:
:param other_hive_configurations:
:param snowflake_name:
:param snowflake_other_configurations:
:return:
"""
url = Quest.rest_entity_path + "/" + pipeline_id + "/node"
Expand All @@ -593,7 +602,9 @@ def add_sink(pipeline_id, data_store,
return QuestAssisted._sink_s3(url, data_format, sink_path, partition_by,
other_configurations=other_s3_configurations)
if data_store == "snowflake":
return QuestAssisted._sink_snowflake(url, data_format)
return QuestAssisted._sink_snowflake(url, warehouse, catalog_name, streaming_stage, snowflake_database,
snowflake_table_name, name=snowflake_name,
other_configurations=snowflake_other_configurations)
if data_store == "google_storage":
return QuestAssisted._sink_google_storage(url, data_format, sink_path, partition_by,
other_configurations=other_gs_configurations)
Expand Down Expand Up @@ -637,34 +648,38 @@ def create_pipeline(pipeline_name, schema, source_data_format, source_data_store
other_gs_configurations=None,
table_name=None,
hive_database=None,
other_hive_configurations=None):
other_hive_configurations=None,
warehouse=None,
catalog_name=None,
streaming_stage=None,
snowflake_database=None,
snowflake_table_name=None,
snowflake_name=None,
snowflake_other_configurations=None):
"""
:param watermark_frequency:
:param sink_topics:
:param source_topics:
:param sink_data_format:
:param source_data_format:
:param filter_column_name:
:param pipeline_name:
:param schema:
:param format:
:param source_data_format:
:param source_data_store:
:param sink_data_store:
:param checkpoint_location:
:param cluster_label:
:param output_mode:
:param sink_data_format:
:param source_topics:
:param sink_topics:
:param trigger_interval:
:param can_retry:
:param topic_type:
:param command_line_options:
:param operators: dict of operators
:param operators:
:param channel_id:
:param endpoint_url:
:param stream_name:
:param starting_position:
:param other_kinesis_settings:
:param broker:
:param topic_type:
:param use_registry:
:param registry_subject:
:param registry_id:
Expand All @@ -674,7 +689,6 @@ def create_pipeline(pipeline_name, schema, source_data_format, source_data_store
:param s3_other_settings:
:param gs_other_settings:
:param kafka_bootstrap_server:
:param topic:
:param other_kafka_settings:
:param sink_path:
:param partition_by:
Expand All @@ -683,13 +697,13 @@ def create_pipeline(pipeline_name, schema, source_data_format, source_data_store
:param table_name:
:param hive_database:
:param other_hive_configurations:
:param condition:
:param value:
:param select_column_names: List of column names
:param frequency:
:param sliding_window_value_frequency:
:param window_interval_frequency:
:param other_columns:
:param warehouse:
:param catalog_name:
:param streaming_stage:
:param snowflake_database:
:param snowflake_table_name:
:param snowflake_name:
:param snowflake_other_configurations:
:return:
"""
response = Quest.create(pipeline_name, QuestAssisted.create_type)
Expand Down Expand Up @@ -724,7 +738,13 @@ def create_pipeline(pipeline_name, schema, source_data_format, source_data_store
other_gs_configurations=other_gs_configurations,
table_name=table_name,
hive_database=hive_database,
other_hive_configurations=other_hive_configurations)
other_hive_configurations=other_hive_configurations,
warehouse=warehouse, catalog_name=catalog_name,
streaming_stage=streaming_stage,
snowflake_database=snowflake_database,
snowflake_table_name=snowflake_table_name,
snowflake_name=snowflake_name,
snowflake_other_configurations=snowflake_other_configurations)
log.info(sink_reponse)
property_response = QuestAssisted.add_property(pipeline_id, cluster_label, checkpoint_location, output_mode,
trigger_interval=trigger_interval,
Expand All @@ -741,20 +761,28 @@ def create_pipeline(pipeline_name, schema, source_data_format, source_data_store
filter_column_name=operator["filter"]["column_name"])
elif operator.items()[0][0] is "select":
operator_response = QuestAssisted.add_operator(pipeline_id, operator="select",
select_column_names=operator["select"]["column_names"])
select_column_names=operator["select"][
"column_names"])
elif operator.items()[0][0] is "watermark":
operator_response = QuestAssisted.add_operator(pipeline_id, operator="watermark",
watermark_column_name=operator["watermark"]["column_name"],
watermark_frequency=operator["watermark"]["frequency"])
watermark_column_name=operator["watermark"][
"column_name"],
watermark_frequency=operator["watermark"][
"frequency"])

elif operator.items()[0][0] is "windowed_group":
operator_response = QuestAssisted.add_operator(pipeline_id, operator="windowed_group",
groupby_column_name=operator["windowed_group"]["column_name"],
sliding_window_value=operator["windowed_group"].get("sliding_window_value"),
window_interval_frequency=operator["windowed_group"]["window_interval_frequency"],
other_columns=operator["windowed_group"]["other_columns"])
groupby_column_name=operator["windowed_group"][
"column_name"],
sliding_window_value=operator["windowed_group"].get(
"sliding_window_value"),
window_interval_frequency=operator["windowed_group"][
"window_interval_frequency"],
other_columns=operator["windowed_group"][
"other_columns"])
else:
raise ParseError("Please enter valid operator value. Valid values are [filter, select, watermark, windowed_group]")
raise ParseError(
"Please enter valid operator value. Valid values are [filter, select, watermark, windowed_group]")

log.info(operator_response)

Expand Down Expand Up @@ -806,7 +834,9 @@ def add_operator(pipeline_id, operator,
if operator == "windowed_group":
return QuestAssisted._window_group_operator(url, groupby_column_name, sliding_window_value,
window_interval_frequency, other_columns)
raise ParseError("Please add only one valid sink out of [filter, select, watermark, windowed_group]. Given value is {}".format(operator))
raise ParseError(
"Please add only one valid sink out of [filter, select, watermark, windowed_group]. Given value is {}".format(
operator))

@staticmethod
def _select_operator(url, column_names):
Expand Down Expand Up @@ -1081,15 +1111,39 @@ def _sink_hive(url, table_name, databases="default", default_other_configuration
return conn.put(url, data)

@staticmethod
def _sink_snowflake(url, format, **kwargs):
def _sink_snowflake(url, warehouse, catalog_name, streaming_stage, database, table_name, name=None,
other_configurations=None):
"""
:param url: API url with pipeline id
:param format:
:param kwargs:
:param url:
:param name:
:param warehouse:
:param catalog_name:
:param streaming_stage:
:param database:
:param table_name:
:param other_configurations:
:return:
"""
pass
conn = Qubole.agent()
if other_configurations is None:
other_configurations = {"sfSchema": "PUBLIC"}
data = {"data": {
"attributes": {
"fields": {
"name": name,
"catalog_name": catalog_name,
"warehouse": warehouse,
"streaming_stage": streaming_stage,
"database": database,
"table_name": table_name,
"other_configurations": other_configurations
},
"data_store": "snowflake"
},
"type": "sink"
}}
return conn.put(url, data)

@staticmethod
def _sink_google_storage(url, format, sink_path, partition_by, other_configurations):
Expand Down

0 comments on commit c7b1777

Please sign in to comment.