Skip to content

Commit

Permalink
added fixes and removed edit pipeline common method
Browse files Browse the repository at this point in the history
  • Loading branch information
harshits committed Jan 7, 2020
1 parent 9dcb367 commit 60b18ab
Showing 1 changed file with 86 additions and 90 deletions.
176 changes: 86 additions & 90 deletions qds_sdk/quest.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,10 @@ def create(pipeline_name, create_type, **kwargs):
if pipeline_name is None:
raise ParseError("Enter pipeline name")
data = {"data": {
"attributes": {"name": pipeline_name, "status": "DRAFT", "create_type": create_type},
"type": "pipelines"}}
"attributes":
{"name": pipeline_name, "status": "DRAFT", "create_type": create_type},
"type": "pipelines"}
}
url = Quest.rest_entity_path + "?mode=wizard"
response = conn.post(url, data)
Quest.pipeline_id = Quest.get_pipline_id(response)
Expand Down Expand Up @@ -343,9 +345,9 @@ class QuestCode(Quest):

@staticmethod
def create_pipeline(pipeline_name, code_or_fileLoc, cluster_label, checkpoint_location,
language='scala',
language=None,
trigger_interval=None,
output_mode="Append",
output_mode=None,
can_retry=True,
channel_id=None):
"""
Expand Down Expand Up @@ -377,32 +379,12 @@ def create_pipeline(pipeline_name, code_or_fileLoc, cluster_label, checkpoint_lo
return pipeline_id

@staticmethod
def edit(pipeline_id, **kwargs):
"""
Method to Edit pipeline
:param pipeline_id: pipeline id
:return:
"""
checkpoint_location = kwargs.get("checkpoint_location")
cluster_label = kwargs.get("cluster_label")
code = kwargs.get("code_or_fileLoc")
language = kwargs.get("language")
trigger_interval = kwargs.get("trigger_interval")
output_mode = kwargs.get("output_mode")
can_retry = kwargs.get("can_retry")
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location,
trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry)
log.info(property)
save_response = QuestCode.save_code(pipeline_id, code, language=language)
return save_response

@staticmethod
def save_code(pipeline_id, code_or_fileLoc, language='scala'):
def save_code(pipeline_id, code_or_fileLoc, language=None):
"""
Method to save code
:param pipeline_id:
:param code_or_fileLoc:
:param language:
:param language:scala or python
:return:
"""
try:
Expand Down Expand Up @@ -431,7 +413,7 @@ class QuestJar(Quest):
@staticmethod
def create_pipeline(pipeline_name, jar_path, cluster_label, checkpoint_location, main_class_name,
channel_id=None,
output_mode="Append",
output_mode=None,
trigger_interval=None,
can_retry=True,
command_line_options=None,
Expand Down Expand Up @@ -467,41 +449,21 @@ def create_pipeline(pipeline_name, jar_path, cluster_label, checkpoint_location,
log.info(response)
return pipeline_id

@staticmethod
def edit(pipeline_id, **kwargs):
"""
Method to Edit pipeline
:param pipeline_id: pipeline id
:param kwargs: checkpoint_location, cluster_label, code_or_fileLoc
:return:
"""
checkpoint_location = kwargs.get("checkpoint_location")
cluster_label = kwargs.get("cluster_label")
code = kwargs.get("code_or_fileLoc")
language = kwargs.get("language")
trigger_interval = kwargs.get("trigger_interval")
output_mode = kwargs.get("output_mode")
can_retry = kwargs.get("can_retry")
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location,
trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry)
log.info(property)
save_response = QuestJar.save_code(pipeline_id, create_type=3, code_or_fileLoc=code, language=language)
return save_response

@staticmethod
def save_code(pipeline_id, jar_path, main_class_name, user_arguments=None):
"""
:param user_arguments:
:param pipeline_id:
:param jar_path:
:param main_class_name:
:param kwargs: user_arguments
:return:
"""
if jar_path is None:
raise ParseError("Unable to open script location or script location and code both are empty")

data = {"data": {
"attributes": {"create_type": 2, "user_arguments": kwargs.get("user_arguments"), "jar_path": jar_path,
"attributes": {"create_type": 2, "user_arguments": user_arguments, "jar_path": jar_path,
"language": main_class_name}}}
conn = Qubole.agent()
url = Quest.rest_entity_path + "/" + str(pipeline_id) + "/save_code"
Expand All @@ -520,10 +482,10 @@ def add_source(pipeline_id, schema, format, data_store, topics=None,
other_kinesis_settings=None,
broker=None,
topic_type=None,
use_registry="write",
use_registry=None,
registry_subject=None,
registry_id=None,
starting_offsets="latest",
starting_offsets=None,
other_kafka_consumer_settings=None,
source_path=None,
s3_other_settings=None,
Expand Down Expand Up @@ -583,7 +545,7 @@ def add_sink(pipeline_id, data_format, data_store,
other_s3_configurations=None,
other_gs_configurations=None,
table_name=None,
hive_database="default",
hive_database=None,
other_hive_configurations=None):
"""
Method to add sink for given pipeline.
Expand Down Expand Up @@ -612,16 +574,20 @@ def add_sink(pipeline_id, data_format, data_store,
if data_store == "snowflake":
return QuestAssisted._sink_snowflake(url, data_format)
if data_store == "google_storage":
QuestAssisted._sink_google_storage(url, data_format, sink_path, partition_by,
other_configurations=other_gs_configurations)
return QuestAssisted._sink_google_storage(url, data_format, sink_path, partition_by,
other_configurations=other_gs_configurations)
if data_store == "hive":
QuestAssisted._sink_hive(url, table_name, databases=hive_database,
default_other_configurations=other_hive_configurations)
return QuestAssisted._sink_hive(url, table_name, databases=hive_database,
default_other_configurations=other_hive_configurations)
raise ParseError("Please add only one valid sink out of [kafka, s3, snowflake, hive, google_storage]")

@staticmethod
def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format, source_data_store, sink_data_store, checkpoint_location,
cluster_label, output_mode, topics=None,
def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format, source_data_store, sink_data_store,
checkpoint_location,
cluster_label,
output_mode,
source_topics=None,
sink_topics=None,
trigger_interval=None,
can_retry=True,
command_line_options=None,
Expand All @@ -632,10 +598,10 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
starting_position=None,
other_kinesis_settings=None,
broker=None, topic_type=None,
use_registry="write",
use_registry=None,
registry_subject=None,
registry_id=None,
starting_offsets="latest",
starting_offsets=None,
other_kafka_consumer_settings=None,
source_path=None,
s3_other_settings=None,
Expand All @@ -651,13 +617,19 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
other_hive_configurations=None,
condition=None,
value=None,
column_name=None,
filter_column_name=None,
select_column_names=None,
frequency=None,
sliding_window_value_frequency=None,
window_interval_frequency=None,
other_columns=None):
"""
:param sink_topics:
:param source_topics:
:param sink_data_format:
:param source_data_format:
:param filter_column_name:
:param pipeline_name:
:param schema:
:param format:
Expand Down Expand Up @@ -697,7 +669,7 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
:param other_hive_configurations:
:param condition:
:param value:
:param column_name:
:param select_column_names: List of column names
:param frequency:
:param sliding_window_value_frequency:
:param window_interval_frequency:
Expand All @@ -706,11 +678,11 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
"""
response = Quest.create(pipeline_name, QuestAssisted.create_type)
log.info(response)
final_reponse = None
final_response = None
pipeline_id = Quest.get_pipline_id(response)
pipeline_id = str(pipeline_id)
src_response = QuestAssisted.add_source(pipeline_id, schema, source_data_format, source_data_store,
topics=topics,
topics=source_topics,
endpoint_url=endpoint_url,
stream_name=stream_name,
starting_position=starting_position,
Expand All @@ -728,7 +700,7 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
log.info(src_response)
sink_reponse = QuestAssisted.add_sink(pipeline_id, sink_data_format, sink_data_store,
kafka_bootstrap_server=kafka_bootstrap_server,
topic=topics,
topic=sink_topics,
other_kafka_settings=other_kafka_settings,
sink_path=sink_path,
partition_by=partition_by,
Expand All @@ -743,40 +715,41 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
can_retry=can_retry,
command_line_options=command_line_options)
log.info(property_response)
final_reponse = property_response
final_response = property_response
if operator:
operator_response = QuestAssisted.add_operator(pipeline_id, operator,
condition=condition,
value=value,
column_name=column_name,
frequency=frequency,
filter_column_name=filter_column_name,
select_column_names=select_column_names,
sliding_window_value_frequency=sliding_window_value_frequency,
window_interval_frequency=window_interval_frequency,
other_columns=other_columns)
log.info(operator_response)
final_reponse = operator_response
final_response = operator_response
if channel_id:
response = QuestAssisted.set_alert(pipeline_id, channel_id)
log.info(response)
final_reponse = response
return final_reponse
final_response = response
return final_response

@staticmethod
def add_operator(pipeline_id, operator,
condition=None,
value=None,
column_name=None,
frequency=None,
filter_column_name=None,
select_column_names=None,
sliding_window_value_frequency=None,
window_interval_frequency=None,
other_columns=None):
"""
:param select_column_names:
:param filter_column_name:
:param pipeline_id:
:param operator:
:param condition:
:param value:
:param column_name:
:param frequency:
:param sliding_window_value_frequency:
:param window_interval_frequency:
Expand All @@ -787,13 +760,13 @@ def add_operator(pipeline_id, operator,
if operator is None:
return
if operator == "filter":
return QuestAssisted._filter_operator(url, column_name, condition, value)
return QuestAssisted._filter_operator(url, filter_column_name, condition, value)
if operator == "select":
return QuestAssisted._select_operator(url, column_name)
return QuestAssisted._select_operator(url, select_column_names)
if operator == "watermark":
return QuestAssisted._watermark_operator(url, column_name, frequency)
return QuestAssisted._watermark_operator(url, filter_column_name, frequency)
if operator == "window_group":
return QuestAssisted._window_group_operator(url, column_name, sliding_window_value_frequency,
return QuestAssisted._window_group_operator(url, filter_column_name, sliding_window_value_frequency,
window_interval_frequency, other_columns)
raise ParseError("Please add only one valid sink out of [kafka, s3, snowflake, hive, google_storage]")

Expand Down Expand Up @@ -848,18 +821,22 @@ def _window_group_operator(url, column_name, sliding_window_value_frequency, win
"""
conn = Qubole.agent()
data = {"data": {"attributes": {"operator": "windowed_group",
"window_expression": {"column_name": column_name, "sliding_window_value": {
"frequency": sliding_window_value_frequency,
"unit": "minute"},
"window_interval": {
"frequency": window_interval_frequency,
"unit": "minute"}}, "other_columns": other_columns,
"window_expression":
{"column_name": column_name,
"sliding_window_value": {
"frequency": sliding_window_value_frequency,
"unit": "minute"},
"window_interval": {
"frequency": window_interval_frequency,
"unit": "minute"}},
"other_columns": other_columns,
"action": "count"}}}
return conn.put(url, data)

@staticmethod
def _source_kafka(url, schema, data_format, broker, topics, topic_type="multiple", use_registry="write",
registry_subject=None, registry_id=None, starting_offsets="latest",
def _source_kafka(url, schema, data_format, broker, topics, topic_type, starting_offsets, use_registry,
registry_subject=None,
registry_id=None,
other_kafka_consumer_settings=None):
"""
Expand All @@ -877,9 +854,13 @@ def _source_kafka(url, schema, data_format, broker, topics, topic_type="multiple
:return:
"""
conn = Qubole.agent()
if "other_kafka_consumer_settings" is None:
if other_kafka_consumer_settings is None:
other_kafka_consumer_settings = {"kafkaConsumer.pollTimeoutMs": 512, "fetchOffset.numRetries": 3,
"fetchOffset.retryIntervalMs": 10}

if use_registry is not "write" and registry_subject is None or registry_id is None:
raise ParseError("registry_id and registry_subject cannot be empty if use_registry is auto fetch")

data = {
"data": {
"attributes": {
Expand All @@ -906,10 +887,9 @@ def _source_kafka(url, schema, data_format, broker, topics, topic_type="multiple
return response

@staticmethod
def _source_kinesis(url, schema, data_format, endpoint_url, stream_name, starting_position="latest",
def _source_kinesis(url, schema, data_format, endpoint_url, stream_name, starting_position=None,
other_kinesis_settings=None):
"""
:param url:
:param schema:
:param format:
Expand All @@ -924,6 +904,7 @@ def _source_kinesis(url, schema, data_format, endpoint_url, stream_name, startin
other_kinesis_settings = {"kinesis.executor.maxFetchTimeInMs": 1000,
"kinesis.executor.maxFetchRecordsPerShard": 100000,
"kinesis.executor.maxRecordPerRead": 10000}

data = {
"data": {
"attributes": {
Expand Down Expand Up @@ -1035,7 +1016,8 @@ def _sink_s3(url, data_format, path, partition_by, other_configurations=None):
partition_by = ""
data = {"data": {"attributes": {
"fields": {"path": path, "partition_by": partition_by,
"other_configurations": other_configurations, "format": data_format,}, "data_store": "s3"}, "type": "sink"}}
"other_configurations": other_configurations, "format": data_format, }, "data_store": "s3"},
"type": "sink"}}
return conn.put(url, data)

@staticmethod
Expand Down Expand Up @@ -1120,3 +1102,17 @@ def add_registry(registry_name, host,
"gateway_port": gateway_port, "gateway_username": gateway_username,
"gateway_private_key": gateway_private_key}, "type": "schemas"}}
conn.post(url, data)

@staticmethod
def switch_from_assisted(pipeline_id):
conn = Qubole.agent()
url = QuestAssisted.rest_entity_path + '/' + pipeline_id
response = conn.get(url)
create_type = response.get("data").get("attributes").get("create_type")
if create_type == 1:
url = QuestAssisted.rest_entity_path + '/' + pipeline_id + '/switch'
data = {"data": {"attributes": {"create_type": 3}}}
response = conn.get(url, data)
return response
else:
log.info("Pipeline is already in non-Assisted mode, create_type = {}".format(create_type))

0 comments on commit 60b18ab

Please sign in to comment.