Skip to content

Commit

Permalink
Merge branch 'main' into brian/async_retriever_refactor_to_stream_slicer
Browse files Browse the repository at this point in the history
  • Loading branch information
brianjlai committed Dec 13, 2024
2 parents 6cbd61a + 216cd43 commit ec98aa3
Show file tree
Hide file tree
Showing 21 changed files with 1,528 additions and 151 deletions.
2 changes: 1 addition & 1 deletion .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ updates:
commit-message:
prefix: "ci(deps): "
schedule:
interval: daily
interval: monthly
labels:
- ci
groups:
Expand Down
6 changes: 5 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.io/airbyte/python-connector-base:2.0.0@sha256:c44839ba84406116e8ba68722a0f30e8f6e7056c726f447681bb9e9ece8bd916
FROM docker.io/airbyte/python-connector-base:3.0.0@sha256:1a0845ff2b30eafa793c6eee4e8f4283c2e52e1bbd44eed6cb9e9abd5d34d844

WORKDIR /airbyte/integration_code

Expand All @@ -23,6 +23,10 @@ RUN mkdir -p source_declarative_manifest \
# Remove unnecessary build files
RUN rm -rf dist/ pyproject.toml poetry.lock README.md

# Set ownership of /airbyte to the non-root airbyte user and group (1000:1000)
RUN chown -R 1000:1000 /airbyte

