diff --git a/.github/workflows/connector-tests.yml b/.github/workflows/connector-tests.yml index b4e02fecb..4f6cedee0 100644 --- a/.github/workflows/connector-tests.yml +++ b/.github/workflows/connector-tests.yml @@ -25,7 +25,7 @@ concurrency: jobs: cdk_changes: name: Get Changes - runs-on: ubuntu-24.04 + runs-on: ubuntu-22.04 permissions: statuses: write pull-requests: read @@ -62,7 +62,7 @@ jobs: # Forked PRs are handled by the community_ci.yml workflow # If the condition is not met the job will be skipped (it will not fail) # runs-on: connector-test-large - runs-on: ubuntu-24.04 + runs-on: ubuntu-22.04 timeout-minutes: 360 # 6 hours strategy: fail-fast: false @@ -96,6 +96,8 @@ jobs: name: "Check: '${{matrix.connector}}' (skip=${{needs.cdk_changes.outputs['src'] == 'false' || needs.cdk_changes.outputs[matrix.cdk_extra] == 'false'}})" permissions: checks: write + contents: write # Required for creating commit statuses + pull-requests: read steps: - name: Abort if extra not changed (${{matrix.cdk_extra}}) id: no_changes @@ -127,6 +129,22 @@ jobs: uses: actions/setup-python@v5 with: python-version: "3.10" + # Create initial pending status for test report + - name: Create Pending Test Report Status + if: steps.no_changes.outputs.status != 'cancelled' + env: + GH_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} + run: | + HEAD_SHA="${{ github.event.pull_request.head.sha || github.sha }}" + gh api \ + --method POST \ + -H "Accept: application/vnd.github+json" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + repos/${{ github.repository }}/statuses/$HEAD_SHA \ + -f state="pending" \ + -f description="Running connector tests..." \ + -f context="${{ matrix.connector }} Test Report" + - name: Test Connector if: steps.no_changes.outputs.status != 'cancelled' timeout-minutes: 90 @@ -173,6 +191,39 @@ jobs: echo "success=${success}" >> $GITHUB_OUTPUT echo "html_report_url=${html_report_url}" >> $GITHUB_OUTPUT + # Update the test report status with results + - name: Update Test Report Status + if: always() && steps.no_changes.outputs.status != 'cancelled' && steps.evaluate_output.outcome == 'success' + env: + GH_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} + run: | + HEAD_SHA="${{ github.event.pull_request.head.sha || github.sha }}" + gh api \ + --method POST \ + -H "Accept: application/vnd.github+json" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + repos/${{ github.repository }}/statuses/$HEAD_SHA \ + -f state="${{ steps.evaluate_output.outputs.success == 'true' && 'success' || 'failure' }}" \ + -f target_url="${{ steps.evaluate_output.outputs.html_report_url }}" \ + -f description="Click Details to view the test report" \ + -f context="${{ matrix.connector }} Test Report" + + # Create failure status if report generation failed + - name: Create Report Generation Failed Status + if: always() && steps.no_changes.outputs.status != 'cancelled' && steps.evaluate_output.outcome != 'success' + env: + GH_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} + run: | + HEAD_SHA="${{ github.event.pull_request.head.sha || github.sha }}" + gh api \ + --method POST \ + -H "Accept: application/vnd.github+json" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + repos/${{ github.repository }}/statuses/$HEAD_SHA \ + -f state="failure" \ + -f description="Failed to run connector tests." \ + -f context="${{ matrix.connector }} Test Report" + # Upload the job output to the artifacts - name: Upload Job Output id: upload_job_output diff --git a/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py index 4c1eacce5..76a16e141 100644 --- a/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py @@ -222,6 +222,8 @@ def get_request_params( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: if stream_slice: + if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: + self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) return self._partition_router.get_request_params( # type: ignore # this always returns a mapping stream_state=stream_state, stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), @@ -244,6 +246,8 @@ def get_request_headers( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: if stream_slice: + if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: + self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping stream_state=stream_state, stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), @@ -266,6 +270,8 @@ def get_request_body_data( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Union[Mapping[str, Any], str]: if stream_slice: + if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: + self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping stream_state=stream_state, stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), @@ -288,6 +294,8 @@ def get_request_body_json( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: if stream_slice: + if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: + self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping stream_state=stream_state, stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), @@ -303,21 +311,6 @@ def get_request_body_json( raise ValueError("A partition needs to be provided in order to get request body json") def should_be_synced(self, record: Record) -> bool: - if ( - record.associated_slice - and self._to_partition_key(record.associated_slice.partition) - not in self._cursor_per_partition - ): - partition_state = ( - self._state_to_migrate_from - if self._state_to_migrate_from - else self._NO_CURSOR_STATE - ) - cursor = self._create_cursor(partition_state) - - self._cursor_per_partition[ - self._to_partition_key(record.associated_slice.partition) - ] = cursor return self._get_cursor(record).should_be_synced( self._convert_record_to_cursor_record(record) ) @@ -356,8 +349,32 @@ def _get_cursor(self, record: Record) -> DeclarativeCursor: ) partition_key = self._to_partition_key(record.associated_slice.partition) if partition_key not in self._cursor_per_partition: - raise ValueError( - "Invalid state as stream slices that are emitted should refer to an existing cursor" - ) + self._create_cursor_for_partition(partition_key) cursor = self._cursor_per_partition[partition_key] return cursor + + def _create_cursor_for_partition(self, partition_key: str) -> None: + """ + Dynamically creates and initializes a cursor for the specified partition. + + This method is required for `ConcurrentPerPartitionCursor`. For concurrent cursors, + stream_slices is executed only for the concurrent cursor, so cursors per partition + are not created for the declarative cursor. This method ensures that a cursor is available + to create requests for the specified partition. The cursor is initialized + with the per-partition state if present in the initial state, or with the global state + adjusted by the lookback window, or with the state to migrate from. + + Note: + This is a temporary workaround and should be removed once the declarative cursor + is decoupled from the concurrent cursor implementation. + + Args: + partition_key (str): The unique identifier for the partition for which the cursor + needs to be created. + """ + partition_state = ( + self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE + ) + cursor = self._create_cursor(partition_state) + + self._cursor_per_partition[partition_key] = cursor diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c16476dae..172946f17 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2407,7 +2407,7 @@ def create_simple_retriever( if ( not isinstance(stream_slicer, DatetimeBasedCursor) or type(stream_slicer) is not DatetimeBasedCursor - ) and not isinstance(stream_slicer, PerPartitionWithGlobalCursor): + ): # Many of the custom component implementations of DatetimeBasedCursor override get_request_params() (or other methods). # Because we're decoupling RequestOptionsProvider from the Cursor, custom components will eventually need to reimplement # their own RequestOptionsProvider. However, right now the existing StreamSlicer/Cursor still can act as the SimpleRetriever's diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 1c7bb6961..ae558c634 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -296,8 +296,12 @@ def set_initial_state(self, stream_state: StreamState) -> None: if not parent_state and incremental_dependency: # Attempt to retrieve child state - substream_state = list(stream_state.values()) - substream_state = substream_state[0] if substream_state else {} # type: ignore [assignment] # Incorrect type for assignment + substream_state_values = list(stream_state.values()) + substream_state = substream_state_values[0] if substream_state_values else {} + # Filter out per partition state. Because we pass the state to the parent stream in the format {cursor_field: substream_state} + if isinstance(substream_state, (list, dict)): + substream_state = {} + parent_state = {} # Copy child state to parent streams with incremental dependencies diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 8862600d5..d1502c218 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -2326,3 +2326,550 @@ def test_incremental_error( expected_records, expected_state, ) + + +SUBSTREAM_REQUEST_OPTIONS_MANIFEST: MutableMapping[str, Any] = { + "version": "0.51.42", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["post_comment_votes"]}, + "definitions": { + "basic_authenticator": { + "type": "BasicHttpAuthenticator", + "username": "{{ config['credentials']['email'] + '/token' }}", + "password": "{{ config['credentials']['api_token'] }}", + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": ["{{ parameters.get('data_path') or parameters['name'] }}"], + }, + "schema_normalization": "Default", + }, + "paginator": { + "type": "DefaultPaginator", + "page_size_option": { + "type": "RequestOption", + "field_name": "per_page", + "inject_into": "request_parameter", + }, + "pagination_strategy": { + "type": "CursorPagination", + "page_size": 100, + "cursor_value": "{{ response.get('next_page', {}) }}", + "stop_condition": "{{ not response.get('next_page', {}) }}", + }, + "page_token_option": {"type": "RequestPath"}, + }, + }, + "cursor_incremental_sync": { + "type": "DatetimeBasedCursor", + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], + "datetime_format": "%Y-%m-%dT%H:%M:%SZ", + "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", + "start_datetime": {"datetime": "{{ config.get('start_date')}}"}, + "start_time_option": { + "inject_into": "request_parameter", + "field_name": "start_time", + "type": "RequestOption", + }, + }, + "posts_stream": { + "type": "DeclarativeStream", + "name": "posts", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "updated_at": {"type": "string", "format": "date-time"}, + "title": {"type": "string"}, + "content": {"type": "string"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": "#/definitions/retriever/record_selector", + "paginator": "#/definitions/retriever/paginator", + }, + "incremental_sync": "#/definitions/cursor_incremental_sync", + "$parameters": { + "name": "posts", + "path": "community/posts", + "data_path": "posts", + "cursor_field": "updated_at", + "primary_key": "id", + }, + }, + "post_comments_stream": { + "type": "DeclarativeStream", + "name": "post_comments", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "updated_at": {"type": "string", "format": "date-time"}, + "post_id": {"type": "integer"}, + "comment": {"type": "string"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts_comments", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, + "record_filter": { + "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" + }, + }, + "paginator": "#/definitions/retriever/paginator", + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": "#/definitions/posts_stream", + "parent_key": "id", + "partition_field": "id", + "incremental_dependency": True, + "request_option": { + "inject_into": "request_parameter", + "type": "RequestOption", + "field_name": "post_id", + }, + } + ], + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], + "datetime_format": "%Y-%m-%dT%H:%M:%SZ", + "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", + "start_datetime": {"datetime": "{{ config.get('start_date') }}"}, + }, + "$parameters": { + "name": "post_comments", + "path": "community/posts_comments", + "data_path": "comments", + "cursor_field": "updated_at", + "primary_key": "id", + }, + }, + "post_comment_votes_stream": { + "type": "DeclarativeStream", + "name": "post_comment_votes", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "created_at": {"type": "string", "format": "date-time"}, + "comment_id": {"type": "integer"}, + "vote": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts_comments_votes", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": "#/definitions/retriever/record_selector", + "paginator": "#/definitions/retriever/paginator", + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": "#/definitions/post_comments_stream", + "parent_key": "id", + "partition_field": "id", + "incremental_dependency": True, + "extra_fields": [["updated_at"]], + "request_option": { + "inject_into": "request_parameter", + "type": "RequestOption", + "field_name": "comment_id", + }, + } + ], + }, + }, + "transformations": [ + { + "type": "AddFields", + "fields": [ + { + "path": ["comment_updated_at"], + "value_type": "string", + "value": "{{ stream_slice.extra_fields['updated_at'] }}", + }, + ], + }, + ], + "incremental_sync": "#/definitions/cursor_incremental_sync", + "$parameters": { + "name": "post_comment_votes", + "path": "community/posts_comments_votes", + "data_path": "votes", + "cursor_field": "created_at", + "primary_key": "id", + }, + }, + }, + "streams": [ + {"$ref": "#/definitions/posts_stream"}, + {"$ref": "#/definitions/post_comments_stream"}, + {"$ref": "#/definitions/post_comment_votes_stream"}, + ], + "concurrency_level": { + "type": "ConcurrencyLevel", + "default_concurrency": "{{ config['num_workers'] or 10 }}", + "max_concurrency": 25, + }, + "spec": { + "type": "Spec", + "documentation_url": "https://airbyte.com/#yaml-from-manifest", + "connection_specification": { + "title": "Test Spec", + "type": "object", + "required": ["credentials", "start_date"], + "additionalProperties": False, + "properties": { + "credentials": { + "type": "object", + "required": ["email", "api_token"], + "properties": { + "email": { + "type": "string", + "title": "Email", + "description": "The email for authentication.", + }, + "api_token": { + "type": "string", + "airbyte_secret": True, + "title": "API Token", + "description": "The API token for authentication.", + }, + }, + }, + "start_date": { + "type": "string", + "format": "date-time", + "title": "Start Date", + "description": "The date from which to start syncing data.", + }, + }, + }, + }, +} + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_REQUEST_OPTIONS_MANIFEST, + [ + # Fetch the first page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}", + { + "posts": [ + {"id": 1, "updated_at": POST_1_UPDATED_AT}, + {"id": 2, "updated_at": POST_2_UPDATED_AT}, + ], + "next_page": ( + f"https://api.example.com/community/posts" + f"?per_page=100&start_time={PARENT_POSTS_CURSOR}&page=2" + ), + }, + ), + # Fetch the second page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}&page=2", + {"posts": [{"id": 3, "updated_at": POST_3_UPDATED_AT}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts_comments?per_page=100&post_id=1", + { + "comments": [ + { + "id": 9, + "post_id": 1, + "updated_at": COMMENT_9_OLDEST, + }, + { + "id": 10, + "post_id": 1, + "updated_at": COMMENT_10_UPDATED_AT, + }, + { + "id": 11, + "post_id": 1, + "updated_at": COMMENT_11_UPDATED_AT, + }, + ], + "next_page": "https://api.example.com/community/posts_comments?per_page=100&post_id=1&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts_comments?per_page=100&post_id=1&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": COMMENT_12_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts_comments_votes?per_page=100&comment_id=10&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + { + "votes": [ + { + "id": 100, + "comment_id": 10, + "created_at": VOTE_100_CREATED_AT, + } + ], + "next_page": ( + f"https://api.example.com/community/posts_comments_votes" + f"?per_page=100&page=2&comment_id=10&start_time={INITIAL_STATE_PARTITION_10_CURSOR}" + ), + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts_comments_votes" + f"?per_page=100&page=2&comment_id=10&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + {"votes": [{"id": 101, "comment_id": 10, "created_at": VOTE_101_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + f"https://api.example.com/community/posts_comments_votes" + f"?per_page=100&comment_id=11&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", + {"votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + f"https://api.example.com/community/posts_comments_votes?" + f"per_page=100&comment_id=12&start_time={LOOKBACK_DATE}", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts_comments?per_page=100&post_id=2", + { + "comments": [{"id": 20, "post_id": 2, "updated_at": COMMENT_20_UPDATED_AT}], + "next_page": "https://api.example.com/community/posts_comments?per_page=100&post_id=2&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts_comments?per_page=100&post_id=2&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": COMMENT_21_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + f"https://api.example.com/community/posts_comments_votes" + f"?per_page=100&comment_id=20&start_time={LOOKBACK_DATE}", + {"votes": [{"id": 200, "comment_id": 20, "created_at": VOTE_200_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + f"https://api.example.com/community/posts_comments_votes?" + f"per_page=100&comment_id=21&start_time={LOOKBACK_DATE}", + {"votes": [{"id": 210, "comment_id": 21, "created_at": VOTE_210_CREATED_AT}]}, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts_comments?per_page=100&post_id=3", + {"comments": [{"id": 30, "post_id": 3, "updated_at": COMMENT_30_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + f"https://api.example.com/community/posts_comments_votes?" + f"per_page=100&comment_id=30&start_time={LOOKBACK_DATE}", + {"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]}, + ), + ], + # Expected records + [ + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_100_CREATED_AT, + "id": 100, + }, + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_101_CREATED_AT, + "id": 101, + }, + { + "comment_id": 11, + "comment_updated_at": COMMENT_11_UPDATED_AT, + "created_at": VOTE_111_CREATED_AT, + "id": 111, + }, + { + "comment_id": 20, + "comment_updated_at": COMMENT_20_UPDATED_AT, + "created_at": VOTE_200_CREATED_AT, + "id": 200, + }, + { + "comment_id": 21, + "comment_updated_at": COMMENT_21_UPDATED_AT, + "created_at": VOTE_210_CREATED_AT, + "id": 210, + }, + { + "comment_id": 30, + "comment_updated_at": COMMENT_30_UPDATED_AT, + "created_at": VOTE_300_CREATED_AT, + "id": 300, + }, + ], + # Initial state + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1}, + } + ], + "parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}}, + } + }, + "state": {"created_at": INITIAL_GLOBAL_CURSOR}, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + ], + "lookback_window": 86400, + }, + # Expected state + { + "state": {"created_at": VOTE_100_CREATED_AT}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": COMMENT_10_UPDATED_AT}, # 10 is the "latest" + "parent_state": { + "posts": {"updated_at": POST_1_UPDATED_AT} + }, # post 1 is the latest + "lookback_window": 1, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_10_UPDATED_AT}, + }, + { + "partition": {"id": 2, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_20_UPDATED_AT}, + }, + { + "partition": {"id": 3, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_30_UPDATED_AT}, + }, + ], + } + }, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_100_CREATED_AT}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_111_CREATED_AT}, + }, + { + "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": LOOKBACK_DATE}, + }, + { + "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_200_CREATED_AT}, + }, + { + "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_210_CREATED_AT}, + }, + { + "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_300_CREATED_AT}, + }, + ], + }, + ), + ], +) +def test_incremental_substream_request_options_provider( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + """ + Test incremental syncing for a stream that uses request options provider from parent stream config. + """ + run_mocked_test( + mock_requests, + manifest, + CONFIG, + "post_comment_votes", + initial_state, + expected_records, + expected_state, + ) diff --git a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index 5814805e2..52306d348 100644 --- a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -402,6 +402,115 @@ def test_substream_partition_router_invalid_parent_record_type(): _ = [s for s in partition_router.stream_slices()] +@pytest.mark.parametrize( + "initial_state, expected_parent_state", + [ + # Case 1: Empty initial state, no parent state expected + ({}, {}), + # Case 2: Initial state with no `parent_state`, migrate `updated_at` to `parent_stream_cursor` + ( + {"updated_at": "2023-05-27T00:00:00Z"}, + {"parent_stream_cursor": "2023-05-27T00:00:00Z"}, + ), + # Case 3: Initial state with global `state`, no migration expected + ( + {"state": {"updated": "2023-05-27T00:00:00Z"}}, + {}, + ), + # Case 4: Initial state with per-partition `states`, no migration expected + ( + { + "states": [ + { + "partition": { + "issue_id": "10012", + "parent_slice": { + "parent_slice": {}, + "project_id": "10000", + }, + }, + "cursor": {"updated": "2021-01-01T00:00:00+0000"}, + }, + { + "partition": { + "issue_id": "10019", + "parent_slice": { + "parent_slice": {}, + "project_id": "10000", + }, + }, + "cursor": {"updated": "2021-01-01T00:00:00+0000"}, + }, + { + "partition": { + "issue_id": "10000", + "parent_slice": { + "parent_slice": {}, + "project_id": "10000", + }, + }, + "cursor": {"updated": "2021-01-01T00:00:00+0000"}, + }, + ] + }, + {}, + ), + # Case 5: Initial state with `parent_state`, existing parent state persists + ( + { + "parent_state": { + "parent_stream_name1": {"parent_stream_cursor": "2023-05-27T00:00:00Z"}, + }, + }, + {"parent_stream_cursor": "2023-05-27T00:00:00Z"}, + ), + ], + ids=[ + "empty_initial_state", + "initial_state_no_parent_legacy_state", + "initial_state_no_parent_global_state", + "initial_state_no_parent_per_partition_state", + "initial_state_with_parent_state", + ], +) +def test_set_initial_state(initial_state, expected_parent_state): + """ + Test the `set_initial_state` method of SubstreamPartitionRouter. + + This test verifies that the method correctly handles different initial state formats + and sets the appropriate parent stream state. + """ + parent_stream = MockStream( + slices=[{}], + records=[], + name="parent_stream_name1", + cursor_field="parent_stream_cursor", + ) + parent_stream.state = {} + parent_stream_config = ParentStreamConfig( + stream=parent_stream, + parent_key="id", + partition_field="parent_stream_id", + parameters={}, + config={}, + incremental_dependency=True, + ) + + partition_router = SubstreamPartitionRouter( + parent_stream_configs=[parent_stream_config], + parameters={}, + config={}, + ) + + partition_router.set_initial_state(initial_state) + + # Assert the state of the parent stream + assert parent_stream.state == expected_parent_state, ( + f"Unexpected parent state. Initial state: {initial_state}, " + f"Expected: {expected_parent_state}, Got: {parent_stream.state}" + ) + + @pytest.mark.parametrize( "parent_stream_request_parameters, expected_req_params, expected_headers, expected_body_json, expected_body_data", [