From 14375fe9a80c943ba04f92650b48a05d3600dd54 Mon Sep 17 00:00:00 2001 From: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com> Date: Wed, 22 Jan 2025 14:23:03 -0500 Subject: [PATCH] feat(concurrent cursor): attempt at clamping datetime (#234) Co-authored-by: octavia-squidington-iii Co-authored-by: brianjlai --- .../declarative_component_schema.yaml | 23 ++++ .../models/declarative_component_schema.py | 24 +++- .../parsers/model_to_component_factory.py | 80 ++++++++++- .../sources/streams/concurrent/clamping.py | 99 ++++++++++++++ .../sources/streams/concurrent/cursor.py | 57 +++----- .../streams/concurrent/cursor_types.py | 32 +++++ .../test_model_to_component_factory.py | 113 ++++++++++++++++ .../streams/concurrent/test_clamping.py | 111 +++++++++++++++ .../sources/streams/concurrent/test_cursor.py | 128 ++++++++++++++++++ 9 files changed, 623 insertions(+), 44 deletions(-) create mode 100644 airbyte_cdk/sources/streams/concurrent/clamping.py create mode 100644 airbyte_cdk/sources/streams/concurrent/cursor_types.py create mode 100644 unit_tests/sources/streams/concurrent/test_clamping.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 687943360..82706ae92 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -784,6 +784,29 @@ definitions: type: type: string enum: [DatetimeBasedCursor] + clamping: + title: Date Range Clamping + description: This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month) + type: object + required: + - target + properties: + target: + title: Target + description: The period of time that datetime windows will be clamped by + # This should ideally be an enum. However, we don't use an enum because we want to allow for connectors + # to support interpolation on the connector config to get the target which is an arbitrary string + type: string + interpolation_context: + - config + examples: + - "DAY" + - "WEEK" + - "MONTH" + - "{{ config['target'] }}" + target_details: + type: object + additionalProperties: true cursor_field: title: Cursor Field description: The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index fa4a00d18..d0664aa2a 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -328,6 +328,16 @@ class Config: type: Optional[Literal["LegacyToPerPartitionStateMigration"]] = None +class Clamping(BaseModel): + target: str = Field( + ..., + description="The period of time that datetime windows will be clamped by", + examples=["DAY", "WEEK", "MONTH", "{{ config['target'] }}"], + title="Target", + ) + target_details: Optional[Dict[str, Any]] = None + + class Algorithm(Enum): HS256 = "HS256" HS384 = "HS384" @@ -719,7 +729,7 @@ class HttpResponseFilter(BaseModel): class TypesMap(BaseModel): target_type: Union[str, List[str]] current_type: Union[str, List[str]] - condition: Optional[str] + condition: Optional[str] = None class SchemaTypeIdentifier(BaseModel): @@ -797,14 +807,11 @@ class DpathFlattenFields(BaseModel): field_path: List[str] = Field( ..., description="A path to field that needs to be flattened.", - examples=[ - ["data"], - ["data", "*", "field"], - ], + examples=[["data"], ["data", "*", "field"]], title="Field Path", ) delete_origin_value: Optional[bool] = Field( - False, + None, description="Whether to delete the origin value or keep it. Default is False.", title="Delete Origin Value", ) @@ -1454,6 +1461,11 @@ class AuthFlow(BaseModel): class DatetimeBasedCursor(BaseModel): type: Literal["DatetimeBasedCursor"] + clamping: Optional[Clamping] = Field( + None, + description="This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month)", + title="Date Range Clamping", + ) cursor_field: str = Field( ..., description="The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c39ae0a68..4fb38acd1 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -8,7 +8,6 @@ import importlib import inspect import re -import sys from functools import partial from typing import ( Any, @@ -102,6 +101,7 @@ LegacyToPerPartitionStateMigration, ) from airbyte_cdk.sources.declarative.models import ( + Clamping, CustomStateMigration, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( @@ -462,6 +462,15 @@ LogAppenderMessageRepositoryDecorator, MessageRepository, ) +from airbyte_cdk.sources.streams.concurrent.clamping import ( + ClampingEndProvider, + ClampingStrategy, + DayClampingStrategy, + MonthClampingStrategy, + NoClamping, + WeekClampingStrategy, + Weekday, +) from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( CustomFormatConcurrentStreamStateConverter, @@ -1048,6 +1057,53 @@ def create_concurrent_cursor_from_datetime_based_cursor( if evaluated_step: step_length = parse_duration(evaluated_step) + clamping_strategy: ClampingStrategy = NoClamping() + if datetime_based_cursor_model.clamping: + # While it is undesirable to interpolate within the model factory (as opposed to at runtime), + # it is still better than shifting interpolation low-code concept into the ConcurrentCursor runtime + # object which we want to keep agnostic of being low-code + target = InterpolatedString( + string=datetime_based_cursor_model.clamping.target, + parameters=datetime_based_cursor_model.parameters or {}, + ) + evaluated_target = target.eval(config=config) + match evaluated_target: + case "DAY": + clamping_strategy = DayClampingStrategy() + end_date_provider = ClampingEndProvider( + DayClampingStrategy(is_ceiling=False), + end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + granularity=cursor_granularity or datetime.timedelta(seconds=1), + ) + case "WEEK": + if ( + not datetime_based_cursor_model.clamping.target_details + or "weekday" not in datetime_based_cursor_model.clamping.target_details + ): + raise ValueError( + "Given WEEK clamping, weekday needs to be provided as target_details" + ) + weekday = self._assemble_weekday( + datetime_based_cursor_model.clamping.target_details["weekday"] + ) + clamping_strategy = WeekClampingStrategy(weekday) + end_date_provider = ClampingEndProvider( + WeekClampingStrategy(weekday, is_ceiling=False), + end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + granularity=cursor_granularity or datetime.timedelta(days=1), + ) + case "MONTH": + clamping_strategy = MonthClampingStrategy() + end_date_provider = ClampingEndProvider( + MonthClampingStrategy(is_ceiling=False), + end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + granularity=cursor_granularity or datetime.timedelta(days=1), + ) + case _: + raise ValueError( + f"Invalid clamping target {evaluated_target}, expected DAY, WEEK, MONTH" + ) + return ConcurrentCursor( stream_name=stream_name, stream_namespace=stream_namespace, @@ -1062,7 +1118,27 @@ def create_concurrent_cursor_from_datetime_based_cursor( lookback_window=lookback_window, slice_range=step_length, cursor_granularity=cursor_granularity, - ) + clamping_strategy=clamping_strategy, + ) + + def _assemble_weekday(self, weekday: str) -> Weekday: + match weekday: + case "MONDAY": + return Weekday.MONDAY + case "TUESDAY": + return Weekday.TUESDAY + case "WEDNESDAY": + return Weekday.WEDNESDAY + case "THURSDAY": + return Weekday.THURSDAY + case "FRIDAY": + return Weekday.FRIDAY + case "SATURDAY": + return Weekday.SATURDAY + case "SUNDAY": + return Weekday.SUNDAY + case _: + raise ValueError(f"Unknown weekday {weekday}") @staticmethod def create_constant_backoff_strategy( diff --git a/airbyte_cdk/sources/streams/concurrent/clamping.py b/airbyte_cdk/sources/streams/concurrent/clamping.py new file mode 100644 index 000000000..022534bc7 --- /dev/null +++ b/airbyte_cdk/sources/streams/concurrent/clamping.py @@ -0,0 +1,99 @@ +from abc import ABC +from datetime import datetime, timedelta +from enum import Enum +from typing import Callable + +from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType + + +class ClampingStrategy(ABC): + def clamp(self, value: CursorValueType) -> CursorValueType: + raise NotImplementedError() + + +class NoClamping(ClampingStrategy): + def clamp(self, value: CursorValueType) -> CursorValueType: + return value + + +class ClampingEndProvider: + def __init__( + self, + clamping_strategy: ClampingStrategy, + end_provider: Callable[[], CursorValueType], + granularity: timedelta, + ) -> None: + self._clamping_strategy = clamping_strategy + self._end_provider = end_provider + self._granularity = granularity + + def __call__(self) -> CursorValueType: + return self._clamping_strategy.clamp(self._end_provider()) - self._granularity + + +class DayClampingStrategy(ClampingStrategy): + def __init__(self, is_ceiling: bool = True) -> None: + self._is_ceiling = is_ceiling + + def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType + return_value = value.replace(hour=0, minute=0, second=0, microsecond=0) + if self._is_ceiling: + return return_value + timedelta(days=1) + return return_value + + +class MonthClampingStrategy(ClampingStrategy): + def __init__(self, is_ceiling: bool = True) -> None: + self._is_ceiling = is_ceiling + + def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType + return_value = value.replace(hour=0, minute=0, second=0, microsecond=0) + needs_to_round = value.day != 1 + if not needs_to_round: + return return_value + + return self._ceil(return_value) if self._is_ceiling else return_value.replace(day=1) + + def _ceil(self, value: datetime) -> datetime: + return value.replace( + year=value.year + 1 if value.month == 12 else value.year, + month=(value.month % 12) + 1, + day=1, + hour=0, + minute=0, + second=0, + microsecond=0, + ) + + +class Weekday(Enum): + """ + These integer values map to the same ones used by the Datetime.date.weekday() implementation + """ + + MONDAY = 0 + TUESDAY = 1 + WEDNESDAY = 2 + THURSDAY = 3 + FRIDAY = 4 + SATURDAY = 5 + SUNDAY = 6 + + +class WeekClampingStrategy(ClampingStrategy): + def __init__(self, day_of_week: Weekday, is_ceiling: bool = True) -> None: + self._day_of_week = day_of_week.value + self._is_ceiling = is_ceiling + + def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType + days_diff_to_ceiling = ( + 7 - (value.weekday() - self._day_of_week) + if value.weekday() > self._day_of_week + else abs(value.weekday() - self._day_of_week) + ) + delta = ( + timedelta(days_diff_to_ceiling) + if self._is_ceiling + else timedelta(days_diff_to_ceiling - 7) + ) + return value.replace(hour=0, minute=0, second=0, microsecond=0) + delta diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index cbce82a94..73e45cdd1 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -13,7 +13,6 @@ Mapping, MutableMapping, Optional, - Protocol, Tuple, Union, ) @@ -21,6 +20,8 @@ from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY +from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping +from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( @@ -35,36 +36,6 @@ def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any: return functools.reduce(lambda a, b: a[b], path, mapping) -class GapType(Protocol): - """ - This is the representation of gaps between two cursor values. Examples: - * if cursor values are datetimes, GapType is timedelta - * if cursor values are integer, GapType will also be integer - """ - - pass - - -class CursorValueType(Protocol): - """Protocol for annotating comparable types.""" - - @abstractmethod - def __lt__(self: "CursorValueType", other: "CursorValueType") -> bool: - pass - - @abstractmethod - def __ge__(self: "CursorValueType", other: "CursorValueType") -> bool: - pass - - @abstractmethod - def __add__(self: "CursorValueType", other: GapType) -> "CursorValueType": - pass - - @abstractmethod - def __sub__(self: "CursorValueType", other: GapType) -> "CursorValueType": - pass - - class CursorField: def __init__(self, cursor_field_key: str) -> None: self.cursor_field_key = cursor_field_key @@ -172,6 +143,7 @@ def __init__( lookback_window: Optional[GapType] = None, slice_range: Optional[GapType] = None, cursor_granularity: Optional[GapType] = None, + clamping_strategy: ClampingStrategy = NoClamping(), ) -> None: self._stream_name = stream_name self._stream_namespace = stream_namespace @@ -193,6 +165,7 @@ def __init__( self._cursor_granularity = cursor_granularity # Flag to track if the logger has been triggered (per stream) self._should_be_synced_logger_triggered = False + self._clamping_strategy = clamping_strategy @property def state(self) -> MutableMapping[str, Any]: @@ -408,10 +381,12 @@ def _split_per_slice_range( lower = max(lower, self._start) if self._start else lower if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: + clamped_lower = self._clamping_strategy.clamp(lower) + clamped_upper = self._clamping_strategy.clamp(upper) start_value, end_value = ( - (lower, upper - self._cursor_granularity) + (clamped_lower, clamped_upper - self._cursor_granularity) if self._cursor_granularity and not upper_is_end - else (lower, upper) + else (clamped_lower, clamped_upper) ) yield StreamSlice( partition={}, @@ -433,11 +408,21 @@ def _split_per_slice_range( ) has_reached_upper_boundary = current_upper_boundary >= upper + clamped_upper = ( + self._clamping_strategy.clamp(current_upper_boundary) + if current_upper_boundary != upper + else current_upper_boundary + ) + clamped_lower = self._clamping_strategy.clamp(current_lower_boundary) + if clamped_lower >= clamped_upper: + # clamping collapsed both values which means that it is time to stop processing + # FIXME should this be replace by proper end_provider + break start_value, end_value = ( - (current_lower_boundary, current_upper_boundary - self._cursor_granularity) + (clamped_lower, clamped_upper - self._cursor_granularity) if self._cursor_granularity and (not upper_is_end or not has_reached_upper_boundary) - else (current_lower_boundary, current_upper_boundary) + else (clamped_lower, clamped_upper) ) yield StreamSlice( partition={}, @@ -450,7 +435,7 @@ def _split_per_slice_range( ]: self._connector_state_converter.output_format(end_value), }, ) - current_lower_boundary = current_upper_boundary + current_lower_boundary = clamped_upper if current_upper_boundary >= upper: stop_processing = True diff --git a/airbyte_cdk/sources/streams/concurrent/cursor_types.py b/airbyte_cdk/sources/streams/concurrent/cursor_types.py new file mode 100644 index 000000000..4a4dbe63c --- /dev/null +++ b/airbyte_cdk/sources/streams/concurrent/cursor_types.py @@ -0,0 +1,32 @@ +from abc import abstractmethod +from typing import Protocol + + +class GapType(Protocol): + """ + This is the representation of gaps between two cursor values. Examples: + * if cursor values are datetimes, GapType is timedelta + * if cursor values are integer, GapType will also be integer + """ + + pass + + +class CursorValueType(Protocol): + """Protocol for annotating comparable types.""" + + @abstractmethod + def __lt__(self: "CursorValueType", other: "CursorValueType") -> bool: + pass + + @abstractmethod + def __ge__(self: "CursorValueType", other: "CursorValueType") -> bool: + pass + + @abstractmethod + def __add__(self: "CursorValueType", other: GapType) -> "CursorValueType": + pass + + @abstractmethod + def __sub__(self: "CursorValueType", other: GapType) -> "CursorValueType": + pass diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index c50e9e6e9..7bfdc0379 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -135,6 +135,14 @@ from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource +from airbyte_cdk.sources.streams.concurrent.clamping import ( + ClampingEndProvider, + DayClampingStrategy, + MonthClampingStrategy, + NoClamping, + WeekClampingStrategy, + Weekday, +) from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( CustomFormatConcurrentStreamStateConverter, @@ -3321,6 +3329,111 @@ def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined(): } +@pytest.mark.parametrize( + "clamping,expected_clamping_strategy,expected_error", + [ + pytest.param( + {"target": "DAY", "target_details": {}}, + DayClampingStrategy, + None, + id="test_day_clamping_strategy", + ), + pytest.param( + {"target": "WEEK", "target_details": {"weekday": "SUNDAY"}}, + WeekClampingStrategy, + None, + id="test_week_clamping_strategy", + ), + pytest.param( + {"target": "MONTH", "target_details": {}}, + MonthClampingStrategy, + None, + id="test_month_clamping_strategy", + ), + pytest.param( + {"target": "WEEK", "target_details": {}}, + None, + ValueError, + id="test_week_clamping_strategy_no_target_details", + ), + pytest.param( + {"target": "FAKE", "target_details": {}}, + None, + ValueError, + id="test_invalid_clamping_target", + ), + pytest.param( + {"target": "{{ config['clamping_target'] }}"}, + MonthClampingStrategy, + None, + id="test_clamping_with_interpolation", + ), + ], +) +def test_create_concurrent_cursor_from_datetime_based_cursor_with_clamping( + clamping, + expected_clamping_strategy, + expected_error, +): + config = { + "start_time": "2024-08-01T00:00:00.000000Z", + "end_time": "2024-10-15T00:00:00.000000Z", + "clamping_target": "MONTH", + } + + cursor_component_definition = { + "type": "DatetimeBasedCursor", + "cursor_field": "updated_at", + "datetime_format": "%Y-%m-%dT%H:%M:%S.%fZ", + "start_datetime": "{{ config['start_time'] }}", + "end_datetime": "{{ config['end_time'] }}", + "partition_field_start": "custom_start", + "partition_field_end": "custom_end", + "step": "P10D", + "cursor_granularity": "PT1S", + "lookback_window": "P3D", + "clamping": clamping, + } + + connector_state_manager = ConnectorStateManager() + + connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) + + stream_name = "test" + + if expected_error: + with pytest.raises(ValueError): + connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor( + state_manager=connector_state_manager, + model_type=DatetimeBasedCursorModel, + component_definition=cursor_component_definition, + stream_name=stream_name, + stream_namespace=None, + config=config, + stream_state={}, + ) + + else: + concurrent_cursor = ( + connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor( + state_manager=connector_state_manager, + model_type=DatetimeBasedCursorModel, + component_definition=cursor_component_definition, + stream_name=stream_name, + stream_namespace=None, + config=config, + stream_state={}, + ) + ) + + assert concurrent_cursor._clamping_strategy.__class__ == expected_clamping_strategy + assert isinstance(concurrent_cursor._end_provider, ClampingEndProvider) + assert isinstance( + concurrent_cursor._end_provider._clamping_strategy, expected_clamping_strategy + ) + assert concurrent_cursor._end_provider._granularity == datetime.timedelta(seconds=1) + + class CustomRecordExtractor(RecordExtractor): def extract_records( self, diff --git a/unit_tests/sources/streams/concurrent/test_clamping.py b/unit_tests/sources/streams/concurrent/test_clamping.py new file mode 100644 index 000000000..f66f4fe5d --- /dev/null +++ b/unit_tests/sources/streams/concurrent/test_clamping.py @@ -0,0 +1,111 @@ +from datetime import datetime +from unittest import TestCase + +from airbyte_cdk.sources.streams.concurrent.clamping import ( + DayClampingStrategy, + MonthClampingStrategy, + WeekClampingStrategy, + Weekday, +) + +_DATETIME_ON_TUESDAY = datetime(2025, 1, 14) +_DATETIME_ON_WEDNESDAY = datetime(2025, 1, 15) + + +class DayClampingStrategyTest(TestCase): + def setUp(self) -> None: + self._strategy = DayClampingStrategy() + + def test_when_clamp_then_remove_every_unit_smaller_than_days(self) -> None: + result = self._strategy.clamp(datetime(2024, 1, 1, 20, 23, 3, 2039)) + assert result.hour == 0 + assert result.minute == 0 + assert result.second == 0 + assert result.microsecond == 0 + + def test_given_last_day_of_month_when_clamp_then_result_is_next_month(self) -> None: + result = self._strategy.clamp(datetime(2024, 1, 31)) + assert result == datetime(2024, 2, 1) + + def test_given_is_not_ceiling_when_clamp_then_just_remove_unit_smaller_than_days(self) -> None: + strategy = DayClampingStrategy(is_ceiling=False) + result = strategy.clamp(datetime(2024, 1, 1, 20, 23, 3, 1029)) + assert result == datetime(2024, 1, 1) + + +class MonthClampingStrategyTest(TestCase): + def setUp(self) -> None: + self._strategy = MonthClampingStrategy() + + def test_when_clamp_then_remove_every_unit_smaller_than_days(self) -> None: + result = self._strategy.clamp(datetime(2024, 1, 1, 20, 23, 3, 2039)) + assert result.hour == 0 + assert result.minute == 0 + assert result.second == 0 + assert result.microsecond == 0 + + def test_given_first_day_of_month_when_clamp_then_return_same_date(self) -> None: + first_day_of_the_month = datetime(2024, 1, 1) + result = self._strategy.clamp(first_day_of_the_month) + assert result == first_day_of_the_month + + def test_given_day_of_month_is_not_1_when_clamp_then_return_first_day_of_next_month( + self, + ) -> None: + result = self._strategy.clamp(datetime(2024, 1, 2)) + assert result == datetime(2024, 2, 1) + + def test_given_not_ceiling_and_day_of_month_is_not_1_when_clamp_then_return_first_day_of_next_month( + self, + ) -> None: + strategy = MonthClampingStrategy(is_ceiling=False) + result = strategy.clamp(datetime(2024, 1, 2)) + assert result == datetime(2024, 1, 1) + + +class WeekClampingStrategyTest(TestCase): + def setUp(self) -> None: + self._strategy = WeekClampingStrategy(Weekday.TUESDAY) + + def test_when_clamp_then_remove_every_unit_smaller_than_days(self) -> None: + result = self._strategy.clamp(datetime(2024, 1, 1, 20, 23, 3, 2039)) + assert result.hour == 0 + assert result.minute == 0 + assert result.second == 0 + assert result.microsecond == 0 + + def test_given_same_weekday_when_clamp_then_return_same_date(self) -> None: + strategy = WeekClampingStrategy(Weekday.TUESDAY) + result = strategy.clamp(_DATETIME_ON_TUESDAY) + assert result == _DATETIME_ON_TUESDAY + + def test_given_not_weekday_before_target_when_clamp_then_return_next_occurrence_of_same_weekday( + self, + ) -> None: + strategy = WeekClampingStrategy(Weekday.TUESDAY) + result = strategy.clamp(_DATETIME_ON_WEDNESDAY) + assert result == datetime( + _DATETIME_ON_WEDNESDAY.year, + _DATETIME_ON_WEDNESDAY.month, + _DATETIME_ON_WEDNESDAY.day + 6, + ) + + def test_given_not_weekday_after_target_when_clamp_then_return_next_occurrence_of_same_weekday( + self, + ) -> None: + strategy = WeekClampingStrategy(Weekday.FRIDAY) + result = strategy.clamp(_DATETIME_ON_WEDNESDAY) + assert result == datetime( + _DATETIME_ON_WEDNESDAY.year, + _DATETIME_ON_WEDNESDAY.month, + _DATETIME_ON_WEDNESDAY.day + 2, + ) + + def test_given_not_ceiling_when_clamp_then_round_down(self) -> None: + strategy = WeekClampingStrategy(Weekday.FRIDAY, is_ceiling=False) + result = strategy.clamp(_DATETIME_ON_WEDNESDAY) + assert result == datetime( + _DATETIME_ON_WEDNESDAY.year, + _DATETIME_ON_WEDNESDAY.month, + _DATETIME_ON_WEDNESDAY.day - 5, + ) diff --git a/unit_tests/sources/streams/concurrent/test_cursor.py b/unit_tests/sources/streams/concurrent/test_cursor.py index 008b5f780..e87976964 100644 --- a/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/unit_tests/sources/streams/concurrent/test_cursor.py @@ -16,6 +16,13 @@ from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor from airbyte_cdk.sources.message import MessageRepository +from airbyte_cdk.sources.streams.concurrent.clamping import ( + ClampingEndProvider, + ClampingStrategy, + MonthClampingStrategy, + WeekClampingStrategy, + Weekday, +) from airbyte_cdk.sources.streams.concurrent.cursor import ( ConcurrentCursor, CursorField, @@ -26,6 +33,7 @@ ConcurrencyCompatibleStateType, ) from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( + CustomFormatConcurrentStreamStateConverter, EpochValueConcurrentStreamStateConverter, IsoMillisConcurrentStreamStateConverter, ) @@ -38,6 +46,7 @@ _NO_PARTITION_IDENTIFIER = None _NO_SLICE = None _NO_SLICE_BOUNDARIES = None +_NOT_SEQUENTIAL = False _LOWER_SLICE_BOUNDARY_FIELD = "lower_boundary" _UPPER_SLICE_BOUNDARY_FIELD = "upper_boundary" _SLICE_BOUNDARY_FIELDS = (_LOWER_SLICE_BOUNDARY_FIELD, _UPPER_SLICE_BOUNDARY_FIELD) @@ -942,6 +951,125 @@ def test_given_initial_state_is_sequential_and_start_provided_when_generate_slic ) +class ClampingIntegrationTest(TestCase): + def setUp(self) -> None: + self._message_repository = Mock(spec=MessageRepository) + self._state_manager = Mock(spec=ConnectorStateManager) + + def _cursor( + self, + start: datetime, + end_provider, + slice_range: timedelta, + granularity: Optional[timedelta], + clamping_strategy: ClampingStrategy, + ) -> ConcurrentCursor: + return ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + {}, + self._message_repository, + self._state_manager, + CustomFormatConcurrentStreamStateConverter( + "%Y-%m-%dT%H:%M:%SZ", is_sequential_state=_NOT_SEQUENTIAL + ), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + start, + end_provider, + slice_range=slice_range, + cursor_granularity=granularity, + clamping_strategy=clamping_strategy, + ) + + @freezegun.freeze_time(time_to_freeze=datetime(2025, 1, 3, tzinfo=timezone.utc)) + def test_given_monthly_clamp_without_granularity_when_stream_slices_then_upper_boundaries_equals_next_lower_boundary( + self, + ) -> None: + cursor = self._cursor( + start=datetime(2023, 12, 31, tzinfo=timezone.utc), + end_provider=ClampingEndProvider( + MonthClampingStrategy(is_ceiling=False), + CustomFormatConcurrentStreamStateConverter.get_end_provider(), + granularity=timedelta(days=1), + ), + slice_range=timedelta(days=27), + granularity=None, + clamping_strategy=MonthClampingStrategy(), + ) + stream_slices = list(cursor.stream_slices()) + assert stream_slices == [ + {"lower_boundary": "2024-01-01T00:00:00Z", "upper_boundary": "2024-02-01T00:00:00Z"}, + {"lower_boundary": "2024-02-01T00:00:00Z", "upper_boundary": "2024-03-01T00:00:00Z"}, + {"lower_boundary": "2024-03-01T00:00:00Z", "upper_boundary": "2024-04-01T00:00:00Z"}, + {"lower_boundary": "2024-04-01T00:00:00Z", "upper_boundary": "2024-05-01T00:00:00Z"}, + {"lower_boundary": "2024-05-01T00:00:00Z", "upper_boundary": "2024-06-01T00:00:00Z"}, + {"lower_boundary": "2024-06-01T00:00:00Z", "upper_boundary": "2024-07-01T00:00:00Z"}, + {"lower_boundary": "2024-07-01T00:00:00Z", "upper_boundary": "2024-08-01T00:00:00Z"}, + {"lower_boundary": "2024-08-01T00:00:00Z", "upper_boundary": "2024-09-01T00:00:00Z"}, + {"lower_boundary": "2024-09-01T00:00:00Z", "upper_boundary": "2024-10-01T00:00:00Z"}, + {"lower_boundary": "2024-10-01T00:00:00Z", "upper_boundary": "2024-11-01T00:00:00Z"}, + {"lower_boundary": "2024-11-01T00:00:00Z", "upper_boundary": "2024-12-01T00:00:00Z"}, + {"lower_boundary": "2024-12-01T00:00:00Z", "upper_boundary": "2025-01-01T00:00:00Z"}, + ] + + @freezegun.freeze_time(time_to_freeze=datetime(2025, 1, 3, tzinfo=timezone.utc)) + def test_given_monthly_clamp_and_granularity_when_stream_slices_then_consider_number_of_days_per_month( + self, + ) -> None: + cursor = self._cursor( + start=datetime(2023, 12, 31, tzinfo=timezone.utc), + end_provider=ClampingEndProvider( + MonthClampingStrategy(is_ceiling=False), + CustomFormatConcurrentStreamStateConverter.get_end_provider(), + granularity=timedelta(days=1), + ), + slice_range=timedelta(days=27), + granularity=timedelta(days=1), + clamping_strategy=MonthClampingStrategy(), + ) + stream_slices = list(cursor.stream_slices()) + assert stream_slices == [ + {"lower_boundary": "2024-01-01T00:00:00Z", "upper_boundary": "2024-01-31T00:00:00Z"}, + {"lower_boundary": "2024-02-01T00:00:00Z", "upper_boundary": "2024-02-29T00:00:00Z"}, + {"lower_boundary": "2024-03-01T00:00:00Z", "upper_boundary": "2024-03-31T00:00:00Z"}, + {"lower_boundary": "2024-04-01T00:00:00Z", "upper_boundary": "2024-04-30T00:00:00Z"}, + {"lower_boundary": "2024-05-01T00:00:00Z", "upper_boundary": "2024-05-31T00:00:00Z"}, + {"lower_boundary": "2024-06-01T00:00:00Z", "upper_boundary": "2024-06-30T00:00:00Z"}, + {"lower_boundary": "2024-07-01T00:00:00Z", "upper_boundary": "2024-07-31T00:00:00Z"}, + {"lower_boundary": "2024-08-01T00:00:00Z", "upper_boundary": "2024-08-31T00:00:00Z"}, + {"lower_boundary": "2024-09-01T00:00:00Z", "upper_boundary": "2024-09-30T00:00:00Z"}, + {"lower_boundary": "2024-10-01T00:00:00Z", "upper_boundary": "2024-10-31T00:00:00Z"}, + {"lower_boundary": "2024-11-01T00:00:00Z", "upper_boundary": "2024-11-30T00:00:00Z"}, + {"lower_boundary": "2024-12-01T00:00:00Z", "upper_boundary": "2024-12-31T00:00:00Z"}, + ] + + @freezegun.freeze_time(time_to_freeze=datetime(2024, 1, 31, tzinfo=timezone.utc)) + def test_given_weekly_clamp_and_granularity_when_stream_slices_then_slice_per_week( + self, + ) -> None: + cursor = self._cursor( + start=datetime( + 2023, 12, 31, tzinfo=timezone.utc + ), # this is Sunday so we expect start to be 2 days after + end_provider=ClampingEndProvider( + WeekClampingStrategy(Weekday.TUESDAY, is_ceiling=False), + CustomFormatConcurrentStreamStateConverter.get_end_provider(), + granularity=timedelta(days=1), + ), + slice_range=timedelta(days=7), + granularity=timedelta(days=1), + clamping_strategy=WeekClampingStrategy(Weekday.TUESDAY), + ) + stream_slices = list(cursor.stream_slices()) + assert stream_slices == [ + {"lower_boundary": "2024-01-02T00:00:00Z", "upper_boundary": "2024-01-08T00:00:00Z"}, + {"lower_boundary": "2024-01-09T00:00:00Z", "upper_boundary": "2024-01-15T00:00:00Z"}, + {"lower_boundary": "2024-01-16T00:00:00Z", "upper_boundary": "2024-01-22T00:00:00Z"}, + {"lower_boundary": "2024-01-23T00:00:00Z", "upper_boundary": "2024-01-29T00:00:00Z"}, + ] + + @freezegun.freeze_time(time_to_freeze=datetime(2024, 4, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) @pytest.mark.parametrize( "start_datetime,end_datetime,step,cursor_field,lookback_window,state,expected_slices",