Skip to content

Commit

Permalink
Merge pull request #2 from DalgoT4D/sensible-defaults
Browse files Browse the repository at this point in the history
defaults for various filenames
  • Loading branch information
fatchat authored Sep 17, 2023
2 parents 494c5f7 + 3e495f6 commit 10b03e9
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 87 deletions.
15 changes: 5 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,15 @@ Example usage:

1.

python scaffolddbt.py --project-name shridbt \
--project-dir ../shridbt
python scaffolddbt.py --project-dir ../shridbt \
--project-name shridbt
2.

python syncsources.py --schema staging \
--source-name shrikobo \
--output-sources-file ../shridbt/models/staging/sources.yml
python syncsources.py --project-dir ../shridbt \
--source-name shrikobo

3.

python mknormalized.py --project-dir ../shridbt/ \
--input-schema staging \
--output-schema intermediate \
--output-schema-schemafile schema.yml \
--sources-file sources.yml
python mknormalized.py --project-dir ../shridbt/


100 changes: 51 additions & 49 deletions mknormalized.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import argparse
from logging import basicConfig, getLogger, INFO
from string import Template
from pathlib import Path
import yaml


Expand All @@ -13,24 +14,9 @@
parser = argparse.ArgumentParser()
parser.add_argument("--project-dir", required=True)
parser.add_argument("--input-schema", default="staging", help="e.g. staging")
parser.add_argument("--output-schema", required=True, help="e.g. intermediate")
parser.add_argument("--output-schema-schemafile", required=True, help="e.g. schema.yml")
parser.add_argument(
"--sources-file",
required=True,
help="filename must be relative to <project-dir>/models/<input-schema>/",
)
parser.add_argument("--output-schema", default="intermediate", help="e.g. intermediate")
args = parser.parse_args()

# ================================================================================
# create the output directory
table_normalized_dbtmodel_dir = (
f"{args.project_dir}/models/{args.output_schema}/normalized/"
)
if not os.path.exists(table_normalized_dbtmodel_dir):
os.makedirs(table_normalized_dbtmodel_dir)
logger.info("created directory %s", table_normalized_dbtmodel_dir)


# ================================================================================
NORMALIZED_MODEL_TEMPLATE = Template(
Expand All @@ -52,67 +38,83 @@
)


def mk_normalized_dbtmode(source_name: str, table_name: str) -> str:
def mk_normalized_dbtmodel(source_name: str, table_name: str) -> str:
"""creates a .sql dbt model"""
return NORMALIZED_MODEL_TEMPLATE.substitute(
{"source_name": source_name, "table_name": table_name}
)


# ================================================================================
def process_source(src: dict, output_schema: str):
def process_source(src: dict, output_schema: str, output_dir: str):
"""iterates through the tables in a source and creates their _normalized models"""
models = []
for table in src["tables"]:
logger.info(
"[process_source] source_name=%s table_name=%s", src["name"], table["name"]
)
table_normalized_dbtmodel = mk_normalized_dbtmode(src["name"], table["name"])
table_normalized_dbtmodel = mk_normalized_dbtmodel(src["name"], table["name"])

table_normalized_dbtmodel_filename = (
table_normalized_dbtmodel_dir + src["name"] + "_normalized.sql"
)
table_normalized_dbtmodel_filename = Path(output_dir) / (src["name"] + ".sql")

logger.info("writing %s", table_normalized_dbtmodel_filename)
with open(table_normalized_dbtmodel_filename, "w", encoding="utf-8") as outfile:
outfile.write(table_normalized_dbtmodel)
outfile.close()

return {
"name": src["name"],
"description": "",
"+schema": output_schema,
"columns": [
models.append(
{
"name": "_airbyte_ab_id",
"name": table["name"],
"description": "",
"tests": ["unique", "not_null"],
"+schema": output_schema,
"columns": [
{
"name": "_airbyte_ab_id",
"description": "",
"tests": ["unique", "not_null"],
}
],
}
],
}
)

return models


# ================================================================================
def get_source(filename: str, input_schema: str) -> dict:
"""read the config file containing `sources` keys and return the source
matching the input schema"""
with open(filename, "r", encoding="utf-8") as sources_file:
sources = yaml.safe_load(sources_file)

for src in sources["sources"]:
if src["schema"] == input_schema:
return src

return None


