Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(concurrent cursor): attempt at clamping datetime #234

Merged
merged 8 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,29 @@ definitions:
type:
type: string
enum: [DatetimeBasedCursor]
clamping:
title: Date Range Clamping
description: This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month)
type: object
required:
- target
properties:
target:
title: Target
description: The period of time that datetime windows will be clamped by
# This should ideally be an enum. However, we don't use an enum because we want to allow for connectors
# to support interpolation on the connector config to get the target which is an arbitrary string
type: string
interpolation_context:
- config
examples:
- "DAY"
- "WEEK"
- "MONTH"
- "{{ config['target'] }}"
target_details:
type: object
additionalProperties: true
cursor_field:
title: Cursor Field
description: The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,16 @@ class Config:
type: Optional[Literal["LegacyToPerPartitionStateMigration"]] = None


class Clamping(BaseModel):
target: str = Field(
...,
description="The period of time that datetime windows will be clamped by",
examples=["DAY", "WEEK", "MONTH", "{{ config['target'] }}"],
title="Target",
)
target_details: Optional[Dict[str, Any]] = None


class Algorithm(Enum):
HS256 = "HS256"
HS384 = "HS384"
Expand Down Expand Up @@ -719,7 +729,7 @@ class HttpResponseFilter(BaseModel):
class TypesMap(BaseModel):
target_type: Union[str, List[str]]
current_type: Union[str, List[str]]
condition: Optional[str]
condition: Optional[str] = None


class SchemaTypeIdentifier(BaseModel):
Expand Down Expand Up @@ -797,14 +807,11 @@ class DpathFlattenFields(BaseModel):
field_path: List[str] = Field(
...,
description="A path to field that needs to be flattened.",
examples=[
["data"],
["data", "*", "field"],
],
examples=[["data"], ["data", "*", "field"]],
title="Field Path",
)
delete_origin_value: Optional[bool] = Field(
False,
None,
description="Whether to delete the origin value or keep it. Default is False.",
title="Delete Origin Value",
)
Expand Down Expand Up @@ -1454,6 +1461,11 @@ class AuthFlow(BaseModel):

