Skip to content

Commit

Permalink
feat: do not use FIELDS(CUSTOM) for salesforce and use query_all_iter…
Browse files Browse the repository at this point in the history
… for large result set
  • Loading branch information
the-forest-tree authored and the-forest-tree committed Dec 4, 2023
1 parent 0d4b5f2 commit 22b0b05
Showing 1 changed file with 83 additions and 7 deletions.
90 changes: 83 additions & 7 deletions src/hrflow_connectors/connectors/salesforce/warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,48 @@
WarehouseWriteAction,
)

SOQL_MAX_RETURNED_ROWS = 2000

SELECT_PROFILES_SOQL = """
SELECT
LastModifiedDate,
FIELDS(CUSTOM),
Id__c,
Dataset_Id__c,
Date_Edition__c,
Date_Reception__c,
Text__c,
Email__c,
URLs__c,
Name__c,
Gender__c,
Location_Text__c,
Location_Lat__c,
Location_Lng__c,
Location_Gmaps__c,
Phone__c,
Timestamp__c,
Summary__c,
Hash_Id__c,
Text_Language__c,
Reference__c,
Skills__c,
Languages__c,
Interests__c,
Seniority__c,
Location_Fields__c,
Experiences_Duration__c,
Educations_Duration__c,
Archive__c,
Picture__c,
First_Name__c,
Last_Name__c,
Labels__c,
Metadatas__c,
Tags__c,
Certifications__c,
Courses__c,
Tasks__c,
Date_Birth__c,
(
SELECT
FIELDS(CUSTOM)
Expand All @@ -46,15 +84,48 @@
FROM HrFlow_Profile__c
WHERE LastModifiedDate >= {last_modified_date}
ORDER BY LastModifiedDate, Id__c
{limit_placeholder}
"""

SELECT_JOBS_SOQL = """
SELECT
LastModifiedDate,
FIELDS(CUSTOM)
Id__c,
Name__c,
Slug__c,
URL__c,
Summary__c,
Status__c,
Date_Edition__c,
Skills__c,
Archive__c,
Hash_Id__c,
Location_Text__c,
Location_Lat__c,
Location_Lng__c,
Tags__c,
Board_Id__c,
Sections__c,
Reference__c,
Languages__c,
Ranges_Float__c,
Ranges_Date__c,
Location_Gmaps__c,
Location_Fields__c,
Metadatas__c,
Certifications__c,
Courses__c,
Tasks__c,
Picture__c,
Culture__c,
Responsibilities__c,
Requirements__c,
Benefits__c,
Interviews__c
FROM HrFlow_Job__c
WHERE LastModifiedDate >= {last_modified_date}
ORDER BY LastModifiedDate, Id__c
{limit_placeholder}
"""


Expand Down Expand Up @@ -85,7 +156,6 @@ class SalesforceBaseParameters(ParametersModel):
sf_organization_id: str = Field(
...,
description=(
"Security Token to access Salesforce API."
"See below for instructions: "
"How to find your organization id "
" https://help.salesforce.com/s/articleView?id=000385215&type=1"
Expand Down Expand Up @@ -158,21 +228,27 @@ def _read_items(
)
last_id = 0

remaining = parameters.limit or float("inf")
query = soql_query.format(
last_modified_date=last_modified_date,
limit_placeholder=(
"LIMIT {}".format(remaining + 5)
if remaining + 5 < SOQL_MAX_RETURNED_ROWS
else ""
),
)

pulled_items = 0
for item in sf.query_all_iter(query):
if parameters.limit is not None and pulled_items >= parameters.limit:
if remaining <= 0:
break
if (
item["LastModifiedDate"] == last_modified_date
and item["Id__c"] <= last_id
):
# This is why limit_placeholder is set with remainig + 5
# In order to account for first items that might be already pulled
continue
pulled_items += 1
yield item
remaining -= 1

return _read_items

Expand Down

0 comments on commit 22b0b05

Please sign in to comment.