Skip to content

Commit

Permalink
dbt clone (#7881)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthew McKnight <matthew.mcknight@dbtlabs.com>
  • Loading branch information
aranke and McKnight-42 authored Jun 29, 2023
1 parent 1af94de commit 5c7aa7f
Show file tree
Hide file tree
Showing 20 changed files with 726 additions and 83 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230616-104849.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: dbt clone
time: 2023-06-16T10:48:49.079961-05:00
custom:
Author: jtcohen6 aranke McKnight-42
Issue: "7258"
1 change: 1 addition & 0 deletions core/dbt/cli/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ def command_args(command: CliCommand) -> ArgsList:
CMD_DICT: Dict[CliCommand, ClickCommand] = {
CliCommand.BUILD: cli.build,
CliCommand.CLEAN: cli.clean,
CliCommand.CLONE: cli.clone,
CliCommand.COMPILE: cli.compile,
CliCommand.DOCS_GENERATE: cli.docs_generate,
CliCommand.DOCS_SERVE: cli.docs_serve,
Expand Down
38 changes: 38 additions & 0 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dbt.events.base_types import EventMsg
from dbt.task.build import BuildTask
from dbt.task.clean import CleanTask
from dbt.task.clone import CloneTask
from dbt.task.compile import CompileTask
from dbt.task.debug import DebugTask
from dbt.task.deps import DepsTask
Expand Down Expand Up @@ -607,6 +608,43 @@ def retry(ctx, **kwargs):
return results, success


# dbt clone
@cli.command("clone")
@click.pass_context
@p.defer_state
@p.exclude
@p.full_refresh
@p.profile
@p.profiles_dir
@p.project_dir
@p.resource_type
@p.select
@p.selector
@p.state # required
@p.target
@p.target_path
@p.threads
@p.vars
@p.version_check
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
@requires.postflight
def clone(ctx, **kwargs):
"""Create clones of selected nodes based on their location in the manifest provided to --state."""
task = CloneTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)

results = task.run()
success = task.interpret_results(results)
return results, success


# dbt run operation
@cli.command("run-operation")
@click.pass_context
Expand Down
1 change: 1 addition & 0 deletions core/dbt/cli/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class Command(Enum):
BUILD = "build"
CLEAN = "clean"
COMPILE = "compile"
CLONE = "clone"
DOCS_GENERATE = "generate"
DOCS_SERVE = "serve"
DEBUG = "debug"
Expand Down
42 changes: 33 additions & 9 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,20 +1378,30 @@ def post_hooks(self) -> List[Dict[str, Any]]:
@contextproperty
def sql(self) -> Optional[str]:
# only doing this in sql model for backward compatible
if (
getattr(self.model, "extra_ctes_injected", None)
and self.model.language == ModelLanguage.sql # type: ignore[union-attr]
):
# TODO CT-211
return self.model.compiled_code # type: ignore[union-attr]
return None
if self.model.language == ModelLanguage.sql: # type: ignore[union-attr]
# If the model is deferred and the adapter doesn't support zero-copy cloning, then select * from the prod
# relation
if getattr(self.model, "defer_relation", None):
# TODO https://github.com/dbt-labs/dbt-core/issues/7976
return f"select * from {self.model.defer_relation.relation_name or str(self.defer_relation)}" # type: ignore[union-attr]
elif getattr(self.model, "extra_ctes_injected", None):
# TODO CT-211
return self.model.compiled_code # type: ignore[union-attr]
else:
return None
else:
return None

@contextproperty
def compiled_code(self) -> Optional[str]:
if getattr(self.model, "extra_ctes_injected", None):
if getattr(self.model, "defer_relation", None):
# TODO https://github.com/dbt-labs/dbt-core/issues/7976
return f"select * from {self.model.defer_relation.relation_name or str(self.defer_relation)}" # type: ignore[union-attr]
elif getattr(self.model, "extra_ctes_injected", None):
# TODO CT-211
return self.model.compiled_code # type: ignore[union-attr]
return None
else:
return None

@contextproperty
def database(self) -> str:
Expand Down Expand Up @@ -1436,6 +1446,20 @@ def this(self) -> Optional[RelationProxy]:
return None
return self.db_wrapper.Relation.create_from(self.config, self.model)

@contextproperty
def defer_relation(self) -> Optional[RelationProxy]:
"""
For commands which add information about this node's corresponding
production version (via a --state artifact), access the Relation
object for that stateful other
"""
if getattr(self.model, "defer_relation", None):
return self.db_wrapper.Relation.create_from_node(
self.config, self.model.defer_relation # type: ignore
)
else:
return None


# This is called by '_context_for', used in 'render_with_context'
def generate_parser_model_context(
Expand Down
12 changes: 7 additions & 5 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
ManifestNode,
Metric,
ModelNode,
RelationalNode,
DeferRelation,
ResultNode,
SemanticModel,
SourceDefinition,
Expand Down Expand Up @@ -1218,8 +1218,10 @@ def add_from_artifact(
for unique_id, node in other.nodes.items():
current = self.nodes.get(unique_id)
if current and (node.resource_type in refables and not node.is_ephemeral):
state_relation = RelationalNode(node.database, node.schema, node.alias)
self.nodes[unique_id] = current.replace(state_relation=state_relation)
defer_relation = DeferRelation(
node.database, node.schema, node.alias, node.relation_name
)
self.nodes[unique_id] = current.replace(defer_relation=defer_relation)

# Methods that were formerly in ParseResult

Expand Down Expand Up @@ -1440,8 +1442,8 @@ def __post_serialize__(self, dct):
for unique_id, node in dct["nodes"].items():
if "config_call_dict" in node:
del node["config_call_dict"]
if "state_relation" in node:
del node["state_relation"]
if "defer_relation" in node:
del node["defer_relation"]
return dct


Expand Down
20 changes: 5 additions & 15 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,9 @@ def add_macro(self, value: str):


@dataclass
class RelationalNode(HasRelationMetadata):
class DeferRelation(HasRelationMetadata):
alias: str
relation_name: Optional[str]

@property
def identifier(self):
Expand All @@ -275,17 +276,6 @@ def add_node(self, value: str):
self.nodes.append(value)


@dataclass
class StateRelation(dbtClassMixin):
alias: str
database: Optional[str]
schema: str

@property
def identifier(self):
return self.alias


@dataclass
class ParsedNodeMandatory(GraphNode, HasRelationMetadata, Replaceable):
alias: str
Expand Down Expand Up @@ -577,7 +567,7 @@ class ModelNode(CompiledNode):
version: Optional[NodeVersion] = None
latest_version: Optional[NodeVersion] = None
deprecation_date: Optional[datetime] = None
state_relation: Optional[StateRelation] = None
defer_relation: Optional[DeferRelation] = None

@classmethod
def from_args(cls, args: ModelNodeArgs) -> "ModelNode":
Expand Down Expand Up @@ -796,7 +786,7 @@ class SeedNode(ParsedNode): # No SQLDefaults!
# and we need the root_path to load the seed later
root_path: Optional[str] = None
depends_on: MacroDependsOn = field(default_factory=MacroDependsOn)
state_relation: Optional[StateRelation] = None
defer_relation: Optional[DeferRelation] = None

def same_seeds(self, other: "SeedNode") -> bool:
# for seeds, we check the hashes. If the hashes are different types,
Expand Down Expand Up @@ -991,7 +981,7 @@ class IntermediateSnapshotNode(CompiledNode):
class SnapshotNode(CompiledNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.Snapshot]})
config: SnapshotConfig
state_relation: Optional[StateRelation] = None
defer_relation: Optional[DeferRelation] = None


# ====================================
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{% macro can_clone_table() %}
{{ return(adapter.dispatch('can_clone_table', 'dbt')()) }}
{% endmacro %}

{% macro default__can_clone_table() %}
{{ return(False) }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{%- materialization clone, default -%}

{%- set relations = {'relations': []} -%}

{%- if not defer_relation -%}
-- nothing to do
{{ log("No relation found in state manifest for " ~ model.unique_id, info=True) }}
{{ return(relations) }}
{%- endif -%}

{%- set existing_relation = load_cached_relation(this) -%}

{%- if existing_relation and not flags.FULL_REFRESH -%}
-- noop!
{{ log("Relation " ~ existing_relation ~ " already exists", info=True) }}
{{ return(relations) }}
{%- endif -%}

{%- set other_existing_relation = load_cached_relation(defer_relation) -%}

-- If this is a database that can do zero-copy cloning of tables, and the other relation is a table, then this will be a table
-- Otherwise, this will be a view

{% set can_clone_table = can_clone_table() %}

{%- if other_existing_relation and other_existing_relation.type == 'table' and can_clone_table -%}

{%- set target_relation = this.incorporate(type='table') -%}
{% if existing_relation is not none and not existing_relation.is_table %}
{{ log("Dropping relation " ~ existing_relation ~ " because it is of type " ~ existing_relation.type) }}
{{ drop_relation_if_exists(existing_relation) }}
{% endif %}

-- as a general rule, data platforms that can clone tables can also do atomic 'create or replace'
{% call statement('main') %}
{{ create_or_replace_clone(target_relation, defer_relation) }}
{% endcall %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %}

{{ return({'relations': [target_relation]}) }}

{%- else -%}

{%- set target_relation = this.incorporate(type='view') -%}

-- reuse the view materialization
-- TODO: support actual dispatch for materialization macros
-- Tracking ticket: https://github.com/dbt-labs/dbt-core/issues/7799
{% set search_name = "materialization_view_" ~ adapter.type() %}
{% if not search_name in context %}
{% set search_name = "materialization_view_default" %}
{% endif %}
{% set materialization_macro = context[search_name] %}
{% set relations = materialization_macro() %}
{{ return(relations) }}

{%- endif -%}

{%- endmaterialization -%}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{% macro create_or_replace_clone(this_relation, defer_relation) %}
{{ return(adapter.dispatch('create_or_replace_clone', 'dbt')(this_relation, defer_relation)) }}
{% endmacro %}

{% macro default__create_or_replace_clone(this_relation, defer_relation) %}
create or replace table {{ this_relation }} clone {{ defer_relation }}
{% endmacro %}
Loading

0 comments on commit 5c7aa7f

Please sign in to comment.