Skip to content

Commit

Permalink
Merge branch 'lazebnyi/add-array-items-handling-to-dynamic-schemas' o…
Browse files Browse the repository at this point in the history
…f github.com:airbytehq/airbyte-python-cdk into lazebnyi/add-array-items-handling-to-dynamic-schemas
  • Loading branch information
lazebnyi committed Jan 24, 2025
2 parents 0ee84d5 + 736bf28 commit 85262e2
Show file tree
Hide file tree
Showing 6 changed files with 751 additions and 23 deletions.
55 changes: 53 additions & 2 deletions .github/workflows/connector-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ concurrency:
jobs:
cdk_changes:
name: Get Changes
runs-on: ubuntu-24.04
runs-on: ubuntu-22.04
permissions:
statuses: write
pull-requests: read
Expand Down Expand Up @@ -62,7 +62,7 @@ jobs:
# Forked PRs are handled by the community_ci.yml workflow
# If the condition is not met the job will be skipped (it will not fail)
# runs-on: connector-test-large
runs-on: ubuntu-24.04
runs-on: ubuntu-22.04
timeout-minutes: 360 # 6 hours
strategy:
fail-fast: false
Expand Down Expand Up @@ -96,6 +96,8 @@ jobs:
name: "Check: '${{matrix.connector}}' (skip=${{needs.cdk_changes.outputs['src'] == 'false' || needs.cdk_changes.outputs[matrix.cdk_extra] == 'false'}})"
permissions:
checks: write
contents: write # Required for creating commit statuses
pull-requests: read
steps:
- name: Abort if extra not changed (${{matrix.cdk_extra}})
id: no_changes
Expand Down Expand Up @@ -127,6 +129,22 @@ jobs:
uses: actions/setup-python@v5
with:
python-version: "3.10"
# Create initial pending status for test report
- name: Create Pending Test Report Status
if: steps.no_changes.outputs.status != 'cancelled'
env:
GH_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
run: |
HEAD_SHA="${{ github.event.pull_request.head.sha || github.sha }}"
gh api \
--method POST \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
repos/${{ github.repository }}/statuses/$HEAD_SHA \
-f state="pending" \
-f description="Running connector tests..." \
-f context="${{ matrix.connector }} Test Report"
- name: Test Connector
if: steps.no_changes.outputs.status != 'cancelled'
timeout-minutes: 90
Expand Down Expand Up @@ -173,6 +191,39 @@ jobs:
echo "success=${success}" >> $GITHUB_OUTPUT
echo "html_report_url=${html_report_url}" >> $GITHUB_OUTPUT
# Update the test report status with results
- name: Update Test Report Status
if: always() && steps.no_changes.outputs.status != 'cancelled' && steps.evaluate_output.outcome == 'success'
env:
GH_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
run: |
HEAD_SHA="${{ github.event.pull_request.head.sha || github.sha }}"
gh api \
--method POST \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
repos/${{ github.repository }}/statuses/$HEAD_SHA \
-f state="${{ steps.evaluate_output.outputs.success == 'true' && 'success' || 'failure' }}" \
-f target_url="${{ steps.evaluate_output.outputs.html_report_url }}" \
-f description="Click Details to view the test report" \
-f context="${{ matrix.connector }} Test Report"
# Create failure status if report generation failed
- name: Create Report Generation Failed Status
if: always() && steps.no_changes.outputs.status != 'cancelled' && steps.evaluate_output.outcome != 'success'
env:
GH_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
run: |
HEAD_SHA="${{ github.event.pull_request.head.sha || github.sha }}"
gh api \
--method POST \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
repos/${{ github.repository }}/statuses/$HEAD_SHA \
-f state="failure" \
-f description="Failed to run connector tests." \
-f context="${{ matrix.connector }} Test Report"
# Upload the job output to the artifacts
- name: Upload Job Output
id: upload_job_output
Expand Down
53 changes: 35 additions & 18 deletions airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ def get_request_params(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
if stream_slice:
if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
return self._partition_router.get_request_params( # type: ignore # this always returns a mapping
stream_state=stream_state,
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
Expand All @@ -244,6 +246,8 @@ def get_request_headers(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
if stream_slice:
if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping
stream_state=stream_state,
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
Expand All @@ -266,6 +270,8 @@ def get_request_body_data(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Union[Mapping[str, Any], str]:
if stream_slice:
if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping
stream_state=stream_state,
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
Expand All @@ -288,6 +294,8 @@ def get_request_body_json(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
if stream_slice:
if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping
stream_state=stream_state,
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
Expand All @@ -303,21 +311,6 @@ def get_request_body_json(
raise ValueError("A partition needs to be provided in order to get request body json")

def should_be_synced(self, record: Record) -> bool:
if (
record.associated_slice
and self._to_partition_key(record.associated_slice.partition)
not in self._cursor_per_partition
):
partition_state = (
self._state_to_migrate_from
if self._state_to_migrate_from
else self._NO_CURSOR_STATE
)
cursor = self._create_cursor(partition_state)

self._cursor_per_partition[
self._to_partition_key(record.associated_slice.partition)
] = cursor
return self._get_cursor(record).should_be_synced(
self._convert_record_to_cursor_record(record)
)
Expand Down Expand Up @@ -356,8 +349,32 @@ def _get_cursor(self, record: Record) -> DeclarativeCursor:
)
partition_key = self._to_partition_key(record.associated_slice.partition)
if partition_key not in self._cursor_per_partition:
raise ValueError(
"Invalid state as stream slices that are emitted should refer to an existing cursor"
)
self._create_cursor_for_partition(partition_key)
cursor = self._cursor_per_partition[partition_key]
return cursor

def _create_cursor_for_partition(self, partition_key: str) -> None:
"""
Dynamically creates and initializes a cursor for the specified partition.
This method is required for `ConcurrentPerPartitionCursor`. For concurrent cursors,
stream_slices is executed only for the concurrent cursor, so cursors per partition
are not created for the declarative cursor. This method ensures that a cursor is available
to create requests for the specified partition. The cursor is initialized
with the per-partition state if present in the initial state, or with the global state
adjusted by the lookback window, or with the state to migrate from.
Note:
This is a temporary workaround and should be removed once the declarative cursor
is decoupled from the concurrent cursor implementation.
Args:
partition_key (str): The unique identifier for the partition for which the cursor
needs to be created.
"""
partition_state = (
self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE
)
cursor = self._create_cursor(partition_state)

self._cursor_per_partition[partition_key] = cursor
Original file line number Diff line number Diff line change
Expand Up @@ -2407,7 +2407,7 @@ def create_simple_retriever(
if (
not isinstance(stream_slicer, DatetimeBasedCursor)
or type(stream_slicer) is not DatetimeBasedCursor
) and not isinstance(stream_slicer, PerPartitionWithGlobalCursor):
):
# Many of the custom component implementations of DatetimeBasedCursor override get_request_params() (or other methods).
# Because we're decoupling RequestOptionsProvider from the Cursor, custom components will eventually need to reimplement
# their own RequestOptionsProvider. However, right now the existing StreamSlicer/Cursor still can act as the SimpleRetriever's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,12 @@ def set_initial_state(self, stream_state: StreamState) -> None:

if not parent_state and incremental_dependency:
# Attempt to retrieve child state
substream_state = list(stream_state.values())
substream_state = substream_state[0] if substream_state else {} # type: ignore [assignment] # Incorrect type for assignment
substream_state_values = list(stream_state.values())
substream_state = substream_state_values[0] if substream_state_values else {}
# Filter out per partition state. Because we pass the state to the parent stream in the format {cursor_field: substream_state}
if isinstance(substream_state, (list, dict)):
substream_state = {}

parent_state = {}

# Copy child state to parent streams with incremental dependencies
Expand Down
Loading

0 comments on commit 85262e2

Please sign in to comment.