Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

defaults for various filenames #2

Merged
merged 1 commit into from
Sep 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,15 @@ Example usage:

1.

python scaffolddbt.py --project-name shridbt \
--project-dir ../shridbt
python scaffolddbt.py --project-dir ../shridbt \
--project-name shridbt
2.

python syncsources.py --schema staging \
--source-name shrikobo \
--output-sources-file ../shridbt/models/staging/sources.yml
python syncsources.py --project-dir ../shridbt \
--source-name shrikobo

3.

python mknormalized.py --project-dir ../shridbt/ \
--input-schema staging \
--output-schema intermediate \
--output-schema-schemafile schema.yml \
--sources-file sources.yml
python mknormalized.py --project-dir ../shridbt/


100 changes: 51 additions & 49 deletions mknormalized.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import argparse
from logging import basicConfig, getLogger, INFO
from string import Template
from pathlib import Path
import yaml


Expand All @@ -13,24 +14,9 @@
parser = argparse.ArgumentParser()
parser.add_argument("--project-dir", required=True)
parser.add_argument("--input-schema", default="staging", help="e.g. staging")
parser.add_argument("--output-schema", required=True, help="e.g. intermediate")
parser.add_argument("--output-schema-schemafile", required=True, help="e.g. schema.yml")
parser.add_argument(
"--sources-file",
required=True,
help="filename must be relative to <project-dir>/models/<input-schema>/",
)
parser.add_argument("--output-schema", default="intermediate", help="e.g. intermediate")
args = parser.parse_args()

# ================================================================================
# create the output directory
table_normalized_dbtmodel_dir = (
f"{args.project_dir}/models/{args.output_schema}/normalized/"
)
if not os.path.exists(table_normalized_dbtmodel_dir):
os.makedirs(table_normalized_dbtmodel_dir)
logger.info("created directory %s", table_normalized_dbtmodel_dir)


# ================================================================================
NORMALIZED_MODEL_TEMPLATE = Template(
Expand All @@ -52,67 +38,83 @@
)


def mk_normalized_dbtmode(source_name: str, table_name: str) -> str:
def mk_normalized_dbtmodel(source_name: str, table_name: str) -> str:
"""creates a .sql dbt model"""
return NORMALIZED_MODEL_TEMPLATE.substitute(
{"source_name": source_name, "table_name": table_name}
)


# ================================================================================
def process_source(src: dict, output_schema: str):
def process_source(src: dict, output_schema: str, output_dir: str):
"""iterates through the tables in a source and creates their _normalized models"""
models = []
for table in src["tables"]:
logger.info(
"[process_source] source_name=%s table_name=%s", src["name"], table["name"]
)
table_normalized_dbtmodel = mk_normalized_dbtmode(src["name"], table["name"])
table_normalized_dbtmodel = mk_normalized_dbtmodel(src["name"], table["name"])

table_normalized_dbtmodel_filename = (
table_normalized_dbtmodel_dir + src["name"] + "_normalized.sql"
)
table_normalized_dbtmodel_filename = Path(output_dir) / (src["name"] + ".sql")

logger.info("writing %s", table_normalized_dbtmodel_filename)
with open(table_normalized_dbtmodel_filename, "w", encoding="utf-8") as outfile:
outfile.write(table_normalized_dbtmodel)
outfile.close()

return {
"name": src["name"],
"description": "",
"+schema": output_schema,
"columns": [
models.append(
{
"name": "_airbyte_ab_id",
"name": table["name"],
"description": "",
"tests": ["unique", "not_null"],
"+schema": output_schema,
"columns": [
{
"name": "_airbyte_ab_id",
"description": "",
"tests": ["unique", "not_null"],
}
],
}
],
}
)

return models


# ================================================================================
def get_source(filename: str, input_schema: str) -> dict:
"""read the config file containing `sources` keys and return the source
matching the input schema"""
with open(filename, "r", encoding="utf-8") as sources_file:
sources = yaml.safe_load(sources_file)

for src in sources["sources"]:
if src["schema"] == input_schema:
return src

return None


# ================================================================================
sources_filename = f"{args.project_dir}/models/{args.input_schema}/{args.sources_file}"
# create the output directory
output_schema_dir = Path(args.project_dir) / "models" / args.output_schema
if not os.path.exists(output_schema_dir):
os.makedirs(output_schema_dir)
logger.info("created directory %s", output_schema_dir)

