diff --git a/qds_sdk/quest.py b/qds_sdk/quest.py
index c1ce32db..4bfba693 100644
--- a/qds_sdk/quest.py
+++ b/qds_sdk/quest.py
@@ -609,7 +609,7 @@ def add_sink(pipeline_id, format, data_store,
         raise ParseError("Please add only one valid sink out of [kafka, s3, snowflake, hive, google_storage]")
 
     @staticmethod
-    def create_pipeline(pipeline_name, schema, format, source_data_store, sink_data_store, checkpoint_location,
+    def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format, source_data_store, sink_data_store, checkpoint_location,
                         cluster_label, output_mode,
                         trigger_interval=None,
                         can_retry=True,
@@ -698,7 +698,7 @@ def create_pipeline(pipeline_name, schema, format, source_data_store, sink_data_
         log.info(response)
         pipeline_id = Quest.get_pipline_id(response)
         pipeline_id = str(pipeline_id)
-        src_response = QuestAssisted.add_source(pipeline_id, schema, format, source_data_store,
+        src_response = QuestAssisted.add_source(pipeline_id, schema, source_data_format, source_data_store,
                                                 endpoint_url=endpoint_url,
                                                 stream_name=stream_name,
                                                 starting_position=starting_position,
@@ -714,7 +714,7 @@ def create_pipeline(pipeline_name, schema, format, source_data_store, sink_data_
                                                 s3_other_settings=s3_other_settings,
                                                 gs_other_settings=gs_other_settings)
         log.info(src_response)
-        sink_reponse = QuestAssisted.add_sink(pipeline_id, format, sink_data_store,
+        sink_reponse = QuestAssisted.add_sink(pipeline_id, sink_data_format, sink_data_store,
                                               kafka_bootstrap_server=kafka_bootstrap_server,
                                               topic=topic,
                                               other_kafka_settings=other_kafka_settings,
@@ -843,14 +843,14 @@ def _window_group_operator(url, column_name, sliding_window_value_frequency, win
         return conn.put(url, data)
 
     @staticmethod
-    def _source_kafka(url, schema, format, broker, topics, topic_type="multiple", use_registry="write",
+    def _source_kafka(url, schema, data_format, broker, topics, topic_type="multiple", use_registry="write",
                       registry_subject=None, registry_id=None, starting_offsets="latest",
                       other_kafka_consumer_settings=None):
         """
 
         :param url:
         :param schema:
-        :param format:
+        :param data_format:
         :param broker:
         :param topics:
         :param topic_type:
@@ -877,7 +877,7 @@ def _source_kafka(url, schema, format, broker, topics, topic_type="multiple", us
                         "registry_subject": registry_subject,
                         "registry_id": registry_id,
                         "starting_offsets": starting_offsets,
-                        "format": format,
+                        "format": data_format,
                         "other_kafka_consumer_settings": other_kafka_consumer_settings
                     },
                     "data_store": "kafka"
@@ -891,7 +891,7 @@ def _source_kafka(url, schema, format, broker, topics, topic_type="multiple", us
         return response
 
     @staticmethod
-    def _source_kinesis(url, schema, format, endpoint_url, stream_name, starting_position="latest",
+    def _source_kinesis(url, schema, data_format, endpoint_url, stream_name, starting_position="latest",
                         other_kinesis_settings=None):
         """
 
@@ -917,7 +917,7 @@ def _source_kinesis(url, schema, format, endpoint_url, stream_name, starting_pos
                         "stream_name": stream_name,
                         "schema": schema,
                         "starting_position": starting_position,
-                        "format": format,
+                        "format": data_format,
                         "other_kinesis_settings": other_kinesis_settings
                     },
                     "data_store": "kinesis"
@@ -928,11 +928,11 @@ def _source_kinesis(url, schema, format, endpoint_url, stream_name, starting_pos
         return conn.put(url, data)
 
     @staticmethod
-    def _source_s3(url, schema, format, path, other_settings=None):
+    def _source_s3(url, schema, data_format, path, other_settings=None):
         """
         :param url: API url with pipeline id
         :param schema:
-        :param format:
+        :param data_format:
         :param other_settings: {"fileNameOnly": "false", "latestFirst": "false"}
         :return:
         """
@@ -945,7 +945,7 @@ def _source_s3(url, schema, format, path, other_settings=None):
                     "fields": {
                         "path": path,
                         "schema": schema,
-                        "format": format,
+                        "format": data_format,
                         "other_settings": other_settings
                     },
                     "data_store": "s3"
@@ -956,11 +956,11 @@ def _source_s3(url, schema, format, path, other_settings=None):
         return conn.put(url, data)
 
     @staticmethod
-    def _source_google_storage(url, schema, format, source_path, other_settings=None):
+    def _source_google_storage(url, schema, data_format, source_path, other_settings=None):
         """
         :param url: API url with pipeline id
         :param schema:
-        :param format:
+        :param data_format:
         :param other_settings:
         :return:
         """
@@ -971,7 +971,7 @@ def _source_google_storage(url, schema, format, source_path, other_settings=None
                     {"attributes":
                          {"fields":
                               {"path": source_path,
-                               "format": format,
+                               "format": data_format,
                                "schema": schema,
                                "other_settings": other_settings
                                },
@@ -982,11 +982,11 @@ def _source_google_storage(url, schema, format, source_path, other_settings=None
         return conn.put(url, data)
 
     @staticmethod
-    def _sink_kafka(url, format, kafka_bootstrap_server, topic, other_kafka_settings=None):
+    def _sink_kafka(url, data_format, kafka_bootstrap_server, topic, other_kafka_settings=None):
         """
 
         :param url:
-        :param format:
+        :param data_format:
         :param kafka_bootstrap_server:
         :param topic:
         :param other_kafka_settings:
@@ -997,17 +997,17 @@ def _sink_kafka(url, format, kafka_bootstrap_server, topic, other_kafka_settings
             other_kafka_settings = {"kafka.max.block.ms": 60000}
         data = {"data": {"attributes": {
             "fields": {"kafka_bootstrap_server": kafka_bootstrap_server, "topic": topic,
-                       "format": format, "other_kafka_settings": other_kafka_settings},
+                       "format": data_format, "other_kafka_settings": other_kafka_settings},
             "data_store": "kafka"},
             "type": "sink"}}
         return conn.put(url, data)
 
     @staticmethod
-    def _sink_s3(url, format, path, partition, other_configurations=None):
+    def _sink_s3(url, data_format, path, partition, other_configurations=None):
         """
 
         :param url:
-        :param format:
+        :param data_format:
         :param path:
         :param partition:
         :param other_configurations:
@@ -1016,8 +1016,8 @@ def _sink_s3(url, format, path, partition, other_configurations=None):
         conn = Qubole.agent()
         data = {"data": {"attributes": {
             "fields": {"path": path, "partition_by": partition,
-                       "other_configurations": other_configurations, "format": format},
-            "data_store": "s3"}, "type": "sink"}}
+                       "other_configurations": other_configurations, "format": data_format},
+                        "data_store": "s3"}, "type": "sink"}}
         return conn.put(url, data)
 
     @staticmethod
@@ -1072,9 +1072,15 @@ def _sink_google_storage(url, format, sink_path, partition_by, other_configurati
         return conn.put(url, data)
 
     @staticmethod
-    def add_registry(registry_name, host, port=8081, registry_type='schema', use_gateway=False, gateway_ip=None,
+    def add_registry(registry_name, host,
+                     port=8081,
+                     registry_type='schema',
+                     use_gateway=False,
+                     gateway_ip=None,
                      gateway_port=None,
-                     gateway_username=None, gateway_private_key=None, **kwargs):
+                     gateway_username=None,
+                     gateway_private_key=None,
+                     **kwargs):
         """
         :param registry_name: Name of Registry
         :param registry_type: