Skip to content

Commit

Permalink
[fix] Update Airbyte assets to avoid name collisions (#24058)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Fixes
[DS-417](https://linear.app/dagster-labs/issue/DS-417/avoid-name-collisions-for-dagster-airbyte-assets)

This PR updates the Airbyte assets naming template to include the full
connection ID. Previously, we were truncating the connection ID to keep
only the first 5 characters in the asset name, which created name
collisions for some users.

## How I Tested These Changes

Updated tests + BK

## Changelog [New | Bug | Docs]

Bug:

[dagster-airbyte] Update the Airbyte asset naming template to include
the full connection ID, avoiding name collisions.
  • Loading branch information
maximearmstrong authored Sep 10, 2024
1 parent 9c5fa62 commit f8216a0
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def _build_airbyte_assets_from_metadata(
io_manager_key = cast(Optional[str], metadata["io_manager_key"])

@multi_asset(
name=f"airbyte_sync_{connection_id[:5]}",
name=f"airbyte_sync_{connection_id.replace('-', '_')}",
deps=list((assets_defn_meta.keys_by_input_name or {}).values()),
outs={
k: AssetOut(
Expand Down Expand Up @@ -301,7 +301,7 @@ def build_airbyte_assets(
internal_deps[table] = set(upstream_deps) if upstream_deps else set()

@multi_asset(
name=f"airbyte_sync_{connection_id[:5]}",
name=f"airbyte_sync_{connection_id.replace('-', '_')}",
deps=upstream_deps,
outs=outputs,
internal_asset_deps=internal_deps,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ def test_assets(schema_prefix, auto_materialize_policy, monkeypatch):
destination_tables = ["foo", "bar"]
if schema_prefix:
destination_tables = [schema_prefix + t for t in destination_tables]
connection_id = "12345"
ab_assets = build_airbyte_assets(
"12345",
connection_id=connection_id,
destination_tables=destination_tables,
asset_key_prefix=["some", "prefix"],
auto_materialize_policy=auto_materialize_policy,
)
ab_assets_name = f"airbyte_sync_{connection_id.replace('-', '_')}"

assert ab_assets[0].keys == {AssetKey(["some", "prefix", t]) for t in destination_tables}
assert len(ab_assets[0].op.output_defs) == 2
Expand Down Expand Up @@ -82,7 +84,7 @@ def test_assets(schema_prefix, auto_materialize_policy, monkeypatch):

materializations = [
event.event_specific_data.materialization
for event in res.events_for_node("airbyte_sync_12345")
for event in res.events_for_node(ab_assets_name)
if event.event_type_value == "ASSET_MATERIALIZATION"
]
assert len(materializations) == 3
Expand Down Expand Up @@ -125,15 +127,17 @@ def test_assets_with_normalization(
destination_tables = [schema_prefix + t for t in destination_tables]

bar_normalization_tables = {schema_prefix + "bar_baz", schema_prefix + "bar_qux"}
connection_id = "12345"
ab_assets = build_airbyte_assets(
"12345",
connection_id=connection_id,
destination_tables=destination_tables,
normalization_tables={destination_tables[1]: bar_normalization_tables},
asset_key_prefix=["some", "prefix"],
deps=[AssetKey(source_asset)] if source_asset else None,
freshness_policy=freshness_policy,
auto_materialize_policy=auto_materialize_policy,
)
ab_assets_name = f"airbyte_sync_{connection_id.replace('-', '_')}"

assert all(spec.freshness_policy == freshness_policy for spec in ab_assets[0].specs)

Expand Down Expand Up @@ -182,7 +186,7 @@ def test_assets_with_normalization(

materializations = [
event.event_specific_data.materialization
for event in res.events_for_node("airbyte_sync_12345")
for event in res.events_for_node(ab_assets_name)
if event.event_type_value == "ASSET_MATERIALIZATION"
]
assert len(materializations) == 5
Expand Down Expand Up @@ -213,13 +217,15 @@ def test_assets_cloud() -> None:
)
ab_url = ab_resource.api_base_url

connection_id = "12345"
ab_assets = build_airbyte_assets(
"12345",
connection_id=connection_id,
destination_tables=["foo", "bar"],
normalization_tables={"bar": {"bar_baz", "bar_qux"}},
asset_key_prefix=["some", "prefix"],
group_name="foo",
)
ab_assets_name = f"airbyte_sync_{connection_id.replace('-', '_')}"

with responses.RequestsMock() as rsps:
rsps.add(
Expand Down Expand Up @@ -251,7 +257,7 @@ def test_assets_cloud() -> None:

materializations = [
event.event_specific_data.materialization
for event in res.events_for_node("airbyte_sync_12345")
for event in res.events_for_node(ab_assets_name)
if event.event_type_value == "ASSET_MATERIALIZATION"
and isinstance(event.event_specific_data, StepMaterializationData)
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def downstream_asset(dagster_tags):

materializations = [
event.event_specific_data.materialization # type: ignore[attr-defined]
for event in res.events_for_node("airbyte_sync_87b7f")
for event in res.events_for_node("airbyte_sync_87b7fe85_a22c_420e_8d74_b30e7ede77df")
if event.event_type_value == "ASSET_MATERIALIZATION"
]
assert len(materializations) == len(tables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def test_load_from_project(

materializations = [
event.event_specific_data.materialization
for event in res.events_for_node("airbyte_sync_87b7f")
for event in res.events_for_node("airbyte_sync_87b7fe85_a22c_420e_8d74_b30e7ede77df")
if event.event_type_value == "ASSET_MATERIALIZATION"
]
assert len(materializations) == len(tables)
Expand Down

0 comments on commit f8216a0

Please sign in to comment.