# Set the entrypoint
ENV AIRBYTE_ENTRYPOINT="python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
USER airbyte
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __new__( # type: ignore[misc]
try:
selected_key = str(
dpath.get(
config, # type: ignore [arg-type] # Dpath wants mutable mapping but doesn't need it.
config, # type: ignore[arg-type] # Dpath wants mutable mapping but doesn't need it.
authenticator_selection_path,
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@


class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
# By default, we defer to a value of 1 which represents running a connector using the Concurrent CDK engine on only one thread.
SINGLE_THREADED_CONCURRENCY_LEVEL = 1
# By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
# because it has hit the limit of futures but not partition reader is consuming them.
_LOWEST_SAFE_CONCURRENCY_LEVEL = 2

def __init__(
self,
Expand Down Expand Up @@ -107,8 +108,8 @@ def __init__(
concurrency_level // 2, 1
) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
else:
concurrency_level = self.SINGLE_THREADED_CONCURRENCY_LEVEL
initial_number_of_partitions_to_generate = self.SINGLE_THREADED_CONCURRENCY_LEVEL
concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2

self._concurrent_source = ConcurrentSource.create(
num_workers=concurrency_level,
Expand Down
209 changes: 171 additions & 38 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ definitions:
additionalProperties: true
ConcurrencyLevel:
title: Concurrency Level
description: Defines the amount of parallelization for the streams that are being synced. The factor of parallelization is how many partitions or streams are synced at the same time. For example, with a concurrency_level of 10, ten streams or partitions of data will processed at the same time.
description: Defines the amount of parallelization for the streams that are being synced. The factor of parallelization is how many partitions or streams are synced at the same time. For example, with a concurrency_level of 10, ten streams or partitions of data will processed at the same time. Note that a value of 1 could create deadlock if a stream has a very high number of partitions.
type: object
required:
- default_concurrency
Expand Down Expand Up @@ -1218,6 +1218,7 @@ definitions:
title: Schema Loader
description: Component used to retrieve the schema for the current stream.
anyOf:
- "$ref": "#/definitions/DynamicSchemaLoader"
- "$ref": "#/definitions/InlineSchemaLoader"
- "$ref": "#/definitions/JsonFileSchemaLoader"
- "$ref": "#/definitions/CustomSchemaLoader"
Expand Down Expand Up @@ -1684,6 +1685,92 @@ definitions:
$parameters:
type: object
additionalProperties: true
TypesMap:
title: Types Map
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type.
type: object
required:
- target_type
- current_type
properties:
target_type:
anyOf:
- type: string
- type: array
items:
type: string
current_type:
anyOf:
- type: string
- type: array
items:
type: string
SchemaTypeIdentifier:
title: Schema Type Identifier
description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing.
type: object
required:
- key_pointer
properties:
type:
type: string
enum: [SchemaTypeIdentifier]
schema_pointer:
title: Schema Path
description: List of nested fields defining the schema field path to extract. Defaults to [].
type: array
default: []
items:
- type: string
interpolation_context:
- config
key_pointer:
title: Key Path
description: List of potentially nested fields describing the full path of the field key to extract.
type: array
items:
- type: string
interpolation_context:
- config
type_pointer:
title: Type Path
description: List of potentially nested fields describing the full path of the field type to extract.
type: array
items:
- type: string
interpolation_context:
- config
types_mapping:
type: array
items:
- "$ref": "#/definitions/TypesMap"
$parameters:
type: object
additionalProperties: true
DynamicSchemaLoader:
title: Dynamic Schema Loader
description: (This component is experimental. Use at your own risk.) Loads a schema by extracting data from retrieved records.
type: object
required:
- type
- retriever
- schema_type_identifier
properties:
type:
type: string
enum: [DynamicSchemaLoader]
retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
$parameters:
type: object
additionalProperties: true
InlineSchemaLoader:
title: Inline Schema Loader
description: Loads a schema that is defined directly in the manifest file.
Expand Down Expand Up @@ -2043,65 +2130,61 @@ definitions:
- extract_output
properties:
consent_url:
title: DeclarativeOAuth Consent URL
title: Consent URL
type: string
description: |-
The DeclarativeOAuth Specific string URL string template to initiate the authentication.
The placeholders are replaced during the processing to provide neccessary values.
examples:
- consent_url: https://domain.host.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}
- consent_url: https://endpoint.host.com/oauth2/authorize?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{scope_key}={urlEncoder:{{scope_key}}}&{state_key}={{state_key}}&subdomain={subdomain}
- https://domain.host.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}
- https://endpoint.host.com/oauth2/authorize?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{scope_key}={urlEncoder:{{scope_key}}}&{state_key}={{state_key}}&subdomain={subdomain}
scope:
title: (Optional) DeclarativeOAuth Scope
title: Scopes
type: string
description: |-
The DeclarativeOAuth Specific string of the scopes needed to be grant for authenticated user.
examples:
- scope: user:read user:read_orders workspaces:read
- user:read user:read_orders workspaces:read
access_token_url:
title: DeclarativeOAuth Access Token URL
title: Access Token URL
type: string
description: |-
The DeclarativeOAuth Specific URL templated string to obtain the `access_token`, `refresh_token` etc.
The placeholders are replaced during the processing to provide neccessary values.
examples:
- access_token_url: https://auth.host.com/oauth2/token?{client_id_key}={{client_id_key}}&{client_secret_key}={{client_secret_key}}&{auth_code_key}={{auth_code_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}
- https://auth.host.com/oauth2/token?{client_id_key}={{client_id_key}}&{client_secret_key}={{client_secret_key}}&{auth_code_key}={{auth_code_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}
access_token_headers:
title: (Optional) DeclarativeOAuth Access Token Headers
title: Access Token Headers
type: object
additionalProperties: true
description: |-
The DeclarativeOAuth Specific optional headers to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.
examples:
- access_token_headers:
{
"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}",
}
- {"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"}
access_token_params:
title: (Optional) DeclarativeOAuth Access Token Query Params (Json Encoded)
title: Access Token Query Params (Json Encoded)
type: object
additionalProperties: true
description: |-
The DeclarativeOAuth Specific optional query parameters to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.
When this property is provided, the query params will be encoded as `Json` and included in the outgoing API request.
examples:
- access_token_params:
{
"{auth_code_key}": "{{auth_code_key}}",
"{client_id_key}": "{{client_id_key}}",
"{client_secret_key}": "{{client_secret_key}}",
}
- {
"{auth_code_key}": "{{auth_code_key}}",
"{client_id_key}": "{{client_id_key}}",
"{client_secret_key}": "{{client_secret_key}}",
}
extract_output:
title: DeclarativeOAuth Extract Output
title: Extract Output
type: array
items:
type: string
description: |-
The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config.
examples:
- extract_output: ["access_token", "refresh_token", "other_field"]
- ["access_token", "refresh_token", "other_field"]
state:
title: (Optional) DeclarativeOAuth Configurable State Query Param
title: Configurable State Query Param
type: object
additionalProperties: true
required:
Expand All @@ -2116,49 +2199,49 @@ definitions:
max:
type: integer
examples:
- state: { "min": 7, "max": 128 }
- { "min": 7, "max": 128 }
client_id_key:
title: (Optional) DeclarativeOAuth Client ID Key Override
title: Client ID Key Override
type: string
description: |-
The DeclarativeOAuth Specific optional override to provide the custom `client_id` key name, if required by data-provider.
examples:
- client_id_key: "my_custom_client_id_key_name"
- "my_custom_client_id_key_name"
client_secret_key:
title: (Optional) DeclarativeOAuth Client Secret Key Override
title: Client Secret Key Override
type: string
description: |-
The DeclarativeOAuth Specific optional override to provide the custom `client_secret` key name, if required by data-provider.
examples:
- client_secret_key: "my_custom_client_secret_key_name"
- "my_custom_client_secret_key_name"
scope_key:
title: (Optional) DeclarativeOAuth Scope Key Override
title: Scopes Key Override
type: string
description: |-
The DeclarativeOAuth Specific optional override to provide the custom `scope` key name, if required by data-provider.
examples:
- scope_key: "my_custom_scope_key_key_name"
- "my_custom_scope_key_key_name"
state_key:
title: (Optional) DeclarativeOAuth State Key Override
title: State Key Override
type: string
description: |-
The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider.
examples:
- state_key: "my_custom_state_key_key_name"
- "my_custom_state_key_key_name"
auth_code_key:
title: (Optional) DeclarativeOAuth Auth Code Key Override
title: Auth Code Key Override
type: string
description: |-
The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider.
examples:
- auth_code_key: "my_custom_auth_code_key_name"
- "my_custom_auth_code_key_name"
redirect_uri_key:
title: (Optional) DeclarativeOAuth Redirect URI Key Override
title: Redirect URI Key Override
type: string
description: |-
The DeclarativeOAuth Specific optional override to provide the custom `redirect_uri` key name to something like `callback_uri`, if required by data-provider.
examples:
- redirect_uri_key: "my_custom_redirect_uri_key_name"
- "my_custom_redirect_uri_key_name"
complete_oauth_output_specification:
title: "OAuth output specification"
description: |-
Expand Down Expand Up @@ -2928,8 +3011,10 @@ definitions:
examples:
- ["data"]
- ["data", "records"]
- ["data", "{{ parameters.name }}"]
- ["data", 1, "name"]
- ["data", "{{ components_values.name }}"]
- ["data", "*", "record"]
- ["*", "**", "name"]
value:
title: Value
description: The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime.
Expand Down Expand Up @@ -2974,6 +3059,52 @@ definitions:
- type
- retriever
- components_mapping
StreamConfig:
title: Stream Config
description: (This component is experimental. Use at your own risk.) Describes how to get streams config from the source config.
type: object
required:
- type
- configs_pointer
properties:
type:
type: string
enum: [StreamConfig]
configs_pointer:
title: Configs Pointer
description: A list of potentially nested fields indicating the full path in source config file where streams configs located.
type: array
items:
- type: string
interpolation_context:
- parameters
examples:
- ["data"]
- ["data", "streams"]
- ["data", "{{ parameters.name }}"]
$parameters:
type: object
additionalProperties: true
ConfigComponentsResolver:
type: object
description: (This component is experimental. Use at your own risk.) Resolves and populates stream templates with components fetched from the source config.
properties:
type:
type: string
enum: [ConfigComponentsResolver]
stream_config:
"$ref": "#/definitions/StreamConfig"
components_mapping:
type: array
items:
"$ref": "#/definitions/ComponentMappingDefinition"
$parameters:
type: object
additionalProperties: true
required:
- type
- stream_config
- components_mapping
DynamicDeclarativeStream:
type: object
description: (This component is experimental. Use at your own risk.) A component that described how will be created declarative streams based on stream template.
Expand All @@ -2988,7 +3119,9 @@ definitions:
components_resolver:
title: Components Resolver
description: Component resolve and populates stream templates with components values.
"$ref": "#/definitions/HttpComponentsResolver"
anyOf:
- "$ref": "#/definitions/HttpComponentsResolver"
- "$ref": "#/definitions/ConfigComponentsResolver"
required:
- type
- stream_template
Expand Down
Loading

0 comments on commit ec98aa3

Please sign in to comment.