Skip to content

Commit

Permalink
Merge branch 'main' into lazebnyi/add-cocurency-to-partial-declarativ…
Browse files Browse the repository at this point in the history
…e-sources
  • Loading branch information
lazebnyi authored Feb 20, 2025
2 parents 7bec79b + e1a182c commit 052d5dd
Show file tree
Hide file tree
Showing 113 changed files with 6,720 additions and 2,361 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/connector-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
# cdk_extra: n/a
# TODO: These are manifest connectors and won't work as expected until we
# add `--use-local-cdk` support for manifest connectors.
- connector: source-the-guardian-api
- connector: source-amplitude
cdk_extra: n/a
- connector: source-pokeapi
cdk_extra: n/a
Expand Down Expand Up @@ -128,7 +128,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
python-version: "3.11"
# Create initial pending status for test report
- name: Create Pending Test Report Status
if: steps.no_changes.outputs.status != 'cancelled'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pypi_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ jobs:
(github.event_name == 'push' &&
startsWith(github.ref, 'refs/tags/v')
) || github.event.inputs.publish_to_pypi == 'true'
uses: pypa/gh-action-pypi-publish@v1.12.3
uses: pypa/gh-action-pypi-publish@v1.12.4

publish_sdm:
name: Publish SDM to DockerHub
Expand Down
28 changes: 16 additions & 12 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import dataclasses

from dataclasses import asdict, dataclass, field
from typing import Any, List, Mapping

from airbyte_cdk.connector_builder.message_grouper import MessageGrouper
from airbyte_cdk.connector_builder.test_reader import TestReader
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteRecordMessage,
Expand All @@ -32,11 +33,11 @@
MAX_RECORDS_KEY = "max_records"


@dataclasses.dataclass
@dataclass
class TestReadLimits:
max_records: int = dataclasses.field(default=DEFAULT_MAXIMUM_RECORDS)
max_pages_per_slice: int = dataclasses.field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
max_slices: int = dataclasses.field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES)
max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS)
max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES)


def get_limits(config: Mapping[str, Any]) -> TestReadLimits:
Expand Down Expand Up @@ -73,17 +74,20 @@ def read_stream(
limits: TestReadLimits,
) -> AirbyteMessage:
try:
handler = MessageGrouper(limits.max_pages_per_slice, limits.max_slices, limits.max_records)
stream_name = configured_catalog.streams[
0
].stream.name # The connector builder only supports a single stream
stream_read = handler.get_message_groups(
test_read_handler = TestReader(
limits.max_pages_per_slice, limits.max_slices, limits.max_records
)
# The connector builder only supports a single stream
stream_name = configured_catalog.streams[0].stream.name

stream_read = test_read_handler.run_test_read(
source, config, configured_catalog, state, limits.max_records
)

return AirbyteMessage(
type=MessageType.RECORD,
record=AirbyteRecordMessage(
data=dataclasses.asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()
data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()
),
)
except Exception as exc:
Expand Down
Loading

0 comments on commit 052d5dd

Please sign in to comment.