-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature] support for dbt clone or dbt build ? DbtCloneOperator #878
Comments
Hey @SPTKL! Great to see you back with another interesting challenge! Hope you've been doing well. To implement a Here's a step-by-step guide to creating the
Here's an example implementation based on the provided context: class DbtCloneMixin:
"""
Mixin for dbt clone command.
"""
template_fields: Sequence[str] = ("clone_flags",)
def build_clone_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> tuple[list[str], dict]:
"""
Builds the dbt clone command.
"""
cmd = [self.dbt_executable_path, "clone"] # Assuming 'dbt clone' is the command.
env_vars = self.build_env_vars(context)
if cmd_flags:
cmd.extend(cmd_flags)
return cmd, env_vars
class DbtCloneKubernetesOperator(DbtCloneMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt clone command in a Kubernetes Pod.
"""
template_fields: Sequence[str] = DbtKubernetesBaseOperator.template_fields + DbtCloneMixin.template_fields # type: ignore[operator]
def execute(self, context: Context) -> None:
cmd, env_vars = self.build_clone_cmd(context=context, cmd_flags=self.cmd_flags)
self.build_and_run_cmd(context=context, cmd_flags=cmd) To modify your process to sequentially run, test, and clone each model, you can define an Airflow DAG that uses the Here's a simplified example of how you might set up the DAG: from airflow import DAG
from airflow.utils.dates import days_ago
from cosmos.operators.kubernetes import DbtRunKubernetesOperator, DbtTestKubernetesOperator, DbtCloneKubernetesOperator
with DAG("dbt_workflow", start_date=days_ago(1), schedule_interval=None) as dag:
run_model = DbtRunKubernetesOperator(
task_id="run_model",
profile_config=ProfileConfig(profile_name="staging"),
# Additional parameters as needed
)
test_model = DbtTestKubernetesOperator(
task_id="test_model",
profile_config=ProfileConfig(profile_name="staging"),
# Additional parameters as needed
)
clone_model = DbtCloneKubernetesOperator(
task_id="clone_model",
profile_config=ProfileConfig(profile_name="production"),
# Additional parameters as needed
)
run_model >> test_model >> clone_model This example assumes that you have a For more details, you can refer to the existing operators in the Sources
|
HI @SPTKL, This is a very useful feature. I'm still learning about the Your feature request has similarities with #903 . One of the challenges is that we may need to introduce Cosmos intermediate storage so that independent tasks, running potentially in different worker nodes/containers / VMs, could share the build artifacts needed by Would you be interested in contributing to Cosmos in this area? |
Yes, I'm interested! |
Hi there, our cicd runs "write-audit-publish", basically we do
dbt build
command to run and test models in stagingdbt clone
command to clone models created in step 1 to a production environment.Can we implement a
DbtCloneOperator
? If so what's the recommended place? Any suggestions on how to implement?Also given the flexibility of cosmos, say if we want to change our process to
dbt run
the model in stagingdbt test
the model in stagingdbt clone
the model from staging to productionideally, in the dag, each model should have their own task group with 4 tasks, run, test, snapshot, clone. It's impossible in the dbt cli because
clone
is a different command (but we get run, test, snapshot indbt build
). what's the recommended path for implementing something like this ^, I know it's difficult because given our current implementation, the environments are configured using different targets with different dbt profiles.The text was updated successfully, but these errors were encountered: