Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting invocation mode when using ExecutionMode.VIRTUALENV #1023

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,18 @@ class ExecutionConfig:
project_path: Path | None = field(init=False)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.invocation_mode and self.execution_mode != ExecutionMode.LOCAL:
raise CosmosValueError("ExecutionConfig.invocation_mode is only configurable for ExecutionMode.LOCAL.")
if self.invocation_mode and self.execution_mode not in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV):
raise CosmosValueError(
"ExecutionConfig.invocation_mode is only configurable for ExecutionMode.LOCAL and ExecutionMode.VIRTUALENV."
)
if self.execution_mode == ExecutionMode.VIRTUALENV:
if self.invocation_mode == InvocationMode.DBT_RUNNER:
raise CosmosValueError(
"InvocationMode.DBT_RUNNER has not been implemented for ExecutionMode.VIRTUALENV"
)
elif self.invocation_mode is None:
logger.debug(
"Defaulting to InvocationMode.SUBPROCESS as it is the only supported invocation mode for ExecutionMode.VIRTUALENV"
)
self.invocation_mode = InvocationMode.SUBPROCESS
self.project_path = Path(dbt_project_path) if dbt_project_path else None
1 change: 1 addition & 0 deletions docs/getting_started/execution-modes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Some drawbacks of this approach:

- It is slower than ``local`` because it creates a new Python virtual environment for each Cosmos dbt task run.
- If dbt is unavailable in the Airflow scheduler, the default ``LoadMode.DBT_LS`` will not work. In this scenario, users must use a `parsing method <parsing-methods.html>`_ that does not rely on dbt, such as ``LoadMode.MANIFEST``.
- Only ``InvocationMode.SUBPROCESS`` is supported currently, attempt to use ``InvocationMode.DBT_RUNNER`` will raise error.

Example of how to use:

Expand Down
35 changes: 27 additions & 8 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,34 @@ def test_render_config_env_vars_deprecated():


@pytest.mark.parametrize(
"execution_mode, expectation",
"execution_mode, invocation_mode, expectation",
[
(ExecutionMode.LOCAL, does_not_raise()),
(ExecutionMode.VIRTUALENV, pytest.raises(CosmosValueError)),
(ExecutionMode.KUBERNETES, pytest.raises(CosmosValueError)),
(ExecutionMode.DOCKER, pytest.raises(CosmosValueError)),
(ExecutionMode.AZURE_CONTAINER_INSTANCE, pytest.raises(CosmosValueError)),
(ExecutionMode.LOCAL, InvocationMode.DBT_RUNNER, does_not_raise()),
(ExecutionMode.LOCAL, InvocationMode.SUBPROCESS, does_not_raise()),
(ExecutionMode.LOCAL, None, does_not_raise()),
(ExecutionMode.VIRTUALENV, InvocationMode.DBT_RUNNER, pytest.raises(CosmosValueError)),
(ExecutionMode.VIRTUALENV, InvocationMode.SUBPROCESS, does_not_raise()),
(ExecutionMode.VIRTUALENV, None, does_not_raise()),
(ExecutionMode.KUBERNETES, InvocationMode.DBT_RUNNER, pytest.raises(CosmosValueError)),
(ExecutionMode.DOCKER, InvocationMode.DBT_RUNNER, pytest.raises(CosmosValueError)),
(ExecutionMode.AZURE_CONTAINER_INSTANCE, InvocationMode.DBT_RUNNER, pytest.raises(CosmosValueError)),
],
)
def test_execution_config_with_invocation_option(execution_mode, expectation):
def test_execution_config_with_invocation_option(execution_mode, invocation_mode, expectation):
with expectation:
ExecutionConfig(execution_mode=execution_mode, invocation_mode=InvocationMode.DBT_RUNNER)
ExecutionConfig(execution_mode=execution_mode, invocation_mode=invocation_mode)


@pytest.mark.parametrize(
"execution_mode, expected_invocation_mode",
[
(ExecutionMode.LOCAL, None),
(ExecutionMode.VIRTUALENV, InvocationMode.SUBPROCESS),
(ExecutionMode.KUBERNETES, None),
(ExecutionMode.DOCKER, None),
(ExecutionMode.AZURE_CONTAINER_INSTANCE, None),
],
)
def test_execution_config_default_config(execution_mode, expected_invocation_mode):
execution_config = ExecutionConfig(execution_mode=execution_mode)
assert execution_config.invocation_mode == expected_invocation_mode
Loading