Skip to content

Commit

Permalink
Merge pull request #94 from DalgoT4D/union-op
Browse files Browse the repository at this point in the history
updated union table operations and added it to merge operations
  • Loading branch information
fatchat authored Mar 21, 2024
2 parents 663b829 + 2785a8c commit 43c9696
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 53 deletions.
61 changes: 49 additions & 12 deletions dbt_automation/assets/operations.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,34 @@ operations:
output_name: <name of the output model>
dest_schema: <enter your destination/output schema>
input:
- input_type: <"source" or "model">
input_name: <name of source table or ref model>
source_name: <name of the source defined in source.yml; will be null for type "model">
- input_type: <"source" or "model">
input_name: <name of source table or ref model>
source_name: <name of the source defined in source.yml; will be null for type "model">
- input_type: <"source" or "model">
input_name: <name of source table or ref model>
source_name: <name of the source defined in source.yml; will be null for type "model">
input_type: <"source" or "model">
input_name: <name of source table or ref model>
source_name: <name of the source defined in source.yml; will be null for type "model">
source_columns:
- <column name>
- <column name>
- <column name>
- ...
other_inputs:
- input:
input_type: <"source" or "model" table2>
input_name: <name of source table or ref model table2>
source_name: <name of the source defined in source.yml; will be null for type "model" table2>
source_columns:
- <column name>
- <column name>
- <column name>
- ..
- input:
input_type: <"source" or "model">
input_name: <name of source table or ref model>
source_name: <name of the source defined in source.yml; will be null for type "model">
source_columns:
- <column name>
- <column name>
- <column name>
- ...

