Skip to content

Commit

Permalink
Fix salesforce default content type (#1724)
Browse files Browse the repository at this point in the history
  • Loading branch information
dlstadther authored Jun 27, 2016
1 parent 831e390 commit 09598c6
Showing 1 changed file with 7 additions and 11 deletions.
18 changes: 7 additions & 11 deletions luigi/contrib/salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ def is_soql_file(self):

@property
def content_type(self):
"""Override to use a different content type. (e.g. XML)"""
"""
Override to use a different content type. Salesforce allows XML, CSV, ZIP_CSV, or ZIP_XML. Defaults to CSV.
"""
return "CSV"

def run(self):
Expand All @@ -155,7 +157,7 @@ def run(self):
salesforce().sb_security_token,
self.sandbox_name)

job_id = sf.create_operation_job('query', self.object_name)
job_id = sf.create_operation_job('query', self.object_name, content_type=self.content_type)
logger.info("Started query job %s in salesforce for object %s" % (job_id, self.object_name))

batch_id = ''
Expand All @@ -165,7 +167,7 @@ def run(self):
with open(self.soql, 'r') as infile:
self.soql = infile.read()

batch_id = sf.create_batch(job_id, self.soql)
batch_id = sf.create_batch(job_id, self.soql, self.content_type)
logger.info("Creating new batch %s to query: %s for job: %s." % (batch_id, self.object_name, job_id))
status = sf.block_on_batch(job_id, batch_id)
if status['state'].lower() == 'failed':
Expand Down Expand Up @@ -213,7 +215,7 @@ def merge_batch_results(self, result_ids):
"""
outfile = open(self.output().path, 'w')

if self.content_type == 'CSV':
if self.content_type.lower() == 'csv':
for i, result_id in enumerate(result_ids):
with open("%s.%d" % (self.output().path, i), 'r') as f:
header = f.readline()
Expand Down Expand Up @@ -398,9 +400,6 @@ def create_operation_job(self, operation, obj, external_id_field_name=None, cont
:param obj: Parent SF object
:param external_id_field_name: Optional.
"""
if content_type is None:
content_type = self.content_type

if not self.has_active_session():
self.start_session()

Expand Down Expand Up @@ -458,7 +457,7 @@ def close_job(self, job_id):

return response

def create_batch(self, job_id, data, file_type=None):
def create_batch(self, job_id, data, file_type):
"""
Creates a batch with either a string of data or a file containing data.
Expand All @@ -474,9 +473,6 @@ def create_batch(self, job_id, data, file_type=None):
if not job_id or not self.has_active_session():
raise Exception("Can not create a batch without a valid job_id and an active session.")

if file_type is None:
file_type = self.content_type.lower()

headers = self._get_create_batch_content_headers(file_type)
headers['Content-Length'] = len(data)

Expand Down

0 comments on commit 09598c6

Please sign in to comment.