Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(source-iterable): fix OOM errors #41983

Merged
merged 11 commits into from
Jul 22, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799
dockerImageTag: 0.5.11
dockerImageTag: 0.6.0
dockerRepository: airbyte/source-iterable
documentationUrl: https://docs.airbyte.com/integrations/sources/iterable
githubIssueLabel: source-iterable
Expand Down
224 changes: 148 additions & 76 deletions airbyte-integrations/connectors/source-iterable/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.5.11"
version = "0.6.0"
name = "source-iterable"
description = "Source implementation for Iterable."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -18,16 +18,15 @@ include = "source_iterable"
[tool.poetry.dependencies]
python = "^3.9,<3.12"
pendulum = "==2.1.2"
airbyte-cdk = "0.90.0"
requests = "==2.31.0"
airbyte-cdk = "^3"
python-dateutil = "==2.8.2"

[tool.poetry.scripts]
source-iterable = "source_iterable.run:run"

[tool.poetry.group.dev.dependencies]
pytest = "^6.1"
pytest = "^6.2"
pytest-mock = "^3.6.1"
freezegun = "==1.1.0"
freezegun = "==1.5.1"
requests-mock = "^1.9.3"
responses = "==0.23.1"
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,21 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import json
from dataclasses import dataclass
from io import StringIO
from typing import Any, Iterable, Mapping

import requests
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState


@dataclass
class XJsonRecordExtractor(DpathExtractor):
def extract_records(self, response: requests.Response) -> list[Record]:
return [json.loads(record) for record in response.iter_lines()]


@dataclass
class ListUsersRecordExtractor(DpathExtractor):
def extract_records(self, response: requests.Response) -> list[Record]:
return [{"email": record.decode()} for record in response.iter_lines()]


@dataclass
class EventsRecordExtractor(DpathExtractor):
common_fields = ("itblInternal", "_type", "createdAt", "email")

def extract_records(self, response: requests.Response) -> list[Record]:
jsonl_records = StringIO(response.text)
records = []
for record in jsonl_records:
record_dict = json.loads(record)
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
jsonl_records = super().extract_records(response=response)
for record_dict in jsonl_records:
record_dict_common_fields = {}
for field in self.common_fields:
record_dict_common_fields[field] = record_dict.pop(field, None)

records.append({**record_dict_common_fields, "data": record_dict})

return records
yield {**record_dict_common_fields, "data": record_dict}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ streams:
type: DeclarativeStream
retriever:
type: SimpleRetriever
decoder:
type: IterableDecoder
paginator:
type: NoPagination
requester:
Expand All @@ -87,9 +89,8 @@ streams:
record_selector:
type: RecordSelector
extractor:
class_name: source_iterable.components.ListUsersRecordExtractor
field_path:
- getUsers
type: DpathExtractor
field_path: []
partition_router:
- type: SubstreamPartitionRouter
parent_stream_configs:
Expand Down Expand Up @@ -139,6 +140,12 @@ streams:
- path:
- listId
value: "{{ stream_slice.list_id }}"
- path:
- email
value: "{{ record.record }}"
- type: RemoveFields
field_pointers:
- - record
- name: campaigns
type: DeclarativeStream
retriever:
Expand Down Expand Up @@ -262,6 +269,8 @@ streams:
type: DeclarativeStream
retriever:
type: SimpleRetriever
decoder:
type: JsonlDecoder
paginator:
type: NoPagination
requester:
Expand All @@ -284,9 +293,8 @@ streams:
record_selector:
type: RecordSelector
extractor:
class_name: source_iterable.components.XJsonRecordExtractor
field_path:
- users
type: DpathExtractor
field_path: []
partition_router: []
primary_key: []
incremental_sync:
Expand Down Expand Up @@ -318,6 +326,8 @@ streams:
primary_key: []
retriever:
type: SimpleRetriever
decoder:
type: JsonlDecoder
requester:
type: HttpRequester
url_base: https://api.iterable.com/api/
Expand All @@ -338,15 +348,14 @@ streams:
type: RecordSelector
extractor:
class_name: source_iterable.components.EventsRecordExtractor
field_path:
- events
field_path: []
paginator:
type: NoPagination
partition_router:
- type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: email
parent_key: record
request_option:
inject_into: request_parameter
type: RequestOption
Expand All @@ -357,6 +366,8 @@ streams:
type: DeclarativeStream
retriever:
type: SimpleRetriever
decoder:
type: IterableDecoder
paginator:
type: NoPagination
requester:
Expand All @@ -377,9 +388,8 @@ streams:
record_selector:
type: RecordSelector
extractor:
class_name: source_iterable.components.ListUsersRecordExtractor
field_path:
- getUsers
type: DpathExtractor
field_path: []
partition_router:
- type: SubstreamPartitionRouter
parent_stream_configs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, UserDefinedBackoffException
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from pendulum.datetime import DateTime
from requests import HTTPError, codes
from requests import HTTPError
from requests.exceptions import ChunkedEncodingError
from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice
from source_iterable.utils import dateutil_parse
Expand Down Expand Up @@ -106,7 +106,7 @@ def should_retry(self, response: requests.Response) -> bool:
if self._slice_retry < 3:
return True
return False
return super().should_retry(response)
return response.status_code == 429 or 500 <= response.status_code < 600

