Skip to content

Commit

Permalink
Source Stripe: add availability strategy (airbytehq#22659)
Browse files Browse the repository at this point in the history
* Source Stripe: add availability strategy

* Source Stripe: bump version, update changelog

* Source Stripe: optimize substream availability strategy, add tests

* Source Stripe: better names

* Source Stripe: small review fixes

* Update docs/integrations/sources/stripe.md

fix changelog message

Co-authored-by: Ella Rohm-Ensing <erohmensing@gmail.com>

* Update airbyte-integrations/connectors/source-stripe/unit_tests/test_availability_strategy.py

assert availability result

Co-authored-by: Ella Rohm-Ensing <erohmensing@gmail.com>

---------

Co-authored-by: Ella Rohm-Ensing <erohmensing@gmail.com>
  • Loading branch information
roman-yermilov-gl and erohmensing authored Feb 14, 2023
1 parent de51dff commit 1d406ef
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 7 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-stripe/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ COPY main.py ./
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.1
LABEL io.airbyte.version=1.0.2
LABEL io.airbyte.name=airbyte/source-stripe
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/full_refresh_configured_catalog.json"
empty_streams: ["external_account_bank_accounts"]
empty_streams: ["bank_accounts", "checkout_sessions", "checkout_sessions_line_items", "external_account_bank_accounts"]
# TEST 1 - Reading catalog without invoice_line_items
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/non_invoice_line_items_catalog.json"
empty_streams: ["transfers"]
timeout_seconds: 3600
# TEST 2 - Reading data from account that has no records for stream Disputes
- config_path: "secrets/connected_account_config.json"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging
from typing import Optional, Tuple

from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy


class StripeSubStreamAvailabilityStrategy(HttpAvailabilityStrategy):
def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional[Source]) -> Tuple[bool, Optional[str]]:
"""Traverse through all the parents of a given stream and run availability strategy on each of them"""
try:
current_stream, parent_stream = stream, getattr(stream, "parent")
except AttributeError:
return super().check_availability(stream, logger, source)
if parent_stream:
parent_stream_instance = getattr(current_stream, "get_parent_stream_instance")()
# Accessing the `availability_strategy` property will instantiate AvailabilityStrategy under the hood
availability_strategy = parent_stream_instance.availability_strategy
if availability_strategy:
is_available, reason = availability_strategy.check_availability(parent_stream_instance, logger, source)
if not is_available:
return is_available, reason
return super().check_availability(stream, logger, source)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.http import HttpStream

from source_stripe.availability_strategy import StripeSubStreamAvailabilityStrategy

