Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat): added date selection parameters in salesforce to S3 operator #8

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions operators/salesforce_to_s3_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airflow.models import BaseOperator
from airflow.hooks.S3_hook import S3Hook

from airflow.contrib.hooks.salesforce_hook import SalesforceHook
from airflow.providers.salesforce.hooks.salesforce import SalesforceHook


class SalesforceBulkQueryToS3Operator(BaseOperator):
Expand Down Expand Up @@ -88,6 +88,20 @@ class SalesforceToS3Operator(BaseOperator):
- ndjson
*Default: csv*
:type fmt: list
:param from_date: *(optional)* A specific datetime iso formatted input
to run query from for incremental ingestion
evaluated against SystemModStamp attribute
not compatible with query parameter and should be
in datetime isoformat (ex. 2021-01-01T00:00:00Z)
*Default: None*
:type from_date: string
:param to_date: *(optional)* A specific datetime iso formatted input
to run query to for incremental ingestion
evaluated against SystemModStamp attribute
not compatible with query parameter and should be
in datetime isoformat (ex. 2021-01-01T00:00:00Z)
*Default: None*
:type to_date: string
:param query: *(optional)* A specific query to run for
the given object. This will override
default query creation.
Expand Down Expand Up @@ -123,6 +137,8 @@ def __init__(self,
s3_bucket,
s3_key,
sf_fields=None,
from_date=None,
to_date=None,
fmt="csv",
query=None,
relationship_object=None,
Expand All @@ -136,6 +152,8 @@ def __init__(self,
self.sf_conn_id = sf_conn_id
self.object = sf_obj
self.fields = sf_fields
self.from_date = from_date
self.to_date = to_date
self.s3_conn_id = s3_conn_id
self.s3_bucket = s3_bucket
self.s3_key = s3_key
Expand Down Expand Up @@ -173,7 +191,7 @@ def execute(self, context):
with NamedTemporaryFile("w") as tmp:

# Load the SalesforceHook
hook = SalesforceHook(conn_id=self.sf_conn_id, output=tmp.name)
hook = SalesforceHook(conn_id=self.sf_conn_id)

# Attempt to login to Salesforce
# If this process fails, it will raise an error and die.
Expand All @@ -198,18 +216,34 @@ def execute(self, context):
relationship_object=self.relationship_object
)
else:
query = hook.get_object_from_salesforce(self.object,
self.fields)
if self.to_date and self.from_date:
logging.info(f"Gathering items from date: {self.from_date} to date: {self.to_date}")
date_select = f"{self.object} WHERE SystemModStamp >= {self.from_date} AND SystemModStamp <= {self.to_date}"
query = hook.get_object_from_salesforce(date_select, self.fields)
elif self.from_date:
logging.info(f"Gathering items from date: {self.from_date}")
date_select = f"{self.object} WHERE SystemModStamp >= {self.from_date}"
query = hook.get_object_from_salesforce(date_select, self.fields)
elif self.to_date:
logging.info(f"Gathering items to date: {self.to_date}")
date_select = f"{self.object} WHERE SystemModStamp <= {self.to_date}"
query = hook.get_object_from_salesforce(date_select, self.fields)
else:
query = hook.get_object_from_salesforce(self.object, self.fields)


# output the records from the query to a file
# the list of records is stored under the "records" key
logging.info("Writing query results to: {0}".format(tmp.name))

hook.write_object_to_file(query['records'],
if query.get("records"):
hook.write_object_to_file(query['records'],
filename=tmp.name,
fmt=self.fmt,
coerce_to_timestamp=self.coerce_to_timestamp,
record_time_added=self.record_time_added)
else:
logging.info(f"No records found in the query: {query}")

# Flush the temp file and upload temp file to S3
tmp.flush()
Expand All @@ -223,8 +257,6 @@ def execute(self, context):
replace=True
)

dest_s3.connection.close()

tmp.close()

logging.info("Query finished!")