From db92c9cf0a2068f53d4d21e6f048fba814382245 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Fri, 23 Feb 2024 00:51:31 +0100 Subject: [PATCH] source-stripe: pass type checks --- .../integration_tests/expected_records.jsonl | 8 +- .../connectors/source-stripe/metadata.yaml | 2 +- .../connectors/source-stripe/poetry.lock | 97 +++++++- .../connectors/source-stripe/pyproject.toml | 5 +- .../source_stripe/availability_strategy.py | 11 +- .../source-stripe/source_stripe/run.py | 6 +- .../source-stripe/source_stripe/source.py | 34 +-- .../source_stripe/stream_helpers.py | 7 +- .../source-stripe/source_stripe/streams.py | 210 +++++++++++------- .../source-stripe/unit_tests/conftest.py | 2 +- .../integration/test_application_fees.py | 2 +- .../test_application_fees_refunds.py | 2 +- .../integration/test_authorizations.py | 2 +- .../integration/test_bank_accounts.py | 2 +- .../unit_tests/integration/test_cards.py | 2 +- .../integration/test_early_fraud_warnings.py | 2 +- .../unit_tests/integration/test_events.py | 2 +- .../test_external_account_bank_accounts.py | 2 +- .../test_external_account_cards.py | 2 +- .../integration/test_payment_methods.py | 2 +- .../unit_tests/integration/test_reviews.py | 2 +- .../integration/test_transactions.py | 2 +- .../source-stripe/unit_tests/test_source.py | 14 +- docs/integrations/sources/stripe.md | 2 + 24 files changed, 292 insertions(+), 130 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/integration_tests/expected_records.jsonl b/airbyte-integrations/connectors/source-stripe/integration_tests/expected_records.jsonl index c6602a1a5df70..24ce5ff9c38b3 100644 --- a/airbyte-integrations/connectors/source-stripe/integration_tests/expected_records.jsonl +++ b/airbyte-integrations/connectors/source-stripe/integration_tests/expected_records.jsonl @@ -47,8 +47,8 @@ {"stream": "products", "data": {"id": "prod_KouQ5ez86yREmB", "object": "product", "active": true, "attributes": [], "created": 1640124902, "default_price": "price_1K9GbqEcXtiJtvvhJ3lZe4i5", "description": null, "features": [], "images": [], "livemode": false, "metadata": {}, "name": "edgao-test-product", "package_dimensions": null, "shippable": null, "statement_descriptor": null, "tax_code": "txcd_10000000", "type": "service", "unit_label": null, "updated": 1696839715, "url": null}, "emitted_at": 1697627307635} {"stream": "products", "data": {"id": "prod_NHcKselSHfKdfc", "object": "product", "active": true, "attributes": [], "created": 1675345504, "default_price": "price_1MX364EcXtiJtvvhE3WgTl4O", "description": "Test Product 1 description", "features": [], "images": ["https://files.stripe.com/links/MDB8YWNjdF8xSndub2lFY1h0aUp0dnZofGZsX3Rlc3RfdjBOT09UaHRiNVl2WmJ6clNYRUlmcFFD00cCBRNHnV"], "livemode": false, "metadata": {}, "name": "Test Product 1", "package_dimensions": null, "shippable": null, "statement_descriptor": null, "tax_code": "txcd_10301000", "type": "service", "unit_label": null, "updated": 1696839789, "url": null}, "emitted_at": 1697627307877} {"stream": "products", "data": {"id": "prod_NCgx1XP2IFQyKF", "object": "product", "active": true, "attributes": [], "created": 1674209524, "default_price": null, "description": null, "features": [], "images": [], "livemode": false, "metadata": {}, "name": "tu", "package_dimensions": null, "shippable": null, "statement_descriptor": null, "tax_code": "txcd_10000000", "type": "service", "unit_label": null, "updated": 1696839225, "url": null}, "emitted_at": 1697627307879} -{"stream": "subscriptions", "data": {"id": "sub_1O2Dg0EcXtiJtvvhz7Q4zS0n", "object": "subscription", "application": null, "application_fee_percent": null, "automatic_tax": {"enabled": true, "liability": {"type": "self"}}, "billing_cycle_anchor": 1697550676.0, "billing_cycle_anchor_config": null, "billing_thresholds": null, "cancel_at": null, "cancel_at_period_end": false, "canceled_at": 1697550676.0, "cancellation_details": {"comment": null, "feedback": null, "reason": "cancellation_requested"}, "collection_method": "charge_automatically", "created": 1697550676, "currency": "usd", "current_period_end": 1705499476.0, "current_period_start": 1702821076, "customer": "cus_NGoTFiJFVbSsvZ", "days_until_due": null, "default_payment_method": null, "default_source": null, "default_tax_rates": [], "description": null, "discount": null, "ended_at": 1705329724.0, "invoice_settings": {"account_tax_ids": null, "issuer": {"type": "self"}}, "items": {"object": "list", "data": [{"id": "si_OptSP2o3XZUBpx", "object": "subscription_item", "billing_thresholds": null, "created": 1697550677, "metadata": {}, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "price": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "price", "active": true, "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "custom_unit_amount": null, "livemode": false, "lookup_key": null, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "recurring": {"aggregate_usage": null, "interval": "month", "interval_count": 1, "trial_period_days": null, "usage_type": "licensed"}, "tax_behavior": "exclusive", "tiers_mode": null, "transform_quantity": null, "type": "recurring", "unit_amount": 600, "unit_amount_decimal": "600"}, "quantity": 1, "subscription": "sub_1O2Dg0EcXtiJtvvhz7Q4zS0n", "tax_rates": []}], "has_more": false, "total_count": 1.0, "url": "/v1/subscription_items?subscription=sub_1O2Dg0EcXtiJtvvhz7Q4zS0n"}, "latest_invoice": "in_1OOKkUEcXtiJtvvheUUavyuB", "livemode": false, "metadata": {}, "next_pending_invoice_item_invoice": null, "on_behalf_of": null, "pause_collection": null, "payment_settings": {"payment_method_options": null, "payment_method_types": null, "save_default_payment_method": null}, "pending_invoice_item_interval": null, "pending_setup_intent": null, "pending_update": null, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "quantity": 1, "schedule": "sub_sched_1O2Dg0EcXtiJtvvh7GtbtIhP", "start_date": 1697550676, "status": "canceled", "test_clock": null, "transfer_data": null, "trial_end": null, "trial_settings": {"end_behavior": {"missing_payment_method": "create_invoice"}}, "trial_start": null, "updated": 1697550676}, "emitted_at": 1707158969393} -{"stream":"subscription_schedule","data":{"id":"sub_sched_1O2Dg0EcXtiJtvvh7GtbtIhP","object":"subscription_schedule","application":null,"canceled_at":"1705329724","completed_at":null,"created":1697550676,"current_phase":null,"customer":"cus_NGoTFiJFVbSsvZ","default_settings":{"application_fee_percent":null,"automatic_tax":{"enabled":false, "liability": null},"billing_cycle_anchor":"automatic","billing_thresholds":null,"collection_method":"charge_automatically","default_payment_method":null,"default_source":null,"description":"Test Test","invoice_settings":"{'account_tax_ids': None, 'days_until_due': None, 'issuer': {'type': 'self'}}","on_behalf_of":null,"transfer_data":null},"end_behavior":"cancel","livemode":false,"metadata":{},"phases":[{"add_invoice_items":[],"application_fee_percent":null,"automatic_tax":{"enabled":true, "liability": {"type": "self"}},"billing_cycle_anchor":null,"billing_thresholds":null,"collection_method":"charge_automatically","coupon":null,"currency":"usd","default_payment_method":null,"default_tax_rates":[],"description":"Test Test","end_date":1705499476,"invoice_settings":"{'account_tax_ids': None, 'days_until_due': None, 'issuer': None}","items":[{"billing_thresholds":null,"metadata":{},"plan":"price_1MSHZoEcXtiJtvvh6O8TYD8T","price":"price_1MSHZoEcXtiJtvvh6O8TYD8T","quantity":1,"tax_rates":[]}],"metadata":{},"on_behalf_of":null,"proration_behavior":"create_prorations","start_date":1697550676,"transfer_data":null,"trial_end":null}],"released_at":null,"released_subscription":null,"renewal_interval":null,"status":"canceled","subscription":"sub_1O2Dg0EcXtiJtvvhz7Q4zS0n","test_clock":null,"updated":1697550676},"emitted_at":1705636378620} +{"stream": "subscriptions", "data": {"id": "sub_1OoDDUEcXtiJtvvh4elaXYFT", "object": "subscription", "application": null, "application_fee_percent": null, "automatic_tax": {"enabled": true, "liability": {"type": "self"}}, "billing_cycle_anchor": 1708988652.0, "billing_cycle_anchor_config": null, "billing_thresholds": null, "cancel_at": null, "cancel_at_period_end": false, "canceled_at": null, "cancellation_details": {"comment": null, "feedback": null, "reason": null}, "collection_method": "charge_automatically", "created": 1708988652, "currency": "usd", "current_period_end": 1711494252.0, "current_period_start": 1708988652, "customer": "cus_NGoTFiJFVbSsvZ", "days_until_due": null, "default_payment_method": null, "default_source": null, "default_tax_rates": [], "description": null, "discount": null, "ended_at": null, "invoice_settings": {"account_tax_ids": null, "issuer": {"type": "self"}}, "items": {"object": "list", "data": [{"id": "si_PdUBTrsn2C4ShU", "object": "subscription_item", "billing_thresholds": null, "created": 1708988653, "metadata": {}, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "price": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "price", "active": true, "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "custom_unit_amount": null, "livemode": false, "lookup_key": null, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "recurring": {"aggregate_usage": null, "interval": "month", "interval_count": 1, "trial_period_days": null, "usage_type": "licensed"}, "tax_behavior": "exclusive", "tiers_mode": null, "transform_quantity": null, "type": "recurring", "unit_amount": 600, "unit_amount_decimal": "600"}, "quantity": 1, "subscription": "sub_1OoDDUEcXtiJtvvh4elaXYFT", "tax_rates": []}], "has_more": false, "total_count": 1.0, "url": "/v1/subscription_items?subscription=sub_1OoDDUEcXtiJtvvh4elaXYFT"}, "latest_invoice": "in_1OoDDUEcXtiJtvvhxhAEJymZ", "livemode": false, "metadata": {}, "next_pending_invoice_item_invoice": null, "on_behalf_of": null, "pause_collection": null, "payment_settings": {"payment_method_options": null, "payment_method_types": null, "save_default_payment_method": "off"}, "pending_invoice_item_interval": null, "pending_setup_intent": null, "pending_update": null, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "quantity": 1, "schedule": "sub_sched_1OoDexEcXtiJtvvhVrIXDZrd", "start_date": 1708988652, "status": "active", "test_clock": null, "transfer_data": null, "trial_end": null, "trial_settings": {"end_behavior": {"missing_payment_method": "create_invoice"}}, "trial_start": null, "updated": 1708988652}, "emitted_at": 1708990377529} +{"stream": "subscription_schedule", "data": {"id": "sub_sched_1OoDexEcXtiJtvvhVrIXDZrd", "object": "subscription_schedule", "application": null, "canceled_at": null, "completed_at": null, "created": 1708990355, "current_phase": {"end_date": 1735603200, "start_date": 1708988652}, "customer": "cus_NGoTFiJFVbSsvZ", "default_settings": {"application_fee_percent": null, "automatic_tax": {"enabled": true, "liability": {"type": "self"}}, "billing_cycle_anchor": "automatic", "billing_thresholds": null, "collection_method": "charge_automatically", "default_payment_method": null, "default_source": null, "description": null, "invoice_settings": "{'account_tax_ids': None, 'days_until_due': None, 'issuer': {'type': 'self'}}", "on_behalf_of": null, "transfer_data": null}, "end_behavior": "cancel", "livemode": false, "metadata": {}, "phases": [{"add_invoice_items": [], "application_fee_percent": null, "automatic_tax": {"enabled": true, "liability": {"type": "self"}}, "billing_cycle_anchor": null, "billing_thresholds": null, "collection_method": "charge_automatically", "coupon": null, "currency": "usd", "default_payment_method": null, "default_tax_rates": [], "description": null, "end_date": 1735603200, "invoice_settings": "{'account_tax_ids': None, 'days_until_due': None, 'issuer': None}", "items": [{"billing_thresholds": null, "metadata": {}, "plan": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "price": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "quantity": 1, "tax_rates": []}], "metadata": {}, "on_behalf_of": null, "proration_behavior": "create_prorations", "start_date": 1708988652, "transfer_data": null, "trial_end": null}, {"add_invoice_items": [], "application_fee_percent": null, "automatic_tax": {"enabled": true, "liability": {"type": "self"}}, "billing_cycle_anchor": null, "billing_thresholds": null, "collection_method": "charge_automatically", "coupon": null, "currency": "usd", "default_payment_method": null, "default_tax_rates": [], "description": null, "end_date": 1742947200, "invoice_settings": "{'account_tax_ids': None, 'days_until_due': None, 'issuer': None}", "items": [{"billing_thresholds": null, "metadata": {}, "plan": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "price": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "quantity": 1, "tax_rates": []}], "metadata": {}, "on_behalf_of": null, "proration_behavior": "create_prorations", "start_date": 1735603200, "transfer_data": null, "trial_end": null}], "released_at": null, "released_subscription": null, "renewal_interval": null, "status": "active", "subscription": "sub_1OoDDUEcXtiJtvvh4elaXYFT", "test_clock": null, "updated": 1708990355}, "emitted_at": 1708990377789} {"stream": "transfers", "data": {"id": "tr_1NH18zEcXtiJtvvhnd827cNO", "object": "transfer", "amount": 10000, "amount_reversed": 0, "balance_transaction": "txn_1NH190EcXtiJtvvhBO3PeR7p", "created": 1686301085, "currency": "usd", "description": null, "destination": "acct_1Jx8unEYmRTj5on1", "destination_payment": "py_1NH18zEYmRTj5on1GkCCsqLK", "livemode": false, "metadata": {}, "reversals": {"object": "list", "data": [], "has_more": false, "total_count": 0.0, "url": "/v1/transfers/tr_1NH18zEcXtiJtvvhnd827cNO/reversals"}, "reversed": false, "source_transaction": null, "source_type": "card", "transfer_group": null, "updated": 1686301085}, "emitted_at": 1697627313262} {"stream": "transfers", "data": {"id": "tr_1NGoaCEcXtiJtvvhjmHtOGOm", "object": "transfer", "amount": 100, "amount_reversed": 100, "balance_transaction": "txn_1NGoaDEcXtiJtvvhsZrNMsdJ", "created": 1686252800, "currency": "usd", "description": null, "destination": "acct_1Jx8unEYmRTj5on1", "destination_payment": "py_1NGoaCEYmRTj5on1LAlAIG3a", "livemode": false, "metadata": {}, "reversals": {"object": "list", "data": [{"id": "trr_1NGolCEcXtiJtvvhOYPck3CP", "object": "transfer_reversal", "amount": 100, "balance_transaction": "txn_1NGolCEcXtiJtvvhZRy4Kd5S", "created": 1686253482, "currency": "usd", "destination_payment_refund": "pyr_1NGolBEYmRTj5on1STal3rmp", "metadata": {}, "source_refund": null, "transfer": "tr_1NGoaCEcXtiJtvvhjmHtOGOm"}], "has_more": false, "total_count": 1.0, "url": "/v1/transfers/tr_1NGoaCEcXtiJtvvhjmHtOGOm/reversals"}, "reversed": true, "source_transaction": null, "source_type": "card", "transfer_group": "ORDER10", "updated": 1686252800}, "emitted_at": 1697627313264} {"stream": "refunds", "data": {"id": "re_3MVuZyEcXtiJtvvh0A6rSbeJ", "object": "refund", "amount": 200000, "balance_transaction": "txn_3MVuZyEcXtiJtvvh0v0QyAMx", "charge": "ch_3MVuZyEcXtiJtvvh0tiVC7DI", "created": 1675074488, "currency": "usd", "destination_details": {"card": {"reference": "5871771120000631", "reference_status": "available", "reference_type": "acquirer_reference_number", "type": "refund"}, "type": "card"}, "metadata": {}, "payment_intent": "pi_3MVuZyEcXtiJtvvh07Ehi4cx", "reason": "fraudulent", "receipt_number": "3278-5368", "source_transfer_reversal": null, "status": "succeeded", "transfer_reversal": null}, "emitted_at": 1701882752716} @@ -68,6 +68,6 @@ {"stream": "invoice_line_items", "data": {"id": "il_1K9GKLEcXtiJtvvhhHaYMebN", "object": "line_item", "amount": 8400, "amount_excluding_tax": 8400, "currency": "usd", "description": "a box of parsnips", "discount_amounts": [], "discountable": true, "discounts": [], "invoice_item": "ii_1K9GKLEcXtiJtvvhmr2AYOAx", "livemode": false, "metadata": {}, "period": {"end": 1640123817, "start": 1640123817}, "plan": null, "price": {"id": "price_1K9GKLEcXtiJtvvhXbrg33lq", "object": "price", "active": false, "billing_scheme": "per_unit", "created": 1640123817, "currency": "usd", "custom_unit_amount": null, "livemode": false, "lookup_key": null, "metadata": {}, "nickname": null, "product": "prod_Kou8cQxtIpF1p7", "recurring": null, "tax_behavior": "unspecified", "tiers_mode": null, "transform_quantity": null, "type": "one_time", "unit_amount": 8400, "unit_amount_decimal": "8400"}, "proration": false, "proration_details": {"credited_items": null}, "quantity": 1, "subscription": null, "tax_amounts": [], "tax_rates": [], "type": "invoiceitem", "unit_amount_excluding_tax": "8400", "invoice_id": "in_1K9GK0EcXtiJtvvhSo2LvGqT"}, "emitted_at": 1697627336438} {"stream": "invoice_line_items", "data": {"id": "il_1MX384EcXtiJtvvh3j2K123f", "object": "line_item", "amount": 6000, "amount_excluding_tax": 6000, "currency": "usd", "description": "Test Product 1", "discount_amounts": [{"amount": 500, "discount": "di_1MX384EcXtiJtvvhkOrY57Ep"}], "discountable": true, "discounts": ["di_1MX384EcXtiJtvvhkOrY57Ep"], "invoice_item": "ii_1MX384EcXtiJtvvhguyn3iYb", "livemode": false, "metadata": {}, "period": {"end": 1675345628, "start": 1675345628}, "plan": null, "price": {"id": "price_1MX364EcXtiJtvvhE3WgTl4O", "object": "price", "active": true, "billing_scheme": "per_unit", "created": 1675345504, "currency": "usd", "custom_unit_amount": null, "livemode": false, "lookup_key": null, "metadata": {}, "nickname": null, "product": "prod_NHcKselSHfKdfc", "recurring": null, "tax_behavior": "exclusive", "tiers_mode": null, "transform_quantity": null, "type": "one_time", "unit_amount": 2000, "unit_amount_decimal": "2000"}, "proration": false, "proration_details": {"credited_items": null}, "quantity": 3, "subscription": null, "tax_amounts": [{"amount": 0, "inclusive": false, "tax_rate": "txr_1MX384EcXtiJtvvhAhVE20Ii", "taxability_reason": "not_collecting", "taxable_amount": 0}], "tax_rates": [], "type": "invoiceitem", "unit_amount_excluding_tax": "2000", "invoice_id": "in_1MX37hEcXtiJtvvhRSl1KbQm"}, "emitted_at": 1697627336446} {"stream": "invoice_line_items", "data": {"id": "il_1MX2yfEcXtiJtvvhiunY2j1x", "object": "line_item", "amount": 25200, "amount_excluding_tax": 25200, "currency": "usd", "description": "edgao-test-product", "discount_amounts": [{"amount": 2520, "discount": "di_1MX2ysEcXtiJtvvh8ORqRVKm"}], "discountable": true, "discounts": ["di_1MX2ysEcXtiJtvvh8ORqRVKm"], "invoice_item": "ii_1MX2yfEcXtiJtvvhfhyOG7SP", "livemode": false, "metadata": {}, "period": {"end": 1675345045, "start": 1675345045}, "plan": null, "price": {"id": "price_1K9GbqEcXtiJtvvhJ3lZe4i5", "object": "price", "active": true, "billing_scheme": "per_unit", "created": 1640124902, "currency": "usd", "custom_unit_amount": null, "livemode": false, "lookup_key": null, "metadata": {}, "nickname": null, "product": "prod_KouQ5ez86yREmB", "recurring": null, "tax_behavior": "inclusive", "tiers_mode": null, "transform_quantity": null, "type": "one_time", "unit_amount": 12600, "unit_amount_decimal": "12600"}, "proration": false, "proration_details": {"credited_items": null}, "quantity": 2, "subscription": null, "tax_amounts": [{"amount": 0, "inclusive": true, "tax_rate": "txr_1MX2yfEcXtiJtvvhVcMEMTRj", "taxability_reason": "not_collecting", "taxable_amount": 0}], "tax_rates": [], "type": "invoiceitem", "unit_amount_excluding_tax": "12600", "invoice_id": "in_1MX2yFEcXtiJtvvhMXhUCgKx"}, "emitted_at": 1697627336449} -{"stream": "subscription_items", "data": {"id": "si_OptSP2o3XZUBpx", "object": "subscription_item", "billing_thresholds": null, "created": 1697550677, "metadata": {}, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "price": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "price", "active": true, "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "custom_unit_amount": null, "livemode": false, "lookup_key": null, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "recurring": {"aggregate_usage": null, "interval": "month", "interval_count": 1, "trial_period_days": null, "usage_type": "licensed"}, "tax_behavior": "exclusive", "tiers_mode": null, "transform_quantity": null, "type": "recurring", "unit_amount": 600, "unit_amount_decimal": "600"}, "quantity": 1, "subscription": "sub_1O2Dg0EcXtiJtvvhz7Q4zS0n", "tax_rates": []}, "emitted_at": 1697627337431} +{"stream": "subscription_items", "data": {"id": "si_PdUBTrsn2C4ShU", "object": "subscription_item", "billing_thresholds": null, "created": 1708988653, "metadata": {}, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "price": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "price", "active": true, "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "custom_unit_amount": null, "livemode": false, "lookup_key": null, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "recurring": {"aggregate_usage": null, "interval": "month", "interval_count": 1, "trial_period_days": null, "usage_type": "licensed"}, "tax_behavior": "exclusive", "tiers_mode": null, "transform_quantity": null, "type": "recurring", "unit_amount": 600, "unit_amount_decimal": "600"}, "quantity": 1, "subscription": "sub_1OoDDUEcXtiJtvvh4elaXYFT", "tax_rates": []}, "emitted_at": 1708990377538} {"stream": "transfer_reversals", "data": {"id": "trr_1NGolCEcXtiJtvvhOYPck3CP", "object": "transfer_reversal", "amount": 100, "balance_transaction": "txn_1NGolCEcXtiJtvvhZRy4Kd5S", "created": 1686253482, "currency": "usd", "destination_payment_refund": "pyr_1NGolBEYmRTj5on1STal3rmp", "metadata": {}, "source_refund": null, "transfer": "tr_1NGoaCEcXtiJtvvhjmHtOGOm"}, "emitted_at": 1697627338960} -{"stream": "usage_records", "data": {"id": "sis_1OUqWiEcXtiJtvvh3WGqc4Vk", "object": "usage_record_summary", "invoice": null, "livemode": false, "period": {"end": null, "start": 1702821076}, "subscription_item": "si_OptSP2o3XZUBpx", "total_usage": 1}, "emitted_at": 1700233660884} +{"stream": "usage_records", "data": {"id": "sis_1OoDfJEcXtiJtvvhLAO1T8SS", "object": "usage_record_summary", "invoice": null, "livemode": false, "period": {"end": null, "start": 1708988652}, "subscription_item": "si_PdUBTrsn2C4ShU", "total_usage": 1}, "emitted_at": 1708990377651} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-stripe/metadata.yaml b/airbyte-integrations/connectors/source-stripe/metadata.yaml index 75c4e5b6eccb9..b80d1697bf074 100644 --- a/airbyte-integrations/connectors/source-stripe/metadata.yaml +++ b/airbyte-integrations/connectors/source-stripe/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: e094cb9a-26de-4645-8761-65c0c425d1de - dockerImageTag: 5.2.4 + dockerImageTag: 5.2.5 dockerRepository: airbyte/source-stripe documentationUrl: https://docs.airbyte.com/integrations/sources/stripe githubIssueLabel: source-stripe diff --git a/airbyte-integrations/connectors/source-stripe/poetry.lock b/airbyte-integrations/connectors/source-stripe/poetry.lock index 250155d9a76c7..c306a266b4d70 100644 --- a/airbyte-integrations/connectors/source-stripe/poetry.lock +++ b/airbyte-integrations/connectors/source-stripe/poetry.lock @@ -479,6 +479,64 @@ files = [ {file = "MarkupSafe-2.1.5.tar.gz", hash = "sha256:d283d37a890ba4c1ae73ffadf8046435c76e7bc2247bbb63c00bd1a709c6544b"}, ] +[[package]] +name = "mypy" +version = "1.8.0" +description = "Optional static typing for Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "mypy-1.8.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:485a8942f671120f76afffff70f259e1cd0f0cfe08f81c05d8816d958d4577d3"}, + {file = "mypy-1.8.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:df9824ac11deaf007443e7ed2a4a26bebff98d2bc43c6da21b2b64185da011c4"}, + {file = "mypy-1.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2afecd6354bbfb6e0160f4e4ad9ba6e4e003b767dd80d85516e71f2e955ab50d"}, + {file = "mypy-1.8.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8963b83d53ee733a6e4196954502b33567ad07dfd74851f32be18eb932fb1cb9"}, + {file = "mypy-1.8.0-cp310-cp310-win_amd64.whl", hash = "sha256:e46f44b54ebddbeedbd3d5b289a893219065ef805d95094d16a0af6630f5d410"}, + {file = "mypy-1.8.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:855fe27b80375e5c5878492f0729540db47b186509c98dae341254c8f45f42ae"}, + {file = "mypy-1.8.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:4c886c6cce2d070bd7df4ec4a05a13ee20c0aa60cb587e8d1265b6c03cf91da3"}, + {file = "mypy-1.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d19c413b3c07cbecf1f991e2221746b0d2a9410b59cb3f4fb9557f0365a1a817"}, + {file = "mypy-1.8.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:9261ed810972061388918c83c3f5cd46079d875026ba97380f3e3978a72f503d"}, + {file = "mypy-1.8.0-cp311-cp311-win_amd64.whl", hash = "sha256:51720c776d148bad2372ca21ca29256ed483aa9a4cdefefcef49006dff2a6835"}, + {file = "mypy-1.8.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:52825b01f5c4c1c4eb0db253ec09c7aa17e1a7304d247c48b6f3599ef40db8bd"}, + {file = "mypy-1.8.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:f5ac9a4eeb1ec0f1ccdc6f326bcdb464de5f80eb07fb38b5ddd7b0de6bc61e55"}, + {file = "mypy-1.8.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:afe3fe972c645b4632c563d3f3eff1cdca2fa058f730df2b93a35e3b0c538218"}, + {file = "mypy-1.8.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:42c6680d256ab35637ef88891c6bd02514ccb7e1122133ac96055ff458f93fc3"}, + {file = "mypy-1.8.0-cp312-cp312-win_amd64.whl", hash = "sha256:720a5ca70e136b675af3af63db533c1c8c9181314d207568bbe79051f122669e"}, + {file = "mypy-1.8.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:028cf9f2cae89e202d7b6593cd98db6759379f17a319b5faf4f9978d7084cdc6"}, + {file = "mypy-1.8.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:4e6d97288757e1ddba10dd9549ac27982e3e74a49d8d0179fc14d4365c7add66"}, + {file = "mypy-1.8.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7f1478736fcebb90f97e40aff11a5f253af890c845ee0c850fe80aa060a267c6"}, + {file = "mypy-1.8.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:42419861b43e6962a649068a61f4a4839205a3ef525b858377a960b9e2de6e0d"}, + {file = "mypy-1.8.0-cp38-cp38-win_amd64.whl", hash = "sha256:2b5b6c721bd4aabaadead3a5e6fa85c11c6c795e0c81a7215776ef8afc66de02"}, + {file = "mypy-1.8.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:5c1538c38584029352878a0466f03a8ee7547d7bd9f641f57a0f3017a7c905b8"}, + {file = "mypy-1.8.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4ef4be7baf08a203170f29e89d79064463b7fc7a0908b9d0d5114e8009c3a259"}, + {file = "mypy-1.8.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7178def594014aa6c35a8ff411cf37d682f428b3b5617ca79029d8ae72f5402b"}, + {file = "mypy-1.8.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:ab3c84fa13c04aeeeabb2a7f67a25ef5d77ac9d6486ff33ded762ef353aa5592"}, + {file = "mypy-1.8.0-cp39-cp39-win_amd64.whl", hash = "sha256:99b00bc72855812a60d253420d8a2eae839b0afa4938f09f4d2aa9bb4654263a"}, + {file = "mypy-1.8.0-py3-none-any.whl", hash = "sha256:538fd81bb5e430cc1381a443971c0475582ff9f434c16cd46d2c66763ce85d9d"}, + {file = "mypy-1.8.0.tar.gz", hash = "sha256:6ff8b244d7085a0b425b56d327b480c3b29cafbd2eff27316a004f9a7391ae07"}, +] + +[package.dependencies] +mypy-extensions = ">=1.0.0" +tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} +typing-extensions = ">=4.1.0" + +[package.extras] +dmypy = ["psutil (>=4.0)"] +install-types = ["pip"] +mypyc = ["setuptools (>=50)"] +reports = ["lxml"] + +[[package]] +name = "mypy-extensions" +version = "1.0.0" +description = "Type system extensions for programs checked with the mypy type checker." +optional = false +python-versions = ">=3.5" +files = [ + {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"}, + {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, +] + [[package]] name = "packaging" version = "23.2" @@ -764,7 +822,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -921,6 +978,42 @@ files = [ {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, ] +[[package]] +name = "tomli" +version = "2.0.1" +description = "A lil' TOML parser" +optional = false +python-versions = ">=3.7" +files = [ + {file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"}, + {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, +] + +[[package]] +name = "types-requests" +version = "2.31.0.20240218" +description = "Typing stubs for requests" +optional = false +python-versions = ">=3.8" +files = [ + {file = "types-requests-2.31.0.20240218.tar.gz", hash = "sha256:f1721dba8385958f504a5386240b92de4734e047a08a40751c1654d1ac3349c5"}, + {file = "types_requests-2.31.0.20240218-py3-none-any.whl", hash = "sha256:a82807ec6ddce8f00fe0e949da6d6bc1fbf1715420218a9640d695f70a9e5a9b"}, +] + +[package.dependencies] +urllib3 = ">=2" + +[[package]] +name = "types-stripe" +version = "3.5.2.20240106" +description = "Typing stubs for stripe" +optional = false +python-versions = ">=3.8" +files = [ + {file = "types-stripe-3.5.2.20240106.tar.gz", hash = "sha256:63a36958f0dc4a71685b027f0b6d807ff197ee337135ac19a0f8b6132365ca52"}, + {file = "types_stripe-3.5.2.20240106-py3-none-any.whl", hash = "sha256:9ea7bc9b9889a3d8606114c52dcc70a8fc62d44a46da4bc5ee19f0d463674bb8"}, +] + [[package]] name = "typing-extensions" version = "4.9.0" @@ -1059,4 +1152,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9,<3.12" -content-hash = "e1b6a4bb5a2d863623daeb1a4194106b45024cdba1d06cfbfe85a91949cad482" +content-hash = "2b1c376517aed689be0c84ed0ffd9e478d2b34a1ca6bc8f2e6253de73838c0d3" diff --git a/airbyte-integrations/connectors/source-stripe/pyproject.toml b/airbyte-integrations/connectors/source-stripe/pyproject.toml index fc915f21ffaa8..f3a71c7959744 100644 --- a/airbyte-integrations/connectors/source-stripe/pyproject.toml +++ b/airbyte-integrations/connectors/source-stripe/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "5.2.4" +version = "5.2.5" name = "source-stripe" description = "Source implementation for Stripe." authors = [ "Airbyte ",] @@ -20,6 +20,7 @@ python = "^3.9,<3.12" stripe = "==2.56.0" pendulum = "==2.1.2" airbyte-cdk = "==0.60.1" +mypy = "^1.8.0" [tool.poetry.scripts] source-stripe = "source_stripe.run:run" @@ -29,3 +30,5 @@ requests-mock = "^1.11.0" pytest = "^6.1" freezegun = "==1.2.2" pytest-mock = "^3.6.1" +types-stripe = "^3.5.2.20240106" +types-requests = "^2.31.0.20240218" diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/availability_strategy.py b/airbyte-integrations/connectors/source-stripe/source_stripe/availability_strategy.py index 6226ffc12ea97..46e1926bdc0d1 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/availability_strategy.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/availability_strategy.py @@ -8,6 +8,7 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import Source from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from requests import HTTPError @@ -29,8 +30,9 @@ def _check_availability_for_sync_mode( sync_mode: SyncMode, logger: logging.Logger, source: Optional["Source"], - stream_state: Optional[Mapping[str, Any]], + stream_state: Optional[Mapping[Any, Any]], ) -> Tuple[bool, Optional[str]]: + reason: Optional[str] = None try: # Some streams need a stream slice to read records (e.g. if they have a SubstreamPartitionRouter) # Streams that don't need a stream slice will return `None` as their first stream slice. @@ -90,7 +92,12 @@ def handle_http_error( raise error doc_ref = self._visit_docs_message(logger, source) reason = f"The endpoint {error.response.url} returned {status_code}: {error.response.reason}. {error_message}. {doc_ref} " - response_error_message = stream.parse_response_error_message(error.response) + # TODO alafanechere + # Can we make the HTTPAvailabilityStrategy handle HttpStream instead of Stream? + # The parse_response_error_message method is only available on HttpStream + response_error_message = None + if isinstance(stream, HttpStream): + response_error_message = stream.parse_response_error_message(error.response) if response_error_message: reason += response_error_message return False, reason diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/run.py b/airbyte-integrations/connectors/source-stripe/source_stripe/run.py index b5a3219863590..aa643cf8b3cbd 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/run.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/run.py @@ -6,14 +6,14 @@ import sys import traceback from datetime import datetime -from typing import List +from typing import List, Optional from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteTraceMessage, TraceType, Type from source_stripe import SourceStripe -def _get_source(args: List[str]): +def _get_source(args: List[str]) -> Optional[SourceStripe]: catalog_path = AirbyteEntrypoint.extract_catalog(args) config_path = AirbyteEntrypoint.extract_config(args) state_path = AirbyteEntrypoint.extract_state(args) @@ -40,7 +40,7 @@ def _get_source(args: List[str]): return None -def run(): +def run() -> None: _args = sys.argv[1:] source = _get_source(_args) if source: diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index 49479c5cc78ec..e695674293690 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -5,11 +5,10 @@ import logging import os from datetime import timedelta -from typing import Any, List, Mapping, MutableMapping, Optional, Tuple +from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Union import pendulum import stripe -from airbyte_cdk import AirbyteLogger from airbyte_cdk.entrypoint import logger as entrypoint_logger from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource @@ -24,7 +23,7 @@ from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import EpochValueConcurrentStreamStateConverter from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator from airbyte_cdk.utils.traced_exception import AirbyteTracedException -from airbyte_protocol.models import SyncMode +from airbyte_protocol.models import SyncMode # type: ignore from source_stripe.streams import ( CreatedCursorIncrementalStripeStream, CustomerBalanceTransactions, @@ -61,7 +60,13 @@ class SourceStripe(ConcurrentSourceAdapter): CreatedCursorIncrementalStripeStream: ("created[gte]", "created[lte]"), } - def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs): + def __init__( + self, + catalog: Optional[ConfiguredAirbyteCatalog], + config: Optional[Mapping[str, Any]], + state: Union[list[Any], MutableMapping[str, Any], None], + **kwargs: Any, + ): if config: concurrency_level = min(config.get("num_workers", _DEFAULT_CONCURRENCY), _MAX_CONCURRENCY) else: @@ -83,13 +88,14 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional self._streams_configured_as_full_refresh = set() @staticmethod - def validate_and_fill_with_defaults(config: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + def validate_and_fill_with_defaults(config: Mapping[str, Any]) -> Mapping[str, Any]: + mutable_config = dict(config) lookback_window_days, slice_range = ( config.get("lookback_window_days"), config.get("slice_range"), ) if lookback_window_days is None: - config["lookback_window_days"] = 0 + mutable_config["lookback_window_days"] = 0 elif not isinstance(lookback_window_days, int) or lookback_window_days < 0: message = f"Invalid lookback window {lookback_window_days}. Please use only positive integer values or 0." raise AirbyteTracedException( @@ -99,9 +105,9 @@ def validate_and_fill_with_defaults(config: MutableMapping[str, Any]) -> Mutable ) # verifies the start_date in the config is valid - SourceStripe._start_date_to_timestamp(config) + SourceStripe._start_date_to_timestamp(mutable_config) if slice_range is None: - config["slice_range"] = 365 + mutable_config["slice_range"] = 365 elif not isinstance(slice_range, int) or slice_range < 1: message = f"Invalid slice range value {slice_range}. Please use positive integer values only." raise AirbyteTracedException( @@ -109,10 +115,10 @@ def validate_and_fill_with_defaults(config: MutableMapping[str, Any]) -> Mutable internal_message=message, failure_type=FailureType.config_error, ) - return config + return mutable_config - def check_connection(self, logger: AirbyteLogger, config: MutableMapping[str, Any]) -> Tuple[bool, Any]: - self.validate_and_fill_with_defaults(config) + def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: + config = self.validate_and_fill_with_defaults(config) stripe.api_key = config["client_secret"] try: stripe.Account.retrieve(config["account_id"]) @@ -121,7 +127,7 @@ def check_connection(self, logger: AirbyteLogger, config: MutableMapping[str, An return True, None @staticmethod - def customers(**args): + def customers(**args: Any) -> IncrementalStripeStream: # The Customers stream is instantiated in a dedicated method to allow parametrization and avoid duplicated code. # It can be used with and without expanded items (as an independent stream or as a parent stream for other streams). return IncrementalStripeStream( @@ -178,7 +184,7 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget: return HttpAPIBudget(policies=policies) - def streams(self, config: MutableMapping[str, Any]) -> List[Stream]: + def streams(self, config: Mapping[str, Any]) -> List[Stream]: config = self.validate_and_fill_with_defaults(config) authenticator = TokenAuthenticator(config["client_secret"]) @@ -522,7 +528,7 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]: state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in streams}, state=self._state) return [self._to_concurrent(stream, self._start_date_to_timestamp(config), state_manager) for stream in streams] - def _to_concurrent(self, stream: Stream, fallback_start, state_manager: ConnectorStateManager) -> Stream: + def _to_concurrent(self, stream: Stream, fallback_start: Any, state_manager: ConnectorStateManager) -> Stream: if stream.name in self._streams_configured_as_full_refresh: return StreamFacade.create_from_stream(stream, self, entrypoint_logger, self._create_empty_state(), NoopCursor()) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/stream_helpers.py b/airbyte-integrations/connectors/source-stripe/source_stripe/stream_helpers.py index dad073ae485b3..647e079b48207 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/stream_helpers.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/stream_helpers.py @@ -8,7 +8,7 @@ from airbyte_cdk.sources.streams.core import Stream, StreamData -def get_first_stream_slice(stream, sync_mode, stream_state) -> Optional[Mapping[str, Any]]: +def get_first_stream_slice(stream: Stream, sync_mode: SyncMode, stream_state: Optional[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: """ Gets the first stream_slice from a given stream's stream_slices. :param stream: stream @@ -17,9 +17,12 @@ def get_first_stream_slice(stream, sync_mode, stream_state) -> Optional[Mapping[ :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty) :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice) """ + cursor_field: Optional[list[str]] = None + if isinstance(stream.cursor_field, str): + cursor_field = [stream.cursor_field] # We wrap the return output of stream_slices() because some implementations return types that are iterable, # but not iterators such as lists or tuples - slices = iter(stream.stream_slices(sync_mode=sync_mode, cursor_field=stream.cursor_field, stream_state=stream_state)) + slices = iter(stream.stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state)) return next(slices) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py b/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py index d8958d9453b7e..2d306a64cd49b 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py @@ -7,7 +7,7 @@ import os from abc import ABC, abstractmethod from itertools import chain -from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union +from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union import pendulum import requests @@ -84,7 +84,7 @@ def extract_records( current_cursor_value = record.get(self.legacy_cursor_field, pendulum.now().int_timestamp) # yield the record with the added cursor_field - yield record | {self.cursor_field: current_cursor_value} + yield {**record, **{self.cursor_field: current_cursor_value}} class StripeStream(HttpStream, ABC): @@ -106,10 +106,9 @@ def name(self) -> str: return self._name return super().name - def path(self, *args, **kwargs) -> str: - if self._path: - return self._path if isinstance(self._path, str) else self._path(self, *args, **kwargs) - return super().path(*args, **kwargs) + def path(self, *args: Any, **kwargs: Any) -> str: + assert self._path, "path must be set" + return self._path if isinstance(self._path, str) else self._path(self, *args, **kwargs) @property def use_cache(self) -> bool: @@ -119,7 +118,7 @@ def use_cache(self) -> bool: def expand_items(self) -> Optional[List[str]]: return self._expand_items - def extra_request_params(self, *args, **kwargs) -> Mapping[str, Any]: + def extra_request_params(self, *args: Any, **kwargs: Any) -> Mapping[str, Any]: if callable(self._extra_request_params): return self._extra_request_params(self, *args, **kwargs) return self._extra_request_params or {} @@ -132,7 +131,7 @@ def __init__( self, start_date: int, account_id: str, - *args, + *args: Any, slice_range: int = DEFAULT_SLICE_RANGE, record_extractor: Optional[IRecordExtractor] = None, name: Optional[str] = None, @@ -143,7 +142,7 @@ def __init__( response_filter: Optional[Callable] = None, slice_data_retriever: Optional[Callable] = None, primary_key: Optional[str] = "id", - **kwargs, + **kwargs: Any, ): self.account_id = account_id self.start_date = start_date @@ -162,12 +161,13 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, if "has_more" in decoded_response and decoded_response["has_more"] and decoded_response.get("data", []): last_object_id = decoded_response["data"][-1]["id"] return {"starting_after": last_object_id} + return None def request_params( self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + stream_state: Optional[Mapping[str, Any]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: # Stripe default pagination is 10, max is 100 params = { @@ -186,18 +186,24 @@ def parse_response( self, response: requests.Response, *, - stream_state: Mapping[str, Any], + stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Iterable[Mapping[str, Any]]: yield from self.record_extractor.extract_records(response.json().get("data", []), stream_slice) - def request_headers(self, **kwargs) -> Mapping[str, Any]: + def request_headers( + self, + stream_state: Optional[Mapping[str, Any]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: headers = {"Stripe-Version": STRIPE_API_VERSION} if self.account_id: headers["Stripe-Account"] = self.account_id return headers + @property def retry_factor(self) -> float: """ Override for testing purposes @@ -213,7 +219,8 @@ def get_parent_stream(self, stream_state: Mapping[str, Any]) -> StripeStream: class CreatedCursorIncrementalStripeStream(StripeStream): # Stripe returns most recently created objects first, so we don't want to persist state until the entire stream has been read - state_checkpoint_interval = math.inf + # TODO: alafanechere - confirm None is correct instead of math.inf + state_checkpoint_interval = None @property def cursor_field(self) -> str: @@ -221,11 +228,11 @@ def cursor_field(self) -> str: def __init__( self, - *args, + *args: Any, lookback_window_days: int = 0, start_date_max_days_from_now: Optional[int] = None, cursor_field: str = "created", - **kwargs, + **kwargs: Any, ): super().__init__(*args, **kwargs) self.lookback_window_days = lookback_window_days @@ -245,11 +252,14 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late def request_params( self, - stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + stream_state: Optional[Mapping[str, Any]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: params = super(CreatedCursorIncrementalStripeStream, self).request_params(stream_state, stream_slice, next_page_token) + assert isinstance(stream_slice, dict), "stream_slice must be a dictionary" + assert "created[gte]" in stream_slice, "stream_slice must contain 'created[gte]' key" + assert "created[lte]" in stream_slice, "stream_slice must contain 'created[lte]' key" return {"created[gte]": stream_slice["created[gte]"], "created[lte]": stream_slice["created[lte]"], **params} def chunk_dates(self, start_date_ts: int) -> Iterable[Tuple[int, int]]: @@ -262,7 +272,7 @@ def chunk_dates(self, start_date_ts: int) -> Iterable[Tuple[int, int]]: after_ts = before_ts + 1 def stream_slices( - self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None ) -> Iterable[Optional[Mapping[str, Any]]]: stream_state = stream_state or {} start_ts = self.get_start_timestamp(stream_state) @@ -271,7 +281,7 @@ def stream_slices( for start, end in self.chunk_dates(start_ts): yield {"created[gte]": start, "created[lte]": end} - def get_start_timestamp(self, stream_state) -> int: + def get_start_timestamp(self, stream_state: Mapping[str, Any]) -> int: start_point = self.start_date # we use +1 second because date range is inclusive start_point = max(start_point, stream_state.get(self.cursor_field, 0) + 1) @@ -295,22 +305,22 @@ class Events(CreatedCursorIncrementalStripeStream): API docs: https://stripe.com/docs/api/events/list """ - def __init__(self, *args, event_types: Optional[Iterable[str]] = None, **kwargs): + def __init__(self, *args: Any, event_types: Optional[Iterable[str]] = None, **kwargs: Any): super().__init__(*args, **kwargs) self.event_types = event_types def request_params( self, - stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + stream_state: Optional[Mapping[str, Any]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) if self.event_types: params["types[]"] = self.event_types return params - def path(self, **kwargs): + def path(self, **kwargs: Any) -> str: return "events" @@ -323,27 +333,27 @@ class UpdatedCursorIncrementalStripeStream(StripeStream): """ @property - def cursor_field(self): + def cursor_field(self) -> str: return self._cursor_field @property - def legacy_cursor_field(self): + def legacy_cursor_field(self) -> Optional[str]: return self._legacy_cursor_field @property - def event_types(self) -> Iterable[str]: + def event_types(self) -> Optional[Iterable[str]]: """A list of event types that are associated with entity.""" return self._event_types def __init__( self, - *args, + *args: Any, cursor_field: str = "updated", legacy_cursor_field: Optional[str] = "created", event_types: Optional[List[str]] = None, record_extractor: Optional[IRecordExtractor] = None, response_filter: Optional[Callable] = None, - **kwargs, + **kwargs: Any, ): self._event_types = event_types self._cursor_field = cursor_field @@ -367,7 +377,7 @@ def __init__( record_extractor=EventRecordExtractor(cursor_field=self.cursor_field, response_filter=response_filter), ) - def update_cursor_field(self, stream_state: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + def update_cursor_field(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: if not self.legacy_cursor_field: # Streams that used to support only full_refresh mode. # Now they support event-based incremental syncs but have a cursor field only in that mode. @@ -376,7 +386,7 @@ def update_cursor_field(self, stream_state: MutableMapping[str, Any]) -> Mutable current_stream_state_value = stream_state.get(self.cursor_field, stream_state.get(self.legacy_cursor_field, 0)) return {self.cursor_field: current_stream_state_value} - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + def get_updated_state(self, current_stream_state: Mapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: latest_record_value = latest_record.get(self.cursor_field) current_stream_state = self.update_cursor_field(current_stream_state) current_state_value = current_stream_state.get(self.cursor_field) @@ -385,7 +395,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late return {self.cursor_field: latest_record_value} def stream_slices( - self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None ) -> Iterable[Optional[Mapping[str, Any]]]: # When reading from a stream, a `read_records` is called once per slice. # We yield a single slice here because we don't want to make duplicate calls for event based incremental syncs. @@ -425,7 +435,7 @@ def __init__( self._created_cursor_stream = created_cursor_incremental_stream self._updated_cursor_stream = updated_cursor_incremental_stream - def get_parent_stream(self, stream_state: Mapping[str, Any]) -> StripeStream: + def get_parent_stream(self, stream_state: Optional[Mapping[str, Any]]) -> StripeStream: return self._updated_cursor_stream if stream_state else self._created_cursor_stream @@ -437,11 +447,11 @@ class IncrementalStripeStream(StripeStream): def __init__( self, - *args, + *args: Any, cursor_field: str = "updated", legacy_cursor_field: Optional[str] = "created", event_types: Optional[List[str]] = None, - **kwargs, + **kwargs: Any, ): super().__init__(*args, **kwargs) self._cursor_field = cursor_field @@ -460,15 +470,15 @@ def __init__( event_types=event_types, **kwargs, ) - self._parent_stream = None + self._parent_stream: Optional[StripeStream] = None self.stream_selector = IncrementalStripeStreamSelector(created_cursor_stream, updated_cursor_stream) @property - def parent_stream(self): + def parent_stream(self) -> Optional[StripeStream]: return self._parent_stream @parent_stream.setter - def parent_stream(self, stream): + def parent_stream(self, stream: StripeStream) -> None: self._parent_stream = stream @property @@ -476,12 +486,13 @@ def cursor_field(self) -> Union[str, List[str]]: return self._cursor_field def stream_slices( - self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None ) -> Iterable[Optional[Mapping[str, Any]]]: self.parent_stream = self.stream_selector.get_parent_stream(stream_state) - yield from self.parent_stream.stream_slices(sync_mode, cursor_field, stream_state) + yield from self.parent_stream.stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state) def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + assert self.parent_stream, "parent_stream must be set before calling get_updated_state" return self.parent_stream.get_updated_state(current_stream_state, latest_record) def read_records( @@ -491,6 +502,7 @@ def read_records( stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, ) -> Iterable[StreamData]: + assert self.parent_stream, "parent_stream must be set before calling read_records" yield from self.parent_stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) @@ -499,7 +511,7 @@ class CustomerBalanceTransactions(StripeStream): API docs: https://stripe.com/docs/api/customer_balance_transactions/list """ - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self.parent = IncrementalStripeStream( name="customers", @@ -511,7 +523,9 @@ def __init__(self, *args, **kwargs): start_date=self.start_date, ) - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs): + def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs: Any) -> str: + assert isinstance(stream_slice, dict), "stream_slice must be a dictionary" + assert "id" in stream_slice, "stream_slice must contain 'id' key" return f"customers/{stream_slice['id']}/balance_transactions" @property @@ -519,7 +533,7 @@ def availability_strategy(self) -> Optional[AvailabilityStrategy]: return StripeSubStreamAvailabilityStrategy() def stream_slices( - self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None ) -> Iterable[Optional[Mapping[str, Any]]]: slices = self.parent.stream_slices(sync_mode=SyncMode.full_refresh) for _slice in slices: @@ -537,7 +551,7 @@ class SetupAttempts(CreatedCursorIncrementalStripeStream, HttpSubStream): Docs: https://stripe.com/docs/api/setup_attempts/list """ - def __init__(self, **kwargs): + def __init__(self, **kwargs: Any) -> None: # SetupAttempts needs lookback_window, but it's parent class does not parent_kwargs = copy.copy(kwargs) parent_kwargs.pop("lookback_window_days") @@ -555,7 +569,7 @@ def __init__(self, **kwargs): ) super().__init__(parent=parent, **kwargs) - def path(self, **kwargs) -> str: + def path(self, **kwargs: Any) -> str: return "setup_attempts" @property @@ -565,7 +579,7 @@ def availability_strategy(self) -> Optional[AvailabilityStrategy]: return HttpAvailabilityStrategy() def stream_slices( - self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None ) -> Iterable[Optional[Mapping[str, Any]]]: # this is a unique combination of CreatedCursorIncrementalStripeStream and HttpSubStream, # so we need to have all the parent IDs multiplied by all the date slices @@ -576,17 +590,22 @@ def stream_slices( ) if incremental_slices: parent_records = HttpSubStream.stream_slices(self, sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state) - yield from (slice | rec for rec in parent_records for slice in incremental_slices) + yield from ( + slice_ | rec + for rec in parent_records + for slice_ in incremental_slices + if isinstance(slice_, dict) and isinstance(rec, dict) + ) else: yield from [] def request_params( self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + stream_state: Optional[Mapping[str, Any]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: - setup_intent_id = stream_slice.get("parent", {}).get("id") + setup_intent_id = stream_slice.get("parent", {}).get("id") if stream_slice else None params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) params.update(setup_intent=setup_intent_id) return params @@ -599,7 +618,7 @@ class Persons(UpdatedCursorIncrementalStripeStream, HttpSubStream): event_types = ["person.created", "person.updated", "person.deleted"] - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any): parent = StripeStream(*args, name="accounts", path="accounts", use_cache=USE_CACHE, **kwargs) super().__init__(*args, parent=parent, **kwargs) @@ -607,14 +626,26 @@ def __init__(self, *args, **kwargs): def availability_strategy(self) -> Optional[AvailabilityStrategy]: return StripeSubStreamAvailabilityStrategy() - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs): + def path( + self, + *, + stream_state: Optional[Mapping[str, Any]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> str: + assert isinstance(stream_slice, dict), "stream_slice must be a dictionary" + assert "parent" in stream_slice, "stream_slice must contain 'parent' key" + assert isinstance(stream_slice["parent"], dict), "stream_slice['parent'] must be a dictionary" + assert "id" in stream_slice["parent"], "stream_slice['parent'] must contain 'id' key" return f"accounts/{stream_slice['parent']['id']}/persons" def stream_slices( - self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None ) -> Iterable[Optional[Mapping[str, Any]]]: - parent = HttpSubStream if not stream_state else UpdatedCursorIncrementalStripeStream - yield from parent.stream_slices(self, sync_mode, cursor_field=cursor_field, stream_state=stream_state) + Parent: Union[Type[HttpSubStream], Type[UpdatedCursorIncrementalStripeStream]] = ( + HttpSubStream if not stream_state else UpdatedCursorIncrementalStripeStream + ) + yield from Parent.stream_slices(self, sync_mode, cursor_field=cursor_field, stream_state=stream_state) class StripeSubStream(StripeStream, HttpSubStream): @@ -666,7 +697,7 @@ class StripeLazySubStream(StripeStream, HttpSubStream): """ @property - def sub_items_attr(self) -> str: + def sub_items_attr(self) -> Optional[str]: """ :return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None. @@ -675,9 +706,9 @@ def sub_items_attr(self) -> str: def __init__( self, - *args, + *args: Any, sub_items_attr: Optional[str] = None, - **kwargs, + **kwargs: Any, ): super().__init__(*args, **kwargs) self._sub_items_attr = sub_items_attr @@ -686,8 +717,13 @@ def __init__( def availability_strategy(self) -> Optional[AvailabilityStrategy]: return StripeSubStreamAvailabilityStrategy() - def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs): - params = super().request_params(stream_slice=stream_slice, **kwargs) + def request_params( + self, + stream_state: Optional[Mapping[str, Any]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> MutableMapping[str, Any]: + params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) # add 'starting_after' param if not params.get("starting_after") and stream_slice and stream_slice.get("starting_after"): @@ -695,16 +731,28 @@ def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs): return params - def read_records(self, sync_mode: SyncMode, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]: + def read_records( + self, + sync_mode: SyncMode, + cursor_field: Optional[List[str]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[Mapping[str, Any]]: + if not stream_slice or not stream_slice.get("parent"): + return [] items_obj = stream_slice["parent"].get(self.sub_items_attr, {}) if not items_obj: - return + return [] - items_next_pages = [] items = list(self.record_extractor.extract_records(items_obj.get("data", []), stream_slice)) if items_obj.get("has_more") and items: stream_slice = {"starting_after": items[-1]["id"], **stream_slice} - items_next_pages = super().read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice, **kwargs) + items_next_pages = super().read_records( + sync_mode=SyncMode.full_refresh, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state + ) + else: + items_next_pages = [] + yield from chain(items, items_next_pages) @@ -713,7 +761,7 @@ def __init__(self, updated_cursor_incremental_stream: UpdatedCursorIncrementalSt self._updated_incremental_stream = updated_cursor_incremental_stream self._lazy_sub_stream = lazy_sub_stream - def get_parent_stream(self, stream_state: Mapping[str, Any]) -> StripeStream: + def get_parent_stream(self, stream_state: Optional[Mapping[str, Any]]) -> StripeStream: return self._updated_incremental_stream if stream_state else self._lazy_sub_stream @@ -726,13 +774,13 @@ class UpdatedCursorIncrementalStripeLazySubStream(StripeStream, ABC): def __init__( self, parent: StripeStream, - *args, + *args: Any, cursor_field: str = "updated", legacy_cursor_field: Optional[str] = "created", event_types: Optional[List[str]] = None, sub_items_attr: Optional[str] = None, response_filter: Optional[Callable] = None, - **kwargs, + **kwargs: Any, ): super().__init__(*args, **kwargs) self._cursor_field = cursor_field @@ -753,7 +801,7 @@ def __init__( ), **kwargs, ) - self._parent_stream = None + self._parent_stream: Optional[StripeStream] = None self.stream_selector = IncrementalStripeLazySubStreamSelector(self.updated_cursor_incremental_stream, self.lazy_substream) @property @@ -761,18 +809,18 @@ def cursor_field(self) -> Union[str, List[str]]: return self._cursor_field @property - def parent_stream(self): + def parent_stream(self) -> Optional[StripeStream]: return self._parent_stream @parent_stream.setter - def parent_stream(self, stream): + def parent_stream(self, stream: Optional[StripeStream]) -> None: self._parent_stream = stream def stream_slices( - self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None ) -> Iterable[Optional[Mapping[str, Any]]]: self.parent_stream = self.stream_selector.get_parent_stream(stream_state) - yield from self.parent_stream.stream_slices(sync_mode, cursor_field=cursor_field, stream_state=stream_state) + yield from self.parent_stream.stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state) def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: # important note: do not call self.parent_stream here as one of the parents does not have the needed method implemented @@ -785,6 +833,7 @@ def read_records( stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, ) -> Iterable[StreamData]: + assert self.parent_stream, "Parent stream must be set before calling read_records" yield from self.parent_stream.read_records( sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state ) @@ -801,12 +850,12 @@ class ParentIncrementalStipeSubStream(StripeSubStream): def cursor_field(self) -> str: return self._cursor_field - def __init__(self, cursor_field: str, *args, **kwargs): + def __init__(self, cursor_field: str, *args: Any, **kwargs: Any): self._cursor_field = cursor_field super().__init__(*args, **kwargs) def stream_slices( - self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None + self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[Any, Any]] = None ) -> Iterable[Optional[Mapping[str, Any]]]: stream_state = stream_state or {} if stream_state: @@ -827,9 +876,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late def raise_on_http_errors(self) -> bool: return False - def parse_response(self, response: requests.Response, *args, **kwargs) -> Iterable[Mapping[str, Any]]: - if response.status_code == 200: - return super().parse_response(response, *args, **kwargs) + def parse_response(self, response: requests.Response, *args: Any, **kwargs: Optional[Mapping[str, Any]]) -> Iterable[Mapping[str, Any]]: if response.status_code == 404: # When running incremental sync with state, the returned parent object very likely will not contain sub-items # as the events API does not support expandable items. Parent class will try getting sub-items from this object, @@ -841,6 +888,7 @@ def parse_response(self, response: requests.Response, *args, **kwargs) -> Iterab ) return [] response.raise_for_status() + return super().parse_response(response, *args, **kwargs) @property def availability_strategy(self) -> Optional[AvailabilityStrategy]: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py index 0463e204fdb08..c54935840bed4 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py @@ -42,7 +42,7 @@ def stream_by_name(config): from source_stripe.source import SourceStripe def mocker(stream_name, source_config=config): - source = SourceStripe(None, source_config, StateBuilder().build()) + source = SourceStripe(catalog=None, config=source_config, state=StateBuilder().build()) streams = source.streams(source_config) for stream in streams: if stream.name == stream_name: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees.py index b7028590f4464..85cae045e7a20 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees.py @@ -56,7 +56,7 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[List[AirbyteStateMessage]]) -> SourceStripe: - return SourceStripe(catalog, config, state) + return SourceStripe(catalog=catalog, config=config, state=state) def _an_event() -> RecordBuilder: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees_refunds.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees_refunds.py index bfde5e409d11a..52e31cb0991ed 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees_refunds.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees_refunds.py @@ -64,7 +64,7 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: - return SourceStripe(catalog, config, state) + return SourceStripe(catalog=catalog, config=config, state=state) def _an_event() -> RecordBuilder: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_authorizations.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_authorizations.py index 90e61aad3166a..87f760fde1bdb 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_authorizations.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_authorizations.py @@ -56,7 +56,7 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: - return SourceStripe(catalog, config, state) + return SourceStripe(catalog=catalog, config=config, state=state) def _an_event() -> RecordBuilder: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_bank_accounts.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_bank_accounts.py index db5c32d5d9a3a..fadc4231935e7 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_bank_accounts.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_bank_accounts.py @@ -68,7 +68,7 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: - return SourceStripe(catalog, config, state) + return SourceStripe(catalog=catalog, config=config, state=state) def _an_event() -> RecordBuilder: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_cards.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_cards.py index 413c1e15d2a95..c30c8ef0908ba 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_cards.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_cards.py @@ -56,7 +56,7 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: - return SourceStripe(catalog, config, state) + return SourceStripe(catalog=catalog, config=config, state=state) def _an_event() -> RecordBuilder: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py index 7f8a0800b97e8..cbd2e2f28abf4 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py @@ -56,7 +56,7 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: - return SourceStripe(catalog, config, state) + return SourceStripe(catalog=catalog, config=config, state=state) def _an_event() -> RecordBuilder: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_events.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_events.py index 14942b03c54c2..0d0cd80dd4416 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_events.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_events.py @@ -49,7 +49,7 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: - return SourceStripe(catalog, config, state) + return SourceStripe(catalog=catalog, config=config, state=state) def _a_record() -> RecordBuilder: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py index cbd08bce1a5cb..1fa6a3bd39262 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py @@ -57,7 +57,7 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: - return SourceStripe(catalog, config, state) + return SourceStripe(catalog=catalog, config=config, state=state) def _an_event() -> RecordBuilder: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py index 3635f7bd6d6d8..396d65765e64f 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py @@ -57,7 +57,7 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: - return SourceStripe(catalog, config, state) + return SourceStripe(catalog=catalog, config=config, state=state) def _an_event() -> RecordBuilder: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py index 10785b6c47770..d06fd3c52aee2 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py @@ -61,7 +61,7 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: - return SourceStripe(catalog, config, state) + return SourceStripe(catalog=catalog, config=config, state=state) def _an_event() -> RecordBuilder: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_reviews.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_reviews.py index d454faec79e8a..ac5135ea518ac 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_reviews.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_reviews.py @@ -56,7 +56,7 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: - return SourceStripe(catalog, config, state) + return SourceStripe(catalog=catalog, config=config, state=state) def _an_event() -> RecordBuilder: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_transactions.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_transactions.py index f0a04e093760e..cbd059dac42be 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_transactions.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_transactions.py @@ -56,7 +56,7 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: - return SourceStripe(catalog, config, state) + return SourceStripe(catalog=catalog, config=config, state=state) def _an_event() -> RecordBuilder: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py index 5a2f6e06a719a..be4491476f0a0 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py @@ -52,11 +52,11 @@ def _a_valid_config(): @patch.object(source_stripe.source, "stripe") def test_source_check_connection_ok(mocked_client, config): - assert SourceStripe(_ANY_CATALOG, _ANY_CONFIG, _NO_STATE).check_connection(logger, config=config) == (True, None) + assert SourceStripe(catalog=_ANY_CATALOG, config=_ANY_CONFIG, state=_NO_STATE).check_connection(logger, config=config) == (True, None) def test_streams_are_unique(config): - stream_names = [s.name for s in SourceStripe(_ANY_CATALOG, _ANY_CONFIG, _NO_STATE).streams(config=config)] + stream_names = [s.name for s in SourceStripe(catalog=_ANY_CATALOG, config=_ANY_CONFIG, state=_NO_STATE).streams(config=config)] assert len(stream_names) == len(set(stream_names)) == 46 @@ -73,7 +73,7 @@ def test_streams_are_unique(config): def test_config_validation(mocked_client, input_config, expected_error_msg): context = pytest.raises(AirbyteTracedException, match=expected_error_msg) if expected_error_msg else does_not_raise() with context: - SourceStripe(_ANY_CATALOG, _ANY_CONFIG, _NO_STATE).check_connection(logger, config=input_config) + SourceStripe(catalog=_ANY_CATALOG, config=_ANY_CONFIG, state=_NO_STATE).check_connection(logger, config=input_config) @pytest.mark.parametrize( @@ -86,15 +86,15 @@ def test_config_validation(mocked_client, input_config, expected_error_msg): @patch.object(source_stripe.source.stripe, "Account") def test_given_stripe_error_when_check_connection_then_connection_not_available(mocked_client, exception): mocked_client.retrieve.side_effect = exception - is_available, _ = SourceStripe(_ANY_CATALOG, _ANY_CONFIG, _NO_STATE).check_connection(logger, config=_a_valid_config()) + is_available, _ = SourceStripe(catalog=_ANY_CATALOG, config=_ANY_CONFIG, state=_NO_STATE).check_connection(logger, config=_a_valid_config()) assert not is_available def test_when_streams_return_full_refresh_as_concurrent(): streams = SourceStripe( - CatalogBuilder().with_stream("bank_accounts", SyncMode.full_refresh).with_stream("customers", SyncMode.incremental).build(), - _a_valid_config(), - _NO_STATE, + catalog=CatalogBuilder().with_stream("bank_accounts", SyncMode.full_refresh).with_stream("customers", SyncMode.incremental).build(), + config=_a_valid_config(), + state=_NO_STATE, ).streams(_a_valid_config()) # bank_accounts (as it is defined as full_refresh) diff --git a/docs/integrations/sources/stripe.md b/docs/integrations/sources/stripe.md index 2e5bb5e957c38..223dbcf06c867 100644 --- a/docs/integrations/sources/stripe.md +++ b/docs/integrations/sources/stripe.md @@ -223,6 +223,8 @@ Each record is marked with `is_deleted` flag when the appropriate event happens | Version | Date | Pull Request | Subject | | :------ | :--------- | :-------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 5.2.5 | 2024-02-23 | [35587](https://github.com/airbytehq/airbyte/pull/35587) | Make `mypy` pass. | + | 5.2.4 | 2024-02-12 | [35137](https://github.com/airbytehq/airbyte/pull/35137) | Fix license in `pyproject.toml` | | 5.2.3 | 2024-02-09 | [35068](https://github.com/airbytehq/airbyte/pull/35068) | Manage dependencies with Poetry. | | 5.2.2 | 2024-01-31 | [34619](https://github.com/airbytehq/airbyte/pull/34619) | Events stream concurrent on incremental syncs |