Skip to content

Commit

Permalink
adding changes
Browse files Browse the repository at this point in the history
  • Loading branch information
harshits committed Dec 23, 2019
1 parent eee24af commit 7863503
Showing 1 changed file with 185 additions and 126 deletions.
311 changes: 185 additions & 126 deletions qds_sdk/quest.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ def start_pause(args):
elif args.status:
response = Quest.status(args.pipeline_id)
else:
raise ParseError("Please select only one param out of --start, --pause, --delete, --archive, --clone and --edit.")
raise ParseError(
"Please select only one param out of --start, --pause, --delete, --archive, --clone and --edit.")
return json.dumps(response, default=lambda o: o.attributes, sort_keys=True, indent=4)

@staticmethod
Expand All @@ -122,13 +123,13 @@ def index(args):
@staticmethod
def create(args):
pipeline = None
print "create type = ", type(args.create_type)
# print "create type = ", type(args.create_type)
if args.create_type == "1":
pipeline = Assisted.create(args.name, args.create_type, args)
pipeline = QuestAssisted.create(args.name, args.create_type, args)
elif args.create_type == "2":
pipeline = Jar.create(args.name, args.create_type, args)
pipeline = QuestJar.create(args.name, args.create_type, args)
elif args.create_type == "3":
c = Code()
c = QuestCode()
c.create(args.name, args)
return json.dumps(pipeline, default=lambda o: o.attributes, sort_keys=True, indent=4)

Expand All @@ -143,7 +144,11 @@ class Quest(Resource):
rest_entity_path = "pipelines"

@staticmethod
def index(status=None):
def get_pipline_id(response):
return response.get('data').get('id')

@staticmethod
def list(status=None):
if status is None or status == 'all':
params = {"filter": "draft,archive,active"}
else:
Expand All @@ -154,58 +159,28 @@ def index(status=None):
return questjson

@staticmethod
def create(name, args):
pass
def create(pipeline_name, create_type, **kwargs):
"""
Create a pipeline object by issuing a POST request to the /pipelin?mode=wizard endpoint
Note - this creates pipeline in draft mode
def add_source(self, pipeline_id, args):
data_store = args.data_store
if data_store == 's3':
return self.s3_source(pipeline_id, args)
elif data_store == 'kafka':
return self.kafka_source(pipeline_id, args)
elif data_store == 'kinesis':
return self.kinesis_source(pipeline_id, args)
else:
log.error("Provide a valid data source {s3, kafka, kinesis}")

def add_sink(self, pipeline_id, args):
data_store = args.get("data_store")
if data_store == 's3':
return self.s3_sink(pipeline_id, args)
elif data_store == 'kafka':
return self.kafka_sink(pipeline_id, args)
elif data_store == 'kinesis':
return self.kinesis_sink(pipeline_id, args)
elif data_store == 'snowflake':
return self.kinesis_sink(pipeline_id, args)
elif data_store == 'hive':
return self.kinesis_sink(pipeline_id, args)
else:
log.error("Provide a valid data source {s3, kafka, kinesis}")

def s3_source(self, pipeline_id, args):
data_store = "s3"
type = "source"
schema = args.schema
name = args.source_name
path = args.source_path
format = args.format
other_settings = {"fileNameOnly": "false", "latestFirst": "false"}
conn = Qubole.agent()
url = self.rest_entity_path + "/" + str(pipeline_id) + '/node'
data = {"data": {"attributes": {"fields": {"name": name, "path": path, "format": format, "schema": schema,
"other_settings": other_settings}, "data_store": data_store},
"type": type}}
return conn.post(url, data)
Args:
**kwargs: keyword arguments specific to create type
def add_property(self):
pass

@staticmethod
def pause(pipeline_id):
url = Quest.rest_entity_path + "/" + pipeline_id + "/pause"
Returns:
Command object
"""
conn = Qubole.agent()
return conn.put(url)
if create_type is None:
raise ParseError("Please enter create_type. 1:Assisted Mode, 2:BYOJ, 3:BYOC")
if pipeline_name is None:
raise ParseError("Enter pipeline name")
data = {"data": {
"attributes": {"name": pipeline_name, "status": "DRAFT", "create_type": create_type},
"type": "pipelines"}}
url = Quest.rest_entity_path + "?mode=wizard"
response = conn.post(url, data)
return response

@staticmethod
def start(pipeline_id):
Expand All @@ -222,101 +197,185 @@ def start(pipeline_id):
return response