def read_records(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

import dateutil.parser
import pendulum
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream


def dateutil_parse(text):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,64 +1,24 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import io

import json
from io import StringIO
from unittest.mock import Mock
import requests
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonlDecoder
from source_iterable.components import EventsRecordExtractor

import pytest
from source_iterable.components import EventsRecordExtractor, ListUsersRecordExtractor, XJsonRecordExtractor


@pytest.fixture
def mock_response():
mock_response = Mock()
return mock_response


def test_list_users_extraction(mock_response):
mock_response.iter_lines.return_value = [b'user1@example.com', b'user2@example.com']

extractor = ListUsersRecordExtractor(
field_path=["getUsers"],
config={},
parameters={},
)
records = extractor.extract_records(mock_response)

assert len(records) == 2
assert records[0]["email"] == "user1@example.com"
assert records[1]["email"] == "user2@example.com"


def test_xjson_extraction(mock_response):
mock_response.iter_lines.return_value = [
b'{"id": 1, "name": "Alice"}',
b'{"id": 2, "name": "Bob"}'
]

extractor = XJsonRecordExtractor(
field_path=["users"],
config={},
parameters={},
)
records = extractor.extract_records(mock_response)

assert len(records) == 2
assert records[0] == {"id": 1, "name": "Alice"}
assert records[1] == {"id": 2, "name": "Bob"}


def test_events_extraction(mock_response):
mock_response.text = '{"itblInternal": 1, "_type": "event", "createdAt": "2024-03-21", "email": "user@example.com", "data": {"event_type": "click"}}\n' \
'{"_type": "event", "createdAt": "2024-03-22", "data": {"event_type": "purchase"}}'
def test_events_extraction():
mock_response = requests.Response()
mock_response.raw = io.BytesIO(b'{"itblInternal": 1, "_type": "event", "createdAt": "2024-03-21", "email": "user@example.com", "data": {"event_type": "click"}}\n{"_type": "event", "createdAt": "2024-03-22", "data": {"event_type": "purchase"}}')

extractor = EventsRecordExtractor(
field_path=["events"],
field_path=[],
decoder=JsonlDecoder(parameters={}),
config={},
parameters={},
)
records = extractor.extract_records(mock_response)
records = list(extractor.extract_records(mock_response))

assert len(records) == 2
assert records[0] == {'_type': 'event', 'createdAt': '2024-03-21', 'data': {'data': {'event_type': 'click'}}, 'email': 'user@example.com', 'itblInternal': 1}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from datetime import datetime, timedelta
from datetime import datetime

import freezegun
import pendulum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import json

import pendulum
import pytest
import requests
import responses
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,11 @@
import pytest
import requests
import responses
from airbyte_cdk import AirbyteTracedException
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.exceptions import ReadException
from airbyte_cdk.sources.declarative.types import StreamSlice
from source_iterable.source import SourceIterable
from source_iterable.streams import (
Campaigns,
CampaignsMetrics,
IterableExportEventsStreamAdjustableRange,
IterableExportStreamAdjustableRange,
IterableExportStreamRanged,
IterableStream,
Templates,
)
from source_iterable.streams import Campaigns, CampaignsMetrics, Templates
from source_iterable.utils import dateutil_parse


Expand All @@ -42,7 +34,7 @@ def test_stream_stops_on_401(config):
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=1", json={}, status=401)
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=2", json={}, status=401)
slices = 0
with pytest.raises(ReadException):
with pytest.raises(AirbyteTracedException):
for slice_ in users_stream.stream_slices(sync_mode=SyncMode.full_refresh):
slices += 1
_ = list(users_stream.read_records(stream_slice=slice_, sync_mode=SyncMode.full_refresh))
Expand Down
35 changes: 18 additions & 17 deletions docs/integrations/sources/iterable.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,24 @@ The Iterable source connector supports the following [sync modes](https://docs.a
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.5.11 | 2024-07-20 | [42228](https://github.com/airbytehq/airbyte/pull/42228) | Update dependencies |
| 0.5.10 | 2024-07-13 | [41684](https://github.com/airbytehq/airbyte/pull/41684) | Update dependencies |
| 0.5.9 | 2024-07-10 | [41401](https://github.com/airbytehq/airbyte/pull/41401) | Update dependencies |
| 0.5.8 | 2024-07-09 | [41293](https://github.com/airbytehq/airbyte/pull/41293) | Update dependencies |
| 0.5.7 | 2024-07-06 | [40811](https://github.com/airbytehq/airbyte/pull/40811) | Update dependencies |
| 0.5.6 | 2024-06-25 | [40362](https://github.com/airbytehq/airbyte/pull/40362) | Update dependencies |
| 0.5.5 | 2024-06-22 | [40080](https://github.com/airbytehq/airbyte/pull/40080) | Update dependencies |
| 0.5.4 | 2024-06-17 | [39382](https://github.com/airbytehq/airbyte/pull/39382) | Refactor state handling for Python incremental streams |
| 0.5.3 | 2024-06-05 | [39142](https://github.com/airbytehq/airbyte/pull/39142) | Updated the `CDK` version to `0.89.0` to fix OOM |
| 0.5.2 | 2024-06-04 | [39077](https://github.com/airbytehq/airbyte/pull/39077) | [autopull] Upgrade base image to v1.2.1 |
| 0.5.1 | 2024-04-24 | [36645](https://github.com/airbytehq/airbyte/pull/36645) | Schema descriptions and CDK 0.80.0 |
| 0.5.0 | 2024-03-18 | [36231](https://github.com/airbytehq/airbyte/pull/36231) | Migrate connector to low-code |
| 0.4.0 | 2024-03-19 | [36267](https://github.com/airbytehq/airbyte/pull/36267) | Pin airbyte-cdk version to `^0` |
| 0.3.0 | 2024-02-20 | [35465](https://github.com/airbytehq/airbyte/pull/35465) | Per-error reporting and continue sync on stream failures |
| 0.2.2 | 2024-02-12 | [35150](https://github.com/airbytehq/airbyte/pull/35150) | Manage dependencies with Poetry. |
| 0.2.1 | 2024-01-12 | [1234](https://github.com/airbytehq/airbyte/pull/1234) | prepare for airbyte-lib |
| :------ | :--------- | :------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.6.0 | 2024-07-20 | [41983](https://github.com/airbytehq/airbyte/pull/41983) | Fix OOM errors; update CDK to v3 |
| 0.5.11 | 2024-07-20 | [42228](https://github.com/airbytehq/airbyte/pull/42228) | Update dependencies |
| 0.5.10 | 2024-07-13 | [41684](https://github.com/airbytehq/airbyte/pull/41684) | Update dependencies |
| 0.5.9 | 2024-07-10 | [41401](https://github.com/airbytehq/airbyte/pull/41401) | Update dependencies |
| 0.5.8 | 2024-07-09 | [41293](https://github.com/airbytehq/airbyte/pull/41293) | Update dependencies |
| 0.5.7 | 2024-07-06 | [40811](https://github.com/airbytehq/airbyte/pull/40811) | Update dependencies |
| 0.5.6 | 2024-06-25 | [40362](https://github.com/airbytehq/airbyte/pull/40362) | Update dependencies |
| 0.5.5 | 2024-06-22 | [40080](https://github.com/airbytehq/airbyte/pull/40080) | Update dependencies |
| 0.5.4 | 2024-06-17 | [39382](https://github.com/airbytehq/airbyte/pull/39382) | Refactor state handling for Python incremental streams |
| 0.5.3 | 2024-06-05 | [39142](https://github.com/airbytehq/airbyte/pull/39142) | Updated the `CDK` version to `0.89.0` to fix OOM |
| 0.5.2 | 2024-06-04 | [39077](https://github.com/airbytehq/airbyte/pull/39077) | [autopull] Upgrade base image to v1.2.1 |
| 0.5.1 | 2024-04-24 | [36645](https://github.com/airbytehq/airbyte/pull/36645) | Schema descriptions and CDK 0.80.0 |
| 0.5.0 | 2024-03-18 | [36231](https://github.com/airbytehq/airbyte/pull/36231) | Migrate connector to low-code |
| 0.4.0 | 2024-03-19 | [36267](https://github.com/airbytehq/airbyte/pull/36267) | Pin airbyte-cdk version to `^0` |
| 0.3.0 | 2024-02-20 | [35465](https://github.com/airbytehq/airbyte/pull/35465) | Per-error reporting and continue sync on stream failures |
| 0.2.2 | 2024-02-12 | [35150](https://github.com/airbytehq/airbyte/pull/35150) | Manage dependencies with Poetry. |
| 0.2.1 | 2024-01-12 | [1234](https://github.com/airbytehq/airbyte/pull/1234) | prepare for airbyte-lib |
| 0.2.0 | 2023-09-29 | [28457](https://github.com/airbytehq/airbyte/pull/30931) | Added `userId` to `email_bounce`, `email_click`, `email_complaint`, `email_open`, `email_send` `email_send_skip`, `email_subscribe`, `email_unsubscribe`, `events` streams |
| 0.1.31 | 2023-12-06 | [33106](https://github.com/airbytehq/airbyte/pull/33106) | Base image migration: remove Dockerfile and use the python-connector-base image |
| 0.1.30 | 2023-07-19 | [28457](https://github.com/airbytehq/airbyte/pull/28457) | Fixed TypeError for StreamSlice in debug mode |
Expand Down
Loading