Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PrefectDbtRunner #16971

Merged
merged 12 commits into from
Feb 7, 2025
Merged

Add PrefectDbtRunner #16971

merged 12 commits into from
Feb 7, 2025

Conversation

kevingrismore
Copy link
Contributor

@kevingrismore kevingrismore commented Feb 4, 2025

This is a new primary interface for dbt Core.

Example usage:

from prefect import flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings


@flow
def run_dbt():
    runner = PrefectDbtRunner(
        settings=PrefectDbtSettings(project_dir="test", profiles_dir="examples/run_dbt")
    )
    runner.invoke(["run"])


if __name__ == "__main__":
    run_dbt()

As this will be replacing the functionality of everything in prefect_dbt.cli, any imports from that path will raise the following warning:

"prefect_dbt.cli is deprecated and will be removed in a future release. Please use prefect_dbt.core instead."

Improved

Logging

The previous implementation assumed the presence of a Prefect run logger, ignored debug logs, and treated all other log levels as INFO.

The PrefectDbtRunner translates all log levels from dbt to standard Python logging levels, and logs exclusively to a Prefect run logger or directly to the terminal depending on the context in which it was invoked.

Failure handling

In the previous implementation, a failed dbt run was handled by returning a Failed state with the dbt exception in the state message. However, exceptions from RunResults in dbt are primarily an indication of failure, whereas the RunExecutionResult contains more detailed messages about status and failure reason.

The PrefectDbtRunner raises an exception when a dbt run fails, including a standardized message with detailed failure information. It also offers raise_on_failure, which controls whether that exception is raised when a dbt run fails, since a failing test does not always constitute a failing project as a whole.

New

PrefectDbtSettings

Added in #16834

Using Pydantic's BaseSettings, this class automatically detects a common set of DBT_-prefixed env vars upon instantiation. Creating a PrefectDbtRunner without passing in a PrefectDbtSettings instance will construct one by default, detecting dbt config from the environment.

profiles.yaml Templating

Added in #16889

When invoking dbt commands through the PrefectDbtRunner, templated references to Prefect blocks and Prefect variables in your workspace will be resolved at runtime. This enables users to include their profiles.yaml in their dbt project repositories without having to worry about exposing secrets, and allows mapping of dbt target configs to workspaces.

For example, a dbt project executed with the PrefectDbtRunner could target a staging database when run in a staging workspace, and a production database when run in a production workspace, without having to deal with managing env vars or other execution environment-specific management by setting a dbt_target Prefect variable to "stg" in their staging workspace and "prod" in their production workspace.

test:
  outputs:
    stg:
      type: duckdb
      path: dev.duckdb
      threads: 1

    prod:
      type: duckdb
      path: prod.duckdb
      threads: 4
      password: "{{ prefect.blocks.secret.my-password }}"

  target: "{{ prefect.variables.dbt_target }}"

Events

A Prefect event is emitted each time a Node Finished event occurs in dbt. The emitted event contains a dbt.node.status field on its primary resource, recording the final state of the node's execution. This enables prefect-dbt users to create automations that fire conditionally on the state of individual nodes in their dbt DAGs, kicking off downstream jobs after particular table updates or alerting on specific test failures.

Node Finished event emission can be disabled for a resource in your dbt project by setting

prefect:
  emit_node_events: False

in your dbt resource's config.

Lineage (Prefect Cloud only)

The PrefectDbtRunner will automatically emit events describing the lineage graph of your dbt DAG by default.

The dbt resource types that constitute Prefect lineage resources are limited to NODE_TYPES_TO_EMIT_LINEAGE, which currently includes models, seeds, and snapshots.

If invoked in a flow run, each node executed during the run will appear as a resource on the flow run page in Prefect Cloud. Clicking "View graph" on a resource will display a lineage graph with several degrees of upstream and downstream resources pre-populated.

A resource in your dbt project can be omitted from the lineage graph by setting

prefect:
  emit_lineage_events: False

in your dbt resource's config.

Checklist

  • This pull request references any related issue by including "closes <link to issue>"
    • If no issue exists and your change is not a small fix, please create an issue first.
  • If this pull request adds new functionality, it includes unit tests that cover the changes
  • If this pull request removes docs files, it includes redirect settings in mint.json.
  • If this pull request adds functions or classes, it includes helpful docstrings.

@kevingrismore kevingrismore added the integrations Related to integrations with other services label Feb 4, 2025
@kevingrismore kevingrismore marked this pull request as ready for review February 5, 2025 00:07
@kevingrismore kevingrismore changed the title add PrefectDbtRunner Add PrefectDbtRunner Feb 5, 2025
payload=event_data,
)

def _get_dbt_event_msg(self, event: EventMsg) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda question the need for a full method for this however private

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think i had suggested this pattern earlier to isolate upstream type warts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep that's exactly what I'm doing


self.manifest = res.result

async def ainvoke(self, args: list[str], **kwargs: Any):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double checking this accepts a single positional argument called args that's a list of str and not several positional args that are each a string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a direct args passthrough to the underlying invoke from dbt. It may not feel the best but is at least decently documented, so we can refer people there.

Copy link
Collaborator

@zzstoatzz zzstoatzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new primary interface

looking good @kevingrismore! can we add some color here on how this sits in relation to today's prefect-dbt? i.e. what's entirely new vs. similar but different

@kevingrismore
Copy link
Contributor Author

kevingrismore commented Feb 5, 2025

new primary interface

looking good @kevingrismore! can we add some color here on how this sits in relation to today's prefect-dbt? i.e. what's entirely new vs. similar but different

Updated the description @zzstoatzz

@kevingrismore
Copy link
Contributor Author

I did something stupid with rebase 😭 hang on

@kevingrismore kevingrismore merged commit b5012c4 into main Feb 7, 2025
15 checks passed
@kevingrismore kevingrismore deleted the add-dbt-core-runner branch February 7, 2025 21:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
integrations Related to integrations with other services migration
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants