Skip to content

Commit

Permalink
Merge pull request #80 from DalgoT4D/72-support-for-join-operations
Browse files Browse the repository at this point in the history
72 support for join operations
  • Loading branch information
fatchat authored Mar 9, 2024
2 parents 62b5fe0 + b34842d commit c41a163
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 53 deletions.
61 changes: 43 additions & 18 deletions dbt_automation/assets/operations.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -206,30 +206,31 @@ operations:

- type: join
config:
input_arr:
- input:
input_type: <"source" or "model">
input_name: <name of source table or ref model>
source_name: <name of the source defined in source.yml; will be null for type "model">
source_columns:
- <column name>
- <column name>
- <column name>
- input:
input_type: <"source" or "model">
input_name: <name of source table or ref model>
source_name: <name of the source defined in source.yml; will be null for type "model">
source_columns:
- <column name>
- <column name>
- <column name>
input:
input_type: <"source" or "model" of table1>
input_name: <name of source table or ref model table1>
source_name: <name of the source defined in source.yml; will be null for type "model" table1>
source_columns:
- <column name>
- <column name>
- <column name>
dest_schema: <destination schema>
output_name: <name of the output model>
join_type: <"inner" or "left">
join_on:
key1: <colname of table1>
key2: <colname of table2>
compare_with: <"=" or "!=" or "<" or ">" or "<=" or ">=" >
other_inputs:
- input:
input_type: <"source" or "model" table2>
input_name: <name of source table or ref model table2>
source_name: <name of the source defined in source.yml; will be null for type "model" table2>
source_columns:
- <column name>
- <column name>
- <column name>
seq: < its 1 for the above input ; will help in mergeoperation chaininig to figure out if we want to do a left or a right join>


- type: mergeoperations
Expand Down Expand Up @@ -351,4 +352,28 @@ operations:
- find: <old string to be replaced>
replace: <new string to be replaced with>
- find: <old string to be replaced>
replace: <new string to be replaced with>
replace: <new string to be replaced with>
- type: join
config:
source_columns:
- <column name>
- <column name>
- <column name>
dest_schema: <destination schema>
output_name: <name of the output model>
join_type: <"inner" or "left">
join_on:
key1: <colname of table1>
key2: <colname of table2>
compare_with: <"=" or "!=" or "<" or ">" or "<=" or ">=" >
other_inputs:
- input:
input_type: <"source" or "model" table2>
input_name: <name of source table or ref model table2>
source_name: <name of the source defined in source.yml; will be null for type "model" table2>
source_columns:
- <column name>
- <column name>
- <column name>
seq: < its 1 for the above input ; will help in mergeoperation chaininig to figure out if we want to do a left or a right join>

80 changes: 45 additions & 35 deletions dbt_automation/operations/joins.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,22 @@
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface
from dbt_automation.utils.tableutils import source_or_ref


# sql, len_output_set = joins.joins_sql({
# "input_arr": [
# {
# "input": {
# "input_type": "source",
# "source_name": "pytest_intermediate",
# "input_name": "arithmetic_add",
# },
# "source_columns": ["measure1", "measure2"]
# },
# "input": {
# "input_type": "source",
# "source_name": "pytest_intermediate",
# "input_name": "arithmetic_add",
# },
# "source_columns": ["measure1", "measure2"],
# "other_inputs": [
# {
# "input": {
# "input_type": "source",
# "source_name": "pytest_intermediate",
# "input_name": "arithmetic_div",
# },
# "source_columns": ["Indicator", "measure2"],
# "source_columns": ["Indicator", "measure2", "Month"],
# "seq": 1
# },
# ],
# "join_type": "inner",
Expand All @@ -35,26 +33,31 @@
# },
# "dest_schema": "joined",
# }, wc_client)

# gives sql =

