Skip to content

Commit

Permalink
fix: support distinct incremental methods per action (create, update,…
Browse files Browse the repository at this point in the history
… archive)
  • Loading branch information
itsnedhir committed Dec 2, 2024
1 parent d6bd087 commit 722e8c7
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/hrflow_connectors/v2/core/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def manifest(self, connectors_directory: Path = CONNECTORS_DIRECTORY) -> Manifes
hrflow_auth_parameters=json_schema(HrFlowWarehouse.auth),
origin=self.name if flow.direction is Direction.inbound else "HrFlow",
origin_data_schema=json_schema(origin_aisle.schema),
supports_incremental=origin_aisle.read.supports_incremental,
supports_incremental=origin_aisle.read.supports_incremental(flow.mode),
pull_parameters=json_schema(pull_parameters),
target="HrFlow" if flow.direction is Direction.inbound else self.name,
target_data_schema=json_schema(target_aisle.schema),
Expand Down
12 changes: 7 additions & 5 deletions src/hrflow_connectors/v2/core/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,10 @@ def run(

incremental_token = None
if incremental:
if origin.read.supports_incremental is False:
if origin.read.supports_incremental(mode) is False:
adapter.warning(
f"Origin warehouse {metadata.origin_name} does not support incremetal"
" reading"
" reading for mode {mode}."
)
return RunResult(
status=Status.fatal,
Expand Down Expand Up @@ -393,13 +393,15 @@ def run(
last_item = origin_items[-1]

# We know it's not None because of the check
# origin.read.supports_incremental
# origin.read.supports_incremental(mode)
# Adding these kinds of asserts which are anyway removed
# in optimized Python bytecode is for type checkers only
assert origin.read.get_incremental_token is not None
assert origin.read.incremental_token_handler is not None
token_handler = token_handler = origin.read.incremental_token_handler(mode)
assert token_handler is not None # for type-checker

try:
next_incremental_token = origin.read.get_incremental_token(last_item)
next_incremental_token = token_handler(last_item)
except Exception as e:
events[Event.getting_incremental_token_failure] += 1
adapter.exception(
Expand Down
28 changes: 24 additions & 4 deletions src/hrflow_connectors/v2/core/warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,24 @@ def for_mode(self, mode: Mode):
return self.archive


@dataclass
class IncrementalTokenHandler:
create: t.Optional[t.Callable[[dict], str]] = None
update: t.Optional[t.Callable[[dict], str]] = None
archive: t.Optional[t.Callable[[dict], str]] = None

def get_handler(self, mode: Mode) -> t.Optional[t.Callable[[dict], str]]:
if mode is Mode.create:
return self.create
if mode is Mode.update:
return self.update
if mode is Mode.archive:
return self.archive

def __call__(self, mode: Mode) -> t.Optional[t.Callable[[dict], str]]:
return self.get_handler(mode)


OperationT = t.TypeVar("OperationT", Read, Write)


Expand All @@ -151,11 +169,13 @@ class Operation(t.Generic[OperationT]):

@dataclass
class ReadOperation(Operation[Read]):
get_incremental_token: t.Optional[t.Callable[[dict], str]] = None
# get_incremental_token: t.Optional[t.Callable[[dict], str]] = None
incremental_token_handler: t.Optional[IncrementalTokenHandler] = None

@property
def supports_incremental(self):
return self.get_incremental_token is not None
def supports_incremental(self, mode: Mode) -> bool:
if self.incremental_token_handler is None:
return False
return self.incremental_token_handler.get_handler(mode) is not None

def __call__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from hrflow_connectors.v2.core.warehouse import (
Aisle,
Criterias,
IncrementalTokenHandler,
ReadOperation,
WriteOperation,
merge,
Expand Down Expand Up @@ -309,7 +310,9 @@ def read_archived(
update=ReadUpdatedCriterias,
archive=ReadArchivedCriterias,
),
get_incremental_token=lambda lead: str(lead["id"]),
incremental_token_handler=IncrementalTokenHandler(
create=lambda lead: str(lead["id"]),
),
),
write=WriteOperation(
function=merge(create=create, update=update, archive=archive),
Expand Down
4 changes: 2 additions & 2 deletions tests/v2/core/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ def test_outcome_if_running_incremental_but_origin_does_not_support_it(
):
assert LeadsAisle.read is not None

with mock.patch.object(LeadsAisle.read, "get_incremental_token", None):
with mock.patch.object(LeadsAisle.read, "incremental_token_handler", None):
result = SmartLeads.create_jobs_in_hrflow(
workflow_id="test",
connector_auth=dict(smart_tag="smart::tag::smart"),
Expand Down Expand Up @@ -569,7 +569,7 @@ def test_outcome_if_getting_incremental_token_fails(
assert LeadsAisle.read is not None

with mock.patch.object(
LeadsAisle.read, "get_incremental_token", side_effect=Exception
LeadsAisle.read.incremental_token_handler, "create", side_effect=Exception
):
result = SmartLeads.create_jobs_in_hrflow(
workflow_id=workflow_id,
Expand Down
9 changes: 6 additions & 3 deletions tests/v2/core/test_warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from hrflow_connectors.v2.core.warehouse import (
Aisle,
Criterias,
IncrementalTokenHandler,
ModeIsNotSupported,
ReadOperation,
Warehouse,
Expand Down Expand Up @@ -50,16 +51,18 @@ def test_read_supports_incremental():
ReadOperation(
function=MagicMock(),
criterias=Criterias(),
).supports_incremental
).supports_incremental(Mode.create)
is False
)

assert (
ReadOperation(
function=MagicMock(),
criterias=Criterias(),
get_incremental_token=lambda *args, **kwargs: "token",
).supports_incremental
incremental_token_handler=IncrementalTokenHandler(
create=lambda *args, **kwargs: "token",
),
).supports_incremental(Mode.create)
is True
)

Expand Down

0 comments on commit 722e8c7

Please sign in to comment.