Skip to content

Commit

Permalink
Merge pull request #296 from Riminder/feature/bullhorn-v2
Browse files Browse the repository at this point in the history
Feature/bullhorn v2
  • Loading branch information
Thomas65535 authored Nov 26, 2024
2 parents 796e00b + 9ac253a commit 2f10960
Show file tree
Hide file tree
Showing 19 changed files with 735 additions and 147 deletions.
285 changes: 262 additions & 23 deletions manifest.json

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions src/hrflow_connectors/core/backend/s3.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
import secrets
import typing as t
from io import BytesIO

Expand Down Expand Up @@ -55,7 +56,7 @@ class CannotReadFromS3Error(Exception):

def check_store_pydantic(state: InternalState):
root_model = create_model("S3StoreRoot", root="HrFlow Connectors", store=NAME)
root_key = "__root"
root_key = f"__root_{secrets.token_hex(4)}"
root = root_model()
try:
save(state, root_key, root)
Expand All @@ -77,7 +78,7 @@ def check_store_msgspec(state: InternalState):
root_model = defstruct(
"S3StoreRoot", [("root", str, "HrFlow Connectors"), ("store", str, NAME)]
)
root_key = "__root"
root_key = f"__root_{secrets.token_hex(4)}"
root = root_model()
try:
save(state, root_key, root)
Expand Down
2 changes: 1 addition & 1 deletion src/hrflow_connectors/v2/connectors/bullhorn/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ This new connector will enable:
| ------- | ----------- |
| [**Create jobs in hrflow**](docs/create_jobs_in_hrflow.md) | Send **created** 'job(s)' _from_ _to_ HrFlow |
| [**Update jobs in hrflow**](docs/update_jobs_in_hrflow.md) | Send **updated** 'job(s)' _from_ _to_ HrFlow |
| [**Update jobs in hrflow**](docs/update_jobs_in_hrflow.md) | Send **updated** 'job(s)' _from_ _to_ HrFlow |
| [**Archive jobs in hrflow**](docs/archive_jobs_in_hrflow.md) | Send **archived** 'job(s)' _from_ _to_ HrFlow |
| [**Create profiles in hrflow**](docs/create_profiles_in_hrflow.md) | Send **created** 'profile(s)' _from_ _to_ HrFlow |
| [**Update profiles in hrflow**](docs/update_profiles_in_hrflow.md) | Send **updated** 'profile(s)' _from_ _to_ HrFlow |
| [**Archive profiles in hrflow**](docs/archive_profiles_in_hrflow.md) | Send **archived** 'profile(s)' _from_ _to_ HrFlow |
Expand Down
186 changes: 116 additions & 70 deletions src/hrflow_connectors/v2/connectors/bullhorn/aisles.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ class BaseJobsParameters(BaseParameters, kw_only=True):
"customText2,customText3,customText4,customText5,customText6,"
"customText7,customText8,customText9,customTextBlock1,customTextBlock2,"
"customTextBlock3,customTextBlock4,customTextBlock5,dateAdded,dateEnd,"
"degreeList,description,durationWeeks,educationDegree,employmentType,"
"feeArrangement,hoursOfOperation,hoursPerWeek,isOpen,isWorkFromHome,"
"markUpPercentage,numOpenings,onSite,payRate,salary,salaryUnit,skills,"
"skillList,source,specialties,startDate,status,title,type,willRelocate,"
"degreeList,description,publicDescription,durationWeeks,educationDegree,"
"employmentType,feeArrangement,hoursOfOperation,hoursPerWeek,isOpen,"
"isWorkFromHome,markUpPercentage,numOpenings,onSite,payRate,salary,salaryUnit,"
"skills,skillList,source,specialties,startDate,status,title,id,type,willRelocate,"
"owner"
)
query: Annotated[
Expand Down Expand Up @@ -242,7 +242,7 @@ class ReadArchivedProfilesCriterias(BaseParameters, kw_only=True):
" the specified conditions"
),
),
] = "isDeleted:0"
] = "isDeleted:1"
fields: Annotated[
str,
Meta(
Expand All @@ -253,7 +253,7 @@ class ReadArchivedProfilesCriterias(BaseParameters, kw_only=True):

def make_request(
method, url, params, auth_parameters: AuthParameters, adapter, json=None
):
) -> t.Optional[dict]:
response = method(url, params=params, data=json)
if response.status_code == 401:
adapter.info("Auth token expired, regenerating...")
Expand All @@ -268,7 +268,7 @@ def make_request(
return handle_response(response, adapter)


def handle_response(response, adapter):
def handle_response(response, adapter) -> t.Optional[dict]:
if not response.ok:
adapter.error(
f"Request failed with status_code={response.status_code},"
Expand All @@ -292,31 +292,59 @@ def search_entity(
return response


def create_entity(entity, rest_url, params, data, auth_parameters, adapter):
def create_entity(
entity: str,
rest_url: str,
params: dict,
data: dict,
auth_parameters: AuthParameters,
adapter: LoggerAdapter,
) -> t.Optional[dict]:
url = f"{rest_url}entity/{entity}"
response = make_request(
requests.post, url, params, auth_parameters, adapter, json.dumps(data)
)
return response


def update_entity(entity, entity_id, rest_url, params, data, auth_parameters, adapter):
def update_entity(
entity: str,
entity_id: str,
rest_url: str,
params: dict,
data: dict,
auth_parameters: AuthParameters,
adapter: LoggerAdapter,
) -> t.Optional[dict]:
url = f"{rest_url}entity/{entity}/{entity_id}"
response = make_request(
requests.put, url, params, auth_parameters, adapter, json.dumps(data)
)
return response


def check_entity_files(entity, rest_url, params, entity_id, auth_parameters, adapter):
def check_entity_files(
entity: str,
entity_id: str,
rest_url: str,
params: dict,
auth_parameters: AuthParameters,
adapter: LoggerAdapter,
) -> t.Optional[dict]:
url = f"{rest_url}entityFiles/{entity}/{entity_id}"
response = make_request(requests.get, url, params, auth_parameters, adapter)
return response


def upload_attachment(
entity, entity_id, rest_url, params, attachment, adapter, auth_parameters
):
entity: str,
entity_id: str,
rest_url: str,
params: dict,
auth_parameters: AuthParameters,
adapter: LoggerAdapter,
attachment,
) -> t.Optional[dict]:
url = f"{rest_url}file/{entity}/{entity_id}"
attachment_response = make_request(
requests.put, url, params, auth_parameters, adapter, json.dumps(attachment)
Expand Down Expand Up @@ -368,7 +396,7 @@ def update_application(
if search_results["count"] == 0:
adapter.info(f"Creating candidate with email: {email}")
candidate_response = create_entity(
"Candidate", rest_url, params, profile, parameters, adapter
"Candidate", rest_url, params, profile, auth_parameters, adapter
)
if not candidate_response:
failed_profiles.append(profile)
Expand Down Expand Up @@ -403,7 +431,7 @@ def update_application(
rest_url,
params,
profile,
parameters,
auth_parameters,
adapter,
)

Expand All @@ -416,12 +444,18 @@ def update_application(
f"Checking if attachment exists for candidate {candidate_id}"
)
entity_files = check_entity_files(
"Candidate", rest_url, params, candidate_id, parameters, adapter
)
attachment_exists = any(
file["name"] == attachment["name"]
for file in entity_files.get("EntityFiles", [])
"Candidate",
candidate_id,
rest_url,
params,
auth_parameters,
adapter,
)
if entity_files:
attachment_exists = any(
file["name"] == attachment["name"]
for file in entity_files.get("EntityFiles", [])
)

if not attachment_exists:
adapter.info("Uploading attachment")
Expand All @@ -430,9 +464,9 @@ def update_application(
candidate_id,
rest_url,
params,
attachment,
auth_parameters,
adapter,
parameters,
attachment,
)
if not attachment_response:
failed_profiles.append(profile)
Expand All @@ -452,39 +486,37 @@ def update_application(
parameters,
)

job_submission_exists = job_submission_results.get("count", 0) > 0
job_submission_id = (
job_submission_results["data"][0]["id"] if job_submission_exists else None
)

job_submission_payload = {
"candidate": {"id": candidate_id},
"jobOrder": {"id": parameters.job_id},
"status": parameters.status_when_created,
"dateWebResponse": int(time.time() * 1000),
}

adapter.info("Creating or updating JobSubmission")
job_submission_response = (
update_entity(
"JobSubmission",
job_submission_id,
rest_url,
params,
job_submission_payload,
parameters,
adapter,
)
if job_submission_exists
else create_entity(
"JobSubmission",
rest_url,
params,
job_submission_payload,
parameters,
adapter,
)
)
if job_submission_results:
job_submission_exists = job_submission_results.get("count", 0) > 0

job_submission_payload = {
"candidate": {"id": candidate_id},
"jobOrder": {"id": parameters.job_id},
"status": parameters.status_when_created,
"dateWebResponse": int(time.time() * 1000),
}
if job_submission_exists:
job_submission_id = job_submission_results["data"][0]["id"]
adapter.info("Updating JobSubmission")
job_submission_response = update_entity(
"JobSubmission",
job_submission_id,
rest_url,
params,
job_submission_payload,
auth_parameters,
adapter,
)
else:
adapter.info("Creating JobSubmission")
job_submission_response = create_entity(
"JobSubmission",
rest_url,
params,
job_submission_payload,
auth_parameters,
adapter,
)

if not job_submission_response:
failed_profiles.append(profile)
Expand Down Expand Up @@ -580,16 +612,23 @@ def _pull_items(
should_break = True
break

date_added = job.get("dateAdded")
date_last_modified = job.get("dateLastModified")

transformed_date_added = transform_timestamp_read_from(date_added)
transformed_date_last_modified = transform_timestamp_read_from(
date_last_modified
)

if (
action is Mode.create
and transform_timestamp_read_from(job.get("dateAdded"))[:19]
!= transform_timestamp_read_from(job.get("dateLastModified"))[
and transformed_date_added is not None
and transformed_date_last_modified is not None
and transformed_date_added[:19]
!= transformed_date_last_modified[
:19
] # ignore microsecond difference created by Bullhorn
) or (
action is Mode.update
and job.get("dateAdded") == job.get("dateLastModified")
):
) or (action is Mode.update and date_added == date_last_modified):
continue

if (
Expand Down Expand Up @@ -726,18 +765,23 @@ def __pull_items(
should_break = True
break

date_added = profile.get("dateAdded")
date_last_modified = profile.get("dateLastModified")

transformed_date_added = transform_timestamp_read_from(date_added)
transformed_date_last_modified = transform_timestamp_read_from(
date_last_modified
)

if (
action is Mode.create
and transform_timestamp_read_from(profile.get("dateAdded"))[:19]
!= transform_timestamp_read_from(
profile.get("dateLastModified")
)[
and transformed_date_added is not None
and transformed_date_last_modified is not None
and transformed_date_added[:19]
!= transformed_date_last_modified[
:19
] # ignore microsecond difference created by Bullhorn
) or (
action is Mode.update
and profile.get("dateAdded") == profile.get("dateLastModified")
):
) or (action is Mode.update and date_added == date_last_modified):
continue

if (
Expand All @@ -753,7 +797,10 @@ def __pull_items(
total_returned += 1
continue

if parameters.parse_resume:
if (
not isinstance(parameters, ReadArchivedProfilesCriterias)
and parameters.parse_resume
):
profile["cvFile"] = None
url_files = (
authentication["restUrl"]
Expand Down Expand Up @@ -922,7 +969,6 @@ def item_to_read_from_update_or_archive(item: dict) -> str:
schema=BullhornProfile,
)

# FIXME generic_job_pulling doesn't seem to handle the archive mode
JobsAisle = Aisle(
name=Entity.job,
read=ReadOperation(
Expand Down
Loading

0 comments on commit 2f10960

Please sign in to comment.