@staticmethod
def status(pipeline_id):
def add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=None, output_mode=None, can_retry=True):
conn = Qubole.agent()
url = Quest.rest_entity_path + "/" + pipeline_id + "/status"
response = conn.put(url)
log.info(response)
data = {"data": {"attributes": {
"cluster_label": cluster_label,
"can_retry": can_retry,
"checkpoint_location": checkpoint_location,
"trigger_interval": trigger_interval,
"output_mode": output_mode,
"command_line_options": "--conf spark.driver.extraLibraryPath=/usr/lib/hadoop2/lib/native\n--conf spark.eventLog.compress=true\n--conf spark.eventLog.enabled=true\n--conf spark.sql.streaming.qubole.enableStreamingEvents=true\n--conf spark.qubole.event.enabled=true"
},
"type": "pipeline/properties"
}
}
url = Quest.rest_entity_path + "/" + str(pipeline_id) + "/properties/"
response = conn.put(url, data)
return response

@staticmethod
def delete(pipeline_id):
conn = Qubole.agent()
url = Quest.rest_entity_path + "/" + pipeline_id + "/delete"
response = conn.put(url)
log.info(response)
return response
def save_code(pipeline_id, create_type, code_or_fileLoc=None, jar_file_loc=None, language='scala'):
try:
code = None
if code_or_fileLoc:
if os.path.isdir(code_or_fileLoc):
q = open(code_or_fileLoc).read()
code = q
else:
code = code_or_fileLoc
elif jar_file_loc:
code=jar_file_loc

def get_pipline_id(self, response):
return response.get('data').get('id')
except IOError as e:
raise ParseError("Unable to open script location or script location and code both are empty")

def kafka_source(self, **kwargs):
pass
data = {"data": {"attributes": {"create_type": create_type, "code": str(code), "language": str(language)}}}
conn = Qubole.agent()
url = Quest.rest_entity_path + "/" + str(pipeline_id) + "/save_code"
response = conn.put(url, data)
return response

def kinesis_source(self, **kwargs):
pass

def health(self):
pass

def clone(self):
pass

def save_code(self):
pass
@staticmethod
def clone(pipeline_id):
url = Quest.rest_entity_path + "/" + pipeline_id + "/duplicate"
log.info("Cloning pipeline with id {}".format(pipeline_id))
conn = Qubole.agent()
return conn.put(url)

def edit(self):
pass
@staticmethod
def pause(pipeline_id):
url = Quest.rest_entity_path + "/" + pipeline_id + "/pause"
log.info("Pausing pipeline with id {}".format(pipeline_id))
conn = Qubole.agent()
return conn.put(url)

@staticmethod
def edit(pipeline_id, **kwargs):

class Code(Quest):
def __init__(self):
self.create_type = 3

def create(self, name, args):
@staticmethod
def archive(pipeline_id):
url = Quest.rest_entity_path + "/" + pipeline_id + "/archive"
log.info("Archiving pipeline with id {}".format(pipeline_id))
conn = Qubole.agent()
data = {"data": {"attributes": {"name": name, "status": "DRAFT", "description": args.description,
"create_type": self.create_type}, "type": "pipelines"}}
url = Quest.rest_entity_path + "?mode=wizard"
response = conn.post(url, data)
pipeline_id = response.get("data").get("id")
property = self.add_property(pipeline_id, cluster_label=args.cluster_label)
save_code = self.save_code(pipeline_id, script_location=args.script_location, code=args.code,
language=args.language)
return save_code
return conn.put(url)

# @staticmethod
# def status(pipeline_id):
# conn = Qubole.agent()
# url = Quest.rest_entity_path + "/" + pipeline_id + "/status"
# response = conn.put(url)
# log.info(response)
# return response

def add_property(self, pipeline_id, **kwargs):
@staticmethod
def delete(pipeline_id):
conn = Qubole.agent()
data = {"data": {"attributes": {
"cluster_label": kwargs.get("cluster_label"),
"can_retry": "true",
"checkpoint_location": kwargs.get("checkpoint_location"),
"trigger_interval": kwargs.get("trigger_interval"),
"output_mode": kwargs.get("output_mode"),
"command_line_options": "--conf spark.driver.extraLibraryPath=/usr/lib/hadoop2/lib/native\n--conf spark.eventLog.compress=true\n--conf spark.eventLog.enabled=true\n--conf spark.sql.streaming.qubole.enableStreamingEvents=true\n--conf spark.qubole.event.enabled=true"
},
"type": "pipeline/properties"
}
}
url = self.rest_entity_path + "/" + str(pipeline_id) + "/properties/"
response = conn.put(url, data)
url = Quest.rest_entity_path + "/" + pipeline_id + "/delete"
log.info("Deleting Pipeline with id: {}".format(pipeline_id))
response = conn.put(url)
log.info(response)
return response

