Skip to content

Commit

Permalink
added fixes and removed edit pipeline common method
Browse files Browse the repository at this point in the history
  • Loading branch information
harshits committed Jan 7, 2020
1 parent 60b18ab commit 4dfbd67
Showing 1 changed file with 48 additions and 26 deletions.
74 changes: 48 additions & 26 deletions qds_sdk/quest.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ class Quest(Resource):

rest_entity_path = "pipelines"
pipeline_id = None

pipeline_name = None
pipeline_code = None
jar_path = None
@staticmethod
def get_pipline_id(response):
return str(response.get('data').get('id'))
Expand Down Expand Up @@ -184,6 +186,7 @@ def create(pipeline_name, create_type, **kwargs):
url = Quest.rest_entity_path + "?mode=wizard"
response = conn.post(url, data)
Quest.pipeline_id = Quest.get_pipline_id(response)
Quest.pipeline_name = pipeline_name
return response

@staticmethod
Expand All @@ -196,7 +199,7 @@ def start(pipeline_id):
conn = Qubole.agent()
url = Quest.rest_entity_path + "/" + pipeline_id + "/start"
response = conn.put(url)
pipeline_status = response.get('data').get('pipeline_instance_status')
pipeline_status = Quest.get_status(pipeline_id)
while pipeline_status == 'waiting':
log.info("Pipeline is in waiting state....")
time.sleep(10)
Expand Down Expand Up @@ -238,8 +241,18 @@ def add_property(pipeline_id, cluster_label, checkpoint_location, output_mode, t
response = conn.put(url, data)
return response

def health(self):
pass
@staticmethod
def get_health(pipeline_id):
"""
Get Pipeline Health
:param pipeline_id:
:return:
"""
conn = Qubole.agent()
url = Quest.rest_entity_path + "/" + pipeline_id
response = conn.get(url)
log.info(response)
return response.get("data").get("attributes").get("health")

@staticmethod
def clone(pipeline_id):
Expand All @@ -265,9 +278,6 @@ def pause(pipeline_id):
conn = Qubole.agent()
return conn.put(url)

@staticmethod
def edit(pipeline_id, **kwargs):
pass

@staticmethod
def archive(pipeline_id):
Expand All @@ -281,13 +291,18 @@ def archive(pipeline_id):
conn = Qubole.agent()
return conn.put(url)

# @staticmethod
# def status(pipeline_id):
# conn = Qubole.agent()
# url = Quest.rest_entity_path + "/" + pipeline_id + "/status"
# response = conn.put(url)
# log.info(response)
# return response
@staticmethod
def get_status(pipeline_id):
"""
Get pipeline status
:param pipeline_id:
:return:
"""
conn = Qubole.agent()
url = Quest.rest_entity_path + "/" + pipeline_id
response = conn.get(url)
log.info(response)
return response.get("data").get("attributes").get("pipeline_instance_status")

@staticmethod
def delete(pipeline_id):
Expand Down Expand Up @@ -333,6 +348,11 @@ def set_alert(pipeline_id, channel_id):

@staticmethod
def get_code(pipeline_id):
"""
Get pipeline code
:param pipeline_id:
:return:
"""
url = Quest.rest_entity_path + "/" + pipeline_id
conn = Qubole.agent()
reponse = conn.get(url)
Expand Down Expand Up @@ -366,16 +386,16 @@ def create_pipeline(pipeline_name, code_or_fileLoc, cluster_label, checkpoint_lo
response = Quest.create(pipeline_name, QuestCode.create_type)
log.info(response)
pipeline_id = Quest.get_pipline_id(response)
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location,
response = Quest.add_property(pipeline_id, cluster_label, checkpoint_location,
trigger_interval=trigger_interval,
output_mode=output_mode,
can_retry=can_retry)
log.info(property)
save_response = QuestCode.save_code(pipeline_id, code_or_fileLoc=code_or_fileLoc, language=language)
log.info(response)
response = QuestCode.save_code(pipeline_id, code_or_fileLoc=code_or_fileLoc, language=language)
if channel_id:
response = Quest.set_alert(pipeline_id, channel_id)
log.info(response)
pipeline_id = Quest.get_pipline_id(save_response)
QuestCode.pipeline_id = QuestCode.get_pipline_id(response)
return pipeline_id

@staticmethod
Expand All @@ -398,7 +418,7 @@ def save_code(pipeline_id, code_or_fileLoc, language=None):

except IOError as e:
raise ParseError("Unable to open script location or script location and code both are empty")

QuestCode.pipeline_code = code
data = {"data": {
"attributes": {"create_type": QuestCode.create_type, "code": str(code), "language": str(language)}}}
conn = Qubole.agent()
Expand Down Expand Up @@ -436,19 +456,19 @@ def create_pipeline(pipeline_name, jar_path, cluster_label, checkpoint_location,
response = Quest.create(pipeline_name, QuestJar.create_type)
log.info(response)
pipeline_id = Quest.get_pipline_id(response)
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location,
response = Quest.add_property(pipeline_id, cluster_label, checkpoint_location,
output_mode=output_mode,
trigger_interval=trigger_interval,
can_retry=can_retry,
command_line_options=command_line_options)
log.info(property)
save_response = QuestJar.save_code(pipeline_id, jar_path, main_class_name, user_arguments=user_arguments)
pipeline_id = Quest.get_pipline_id(save_response)
log.info(response)
response = QuestJar.save_code(pipeline_id, jar_path, main_class_name, user_arguments=user_arguments)
QuestCode.pipeline_id = QuestJar.get_pipline_id(response)
QuestJar.jar_path = jar_path
if channel_id:
response = Quest.set_alert(pipeline_id, channel_id)
log.info(response)
return pipeline_id

return response

@staticmethod
def save_code(pipeline_id, jar_path, main_class_name, user_arguments=None):
Expand All @@ -467,6 +487,7 @@ def save_code(pipeline_id, jar_path, main_class_name, user_arguments=None):
"language": main_class_name}}}
conn = Qubole.agent()
url = Quest.rest_entity_path + "/" + str(pipeline_id) + "/save_code"
QuestJar.jar_path = jar_path
response = conn.put(url, data)
return response

Expand Down Expand Up @@ -731,6 +752,7 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
response = QuestAssisted.set_alert(pipeline_id, channel_id)
log.info(response)
final_response = response
QuestAssisted.pipeline_code = QuestAssisted.get_code(pipeline_id)
return final_response

@staticmethod
Expand Down Expand Up @@ -1111,7 +1133,7 @@ def switch_from_assisted(pipeline_id):
create_type = response.get("data").get("attributes").get("create_type")
if create_type == 1:
url = QuestAssisted.rest_entity_path + '/' + pipeline_id + '/switch'
data = {"data": {"attributes": {"create_type": 3}}}
data = {"data": {"attributes": {"create_type": QuestAssisted.create_type}}}
response = conn.get(url, data)
return response
else:
Expand Down

0 comments on commit 4dfbd67

Please sign in to comment.