-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
source-stripe: pass type checks #35587
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
from airbyte_cdk.models import SyncMode | ||
from airbyte_cdk.sources import Source | ||
from airbyte_cdk.sources.streams import Stream | ||
from airbyte_cdk.sources.streams.http import HttpStream | ||
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy | ||
from requests import HTTPError | ||
|
||
|
@@ -29,8 +30,9 @@ def _check_availability_for_sync_mode( | |
sync_mode: SyncMode, | ||
logger: logging.Logger, | ||
source: Optional["Source"], | ||
stream_state: Optional[Mapping[str, Any]], | ||
stream_state: Optional[Mapping[Any, Any]], | ||
) -> Tuple[bool, Optional[str]]: | ||
reason: Optional[str] = None | ||
try: | ||
# Some streams need a stream slice to read records (e.g. if they have a SubstreamPartitionRouter) | ||
# Streams that don't need a stream slice will return `None` as their first stream slice. | ||
|
@@ -90,7 +92,12 @@ def handle_http_error( | |
raise error | ||
doc_ref = self._visit_docs_message(logger, source) | ||
reason = f"The endpoint {error.response.url} returned {status_code}: {error.response.reason}. {error_message}. {doc_ref} " | ||
response_error_message = stream.parse_response_error_message(error.response) | ||
# TODO alafanechere | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't because of the inheritance we have in place right now. This comment describes the situation |
||
# Can we make the HTTPAvailabilityStrategy handle HttpStream instead of Stream? | ||
# The parse_response_error_message method is only available on HttpStream | ||
response_error_message = None | ||
if isinstance(stream, HttpStream): | ||
response_error_message = stream.parse_response_error_message(error.response) | ||
if response_error_message: | ||
reason += response_error_message | ||
return False, reason | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,11 +5,10 @@ | |
import logging | ||
import os | ||
from datetime import timedelta | ||
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple | ||
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Union | ||
|
||
import pendulum | ||
import stripe | ||
from airbyte_cdk import AirbyteLogger | ||
from airbyte_cdk.entrypoint import logger as entrypoint_logger | ||
from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType | ||
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource | ||
|
@@ -24,7 +23,7 @@ | |
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import EpochValueConcurrentStreamStateConverter | ||
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator | ||
from airbyte_cdk.utils.traced_exception import AirbyteTracedException | ||
from airbyte_protocol.models import SyncMode | ||
from airbyte_protocol.models import SyncMode # type: ignore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we document why we have this type ignore? I'm unclear as to why we would ignore this |
||
from source_stripe.streams import ( | ||
CreatedCursorIncrementalStripeStream, | ||
CustomerBalanceTransactions, | ||
|
@@ -61,7 +60,13 @@ class SourceStripe(ConcurrentSourceAdapter): | |
CreatedCursorIncrementalStripeStream: ("created[gte]", "created[lte]"), | ||
} | ||
|
||
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs): | ||
def __init__( | ||
self, | ||
catalog: Optional[ConfiguredAirbyteCatalog], | ||
config: Optional[Mapping[str, Any]], | ||
state: Union[list[Any], MutableMapping[str, Any], None], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can't we keep TState here? I'm not very knowledgeable as to how MyPy handles |
||
**kwargs: Any, | ||
): | ||
if config: | ||
concurrency_level = min(config.get("num_workers", _DEFAULT_CONCURRENCY), _MAX_CONCURRENCY) | ||
else: | ||
|
@@ -83,13 +88,14 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional | |
self._streams_configured_as_full_refresh = set() | ||
|
||
@staticmethod | ||
def validate_and_fill_with_defaults(config: MutableMapping[str, Any]) -> MutableMapping[str, Any]: | ||
def validate_and_fill_with_defaults(config: Mapping[str, Any]) -> Mapping[str, Any]: | ||
mutable_config = dict(config) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can't MutableMapping allow for modifying the config? In practice, it is probably a dict anyway here |
||
lookback_window_days, slice_range = ( | ||
config.get("lookback_window_days"), | ||
config.get("slice_range"), | ||
) | ||
if lookback_window_days is None: | ||
config["lookback_window_days"] = 0 | ||
mutable_config["lookback_window_days"] = 0 | ||
elif not isinstance(lookback_window_days, int) or lookback_window_days < 0: | ||
message = f"Invalid lookback window {lookback_window_days}. Please use only positive integer values or 0." | ||
raise AirbyteTracedException( | ||
|
@@ -99,20 +105,20 @@ def validate_and_fill_with_defaults(config: MutableMapping[str, Any]) -> Mutable | |
) | ||
|
||
# verifies the start_date in the config is valid | ||
SourceStripe._start_date_to_timestamp(config) | ||
SourceStripe._start_date_to_timestamp(mutable_config) | ||
if slice_range is None: | ||
config["slice_range"] = 365 | ||
mutable_config["slice_range"] = 365 | ||
elif not isinstance(slice_range, int) or slice_range < 1: | ||
message = f"Invalid slice range value {slice_range}. Please use positive integer values only." | ||
raise AirbyteTracedException( | ||
message=message, | ||
internal_message=message, | ||
failure_type=FailureType.config_error, | ||
) | ||
return config | ||
return mutable_config | ||
|
||
def check_connection(self, logger: AirbyteLogger, config: MutableMapping[str, Any]) -> Tuple[bool, Any]: | ||
self.validate_and_fill_with_defaults(config) | ||
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: | ||
config = self.validate_and_fill_with_defaults(config) | ||
stripe.api_key = config["client_secret"] | ||
try: | ||
stripe.Account.retrieve(config["account_id"]) | ||
|
@@ -121,7 +127,7 @@ def check_connection(self, logger: AirbyteLogger, config: MutableMapping[str, An | |
return True, None | ||
|
||
@staticmethod | ||
def customers(**args): | ||
def customers(**args: Any) -> IncrementalStripeStream: | ||
# The Customers stream is instantiated in a dedicated method to allow parametrization and avoid duplicated code. | ||
# It can be used with and without expanded items (as an independent stream or as a parent stream for other streams). | ||
return IncrementalStripeStream( | ||
|
@@ -178,7 +184,7 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget: | |
|
||
return HttpAPIBudget(policies=policies) | ||
|
||
def streams(self, config: MutableMapping[str, Any]) -> List[Stream]: | ||
def streams(self, config: Mapping[str, Any]) -> List[Stream]: | ||
config = self.validate_and_fill_with_defaults(config) | ||
authenticator = TokenAuthenticator(config["client_secret"]) | ||
|
||
|
@@ -522,7 +528,7 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]: | |
state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in streams}, state=self._state) | ||
return [self._to_concurrent(stream, self._start_date_to_timestamp(config), state_manager) for stream in streams] | ||
|
||
def _to_concurrent(self, stream: Stream, fallback_start, state_manager: ConnectorStateManager) -> Stream: | ||
def _to_concurrent(self, stream: Stream, fallback_start: Any, state_manager: ConnectorStateManager) -> Stream: | ||
if stream.name in self._streams_configured_as_full_refresh: | ||
return StreamFacade.create_from_stream(stream, self, entrypoint_logger, self._create_empty_state(), NoopCursor()) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This typing change seems odd to me. Why is this needed?
get_first_stream_slice
should expect aMapping[str, Any]
and it seems like the only place this is used