Skip to content

Commit

Permalink
ref(source-stripe): remove stripe py-package (#45348)
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
  • Loading branch information
artem1205 authored Sep 10, 2024
1 parent ba63c03 commit 21348af
Show file tree
Hide file tree
Showing 6 changed files with 508 additions and 284 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 5.5.3
dockerImageTag: 5.5.4
dockerRepository: airbyte/source-stripe
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
erdUrl: https://dbdocs.io/airbyteio/source-stripe?view=relationships
Expand Down
655 changes: 449 additions & 206 deletions airbyte-integrations/connectors/source-stripe/poetry.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions airbyte-integrations/connectors/source-stripe/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "5.5.3"
version = "5.5.4"
name = "source-stripe"
description = "Source implementation for Stripe."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -17,7 +17,6 @@ include = "source_stripe"

[tool.poetry.dependencies]
python = "^3.10,<3.12"
stripe = "==2.56.0"
pendulum = "==2.1.2"
airbyte-cdk = "^4"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple

import pendulum
import stripe
from airbyte_cdk.entrypoint import logger as entrypoint_logger
from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
Expand Down Expand Up @@ -107,14 +106,29 @@ def validate_and_fill_with_defaults(config: MutableMapping[str, Any]) -> Mutable
return config

def check_connection(self, logger: logging.Logger, config: MutableMapping[str, Any]) -> Tuple[bool, Any]:
self.validate_and_fill_with_defaults(config)
stripe.api_key = config["client_secret"]
args = self._get_stream_base_args(config)
account_stream = StripeStream(name="accounts", path="accounts", use_cache=USE_CACHE, **args)
try:
stripe.Account.retrieve(config["account_id"])
except (stripe.error.AuthenticationError, stripe.error.PermissionError) as e:
return False, str(e)
next(account_stream.read_records(sync_mode=SyncMode.full_refresh))
except AirbyteTracedException as error:
if error.failure_type == FailureType.config_error:
return False, error.message
raise error
return True, None

def _get_stream_base_args(self, config: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
config = self.validate_and_fill_with_defaults(config)
authenticator = TokenAuthenticator(config["client_secret"])
start_timestamp = self._start_date_to_timestamp(config)
args = {
"authenticator": authenticator,
"account_id": config["account_id"],
"start_date": start_timestamp,
"slice_range": config["slice_range"],
"api_budget": self.get_api_call_budget(config),
}
return args

@staticmethod
def customers(**args):
# The Customers stream is instantiated in a dedicated method to allow parametrization and avoid duplicated code.
Expand Down Expand Up @@ -174,17 +188,7 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget:
return HttpAPIBudget(policies=policies)

def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
config = self.validate_and_fill_with_defaults(config)
authenticator = TokenAuthenticator(config["client_secret"])

start_timestamp = self._start_date_to_timestamp(config)
args = {
"authenticator": authenticator,
"account_id": config["account_id"],
"start_date": start_timestamp,
"slice_range": config["slice_range"],
"api_budget": self.get_api_call_budget(config),
}
args = self._get_stream_base_args(config)
incremental_args = {**args, "lookback_window_days": config["lookback_window_days"]}
subscriptions = IncrementalStripeStream(
name="subscriptions",
Expand Down Expand Up @@ -532,7 +536,7 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
),
]

state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in streams}, state=self._state)
state_manager = ConnectorStateManager(state=self._state)
return [
self._to_concurrent(
stream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
import datetime
import logging
from contextlib import nullcontext as does_not_raise
from unittest.mock import patch

import pytest
import source_stripe
import stripe
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode
from airbyte_cdk.sources.streams.call_rate import CachedLimiterSession, LimiterSession, Rate
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
Expand Down Expand Up @@ -50,11 +47,6 @@ def _a_valid_config():
return {"account_id": 1, "client_secret": "secret"}


@patch.object(source_stripe.source, "stripe")
def test_source_check_connection_ok(mocked_client, config):
assert SourceStripe(_ANY_CATALOG, _ANY_CONFIG, _NO_STATE).check_connection(logger, config=config) == (True, None)


def test_streams_are_unique(config):
stream_names = [s.name for s in SourceStripe(_ANY_CATALOG, _ANY_CONFIG, _NO_STATE).streams(config=config)]
assert len(stream_names) == len(set(stream_names)) == 46
Expand All @@ -69,25 +61,10 @@ def test_streams_are_unique(config):
(_a_valid_config(), None),
),
)
@patch.object(source_stripe.source.stripe, "Account")
def test_config_validation(mocked_client, input_config, expected_error_msg):
def test_config_validation(input_config, expected_error_msg):
context = pytest.raises(AirbyteTracedException, match=expected_error_msg) if expected_error_msg else does_not_raise()
with context:
SourceStripe(_ANY_CATALOG, _ANY_CONFIG, _NO_STATE).check_connection(logger, config=input_config)


@pytest.mark.parametrize(
"exception",
(
stripe.error.AuthenticationError,
stripe.error.PermissionError,
),
)
@patch.object(source_stripe.source.stripe, "Account")
def test_given_stripe_error_when_check_connection_then_connection_not_available(mocked_client, exception):
mocked_client.retrieve.side_effect = exception
is_available, _ = SourceStripe(_ANY_CATALOG, _ANY_CONFIG, _NO_STATE).check_connection(logger, config=_a_valid_config())
assert not is_available
SourceStripe(_ANY_CATALOG, _ANY_CONFIG, _NO_STATE).validate_and_fill_with_defaults(config=input_config)


def test_when_streams_return_full_refresh_as_concurrent():
Expand Down
Loading

0 comments on commit 21348af

Please sign in to comment.