Skip to content

Commit

Permalink
fix(low-code cdk pagination): Fix the offset strategy so that it rese…
Browse files Browse the repository at this point in the history
…ts back to 0 when a stream is an incremental data feed (#202)
  • Loading branch information
brianjlai authored Jan 6, 2025
1 parent f8cb659 commit 884897e
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ def next_page_token(
return self._offset

def reset(self, reset_value: Optional[Any] = 0) -> None:
if not isinstance(reset_value, int):
if reset_value is None:
self._offset = 0
elif not isinstance(reset_value, int):
raise ValueError(
f"Reset value {reset_value} for OffsetIncrement pagination strategy was not an integer"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ def next_page_token(
return self._delegate.next_page_token(response, last_page_size, last_record)

def reset(self, reset_value: Optional[Any] = None) -> None:
self._delegate.reset(reset_value)
if reset_value:
self._delegate.reset(reset_value)
else:
self._delegate.reset()

def get_page_size(self) -> Optional[int]:
return self._delegate.get_page_size()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import requests

from airbyte_cdk.sources.declarative.decoders import JsonDecoder, XmlDecoder
from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import (
DefaultPaginator,
Expand All @@ -22,6 +23,10 @@
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import (
OffsetIncrement,
)
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.stop_condition import (
CursorStopCondition,
StopConditionPaginationStrategyDecorator,
)
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath


Expand Down Expand Up @@ -356,6 +361,40 @@ def test_reset(inject_on_first_request):
assert request_parameters_for_second_request != request_parameters_after_reset


def test_data_feed_paginator_with_stop_page_condition():
config = {}

cursor = DatetimeBasedCursor(
cursor_field="updated_at",
datetime_format="%Y-%m-%d",
start_datetime="2024-01-01",
config=config,
parameters={},
)

wrapped_strategy = StopConditionPaginationStrategyDecorator(
_delegate=OffsetIncrement(
config={}, page_size=2, inject_on_first_request=False, parameters={}
),
stop_condition=CursorStopCondition(cursor=cursor),
)

paginator = DefaultPaginator(
pagination_strategy=wrapped_strategy,
config=config,
url_base="https://airbyte.io",
parameters={},
page_size_option=RequestOption(
inject_into=RequestOptionType.request_parameter, field_name="limit", parameters={}
),
page_token_option=RequestOption(
inject_into=RequestOptionType.request_parameter, field_name="offset", parameters={}
),
)

paginator.reset()


def test_initial_token_with_offset_pagination():
page_size_request_option = RequestOption(
inject_into=RequestOptionType.request_parameter, field_name="limit", parameters={}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,5 @@ def test_offset_increment_reset(reset_value, expected_initial_token, expected_er
with pytest.raises(expected_error):
paginator_strategy.reset(reset_value=reset_value)
else:
if reset_value is None:
paginator_strategy.reset()
else:
paginator_strategy.reset(reset_value=reset_value)
paginator_strategy.reset(reset_value=reset_value)
assert paginator_strategy.initial_token == expected_initial_token
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def test_when_reset_then_delegate(mocked_pagination_strategy, mocked_stop_condit
mocked_pagination_strategy, mocked_stop_condition
)
decorator.reset()
mocked_pagination_strategy.reset.assert_called_once_with(None)
mocked_pagination_strategy.reset.assert_called_once_with()


def test_when_get_page_size_then_delegate(mocked_pagination_strategy, mocked_stop_condition):
Expand Down

0 comments on commit 884897e

Please sign in to comment.