From 20d9d7cb90b585475ff5b5866a9f6b2f774f753a Mon Sep 17 00:00:00 2001 From: harshits Date: Thu, 2 Jan 2020 14:49:59 +0530 Subject: [PATCH] adding changes in request format of sink_s3 --- qds_sdk/quest.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/qds_sdk/quest.py b/qds_sdk/quest.py index 4bfba693..6b9f109a 100644 --- a/qds_sdk/quest.py +++ b/qds_sdk/quest.py @@ -563,7 +563,7 @@ def add_source(pipeline_id, schema, format, data_store, raise ParseError("Please add only one valid source out of [kafka, s3, kinesis]") @staticmethod - def add_sink(pipeline_id, format, data_store, + def add_sink(pipeline_id, data_format, data_store, kafka_bootstrap_server=None, topic=None, other_kafka_settings=None, @@ -593,15 +593,15 @@ def add_sink(pipeline_id, format, data_store, """ url = Quest.rest_entity_path + "/" + pipeline_id + "/node" if data_store == "kafka": - return QuestAssisted._sink_kafka(url, format, kafka_bootstrap_server, topic, + return QuestAssisted._sink_kafka(url, data_format, kafka_bootstrap_server, topic, other_kafka_settings=other_kafka_settings) if data_store == "s3": - return QuestAssisted._sink_s3(url, format, sink_path, partition_by, + return QuestAssisted._sink_s3(url, data_format, sink_path, partition_by, other_configurations=other_s3_configurations) if data_store == "snowflake": - return QuestAssisted._sink_snowflake(url, format) + return QuestAssisted._sink_snowflake(url, data_format) if data_store == "google_storage": - QuestAssisted._sink_google_storage(url, format, sink_path, partition_by, + QuestAssisted._sink_google_storage(url, data_format, sink_path, partition_by, other_configurations=other_gs_configurations) if data_store == "hive": QuestAssisted._sink_hive(url, table_name, databases=hive_database, @@ -1003,7 +1003,7 @@ def _sink_kafka(url, data_format, kafka_bootstrap_server, topic, other_kafka_set return conn.put(url, data) @staticmethod - def _sink_s3(url, data_format, path, partition, other_configurations=None): + def _sink_s3(url, data_format, path, partition_by, other_configurations=None): """ :param url: @@ -1014,10 +1014,18 @@ def _sink_s3(url, data_format, path, partition, other_configurations=None): :return: """ conn = Qubole.agent() - data = {"data": {"attributes": { - "fields": {"path": path, "partition_by": partition, - "other_configurations": other_configurations, "format": data_format}, - "data_store": "s3"}, "type": "sink"}} + if other_configurations is None: + other_configurations = {} + if partition_by is None: + partition_by = "" + data = {"data": + {"attributes": { + "fields": {"path": path, "partition_by": partition_by, + "other_configurations": other_configurations}, + "format": data_format, + "data_store": "s3"}, + "type": "sink"} + } return conn.put(url, data) @staticmethod