From 4166f262356bdd778caeaf4ed6938f1dbf333d56 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Thu, 21 Mar 2024 17:20:26 +0530 Subject: [PATCH 1/3] updated union table operations and added it to merge operations --- dbt_automation/assets/operations.template.yml | 61 +++++++++++++++---- dbt_automation/operations/mergeoperations.py | 5 ++ dbt_automation/operations/mergetables.py | 51 +++++++++------- 3 files changed, 84 insertions(+), 33 deletions(-) diff --git a/dbt_automation/assets/operations.template.yml b/dbt_automation/assets/operations.template.yml index 48270a7..310ba31 100644 --- a/dbt_automation/assets/operations.template.yml +++ b/dbt_automation/assets/operations.template.yml @@ -15,20 +15,34 @@ operations: output_name: dest_schema: input: - - input_type: <"source" or "model"> - input_name: - source_name: - - input_type: <"source" or "model"> - input_name: - source_name: - - input_type: <"source" or "model"> - input_name: - source_name: + input_type: <"source" or "model"> + input_name: + source_name: source_columns: - - - - ... + other_inputs: + - input: + input_type: <"source" or "model" table2> + input_name: + source_name: + source_columns: + - + - + - + - .. + - input: + input_type: <"source" or "model"> + input_name: + source_name: + source_columns: + - + - + - + - ... + - type: syncsources config: source_name: @@ -480,8 +494,6 @@ operations: - - - - dest_schema: - output_name: join_type: <"inner" or "left"> join_on: key1: @@ -589,4 +601,29 @@ operations: is_col: output_column_name: sql_snippet: - ``` \ No newline at end of file + - type: unionall + config: + source_columns: + - + - + - + - ... + other_inputs: + - input: + input_type: <"source" or "model" table2> + input_name: + source_name: + source_columns: + - + - + - + - .. + - input: + input_type: <"source" or "model"> + input_name: + source_name: + source_columns: + - + - + - + - ... \ No newline at end of file diff --git a/dbt_automation/operations/mergeoperations.py b/dbt_automation/operations/mergeoperations.py index 997a3b8..b21e910 100644 --- a/dbt_automation/operations/mergeoperations.py +++ b/dbt_automation/operations/mergeoperations.py @@ -21,6 +21,7 @@ from dbt_automation.operations.aggregate import aggregate_dbt_sql from dbt_automation.operations.casewhen import casewhen_dbt_sql from dbt_automation.operations.flattenjson import flattenjson_dbt_sql +from dbt_automation.operations.mergetables import union_tables, union_tables_sql def merge_operations_sql( @@ -116,6 +117,10 @@ def merge_operations_sql( op_select_statement, out_cols = flattenjson_dbt_sql( operation["config"], warehouse ) + elif operation["type"] == "unionall": + op_select_statement, out_cols = union_tables_sql( + operation["config"], warehouse + ) output_cols = out_cols diff --git a/dbt_automation/operations/mergetables.py b/dbt_automation/operations/mergetables.py index 84d19eb..9d1d0ed 100644 --- a/dbt_automation/operations/mergetables.py +++ b/dbt_automation/operations/mergetables.py @@ -10,6 +10,7 @@ from dbt_automation.utils.dbtproject import dbtProject from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface from dbt_automation.utils.tableutils import source_or_ref +from dbt_automation.utils.columnutils import quote_constvalue basicConfig(level=INFO) logger = getLogger() @@ -17,53 +18,61 @@ # pylint:disable=unused-argument,logging-fstring-interpolation def union_tables_sql(config, warehouse: WarehouseInterface): - """Generates SQL code for unioning tables using the dbt_utils union_relations macro.""" - input_arr = config["input_arr"] - dest_schema = config["dest_schema"] - source_columns = config["source_columns"] + """ + Generates SQL code for unioning tables using the dbt_utils union_relations macro. + NOTE: THIS OPERATION WONT WORK WITH CTES AS OFF NOW + """ + input_tables = [ + {"input": config["input"], "source_columns": config["source_columns"]} + ] + config.get("other_inputs", []) names = set() - for input in input_arr: - name = source_or_ref(**input) + for table in input_tables: + name = source_or_ref(**table["input"]) if name in names: logger.error("This appears more than once: %s", name) raise ValueError("Duplicate inputs found") names.add(name) relations = "[" - for input in input_arr: - relations += f"{source_or_ref(**input)}," + for table in input_tables: + relations += f"{source_or_ref(**table['input'])}," relations = relations[:-1] relations += "]" dbt_code = "" - if config["input_arr"][0]["input_type"] != "cte": - dbt_code += f"{{{{ config(materialized='table',schema='{dest_schema}') }}}}\n" - - include_cols = [f'"{col}"' for col in source_columns] + output_cols = set() + for table in input_tables: + output_cols.update(table["source_columns"]) # pylint:disable=consider-using-f-string dbt_code += "{{ dbt_utils.union_relations(" - dbt_code += f"relations={relations} , " + f"include=[{','.join(include_cols)}]" + dbt_code += ( + f"relations={relations} , " + + f"include=[{','.join([quote_constvalue(col, 'postgres') for col in list(output_cols)])}]" + ) dbt_code += ")}}" - return dbt_code, source_columns + return dbt_code, list(output_cols) def union_tables(config, warehouse: WarehouseInterface, project_dir): """Generates a dbt model which uses the dbt_utils union_relations macro to union tables.""" + dest_schema = config["dest_schema"] output_model_name = config["output_name"] + dbt_sql = "" + if config["input"] != "cte": + dbt_sql = ( + "{{ config(materialized='table', schema='" + config["dest_schema"] + "') }}" + ) - union_code, output_cols = union_tables_sql(config, warehouse) + select_statement, output_cols = union_tables_sql(config, warehouse) + dbt_sql += "\n" + select_statement dbtproject = dbtProject(project_dir) - dbtproject.ensure_models_dir(config["dest_schema"]) + dbtproject.ensure_models_dir(dest_schema) - model_sql_path = dbtproject.write_model( - config["dest_schema"], - output_model_name, - union_code, - ) + model_sql_path = dbtproject.write_model(dest_schema, output_model_name, dbt_sql) return model_sql_path, output_cols From dd9b568eb60396571f872b6de68981c00ed751b5 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Thu, 21 Mar 2024 17:22:57 +0530 Subject: [PATCH 2/3] updated test case for union op --- tests/warehouse/test_bigquery_ops.py | 29 ++++++++++++++++++---------- tests/warehouse/test_postgres_ops.py | 21 +++++++++++--------- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/tests/warehouse/test_bigquery_ops.py b/tests/warehouse/test_bigquery_ops.py index 5acd190..c597542 100644 --- a/tests/warehouse/test_bigquery_ops.py +++ b/tests/warehouse/test_bigquery_ops.py @@ -671,19 +671,28 @@ def test_mergetables(self): config = { "dest_schema": "pytest_intermediate", "output_name": output_name, - "input_arr": [ - { - "input_type": "model", - "input_name": "_airbyte_raw_Sheet1", - "source_name": None, - }, + "source_columns": ["NGO", "Month", "measure1", "measure2", "Indicator"], + "input": { + "input_type": "model", + "input_name": "_airbyte_raw_Sheet1", + "source_name": None, + }, + "other_inputs": [ { - "input_type": "model", - "input_name": "_airbyte_raw_Sheet2", - "source_name": None, + "input": { + "input_type": "model", + "input_name": "_airbyte_raw_Sheet2", + "source_name": None, + }, + "source_columns": [ + "NGO", + "Month", + "measure1", + "measure2", + "Indicator", + ], }, ], - "source_columns": ["NGO", "Month", "measure1", "measure2", "Indicator"], } union_tables( diff --git a/tests/warehouse/test_postgres_ops.py b/tests/warehouse/test_postgres_ops.py index 67a0068..de9b08f 100644 --- a/tests/warehouse/test_postgres_ops.py +++ b/tests/warehouse/test_postgres_ops.py @@ -683,16 +683,19 @@ def test_mergetables(self): "dest_schema": "pytest_intermediate", "output_name": output_name, "source_columns": ["NGO", "Month", "measure1", "measure2", "Indicator"], - "input_arr": [ - { - "input_type": "model", - "input_name": "_airbyte_raw_Sheet1", - "source_name": None, - }, + "input": { + "input_type": "model", + "input_name": "_airbyte_raw_Sheet1", + "source_name": None, + }, + "other_inputs": [ { - "input_type": "model", - "input_name": "_airbyte_raw_Sheet2", - "source_name": None, + "input": { + "input_type": "model", + "input_name": "_airbyte_raw_Sheet2", + "source_name": None, + }, + "source_columns": ["NGO", "Month", "measure1", "measure2", "Indicator"], }, ], } From 2785a8cb92ee867470bd43f7db7c28ab1220869f Mon Sep 17 00:00:00 2001 From: Rohit Chatterjee Date: Thu, 21 Mar 2024 23:11:54 +0800 Subject: [PATCH 3/3] comment --- dbt_automation/operations/mergetables.py | 30 +++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/dbt_automation/operations/mergetables.py b/dbt_automation/operations/mergetables.py index 9d1d0ed..f72ee99 100644 --- a/dbt_automation/operations/mergetables.py +++ b/dbt_automation/operations/mergetables.py @@ -3,7 +3,6 @@ import os import argparse -from collections import Counter from logging import basicConfig, getLogger, INFO from dotenv import load_dotenv @@ -15,6 +14,35 @@ basicConfig(level=INFO) logger = getLogger() +# sql, len_output_set = mergetables.union_tables_sql({ +# "input": { +# "input_type": "source", +# "source_name": "pytest_intermediate", +# "input_name": "arithmetic_add", +# }, +# "source_columns": [ +# "NGO", "Month", "measure1", "measure2" +# ], +# "other_inputs": [ +# { +# "input": { +# "input_type": "source", +# "source_name": "pytest_intermediate", +# "input_name": "arithmetic_div", +# }, +# "source_columns": [ +# "NGO", "Month", "measure1", +# ], +# } +# ], +# }, wc_client) + +# {{ dbt_utils.union_relations(relations=[ +# source('pytest_intermediate', 'arithmetic_add'), +# source('pytest_intermediate', 'arithmetic_div') +# ] , include=['NGO','measure2','Month','measure1']) +# }} + # pylint:disable=unused-argument,logging-fstring-interpolation def union_tables_sql(config, warehouse: WarehouseInterface):