# SELECT "t1"."measure1",
# "t1"."measure2",
# "t2"."Indicator",
# "t2"."measure2" AS "measure2_2"
# FROM {{source('pytest_intermediate', 'arithmetic_add')}} t1
# INNER JOIN {{source('pytest_intermediate', 'arithmetic_div')}} t2
# ON "t1"."NGO" = "t2"."NGO"

# and len_output_set = 4 (columns)
#
# then sql is
#
# SELECT "t1"."measure1",
# "t1"."measure2",
# "t2"."Indicator",
# "t2"."measure2" AS "measure2_2",
# "t2"."Month"
# FROM {{source('pytest_intermediate', 'arithmetic_add')}} t1
# INNER JOIN {{source('pytest_intermediate', 'arithmetic_div')}} t2
# ON "t1"."NGO" = "t2"."NGO"
#
# and len_output_set = 5 (columns)


def joins_sql(
config: dict,
warehouse: WarehouseInterface,
):
"""Given a regex and a column name, extract the regex from the column."""
input_tables = config.get("input_arr", [])

input_tables = [
{"input": config["input"], "source_columns": config["source_columns"], "seq": 1}
] + config.get("other_inputs", [])
input_tables.sort(key=lambda x: x["seq"])
join_type: str = config.get("join_type", "")
join_on = config.get("join_on", {})

Expand Down Expand Up @@ -86,21 +89,28 @@ def joins_sql(

dbt_code = dbt_code[:-2]

select_from = source_or_ref(**input_tables[0]["input"])
if input_tables[0]["input"]["input_type"] == "cte":
table1 = input_tables[0]["input"]
select_from = source_or_ref(**table1)
if table1["input_type"] == "cte":
dbt_code += "\n FROM " + select_from + " " + aliases[0] + "\n"
else:
dbt_code += "\n FROM " + "{{" + select_from + "}}" + " " + aliases[0] + "\n"

# join
dbt_code += (
f" {join_type.upper()} JOIN "
+ "{{"
+ source_or_ref(**input_tables[1]["input"])
+ "}}"
+ f" {aliases[1]}"
+ "\n"
)
table2 = input_tables[1]["input"]
join_with = source_or_ref(**table2)
if table2["input_type"] == "cte":
dbt_code += f" {join_type.upper()} JOIN " + join_with + " " + aliases[1] + "\n"
else:
dbt_code += (
f" {join_type.upper()} JOIN "
+ "{{"
+ join_with
+ "}}"
+ " "
+ aliases[1]
+ "\n"
)

dbt_code += f" ON {quote_columnname(aliases[0], warehouse.name)}.{quote_columnname(join_on['key1'], warehouse.name)}"
dbt_code += f" {join_on['compare_with']} {quote_columnname(aliases[1], warehouse.name)}.{quote_columnname(join_on['key2'], warehouse.name)}\n"
Expand All @@ -115,7 +125,7 @@ def join(config: dict, warehouse: WarehouseInterface, project_dir: str):
dest_schema = config["dest_schema"]
output_model_name = config["output_name"]
dbt_sql = ""
if config["input_arr"][0]["input"]["input_type"] != "cte":
if config["input"] != "cte":
dbt_sql = (
"{{ config(materialized='table', schema='" + config["dest_schema"] + "') }}"
)
Expand Down
3 changes: 3 additions & 0 deletions dbt_automation/operations/mergeoperations.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface
from dbt_automation.operations.castdatatypes import cast_datatypes_sql
from dbt_automation.operations.replace import replace_dbt_sql
from dbt_automation.operations.joins import joins_sql
from dbt_automation.utils.tableutils import source_or_ref


Expand Down Expand Up @@ -89,6 +90,8 @@ def merge_operations_sql(
op_select_statement, out_cols = replace_dbt_sql(
operation["config"], warehouse
)
elif operation["type"] == "join":
op_select_statement, out_cols = joins_sql(operation["config"], warehouse)

output_cols = out_cols

Expand Down

0 comments on commit c41a163

Please sign in to comment.