- type: syncsources
config:
source_name: <top level name of the source in sources.yml file in dbt project. all tables will go under here>
Expand Down Expand Up @@ -480,8 +494,6 @@ operations:
- <column name>
- <column name>
- <column name>
dest_schema: <destination schema>
output_name: <name of the output model>
join_type: <"inner" or "left">
join_on:
key1: <colname of table1>
Expand Down Expand Up @@ -589,4 +601,29 @@ operations:
is_col: <boolean>
output_column_name: <output column name>
sql_snippet: <custom sql snippet of CASE WHEN END AS 'output_col_name'>
```
- type: unionall
config:
source_columns:
- <column name>
- <column name>
- <column name>
- ...
other_inputs:
- input:
input_type: <"source" or "model" table2>
input_name: <name of source table or ref model table2>
source_name: <name of the source defined in source.yml; will be null for type "model" table2>
source_columns:
- <column name>
- <column name>
- <column name>
- ..
- input:
input_type: <"source" or "model">
input_name: <name of source table or ref model>
source_name: <name of the source defined in source.yml; will be null for type "model">
source_columns:
- <column name>
- <column name>
- <column name>
- ...
5 changes: 5 additions & 0 deletions dbt_automation/operations/mergeoperations.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dbt_automation.operations.aggregate import aggregate_dbt_sql
from dbt_automation.operations.casewhen import casewhen_dbt_sql
from dbt_automation.operations.flattenjson import flattenjson_dbt_sql
from dbt_automation.operations.mergetables import union_tables, union_tables_sql


def merge_operations_sql(
Expand Down Expand Up @@ -116,6 +117,10 @@ def merge_operations_sql(
op_select_statement, out_cols = flattenjson_dbt_sql(
operation["config"], warehouse
)
elif operation["type"] == "unionall":
op_select_statement, out_cols = union_tables_sql(
operation["config"], warehouse
)

output_cols = out_cols

Expand Down
81 changes: 59 additions & 22 deletions dbt_automation/operations/mergetables.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,67 +3,104 @@
import os

import argparse
from collections import Counter
from logging import basicConfig, getLogger, INFO
from dotenv import load_dotenv

from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface
from dbt_automation.utils.tableutils import source_or_ref
from dbt_automation.utils.columnutils import quote_constvalue

basicConfig(level=INFO)
logger = getLogger()

# sql, len_output_set = mergetables.union_tables_sql({
# "input": {
# "input_type": "source",
# "source_name": "pytest_intermediate",
# "input_name": "arithmetic_add",
# },
# "source_columns": [
# "NGO", "Month", "measure1", "measure2"
# ],
# "other_inputs": [
# {
# "input": {
# "input_type": "source",
# "source_name": "pytest_intermediate",
# "input_name": "arithmetic_div",
# },
# "source_columns": [
# "NGO", "Month", "measure1",
# ],
# }
# ],
# }, wc_client)

# {{ dbt_utils.union_relations(relations=[
# source('pytest_intermediate', 'arithmetic_add'),
# source('pytest_intermediate', 'arithmetic_div')
# ] , include=['NGO','measure2','Month','measure1'])
# }}


# pylint:disable=unused-argument,logging-fstring-interpolation
def union_tables_sql(config, warehouse: WarehouseInterface):
"""Generates SQL code for unioning tables using the dbt_utils union_relations macro."""
input_arr = config["input_arr"]
dest_schema = config["dest_schema"]
source_columns = config["source_columns"]
"""
Generates SQL code for unioning tables using the dbt_utils union_relations macro.
NOTE: THIS OPERATION WONT WORK WITH CTES AS OFF NOW
"""
input_tables = [
{"input": config["input"], "source_columns": config["source_columns"]}
] + config.get("other_inputs", [])

names = set()
for input in input_arr:
name = source_or_ref(**input)
for table in input_tables:
name = source_or_ref(**table["input"])
if name in names:
logger.error("This appears more than once: %s", name)
raise ValueError("Duplicate inputs found")
names.add(name)

relations = "["
for input in input_arr:
relations += f"{source_or_ref(**input)},"
for table in input_tables:
relations += f"{source_or_ref(**table['input'])},"
relations = relations[:-1]
relations += "]"
dbt_code = ""

if config["input_arr"][0]["input_type"] != "cte":
dbt_code += f"{{{{ config(materialized='table',schema='{dest_schema}') }}}}\n"

include_cols = [f'"{col}"' for col in source_columns]
output_cols = set()
for table in input_tables:
output_cols.update(table["source_columns"])

# pylint:disable=consider-using-f-string
dbt_code += "{{ dbt_utils.union_relations("
dbt_code += f"relations={relations} , " + f"include=[{','.join(include_cols)}]"
dbt_code += (
f"relations={relations} , "
+ f"include=[{','.join([quote_constvalue(col, 'postgres') for col in list(output_cols)])}]"
)
dbt_code += ")}}"

return dbt_code, source_columns
return dbt_code, list(output_cols)


def union_tables(config, warehouse: WarehouseInterface, project_dir):
"""Generates a dbt model which uses the dbt_utils union_relations macro to union tables."""
dest_schema = config["dest_schema"]
output_model_name = config["output_name"]
dbt_sql = ""
if config["input"] != "cte":
dbt_sql = (
"{{ config(materialized='table', schema='" + config["dest_schema"] + "') }}"
)

union_code, output_cols = union_tables_sql(config, warehouse)
select_statement, output_cols = union_tables_sql(config, warehouse)
dbt_sql += "\n" + select_statement

dbtproject = dbtProject(project_dir)
dbtproject.ensure_models_dir(config["dest_schema"])
dbtproject.ensure_models_dir(dest_schema)

model_sql_path = dbtproject.write_model(
config["dest_schema"],
output_model_name,
union_code,
)
model_sql_path = dbtproject.write_model(dest_schema, output_model_name, dbt_sql)

return model_sql_path, output_cols

Expand Down
29 changes: 19 additions & 10 deletions tests/warehouse/test_bigquery_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,19 +671,28 @@ def test_mergetables(self):
config = {
"dest_schema": "pytest_intermediate",
"output_name": output_name,
"input_arr": [
{
"input_type": "model",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"source_columns": ["NGO", "Month", "measure1", "measure2", "Indicator"],
"input": {
"input_type": "model",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"other_inputs": [
{
"input_type": "model",
"input_name": "_airbyte_raw_Sheet2",
"source_name": None,
"input": {
"input_type": "model",
"input_name": "_airbyte_raw_Sheet2",
"source_name": None,
},
"source_columns": [
"NGO",
"Month",
"measure1",
"measure2",
"Indicator",
],
},
],
"source_columns": ["NGO", "Month", "measure1", "measure2", "Indicator"],
}

union_tables(
Expand Down
21 changes: 12 additions & 9 deletions tests/warehouse/test_postgres_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,16 +683,19 @@ def test_mergetables(self):
"dest_schema": "pytest_intermediate",
"output_name": output_name,
"source_columns": ["NGO", "Month", "measure1", "measure2", "Indicator"],
"input_arr": [
{
"input_type": "model",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"input": {
"input_type": "model",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"other_inputs": [
{
"input_type": "model",
"input_name": "_airbyte_raw_Sheet2",
"source_name": None,
"input": {
"input_type": "model",
"input_name": "_airbyte_raw_Sheet2",
"source_name": None,
},
"source_columns": ["NGO", "Month", "measure1", "measure2", "Indicator"],
},
],
}
Expand Down

0 comments on commit 43c9696

Please sign in to comment.