STRIPE_ERROR_CODES: List = [
# stream requires additional permissions
"more_permissions_required",
Expand All @@ -32,10 +34,6 @@ def __init__(self, start_date: int, account_id: str, slice_range: int = DEFAULT_
self.start_date = start_date
self.slice_range = slice_range or self.DEFAULT_SLICE_RANGE

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
decoded_response = response.json()
if bool(decoded_response.get("has_more", "False")) and decoded_response.get("data", []):
Expand Down Expand Up @@ -318,6 +316,10 @@ def sub_items_attr(self) -> str:
If the stream has no primary keys, return None.
"""

@property
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return StripeSubStreamAvailabilityStrategy()

def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs):
params = super().request_params(stream_slice=stream_slice, **kwargs)

Expand All @@ -327,8 +329,11 @@ def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs):

return params

def get_parent_stream_instance(self):
return self.parent(authenticator=self.authenticator, account_id=self.account_id, start_date=self.start_date)

def read_records(self, sync_mode: SyncMode, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
parent_stream = self.parent(authenticator=self.authenticator, account_id=self.account_id, start_date=self.start_date)
parent_stream = self.get_parent_stream_instance()
slices = parent_stream.stream_slices(sync_mode=SyncMode.full_refresh)
for _slice in slices:
for record in parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import pendulum
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy

from source_stripe.availability_strategy import StripeSubStreamAvailabilityStrategy
from source_stripe.streams import InvoiceLineItems, Invoices


def test_traverse_over_substreams(mocker):
# Mock base HttpAvailabilityStrategy to capture all the check_availability method calls
check_availability_mock = mocker.MagicMock()
check_availability_mock.return_value = (True, None)
mocker.patch(
"airbyte_cdk.sources.streams.http.availability_strategy.HttpAvailabilityStrategy.check_availability",
check_availability_mock
)

# Prepare tree of nested objects
root = mocker.Mock()
root.availability_strategy = HttpAvailabilityStrategy()
root.parent = None

child_1 = mocker.Mock()
child_1.availability_strategy = StripeSubStreamAvailabilityStrategy()
child_1.get_parent_stream_instance.return_value = root

child_1_1 = mocker.Mock()
child_1_1.availability_strategy = StripeSubStreamAvailabilityStrategy()
child_1_1.get_parent_stream_instance.return_value = child_1

child_1_1_1 = mocker.Mock()
child_1_1_1.availability_strategy = StripeSubStreamAvailabilityStrategy()
child_1_1_1.get_parent_stream_instance.return_value = child_1_1

# Start traverse
is_available, reason = child_1_1_1.availability_strategy.check_availability(child_1_1_1, mocker.Mock(), mocker.Mock())

assert is_available and reason is None

# Check availability strategy was called once for every nested object
assert check_availability_mock.call_count == 4

# Check each availability strategy was called with proper instance argument
assert id(check_availability_mock.call_args_list[0].args[0]) == id(root)
assert id(check_availability_mock.call_args_list[1].args[0]) == id(child_1)
assert id(check_availability_mock.call_args_list[2].args[0]) == id(child_1_1)
assert id(check_availability_mock.call_args_list[3].args[0]) == id(child_1_1_1)


def test_traverse_over_substreams_failure(mocker):
# Mock base HttpAvailabilityStrategy to capture all the check_availability method calls
check_availability_mock = mocker.MagicMock()
check_availability_mock.side_effect = [(True, None), (False, "child_1")]
mocker.patch(
"airbyte_cdk.sources.streams.http.availability_strategy.HttpAvailabilityStrategy.check_availability",
check_availability_mock
)

# Prepare tree of nested objects
root = mocker.Mock()
root.availability_strategy = HttpAvailabilityStrategy()
root.parent = None

child_1 = mocker.Mock()
child_1.availability_strategy = StripeSubStreamAvailabilityStrategy()
child_1.get_parent_stream_instance.return_value = root

child_1_1 = mocker.Mock()
child_1_1.availability_strategy = StripeSubStreamAvailabilityStrategy()
child_1_1.get_parent_stream_instance.return_value = child_1

child_1_1_1 = mocker.Mock()
child_1_1_1.availability_strategy = StripeSubStreamAvailabilityStrategy()
child_1_1_1.get_parent_stream_instance.return_value = child_1_1

# Start traverse
is_available, reason = child_1_1_1.availability_strategy.check_availability(child_1_1_1, mocker.Mock(), mocker.Mock())

assert not is_available and reason == "child_1"

# Check availability strategy was called once for every nested object
assert check_availability_mock.call_count == 2

# Check each availability strategy was called with proper instance argument
assert id(check_availability_mock.call_args_list[0].args[0]) == id(root)
assert id(check_availability_mock.call_args_list[1].args[0]) == id(child_1)


def test_substream_availability(mocker):
check_availability_mock = mocker.MagicMock()
check_availability_mock.return_value = (True, None)
mocker.patch(
"airbyte_cdk.sources.streams.http.availability_strategy.HttpAvailabilityStrategy.check_availability",
check_availability_mock
)

stream = InvoiceLineItems(start_date=pendulum.today().subtract(days=3).int_timestamp, account_id="None")
is_available, reason = stream.availability_strategy.check_availability(stream, mocker.Mock(), mocker.Mock())
assert is_available and reason is None

assert check_availability_mock.call_count == 2
assert isinstance(check_availability_mock.call_args_list[0].args[0], Invoices)
assert isinstance(check_availability_mock.call_args_list[1].args[0], InvoiceLineItems)


def test_substream_availability_no_parent(mocker):
check_availability_mock = mocker.MagicMock()
check_availability_mock.return_value = (True, None)
mocker.patch(
"airbyte_cdk.sources.streams.http.availability_strategy.HttpAvailabilityStrategy.check_availability",
check_availability_mock
)

stream = InvoiceLineItems(start_date=pendulum.today().subtract(days=3).int_timestamp, account_id="None")
stream.parent = None

stream.availability_strategy.check_availability(stream, mocker.Mock(), mocker.Mock())

assert check_availability_mock.call_count == 1
assert isinstance(check_availability_mock.call_args_list[0].args[0], InvoiceLineItems)
1 change: 1 addition & 0 deletions docs/integrations/sources/stripe.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ The Stripe connector should not run into Stripe API limitations under normal usa

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.2 | 2023-02-09 | [22659](https://github.com/airbytehq/airbyte/pull/22659) | Set `AvailabilityStrategy` for all streams |
| 1.0.1 | 2023-01-27 | [22042](https://github.com/airbytehq/airbyte/pull/22042) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 1.0.0 | 2023-01-25 | [21858](https://github.com/airbytehq/airbyte/pull/21858) | Update the `Subscriptions` and `Invoices` stream schemas |
| 0.1.40 | 2022-10-20 | [18228](https://github.com/airbytehq/airbyte/pull/18228) | Update the `Payment Intents` stream schema |
Expand Down

0 comments on commit 1d406ef

Please sign in to comment.