class DatetimeBasedCursor(BaseModel):
type: Literal["DatetimeBasedCursor"]
clamping: Optional[Clamping] = Field(
None,
description="This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month)",
title="Date Range Clamping",
)
cursor_field: str = Field(
...,
description="The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import importlib
import inspect
import re
from datetime import timedelta
from functools import partial
from typing import (
Any,
Expand Down Expand Up @@ -101,6 +102,7 @@
LegacyToPerPartitionStateMigration,
)
from airbyte_cdk.sources.declarative.models import (
Clamping,
CustomStateMigration,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
Expand Down Expand Up @@ -457,6 +459,15 @@
LogAppenderMessageRepositoryDecorator,
MessageRepository,
)
from airbyte_cdk.sources.streams.concurrent.clamping import (
ClampingEndProvider,
ClampingStrategy,
DayClampingStrategy,
MonthClampingStrategy,
NoClamping,
WeekClampingStrategy,
Weekday,
)
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
CustomFormatConcurrentStreamStateConverter,
Expand Down Expand Up @@ -1043,6 +1054,53 @@ def create_concurrent_cursor_from_datetime_based_cursor(
if evaluated_step:
step_length = parse_duration(evaluated_step)

clamping_strategy: ClampingStrategy = NoClamping()
if datetime_based_cursor_model.clamping:
# While it is undesirable to interpolate within the model factory (as opposed to at runtime),
# it is still better than shifting interpolation low-code concept into the ConcurrentCursor runtime
# object which we want to keep agnostic of being low-code
target = InterpolatedString(
string=datetime_based_cursor_model.clamping.target,
parameters=datetime_based_cursor_model.parameters or {},
)
evaluated_target = target.eval(config=config)
match evaluated_target:
case "DAY":
clamping_strategy = DayClampingStrategy()
end_date_provider = ClampingEndProvider(
DayClampingStrategy(is_ceiling=False),
end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
granularity=cursor_granularity or timedelta(seconds=1),
)
case "WEEK":
if (
not datetime_based_cursor_model.clamping.target_details
or "weekday" not in datetime_based_cursor_model.clamping.target_details
):
raise ValueError(
"Given WEEK clamping, weekday needs to be provided as target_details"
)
weekday = self._assemble_weekday(
datetime_based_cursor_model.clamping.target_details["weekday"]
)
clamping_strategy = WeekClampingStrategy(weekday)
end_date_provider = ClampingEndProvider(
WeekClampingStrategy(weekday, is_ceiling=False),
end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
granularity=cursor_granularity or timedelta(days=1),
)
case "MONTH":
clamping_strategy = MonthClampingStrategy()
end_date_provider = ClampingEndProvider(
MonthClampingStrategy(is_ceiling=False),
end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
granularity=cursor_granularity or timedelta(days=1),
)
case _:
raise ValueError(
f"Invalid clamping target {evaluated_target}, expected DAY, WEEK, MONTH"
)

return ConcurrentCursor(
stream_name=stream_name,
stream_namespace=stream_namespace,
Expand All @@ -1057,7 +1115,27 @@ def create_concurrent_cursor_from_datetime_based_cursor(
lookback_window=lookback_window,
slice_range=step_length,
cursor_granularity=cursor_granularity,
)
clamping_strategy=clamping_strategy,
)

def _assemble_weekday(self, weekday: str) -> Weekday:
match weekday:
case "MONDAY":
return Weekday.MONDAY
case "TUESDAY":
return Weekday.TUESDAY
case "WEDNESDAY":
return Weekday.WEDNESDAY
case "THURSDAY":
return Weekday.THURSDAY
case "FRIDAY":
return Weekday.FRIDAY
case "SATURDAY":
return Weekday.SATURDAY
case "SUNDAY":
return Weekday.SUNDAY
case _:
raise ValueError(f"Unknown weekday {weekday}")

@staticmethod
def create_constant_backoff_strategy(
Expand Down
99 changes: 99 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/clamping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from abc import ABC
from datetime import datetime, timedelta
from enum import Enum
from typing import Callable

from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType


class ClampingStrategy(ABC):
def clamp(self, value: CursorValueType) -> CursorValueType:
raise NotImplementedError()


class NoClamping(ClampingStrategy):
def clamp(self, value: CursorValueType) -> CursorValueType:
return value


class ClampingEndProvider:
def __init__(
self,
clamping_strategy: ClampingStrategy,
end_provider: Callable[[], CursorValueType],
granularity: timedelta,
) -> None:
self._clamping_strategy = clamping_strategy
self._end_provider = end_provider
self._granularity = granularity

def __call__(self) -> CursorValueType:
return self._clamping_strategy.clamp(self._end_provider()) - self._granularity


class DayClampingStrategy(ClampingStrategy):
def __init__(self, is_ceiling: bool = True) -> None:
self._is_ceiling = is_ceiling

def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType
return_value = value.replace(hour=0, minute=0, second=0, microsecond=0)
if self._is_ceiling:
return return_value + timedelta(days=1)
return return_value


class MonthClampingStrategy(ClampingStrategy):
def __init__(self, is_ceiling: bool = True) -> None:
self._is_ceiling = is_ceiling

def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType
return_value = value.replace(hour=0, minute=0, second=0, microsecond=0)
needs_to_round = value.day != 1
if not needs_to_round:
return return_value

return self._ceil(return_value) if self._is_ceiling else return_value.replace(day=1)

def _ceil(self, value: datetime) -> datetime:
return value.replace(
year=value.year + 1 if value.month == 12 else value.year,
month=(value.month % 12) + 1,
day=1,
hour=0,
minute=0,
second=0,
microsecond=0,
)


class Weekday(Enum):
"""
These integer values map to the same ones used by the Datetime.date.weekday() implementation
"""

MONDAY = 0
TUESDAY = 1
WEDNESDAY = 2
THURSDAY = 3
FRIDAY = 4
SATURDAY = 5
SUNDAY = 6


class WeekClampingStrategy(ClampingStrategy):
def __init__(self, day_of_week: Weekday, is_ceiling: bool = True) -> None:
self._day_of_week = day_of_week.value
self._is_ceiling = is_ceiling

def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType
days_diff_to_ceiling = (
7 - (value.weekday() - self._day_of_week)
if value.weekday() > self._day_of_week
else abs(value.weekday() - self._day_of_week)
)
delta = (
timedelta(days_diff_to_ceiling)
if self._is_ceiling
else timedelta(days_diff_to_ceiling - 7)
)
return value.replace(hour=0, minute=0, second=0, microsecond=0) + delta
Loading
Loading