def save_code(self, pipeline_id, **kwargs):
try:
code = None
if kwargs.get("code"):
code = kwargs.get('code')
elif kwargs.get("script_location"):
q = open(kwargs.get("script_location")).read()
code = q
except IOError as e:
raise ParseError("Unable to open script location or script location and code both are empty")
@staticmethod
def edit_pipeline_name(pipeline_id, pipeline_name):
conn = Qubole.agent()
url = Quest.rest_entity_path + "/" + pipeline_id
data = {"data":{"attributes":{"name":pipeline_name},"type":"pipelines"}}
return conn.put(url, data)


class QuestCode(Quest):
@staticmethod
def create_pipeline(pipeline_name, create_type, code_or_fileLoc, cluster_label, checkpoint_location, language='scala', **kwargs):
"""
Method to create pipeline in BYOC mode in one go.
:param pipeline_name: Name by which pipeline will be created.
:param create_type: 1->Assisted Mode, 2->BYOJ mode, 3->BYOC mode
:param code_or_fileLoc: code/location of file
:param language: scala/python
:param kwargs: other params are checkpoint_location, trigger_interval, outputmode, can_retry(true by default)
:return: pipeline id
"""
response = Quest.create(pipeline_name, create_type)
log.info(response)
pipeline_id = Quest.get_pipline_id(response)
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=kwargs.get("trigger_interval"), output_mode=kwargs.get("output_mode"), can_retry=kwargs.get("can_retry"))
log.info(property)
save_response = Quest.save_code(pipeline_id, create_type=3, code_or_fileLoc=code_or_fileLoc, language=language)
pipeline_id = Quest.get_pipline_id(save_response)
return pipeline_id

@staticmethod
def edit(pipeline_id, **kwargs):
"""
Method to Edit pipeline
:param pipeline_id: pipeline id
:return:
"""
checkpoint_location = kwargs.get("checkpoint_location")
cluster_label = kwargs.get("cluster_label")
code = kwargs.get("code_or_fileLoc")
language = kwargs.get("language")
data = {"data": {"attributes": {"create_type": self.create_type, "code": str(code), "language": str(language)}}}
conn = Qubole.agent()
url = self.rest_entity_path + "/" + str(pipeline_id) + "/save_code"
response = conn.put(url, data)
return response
trigger_interval=kwargs.get("trigger_interval")
output_mode=kwargs.get("output_mode")
can_retry=kwargs.get("can_retry")
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry)
log.info(property)
save_response = Quest.save_code(pipeline_id, create_type=3, code_or_fileLoc=code, language=language)
return save_response


class QuestJar(Quest):
@staticmethod
def create_pipeline(pipeline_name, create_type, jar_file, cluster_label, checkpoint_location, language='scala', **kwargs):
"""
Method to create pipeline in BYOC mode in one go.
:param pipeline_name: Name by which pipeline will be created.
:param create_type: 1->Assisted Mode, 2->BYOJ mode, 3->BYOC mode
:param code_or_fileLoc: code/location of file
:param language: scala/python
:param kwargs: other params are checkpoint_location, trigger_interval, outputmode, can_retry(true by default)
:return: pipeline id
"""
response = Quest.create(pipeline_name, create_type)
log.info(response)
pipeline_id = Quest.get_pipline_id(response)
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, **kwargs)
log.info(property)
save_response = Quest.save_code(pipeline_id, create_type, jar_file_loc=jar_file, language=language)
pipeline_id = Quest.get_pipline_id(save_response)
return pipeline_id

def start(self, pipeline_id):
return super(Quest, self).start(pipeline_id)
@staticmethod
def edit(pipeline_id, **kwargs):
"""
Method to Edit pipeline
:param pipeline_id: pipeline id
:return:
"""
checkpoint_location = kwargs.get("checkpoint_location")
cluster_label = kwargs.get("cluster_label")
code = kwargs.get("code_or_fileLoc")
language = kwargs.get("language")
trigger_interval=kwargs.get("trigger_interval")
output_mode=kwargs.get("output_mode")
can_retry=kwargs.get("can_retry")
property = Quest.add_property(pipeline_id, cluster_label, checkpoint_location, trigger_interval=trigger_interval, output_mode=output_mode, can_retry=can_retry)
log.info(property)
save_response = Quest.save_code(pipeline_id, create_type=3, code_or_fileLoc=code, language=language)
return save_response


class Jar(Quest):
pass

class QuestAssisted(Quest):
def kafka_source(self, **kwargs):
pass

class Assisted(Quest):
pass
def kinesis_source(self, **kwargs):
pass

0 comments on commit 7863503

Please sign in to comment.