Skip to content

Commit

Permalink
[dagster-dbt] Upgrade dagster-dbt to be compatible with dbt 1.5.x (da…
Browse files Browse the repository at this point in the history
…gster-io#13907)

## Summary & Motivation

Fixes: dagster-io#13898

The changes:

- `--no-use-color` is deprecated (`--no-use-colors` is allowed on all
versions 1.0+)
- The dbt selection string parsing logic relies on global state being
set (I know, I know...), and the way that this global state is set has
been changed (previously, they were attributes directly on the module,
now they're all wrapped into a single GLOBAL_FLAGS object)
- Nasty hack to deal with dbt-core shifting from `argparse` to `click`.
We were using the argument parsing functionality to parse dbt Cloud job
commands, so I found a somewhat similar entrypoint to work with.
- Ensure that the current working directory when executing dbt commands
is the project directory. This handles the issue linked below, where dbt
now writes output files to the cwd rather than the project directory.
This is not an issue if the cwd *is* the project directory ;)
- Update some tests to be a bit more flexible to subtly different output
formats (generally this is just testing stuff in the DbtCliOutput
object, which we don't particularly expect or need to be very stable)
- Some errors got reworded, so tests needed to be updated

In general, the `select_unique_ids_from_manifest` function access a lot
of things it potentially shouldn't touch from the dbt internals, and
there's no guarantee similar issues don't crop up in the future. The
hope is that there is eventually a dbt-native python entrypoint for this
functionality, at which point in time we can remove all this ugly stuff.

## How I Tested These Changes

Added a dbt_15X tox env, which is pinned to the specific rcs that were
available to me. Once the real `dbt 1.5.x` comes out, this should be
unpinned

---------

Co-authored-by: Rex Ledesma <rex@elementl.com>
  • Loading branch information
2 people authored and takeru911 committed Apr 29, 2023
1 parent d630dc0 commit a5befa9
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ def k8s_extra_cmds(version: str, _) -> List[str]:
pytest_tox_factors=[
"dbt_13X",
"dbt_14X",
"dbt_15X",
],
),
PackageSpec(
Expand Down
3 changes: 1 addition & 2 deletions examples/assets_smoke_test/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
"dagster-pandas",
"dagster-dbt",
"pandas",
"dbt-core",
"dbt-snowflake",
"dbt-snowflake>=1.2",
"dagster-snowflake",
"dagster-snowflake-pandas",
],
Expand Down
5 changes: 2 additions & 3 deletions examples/project_fully_featured/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
"dagster-pyspark",
"dagster-slack",
"dagster-postgres",
"dbt-core",
"dbt-duckdb",
"dbt-snowflake",
"dbt-duckdb>=1.2",
"dbt-snowflake>=1.2",
"duckdb!=0.3.3, <= 6.0.0", # missing wheels
"mock",
# DataFrames were not written to Snowflake, causing errors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def _events_for_structured_json_line(
output_name=output_name_fn(compiled_node_info),
metadata=metadata,
)
elif node_resource_type == "test" and runtime_node_info.get("node_finished_at") is not None:
elif node_resource_type == "test" and runtime_node_info.get("node_finished_at"):
upstream_unique_ids = (
manifest_json["nodes"][unique_id].get("depends_on", {}).get("nodes", [])
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _create_command_list(
if warn_error:
prefix += ["--warn-error"]
if json_log_format:
prefix += ["--no-use-color", "--log-format", "json"]
prefix += ["--no-use-colors", "--log-format", "json"]
if debug:
prefix += ["--debug"]

Expand Down Expand Up @@ -117,6 +117,7 @@ def _core_execute_cli(
command_list: Sequence[str],
ignore_handled_error: bool,
json_log_format: bool,
project_dir: str,
) -> Iterator[Union[DbtCliEvent, int]]:
"""Runs a dbt command in a subprocess and yields parsed output line by line."""
# Execute the dbt CLI command in a subprocess.
Expand All @@ -130,6 +131,7 @@ def _core_execute_cli(
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=passenv,
cwd=project_dir if os.path.exists(project_dir) else None,
)
for raw_line in process.stdout or []:
line = raw_line.decode().strip()
Expand Down Expand Up @@ -180,6 +182,7 @@ def execute_cli_stream(
command_list=command_list,
json_log_format=json_log_format,
ignore_handled_error=ignore_handled_error,
project_dir=flags_dict["project-dir"],
):
if isinstance(event, int):
return_code = event
Expand Down Expand Up @@ -227,6 +230,7 @@ def execute_cli(
command_list=command_list,
json_log_format=json_log_format,
ignore_handled_error=ignore_handled_error,
project_dir=flags_dict["project-dir"],
):
if isinstance(event, int):
return_code = event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from dagster._core.definitions.metadata import MetadataUserInput
from dagster._core.execution.context.init import build_init_resource_context
from dagster._utils.backcompat import experimental_arg_warning
from dbt.main import parse_args as dbt_parse_args

from dagster_dbt.asset_utils import (
default_asset_key_fn,
Expand Down Expand Up @@ -103,7 +102,21 @@ def build_definitions(

@staticmethod
def parse_dbt_command(dbt_command: str) -> Namespace:
return dbt_parse_args(args=shlex.split(dbt_command)[1:])
args = shlex.split(dbt_command)[1:]
try:
from dbt.cli.flags import ( # pyright: ignore [reportMissingImports]
Flags,
args_to_context,
)

# nasty hack to get dbt to parse the args
# dbt >= 1.5.0 requires that profiles-dir is set to an existing directory
return Namespace(**vars(Flags(args_to_context(args + ["--profiles-dir", "."]))))
except ImportError:
# dbt < 1.5.0 compat
from dbt.main import parse_args

return parse_args(args=args)

@staticmethod
def get_job_materialization_command_step(execute_steps: List[str]) -> int:
Expand Down Expand Up @@ -132,8 +145,11 @@ def get_compile_filters(parsed_args: Namespace) -> List[str]:
if excluded_models:
dbt_compile_options.append(f"--exclude {' '.join(excluded_models)}")

if parsed_args.selector_name:
dbt_compile_options.append(f"--selector {parsed_args.selector_name}")
selector = getattr(parsed_args, "selector_name", None) or getattr(
parsed_args, "selector", None
)
if selector:
dbt_compile_options.append(f"--selector {selector}")

return dbt_compile_options

Expand Down Expand Up @@ -179,8 +195,7 @@ def _compile_dbt_cloud_job(self, dbt_cloud_job: Mapping[str, Any]) -> Tuple[int,
#
# Since we're only doing this to generate the dependency structure, just use an arbitrary
# partition key (e.g. the last one) to retrieve the partition variable.
dbt_vars = json.loads(parsed_args.vars or "{}")
if dbt_vars:
if parsed_args.vars and parsed_args.vars != "{}":
raise DagsterDbtCloudJobInvariantViolationError(
f"The dbt Cloud job '{dbt_cloud_job['name']}' ({dbt_cloud_job['id']}) must not have"
" variables defined from `--vars` in its `dbt run` or `dbt build` command."
Expand Down
9 changes: 7 additions & 2 deletions python_modules/libraries/dagster-dbt/dagster_dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ def select_unique_ids_from_manifest(
manifest_json: Optional[Mapping[str, Any]] = None,
) -> AbstractSet[str]:
"""Method to apply a selection string to an existing manifest.json file."""
import dbt.flags as flags
import dbt.graph.cli as graph_cli
import dbt.graph.selector as graph_selector
from dbt.contracts.graph.manifest import Manifest, WritableManifest
Expand Down Expand Up @@ -300,7 +299,13 @@ def __getattr__(self, item):
graph = graph_selector.Graph(DiGraph(incoming_graph_data=child_map))

# create a parsed selection from the select string
flags.INDIRECT_SELECTION = IndirectSelection.Eager
try:
from dbt.flags import GLOBAL_FLAGS
except ImportError:
# dbt < 1.5.0 compat
import dbt.flags as GLOBAL_FLAGS
setattr(GLOBAL_FLAGS, "INDIRECT_SELECTION", IndirectSelection.Eager)
setattr(GLOBAL_FLAGS, "WARN_ERROR", True)
parsed_spec: SelectionSpec = graph_cli.parse_union([select], True)

if exclude:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ def my_dbt_solid(context):

context = get_dbt_solid_context(test_project_dir, dbt_config_dir)
dbt_result = my_dbt_solid(context)
assert len(dbt_result.raw_output.split("\n\n")) == 25
# slightly different output formats between dbt versions result in raw log output shifting
expected_results = 25
raw_lines = len(dbt_result.raw_output.split("\n\n"))
assert raw_lines == expected_results or raw_lines == expected_results + 2


def test_ls_resource_type(test_project_dir, dbt_config_dir):
Expand All @@ -60,7 +63,10 @@ def my_dbt_solid(context):

context = get_dbt_solid_context(test_project_dir, dbt_config_dir)
dbt_result = my_dbt_solid(context)
assert len(dbt_result.raw_output.split("\n\n")) == 6
# slightly different output formats between dbt versions result in raw log output shifting
expected_results = 6
raw_lines = len(dbt_result.raw_output.split("\n\n"))
assert raw_lines == expected_results or raw_lines == expected_results + 2


def test_test(dbt_seed, test_project_dir, dbt_config_dir):
Expand Down Expand Up @@ -147,7 +153,10 @@ def my_dbt_solid(context):
context = get_dbt_solid_context(test_project_dir, dbt_config_dir)
dbt_result = my_dbt_solid(context)
assert len(dbt_result.result["results"]) == 4
assert json.loads(dbt_result.result["args"]["vars"]) == my_vars

# vars can be stored as a dict or a json-encoded string depending on dbt version
result_vars = dbt_result.result["args"]["vars"]
assert result_vars == my_vars or json.loads(result_vars) == my_vars


def test_models_and_extra_run(dbt_seed, test_project_dir, dbt_config_dir):
Expand All @@ -161,7 +170,9 @@ def my_dbt_solid(context):
dbt_result = my_dbt_solid(context)

assert len(dbt_result.result["results"]) == 2
assert json.loads(dbt_result.result["args"]["vars"]) == my_vars
# vars can be stored as a dict or a json-encoded string depending on dbt version
result_vars = dbt_result.result["args"]["vars"]
assert result_vars == my_vars or json.loads(result_vars) == my_vars


def test_exclude_run(dbt_seed, test_project_dir, dbt_config_dir):
Expand All @@ -175,7 +186,9 @@ def my_dbt_solid(context):
dbt_result = my_dbt_solid(context)

assert len(dbt_result.result["results"]) == 3
assert json.loads(dbt_result.result["args"]["vars"]) == my_vars
# vars can be stored as a dict or a json-encoded string depending on dbt version
result_vars = dbt_result.result["args"]["vars"]
assert result_vars == my_vars or json.loads(result_vars) == my_vars


def test_merged_extra_flags_run(dbt_seed, test_project_dir, dbt_config_dir):
Expand All @@ -192,4 +205,6 @@ def my_dbt_solid(context):
assert len(dbt_result.result["results"]) == 4
for key in my_vars.keys():
assert key in dbt_result.command
assert json.loads(dbt_result.result["args"]["vars"]) == my_vars
# vars can be stored as a dict or a json-encoded string depending on dbt version
result_vars = dbt_result.result["args"]["vars"]
assert result_vars == my_vars or json.loads(result_vars) == my_vars
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ def foo():


def test_dbt_ls_fail_fast():
with pytest.raises(DagsterDbtCliFatalRuntimeError, match="Invalid --project-dir flag."):
with pytest.raises(DagsterDbtCliFatalRuntimeError, match=r"Invalid.*--project-dir"):
load_assets_from_dbt_project("bad_project_dir", "bad_config_dir")


Expand Down Expand Up @@ -758,7 +758,7 @@ def test_dbt_selections(
@pytest.mark.parametrize(
"select,error_match",
[
("tag:nonexist", "No dbt models match"),
("tag:nonexist", r"(No dbt models match|does not match any nodes)"),
("asjdlhalskujh:z", "not a valid method name"),
],
)
Expand Down
5 changes: 4 additions & 1 deletion python_modules/libraries/dagster-dbt/tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = py{39,38,37,36}-{unix,windows}-{dbt_13X,dbt_14X}
envlist = py{39,38,37,36}-{unix,windows}-{dbt_13X,dbt_14X,dbt_15X}
skipsdist = true

[testenv]
Expand All @@ -13,10 +13,13 @@ deps =
dbt_13X: dbt-duckdb==1.3.*
dbt_14X: dbt-core==1.4.*
dbt_14X: dbt-duckdb==1.4.*
dbt_15X: dbt-core==1.5.*
dbt_15X: dbt-duckdb==1.5.*
-e .[test]
allowlist_externals =
/bin/bash
commands =
!windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster -e dagit'
dbt_13X: pytest -c ../../../pyproject.toml -vv {posargs}
dbt_14X: pytest -c ../../../pyproject.toml -vv {posargs}
dbt_15X: pytest -c ../../../pyproject.toml -vv {posargs}

0 comments on commit a5befa9

Please sign in to comment.