output_schema_schema = {
sources_filename = Path(args.project_dir) / "models" / args.input_schema / "sources.yml"
models_filename = Path(args.project_dir) / "models" / args.output_schema / "models.yml"

output_config = {
"version": 2,
"models": [],
}

with open(sources_filename, "r", encoding="utf-8") as sources_file:
sources = yaml.safe_load(sources_file)

for source in sources["sources"]:
if source["schema"] == "staging":
output_schema_schema["models"].append(
process_source(source, args.output_schema)
)
source = get_source(sources_filename, args.input_schema)
if source:
output_config["models"] = process_source(
source, args.output_schema, output_schema_dir
)

output_schema_schemafilename = (
f"{args.project_dir}/models/{args.output_schema}/{args.output_schema_schemafile}"
)
logger.info("writing %s", output_schema_schemafilename)
with open(
output_schema_schemafilename, "w", encoding="utf-8"
) as output_schema_schemafile:
yaml.safe_dump(output_schema_schema, output_schema_schemafile, sort_keys=False)
logger.info("writing %s", models_filename)
with open(models_filename, "w", encoding="utf-8") as models_file:
yaml.safe_dump(output_config, models_file, sort_keys=False)
43 changes: 15 additions & 28 deletions syncsources.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import argparse
from logging import basicConfig, getLogger, INFO
from pathlib import Path
import yaml
import psycopg2

Expand All @@ -14,17 +15,15 @@

parser = argparse.ArgumentParser(
"""
Generates a source.yml containing exactly one source
Generates a sources.yml configuration containing exactly one source
That source will have one or more tables
Ref: https://docs.getdbt.com/reference/source-properties
Database connection parameters are read from syncsources.env
"""
)
parser.add_argument("--schema", required=True, help="e.g. staging")
parser.add_argument("--project-dir", required=True)
parser.add_argument("--source-name", required=True)
parser.add_argument("--input-sources-file")
parser.add_argument(
"--output-sources-file",
help="can be the same as input-sources-file, will overwrite",
)
parser.add_argument("--schema", default="staging", help="e.g. staging")
args = parser.parse_args()


Expand Down Expand Up @@ -132,12 +131,7 @@ def get_connection():


# ================================================================================
def make_source_definitions(
source_name: str,
input_schema: str,
existing_source_definitions_file=None,
output_sources_file=None,
):
def make_source_definitions(source_name: str, input_schema: str, sources_file: str):
"""
reads tables from the input_schema to create a dbt sources.yml
uses the metadata from the existing source definitions, if any
Expand All @@ -157,11 +151,9 @@ def make_source_definitions(
dbsourcedefinitions = mksourcedefinition(source_name, input_schema, tablenames)
logger.info("read sources from database schema %s", input_schema)

if existing_source_definitions_file:
filesourcedefinitions = readsourcedefinitions(existing_source_definitions_file)
logger.info(
"read existing source defs from %s", existing_source_definitions_file
)
if Path(sources_file).exists():
filesourcedefinitions = readsourcedefinitions(sources_file)
logger.info("read existing source defs from %s", sources_file)

else:
filesourcedefinitions = {"version": 2, "sources": []}
Expand All @@ -170,17 +162,12 @@ def make_source_definitions(
filesourcedefinitions, dbsourcedefinitions
)
logger.info("created (new) source definitions")
if output_sources_file:
with open(output_sources_file, "w", encoding="utf-8") as outfile:
yaml.safe_dump(merged_definitions, outfile, sort_keys=False)
logger.info("wrote source definitions to %s", output_sources_file)
else:
logger.info("sources to be written to file:")
logger.info(merged_definitions)
with open(sources_file, "w", encoding="utf-8") as outfile:
yaml.safe_dump(merged_definitions, outfile, sort_keys=False)
logger.info("wrote source definitions to %s", sources_file)


# ================================================================================
if __name__ == "__main__":
make_source_definitions(
args.source_name, args.schema, args.input_sources_file, args.output_sources_file
)
sources_filename = Path(args.project_dir) / "models" / args.schema / "sources.yml"
make_source_definitions(args.source_name, args.schema, sources_filename)