# ================================================================================
sources_filename = f"{args.project_dir}/models/{args.input_schema}/{args.sources_file}"
# create the output directory
output_schema_dir = Path(args.project_dir) / "models" / args.output_schema
if not os.path.exists(output_schema_dir):
os.makedirs(output_schema_dir)
logger.info("created directory %s", output_schema_dir)

output_schema_schema = {
sources_filename = Path(args.project_dir) / "models" / args.input_schema / "sources.yml"
models_filename = Path(args.project_dir) / "models" / args.output_schema / "models.yml"

output_config = {
"version": 2,
"models": [],
}

with open(sources_filename, "r", encoding="utf-8") as sources_file:
sources = yaml.safe_load(sources_file)

for source in sources["sources"]:
if source["schema"] == "staging":
output_schema_schema["models"].append(
process_source(source, args.output_schema)
)
source = get_source(sources_filename, args.input_schema)
if source:
output_config["models"] = process_source(
source, args.output_schema, output_schema_dir
)

output_schema_schemafilename = (
f"{args.project_dir}/models/{args.output_schema}/{args.output_schema_schemafile}"
)
logger.info("writing %s", output_schema_schemafilename)
with open(
output_schema_schemafilename, "w", encoding="utf-8"
) as output_schema_schemafile:
yaml.safe_dump(output_schema_schema, output_schema_schemafile, sort_keys=False)
logger.info("writing %s", models_filename)
with open(models_filename, "w", encoding="utf-8") as models_file:
yaml.safe_dump(output_config, models_file, sort_keys=False)
43 changes: 15 additions & 28 deletions syncsources.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import argparse
from logging import basicConfig, getLogger, INFO
from pathlib import Path
import yaml
import psycopg2

Expand All @@ -14,17 +15,15 @@

parser = argparse.ArgumentParser(
"""
Generates a source.yml containing exactly one source
Generates a sources.yml configuration containing exactly one source
That source will have one or more tables
Ref: https://docs.getdbt.com/reference/source-properties
Database connection parameters are read from syncsources.env
"""
)
parser.add_argument("--schema", required=True, help="e.g. staging")
parser.add_argument("--project-dir", required=True)
parser.add_argument("--source-name", required=True)
parser.add_argument("--input-sources-file")
parser.add_argument(
"--output-sources-file",
help="can be the same as input-sources-file, will overwrite",
)
parser.add_argument("--schema", default="staging", help="e.g. staging")
args = parser.parse_args()


Expand Down Expand Up @@ -132,12 +131,7 @@ def get_connection():


# ================================================================================
def make_source_definitions(
source_name: str,
input_schema: str,
existing_source_definitions_file=None,
output_sources_file=None,
):
def make_source_definitions(source_name: str, input_schema: str, sources_file: str):
"""
reads tables from the input_schema to create a dbt sources.yml
uses the metadata from the existing source definitions, if any
Expand All @@ -157,11 +151,9 @@ def make_source_definitions(
dbsourcedefinitions = mksourcedefinition(source_name, input_schema, tablenames)
logger.info("read sources from database schema %s", input_schema)

if existing_source_definitions_file:
filesourcedefinitions = readsourcedefinitions(existing_source_definitions_file)
logger.info(
"read existing source defs from %s", existing_source_definitions_file
)
if Path(sources_file).exists():
filesourcedefinitions = readsourcedefinitions(sources_file)
logger.info("read existing source defs from %s", sources_file)

else:
filesourcedefinitions = {"version": 2, "sources": []}
Expand All @@ -170,17 +162,12 @@ def make_source_definitions(
filesourcedefinitions, dbsourcedefinitions
)
logger.info("created (new) source definitions")
if output_sources_file:
with open(output_sources_file, "w", encoding="utf-8") as outfile:
yaml.safe_dump(merged_definitions, outfile, sort_keys=False)
logger.info("wrote source definitions to %s", output_sources_file)
else:
logger.info("sources to be written to file:")
logger.info(merged_definitions)
with open(sources_file, "w", encoding="utf-8") as outfile:
yaml.safe_dump(merged_definitions, outfile, sort_keys=False)
logger.info("wrote source definitions to %s", sources_file)


# ================================================================================
if __name__ == "__main__":
make_source_definitions(
args.source_name, args.schema, args.input_sources_file, args.output_sources_file
)
sources_filename = Path(args.project_dir) / "models" / args.schema / "sources.yml"
make_source_definitions(args.source_name, args.schema, sources_filename)

0 comments on commit 10b03e9

Please sign in to comment.