diff --git a/src/hrflow_connectors/v2/core/connector.py b/src/hrflow_connectors/v2/core/connector.py index 07b7dd40..e511e9e8 100644 --- a/src/hrflow_connectors/v2/core/connector.py +++ b/src/hrflow_connectors/v2/core/connector.py @@ -362,7 +362,7 @@ def manifest(self, connectors_directory: Path = CONNECTORS_DIRECTORY) -> Manifes hrflow_auth_parameters=json_schema(HrFlowWarehouse.auth), origin=self.name if flow.direction is Direction.inbound else "HrFlow", origin_data_schema=json_schema(origin_aisle.schema), - supports_incremental=origin_aisle.read.supports_incremental, + supports_incremental=origin_aisle.read.supports_incremental(flow.mode), pull_parameters=json_schema(pull_parameters), target="HrFlow" if flow.direction is Direction.inbound else self.name, target_data_schema=json_schema(target_aisle.schema), diff --git a/src/hrflow_connectors/v2/core/run.py b/src/hrflow_connectors/v2/core/run.py index 5b6140fc..1b4193dd 100644 --- a/src/hrflow_connectors/v2/core/run.py +++ b/src/hrflow_connectors/v2/core/run.py @@ -322,10 +322,10 @@ def run( incremental_token = None if incremental: - if origin.read.supports_incremental is False: + if origin.read.supports_incremental(mode) is False: adapter.warning( f"Origin warehouse {metadata.origin_name} does not support incremetal" - " reading" + " reading for mode {mode}." ) return RunResult( status=Status.fatal, @@ -393,13 +393,15 @@ def run( last_item = origin_items[-1] # We know it's not None because of the check - # origin.read.supports_incremental + # origin.read.supports_incremental(mode) # Adding these kinds of asserts which are anyway removed # in optimized Python bytecode is for type checkers only - assert origin.read.get_incremental_token is not None + assert origin.read.incremental_token_handler is not None + token_handler = token_handler = origin.read.incremental_token_handler(mode) + assert token_handler is not None # for type-checker try: - next_incremental_token = origin.read.get_incremental_token(last_item) + next_incremental_token = token_handler(last_item) except Exception as e: events[Event.getting_incremental_token_failure] += 1 adapter.exception( diff --git a/src/hrflow_connectors/v2/core/warehouse.py b/src/hrflow_connectors/v2/core/warehouse.py index dbb5b2bb..ef253c3b 100644 --- a/src/hrflow_connectors/v2/core/warehouse.py +++ b/src/hrflow_connectors/v2/core/warehouse.py @@ -139,6 +139,24 @@ def for_mode(self, mode: Mode): return self.archive +@dataclass +class IncrementalTokenHandler: + create: t.Optional[t.Callable[[dict], str]] = None + update: t.Optional[t.Callable[[dict], str]] = None + archive: t.Optional[t.Callable[[dict], str]] = None + + def get_handler(self, mode: Mode) -> t.Optional[t.Callable[[dict], str]]: + if mode is Mode.create: + return self.create + if mode is Mode.update: + return self.update + if mode is Mode.archive: + return self.archive + + def __call__(self, mode: Mode) -> t.Optional[t.Callable[[dict], str]]: + return self.get_handler(mode) + + OperationT = t.TypeVar("OperationT", Read, Write) @@ -151,11 +169,13 @@ class Operation(t.Generic[OperationT]): @dataclass class ReadOperation(Operation[Read]): - get_incremental_token: t.Optional[t.Callable[[dict], str]] = None + # get_incremental_token: t.Optional[t.Callable[[dict], str]] = None + incremental_token_handler: t.Optional[IncrementalTokenHandler] = None - @property - def supports_incremental(self): - return self.get_incremental_token is not None + def supports_incremental(self, mode: Mode) -> bool: + if self.incremental_token_handler is None: + return False + return self.incremental_token_handler.get_handler(mode) is not None def __call__( self, diff --git a/tests/v2/core/src/hrflow_connectors/connectors/smartleads/aisles/leads.py b/tests/v2/core/src/hrflow_connectors/connectors/smartleads/aisles/leads.py index 938ba71b..f661fa4f 100644 --- a/tests/v2/core/src/hrflow_connectors/connectors/smartleads/aisles/leads.py +++ b/tests/v2/core/src/hrflow_connectors/connectors/smartleads/aisles/leads.py @@ -9,6 +9,7 @@ from hrflow_connectors.v2.core.warehouse import ( Aisle, Criterias, + IncrementalTokenHandler, ReadOperation, WriteOperation, merge, @@ -309,7 +310,9 @@ def read_archived( update=ReadUpdatedCriterias, archive=ReadArchivedCriterias, ), - get_incremental_token=lambda lead: str(lead["id"]), + incremental_token_handler=IncrementalTokenHandler( + create=lambda lead: str(lead["id"]), + ), ), write=WriteOperation( function=merge(create=create, update=update, archive=archive), diff --git a/tests/v2/core/test_connector.py b/tests/v2/core/test_connector.py index 0b8f914f..b9224587 100644 --- a/tests/v2/core/test_connector.py +++ b/tests/v2/core/test_connector.py @@ -500,7 +500,7 @@ def test_outcome_if_running_incremental_but_origin_does_not_support_it( ): assert LeadsAisle.read is not None - with mock.patch.object(LeadsAisle.read, "get_incremental_token", None): + with mock.patch.object(LeadsAisle.read, "incremental_token_handler", None): result = SmartLeads.create_jobs_in_hrflow( workflow_id="test", connector_auth=dict(smart_tag="smart::tag::smart"), @@ -569,7 +569,7 @@ def test_outcome_if_getting_incremental_token_fails( assert LeadsAisle.read is not None with mock.patch.object( - LeadsAisle.read, "get_incremental_token", side_effect=Exception + LeadsAisle.read.incremental_token_handler, "create", side_effect=Exception ): result = SmartLeads.create_jobs_in_hrflow( workflow_id=workflow_id, diff --git a/tests/v2/core/test_warehouse.py b/tests/v2/core/test_warehouse.py index 68253093..727ee076 100644 --- a/tests/v2/core/test_warehouse.py +++ b/tests/v2/core/test_warehouse.py @@ -8,6 +8,7 @@ from hrflow_connectors.v2.core.warehouse import ( Aisle, Criterias, + IncrementalTokenHandler, ModeIsNotSupported, ReadOperation, Warehouse, @@ -50,7 +51,7 @@ def test_read_supports_incremental(): ReadOperation( function=MagicMock(), criterias=Criterias(), - ).supports_incremental + ).supports_incremental(Mode.create) is False ) @@ -58,8 +59,10 @@ def test_read_supports_incremental(): ReadOperation( function=MagicMock(), criterias=Criterias(), - get_incremental_token=lambda *args, **kwargs: "token", - ).supports_incremental + incremental_token_handler=IncrementalTokenHandler( + create=lambda *args, **kwargs: "token", + ), + ).supports_incremental(Mode.create) is True )