From 3e495f648f9ec2e4a35814ba6dadaa7a807d6693 Mon Sep 17 00:00:00 2001 From: Rohit Chatterjee Date: Sun, 17 Sep 2023 13:53:30 +0530 Subject: [PATCH] defaults for various filenames bug fix: one model per table in the output --- README.md | 15 +++----- mknormalized.py | 100 ++++++++++++++++++++++++------------------------ syncsources.py | 43 ++++++++------------- 3 files changed, 71 insertions(+), 87 deletions(-) diff --git a/README.md b/README.md index d1ab204..0d1c5a6 100644 --- a/README.md +++ b/README.md @@ -5,20 +5,15 @@ Example usage: 1. - python scaffolddbt.py --project-name shridbt \ - --project-dir ../shridbt + python scaffolddbt.py --project-dir ../shridbt \ + --project-name shridbt 2. - python syncsources.py --schema staging \ - --source-name shrikobo \ - --output-sources-file ../shridbt/models/staging/sources.yml + python syncsources.py --project-dir ../shridbt \ + --source-name shrikobo 3. - python mknormalized.py --project-dir ../shridbt/ \ - --input-schema staging \ - --output-schema intermediate \ - --output-schema-schemafile schema.yml \ - --sources-file sources.yml + python mknormalized.py --project-dir ../shridbt/ diff --git a/mknormalized.py b/mknormalized.py index ce84d02..5e8bd2d 100644 --- a/mknormalized.py +++ b/mknormalized.py @@ -3,6 +3,7 @@ import argparse from logging import basicConfig, getLogger, INFO from string import Template +from pathlib import Path import yaml @@ -13,24 +14,9 @@ parser = argparse.ArgumentParser() parser.add_argument("--project-dir", required=True) parser.add_argument("--input-schema", default="staging", help="e.g. staging") -parser.add_argument("--output-schema", required=True, help="e.g. intermediate") -parser.add_argument("--output-schema-schemafile", required=True, help="e.g. schema.yml") -parser.add_argument( - "--sources-file", - required=True, - help="filename must be relative to /models//", -) +parser.add_argument("--output-schema", default="intermediate", help="e.g. intermediate") args = parser.parse_args() -# ================================================================================ -# create the output directory -table_normalized_dbtmodel_dir = ( - f"{args.project_dir}/models/{args.output_schema}/normalized/" -) -if not os.path.exists(table_normalized_dbtmodel_dir): - os.makedirs(table_normalized_dbtmodel_dir) - logger.info("created directory %s", table_normalized_dbtmodel_dir) - # ================================================================================ NORMALIZED_MODEL_TEMPLATE = Template( @@ -52,7 +38,7 @@ ) -def mk_normalized_dbtmode(source_name: str, table_name: str) -> str: +def mk_normalized_dbtmodel(source_name: str, table_name: str) -> str: """creates a .sql dbt model""" return NORMALIZED_MODEL_TEMPLATE.substitute( {"source_name": source_name, "table_name": table_name} @@ -60,59 +46,75 @@ def mk_normalized_dbtmode(source_name: str, table_name: str) -> str: # ================================================================================ -def process_source(src: dict, output_schema: str): +def process_source(src: dict, output_schema: str, output_dir: str): """iterates through the tables in a source and creates their _normalized models""" + models = [] for table in src["tables"]: logger.info( "[process_source] source_name=%s table_name=%s", src["name"], table["name"] ) - table_normalized_dbtmodel = mk_normalized_dbtmode(src["name"], table["name"]) + table_normalized_dbtmodel = mk_normalized_dbtmodel(src["name"], table["name"]) - table_normalized_dbtmodel_filename = ( - table_normalized_dbtmodel_dir + src["name"] + "_normalized.sql" - ) + table_normalized_dbtmodel_filename = Path(output_dir) / (src["name"] + ".sql") logger.info("writing %s", table_normalized_dbtmodel_filename) with open(table_normalized_dbtmodel_filename, "w", encoding="utf-8") as outfile: outfile.write(table_normalized_dbtmodel) outfile.close() - return { - "name": src["name"], - "description": "", - "+schema": output_schema, - "columns": [ + models.append( { - "name": "_airbyte_ab_id", + "name": table["name"], "description": "", - "tests": ["unique", "not_null"], + "+schema": output_schema, + "columns": [ + { + "name": "_airbyte_ab_id", + "description": "", + "tests": ["unique", "not_null"], + } + ], } - ], - } + ) + + return models + + +# ================================================================================ +def get_source(filename: str, input_schema: str) -> dict: + """read the config file containing `sources` keys and return the source + matching the input schema""" + with open(filename, "r", encoding="utf-8") as sources_file: + sources = yaml.safe_load(sources_file) + + for src in sources["sources"]: + if src["schema"] == input_schema: + return src + + return None # ================================================================================ -sources_filename = f"{args.project_dir}/models/{args.input_schema}/{args.sources_file}" +# create the output directory +output_schema_dir = Path(args.project_dir) / "models" / args.output_schema +if not os.path.exists(output_schema_dir): + os.makedirs(output_schema_dir) + logger.info("created directory %s", output_schema_dir) -output_schema_schema = { +sources_filename = Path(args.project_dir) / "models" / args.input_schema / "sources.yml" +models_filename = Path(args.project_dir) / "models" / args.output_schema / "models.yml" + +output_config = { "version": 2, "models": [], } -with open(sources_filename, "r", encoding="utf-8") as sources_file: - sources = yaml.safe_load(sources_file) - - for source in sources["sources"]: - if source["schema"] == "staging": - output_schema_schema["models"].append( - process_source(source, args.output_schema) - ) +source = get_source(sources_filename, args.input_schema) +if source: + output_config["models"] = process_source( + source, args.output_schema, output_schema_dir + ) -output_schema_schemafilename = ( - f"{args.project_dir}/models/{args.output_schema}/{args.output_schema_schemafile}" -) -logger.info("writing %s", output_schema_schemafilename) -with open( - output_schema_schemafilename, "w", encoding="utf-8" -) as output_schema_schemafile: - yaml.safe_dump(output_schema_schema, output_schema_schemafile, sort_keys=False) + logger.info("writing %s", models_filename) + with open(models_filename, "w", encoding="utf-8") as models_file: + yaml.safe_dump(output_config, models_file, sort_keys=False) diff --git a/syncsources.py b/syncsources.py index 3f744d0..d814138 100644 --- a/syncsources.py +++ b/syncsources.py @@ -2,6 +2,7 @@ import os import argparse from logging import basicConfig, getLogger, INFO +from pathlib import Path import yaml import psycopg2 @@ -14,17 +15,15 @@ parser = argparse.ArgumentParser( """ -Generates a source.yml containing exactly one source +Generates a sources.yml configuration containing exactly one source That source will have one or more tables +Ref: https://docs.getdbt.com/reference/source-properties +Database connection parameters are read from syncsources.env """ ) -parser.add_argument("--schema", required=True, help="e.g. staging") +parser.add_argument("--project-dir", required=True) parser.add_argument("--source-name", required=True) -parser.add_argument("--input-sources-file") -parser.add_argument( - "--output-sources-file", - help="can be the same as input-sources-file, will overwrite", -) +parser.add_argument("--schema", default="staging", help="e.g. staging") args = parser.parse_args() @@ -132,12 +131,7 @@ def get_connection(): # ================================================================================ -def make_source_definitions( - source_name: str, - input_schema: str, - existing_source_definitions_file=None, - output_sources_file=None, -): +def make_source_definitions(source_name: str, input_schema: str, sources_file: str): """ reads tables from the input_schema to create a dbt sources.yml uses the metadata from the existing source definitions, if any @@ -157,11 +151,9 @@ def make_source_definitions( dbsourcedefinitions = mksourcedefinition(source_name, input_schema, tablenames) logger.info("read sources from database schema %s", input_schema) - if existing_source_definitions_file: - filesourcedefinitions = readsourcedefinitions(existing_source_definitions_file) - logger.info( - "read existing source defs from %s", existing_source_definitions_file - ) + if Path(sources_file).exists(): + filesourcedefinitions = readsourcedefinitions(sources_file) + logger.info("read existing source defs from %s", sources_file) else: filesourcedefinitions = {"version": 2, "sources": []} @@ -170,17 +162,12 @@ def make_source_definitions( filesourcedefinitions, dbsourcedefinitions ) logger.info("created (new) source definitions") - if output_sources_file: - with open(output_sources_file, "w", encoding="utf-8") as outfile: - yaml.safe_dump(merged_definitions, outfile, sort_keys=False) - logger.info("wrote source definitions to %s", output_sources_file) - else: - logger.info("sources to be written to file:") - logger.info(merged_definitions) + with open(sources_file, "w", encoding="utf-8") as outfile: + yaml.safe_dump(merged_definitions, outfile, sort_keys=False) + logger.info("wrote source definitions to %s", sources_file) # ================================================================================ if __name__ == "__main__": - make_source_definitions( - args.source_name, args.schema, args.input_sources_file, args.output_sources_file - ) + sources_filename = Path(args.project_dir) / "models" / args.schema / "sources.yml" + make_source_definitions(args.source_name, args.schema, sources_filename)