Skip to content

Commit

Permalink
adding changes
Browse files Browse the repository at this point in the history
  • Loading branch information
harshits committed Dec 24, 2019
1 parent 7863503 commit 1b43fd9
Showing 1 changed file with 243 additions and 20 deletions.
263 changes: 243 additions & 20 deletions qds_sdk/quest.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ def start(pipeline_id):
return response

@staticmethod
def add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=None, output_mode=None, can_retry=True):
def add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=None, output_mode=None,
can_retry=True):
conn = Qubole.agent()
data = {"data": {"attributes": {
"cluster_label": cluster_label,
Expand Down Expand Up @@ -225,7 +226,7 @@ def save_code(pipeline_id, create_type, code_or_fileLoc=None, jar_file_loc=None,
else:
code = code_or_fileLoc
elif jar_file_loc:
code=jar_file_loc
code = jar_file_loc

except IOError as e:
raise ParseError("Unable to open script location or script location and code both are empty")
Expand All @@ -236,7 +237,6 @@ def save_code(pipeline_id, create_type, code_or_fileLoc=None, jar_file_loc=None,
response = conn.put(url, data)
return response


def health(self):
pass

Expand All @@ -256,7 +256,7 @@ def pause(pipeline_id):

@staticmethod
def edit(pipeline_id, **kwargs):

pass

@staticmethod
def archive(pipeline_id):
Expand Down Expand Up @@ -286,13 +286,14 @@ def delete(pipeline_id):
def edit_pipeline_name(pipeline_id, pipeline_name):
conn = Qubole.agent()
url = Quest.rest_entity_path + "/" + pipeline_id
data = {"data":{"attributes":{"name":pipeline_name},"type":"pipelines"}}
data = {"data": {"attributes": {"name": pipeline_name}, "type": "pipelines"}}
return conn.put(url, data)


class QuestCode(Quest):
@staticmethod
def create_pipeline(pipeline_name, create_type, code_or_fileLoc, cluster_label, checkpoint_location, language='scala', **kwargs):
def create_pipeline(pipeline_name, create_type, code_or_fileLoc, cluster_label, checkpoint_location,
language='scala', **kwargs):
"""
Method to create pipeline in BYOC mode in one go.
:param pipeline_name: Name by which pipeline will be created.
Expand All @@ -305,7 +306,9 @@ def create_pipeline(pipeline_name, create_type, code_or_fileLoc, cluster_label,
response = Quest.create(pipeline_name, create_type)
log.info(response)
pipeline_id = Quest.get_pipline_id(response)
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=kwargs.get("trigger_interval"), output_mode=kwargs.get("output_mode"), can_retry=kwargs.get("can_retry"))
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location,
trigger_interval=kwargs.get("trigger_interval"),
output_mode=kwargs.get("output_mode"), can_retry=kwargs.get("can_retry"))
log.info(property)
save_response = Quest.save_code(pipeline_id, create_type=3, code_or_fileLoc=code_or_fileLoc, language=language)
pipeline_id = Quest.get_pipline_id(save_response)
Expand All @@ -322,18 +325,20 @@ def edit(pipeline_id, **kwargs):
cluster_label = kwargs.get("cluster_label")
code = kwargs.get("code_or_fileLoc")
language = kwargs.get("language")
trigger_interval=kwargs.get("trigger_interval")
output_mode=kwargs.get("output_mode")
can_retry=kwargs.get("can_retry")
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry)
trigger_interval = kwargs.get("trigger_interval")
output_mode = kwargs.get("output_mode")
can_retry = kwargs.get("can_retry")
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location,
trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry)
log.info(property)
save_response = Quest.save_code(pipeline_id, create_type=3, code_or_fileLoc=code, language=language)
return save_response


class QuestJar(Quest):
@staticmethod
def create_pipeline(pipeline_name, create_type, jar_file, cluster_label, checkpoint_location, language='scala', **kwargs):
def create_pipeline(pipeline_name, create_type, jar_file, cluster_label, checkpoint_location, language='scala',
**kwargs):
"""
Method to create pipeline in BYOC mode in one go.
:param pipeline_name: Name by which pipeline will be created.
Expand Down Expand Up @@ -363,19 +368,237 @@ def edit(pipeline_id, **kwargs):
cluster_label = kwargs.get("cluster_label")
code = kwargs.get("code_or_fileLoc")
language = kwargs.get("language")
trigger_interval=kwargs.get("trigger_interval")
output_mode=kwargs.get("output_mode")
can_retry=kwargs.get("can_retry")
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry)
trigger_interval = kwargs.get("trigger_interval")
output_mode = kwargs.get("output_mode")
can_retry = kwargs.get("can_retry")
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location,
trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry)
log.info(property)
save_response = Quest.save_code(pipeline_id, create_type=3, code_or_fileLoc=code, language=language)
return save_response



class QuestAssisted(Quest):
def kafka_source(self, **kwargs):
pass
@staticmethod
def add_source(pipeline_id, source_path, schema, format, data_store, **kwargs):
"""
Method to add source in assisted mode pipeline.
:param pipeline_id: id of pipeline for which source need to be added/updated.
:param source_path: location of data
:param schema: key value pair eg: {"id":"Integer"}
:param format: JSON/Avro/Parquet/ORC.
:param other_settings: key value pairs eg: {"fileNameOnly": "false", "latestFirst": "false"}
:return: response in dict format
"""
url = Quest.rest_entity_path + "/" + pipeline_id + "/node"
sources = {"kafka": QuestAssisted._source_kafka, "kinesis": QuestAssisted._source_kinesis,
"s3": QuestAssisted._source_s3, "google_storage": QuestAssisted._source_google_storage}
if data_store in sources.keys():
return sources[data_store](url, pipeline_id, source_path, schema, format, data_store, **kwargs)
raise ParseError("Please add only one valid source out of [kafka, s3, kinesis]")

@staticmethod
def add_sink(pipeline_id, source_path, schema, format, data_store, **kwargs):
"""
Method to add sink for given pipeline.
:param pipeline_id:
:param source_path:
:param schema:
:param format:
:param data_store:
:param kwargs:
:return:
"""
url = Quest.rest_entity_path + "/" + pipeline_id + "/node"
sources = {"kafka": QuestAssisted._sink_kafka, "snowflake": QuestAssisted._sink_snowflake,
"hive": QuestAssisted._sink_hive,
"s3": QuestAssisted._sink_s3, "google_storage": QuestAssisted._sink_google_storage}
if data_store in sources.keys():
return sources[data_store](url, pipeline_id, source_path, schema, format, **kwargs)
raise ParseError("Please add only one valid sink out of [kafka, s3, snowflake, hive, google_storage]")

@staticmethod
def add_operator(operator, pipeline_id, column_name, **kwargs):
url = Quest.rest_entity_path + "/" + pipeline_id + "/operator"
if operator == 'filter':
return QuestAssisted._filter_operator(url, column_name, kwargs.get("condition"), kwargs.get("value"))
if operator == "select":
return QuestAssisted._select_operator(url, column_name)
if operator == "watermark":
return QuestAssisted._watermark_operator(url, column_name, kwargs.get("frequency"))
if operator == "window_group":
return QuestAssisted._window_group_operator(url, column_name, kwargs.get("sliding_window_value_frequency"),
kwargs.get("window_interval_frequency"),
kwargs.get("other_columns"))
raise ParseError("Please add only one valid sink out of [kafka, s3, snowflake, hive, google_storage]")

@staticmethod
def _select_operator(url, column_names):
conn = Qubole.agent()
data = {"data": {"attributes": {"operator": "select", "column_names": column_names}}}
return conn.put(url, data)

@staticmethod
def _filter_operator(url, column_name, condition, value):
conn = Qubole.agent()
data = {"data": {"attributes": {"operator": "filter", "column_name": column_name,
"condition": condition, "value": value}}}
return conn.put(url, data)

@staticmethod
def _watermark_operator(url, column_name, frequency):
conn = Qubole.agent()
data = {"data": {"attributes": {"operator": "watermark", "column_name": column_name,
"frequency": frequency, "unit": "minute"}}}
return conn.put(url, data)

@staticmethod
def _window_group_operator(url, column_name, sliding_window_value_frequency, window_interval_frequency,
other_columns):
conn = Qubole.agent()
data = {"data": {"attributes": {"operator": "windowed_group",
"window_expression": {"column_name": column_name, "sliding_window_value": {
"frequency": sliding_window_value_frequency,
"unit": "minute"},
"window_interval": {
"frequency": window_interval_frequency,
"unit": "minute"}}, "other_columns": other_columns,
"action": "count"}}}
return conn.put(url, data)

@staticmethod
def _source_kafka(url, schema, format, **kwargs):
conn = Qubole.agent()
default_kafka = {"kafkaConsumer.pollTimeoutMs": 512, "fetchOffset.numRetries": 3,
"fetchOffset.retryIntervalMs": 10}
data = {
"data": {
"attributes": {
"fields": {
"name": kwargs.get("name", "source_kafka"),
"brokers": kwargs.get("broker"),
"topics": kwargs.get("topics"),
"topic_type": kwargs.get("topic_type", "multiple"),
"schema": schema,
"use_registry": kwargs.get("use_registry", "write"),
"registry_subject": kwargs.get("registry_subject", None),
"registry_id": kwargs.get("registry_id", None),
"starting_offsets": kwargs.get("starting_offsets", "latest"),
"format": format,
"other_kafka_consumer_settings": kwargs.get("other_kafka_consumer_settings", default_kafka)
},
"data_store": "kafka"
},
"type": "source"
}
}
return conn.put(url, data)

def kinesis_source(self, **kwargs):
@staticmethod
def _source_kinesis(url, schema, format, **kwargs):
conn = Qubole.agent()
other_kinesis_settings = {"kinesis.executor.maxFetchTimeInMs": 1000,
"kinesis.executor.maxFetchRecordsPerShard": 100000,
"kinesis.executor.maxRecordPerRead": 10000}
data = {
"data": {
"attributes": {
"fields": {
"name": kwargs.get("name", "source_kinesis"),
"endpoint_url": kwargs.get("endpoint_url"),
"stream_name": kwargs.get("stream_name"),
"schema": schema,
"starting_position": kwargs.get("starting_position", "latest"),
"format": format,
"other_kinesis_settings": kwargs.get("other_kinesis_settings", other_kinesis_settings)
},
"data_store": "kinesis"
},
"type": "source"
}
}
return conn.put(url, data)

@staticmethod
def _source_s3(url, schema, format, **kwargs):
conn = Qubole.agent()
other_settings = {"fileNameOnly": "false", "latestFirst": "false"}
data = {
"data": {
"attributes": {
"fields": {
"name": kwargs.get("name", "source_s3"),
"path": kwargs.get("path"),
"schema": schema,
"format": format,
"other_settings": kwargs.get("other_settings", other_settings)
},
"data_store": "s3"
},
"type": "source"
}
}
return conn.put(url, data)

@staticmethod
def _source_google_storage(url, schema, format, **kwargs):
conn = Qubole.agent()
other_settings = {"fileNameOnly": "false", "latestFirst": "false"}
data = {"data":
{"attributes":
{"fields":
{"path": kwargs.get("path"),
"format": format,
"schema": schema,
"other_settings": other_settings
},
"data_store": "googleStorage"},
"type": "source"
}
}
return conn.put(url, data)

@staticmethod
def _sink_kafka(url, format, **kwargs):
conn = Qubole.agent()
default = {"kafka.max.block.ms": 60000}
data = {"data": {"attributes": {
"fields": {"kafka_bootstrap_server": kwargs.get("kafka_bootstrap_server"), "topic": kwargs.get("topic"),
"format": format, "other_kafka_settings": kwargs.get("other_kafka_settings", default)},
"data_store": "kafka"},
"type": "sink"}}
return conn.put(url, data)

@staticmethod
def _sink_s3(url, format, **kwargs):
conn = Qubole.agent()
data = {"data": {"attributes": {
"fields": {"path": kwargs.get("path"), "partition_by": kwargs.get("partition_by"),
"other_configurations": kwargs.get("other_configurations"), "format": format},
"data_store": "s3"}, "type": "sink"}}
return conn.put(url, data)

@staticmethod
def _sink_hive(url, format, **kwargs):
conn = Qubole.agent()
default_other_configurations = {"table.metastore.stopOnFailure": "false",
"table.metastore.updateIntervalSeconds": 10}
data = {"data": {"attributes": {
"fields": {"database": kwargs.get("databases", "default"), "table_name": kwargs.get("table_name"),
"other_configurations": kwargs.get("other_configuration", default_other_configurations)},
"data_store": "hive"},
"type": "sink"}}
return conn.put(url, data)

@staticmethod
def _sink_snowflake(url, format, **kwargs):
pass

@staticmethod
def _sink_google_storage(url, format, **kwargs):
conn = Qubole.agent()
data = {"data": {"attributes": {
"fields": {"path": kwargs.get("path"), "partition_by": kwargs.get("partition_by"),
"other_configurations": kwargs.get("other_configurations"), "format": format},
"data_store": "googleStorage"}, "type": "sink"}}
return conn.put(url, data)

0 comments on commit 1b43fd9

Please sign in to comment.