Skip to content

Commit

Permalink
Merge pull request #51 from DalgoT4D/warehouse-client-interface
Browse files Browse the repository at this point in the history
interface for bigquery and postgres warehouses
  • Loading branch information
fatchat authored Dec 10, 2023
2 parents 5c61109 + ccb9359 commit 4476576
Show file tree
Hide file tree
Showing 15 changed files with 92 additions and 13 deletions.
4 changes: 3 additions & 1 deletion dbt_automation/operations/arithmetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

from logging import basicConfig, getLogger, INFO
from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface


basicConfig(level=INFO)
logger = getLogger()


# pylint:disable=unused-argument,logging-fstring-interpolation
def arithmetic(config: dict, warehouse, project_dir: str):
def arithmetic(config: dict, warehouse: WarehouseInterface, project_dir: str):
"""performs arithmetic operations: +/-/*//"""
output_name = config["output_name"]
input_model = config["input_name"]
Expand Down
4 changes: 3 additions & 1 deletion dbt_automation/operations/castdatatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.columnutils import quote_columnname
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface


basicConfig(level=INFO)
logger = getLogger()
Expand All @@ -15,7 +17,7 @@


# pylint:disable=logging-fstring-interpolation
def cast_datatypes(config: dict, warehouse, project_dir: str):
def cast_datatypes(config: dict, warehouse: WarehouseInterface, project_dir: str):
"""generates the model"""
dest_schema = config["dest_schema"]
output_name = config["output_name"]
Expand Down
4 changes: 3 additions & 1 deletion dbt_automation/operations/coalescecolumns.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.columnutils import quote_columnname
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface


basicConfig(level=INFO)
logger = getLogger()


# pylint:disable=unused-argument,logging-fstring-interpolation
def coalesce_columns(config: dict, warehouse, project_dir: str):
def coalesce_columns(config: dict, warehouse: WarehouseInterface, project_dir: str):
"""coalesces columns"""
dest_schema = config["dest_schema"]
output_name = config["output_name"]
Expand Down
3 changes: 2 additions & 1 deletion dbt_automation/operations/concatcolumns.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
from logging import basicConfig, getLogger, INFO
from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.columnutils import quote_columnname
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface


basicConfig(level=INFO)
logger = getLogger()


def concat_columns(config: dict, warehouse, project_dir: str):
def concat_columns(config: dict, warehouse: WarehouseInterface, project_dir: str):
"""This function generates dbt model to concat strings"""
logger.info("here in concat columns")
logger.info("testing")
Expand Down
4 changes: 3 additions & 1 deletion dbt_automation/operations/droprenamecolumns.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@

from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.columnutils import quote_columnname
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface


basicConfig(level=INFO)
logger = getLogger()


# pylint:disable=unused-argument,logging-fstring-interpolation
def drop_columns(config: dict, warehouse, project_dir: str):
def drop_columns(config: dict, warehouse: WarehouseInterface, project_dir: str):
"""drops columns from a model"""
dest_schema = config["dest_schema"]
input_name = config["input_name"]
Expand Down
4 changes: 3 additions & 1 deletion dbt_automation/operations/flattenairbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
from dbt_automation.utils.dbtconfigs import mk_model_config
from dbt_automation.utils.columnutils import make_cleaned_column_names, dedup_list
from dbt_automation.utils.warehouseclient import get_client
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface


basicConfig(level=INFO)
logger = getLogger()


# pylint:disable=logging-fstring-interpolation,unused-argument
def flatten_operation(config: dict, warehouse, project_dir: str):
def flatten_operation(config: dict, warehouse: WarehouseInterface, project_dir: str):
"""
This function does the flatten operation for all sources (raw tables) in the sources.yml.
By default, _airbyte_data field is used to flatten
Expand Down
4 changes: 3 additions & 1 deletion dbt_automation/operations/flattenjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.columnutils import quote_columnname
from dbt_automation.utils.columnutils import make_cleaned_column_names, dedup_list
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface


basicConfig(level=INFO)
logger = getLogger()


# pylint:disable=unused-argument,logging-fstring-interpolation
def flattenjson(config: dict, warehouse, project_dir: str):
def flattenjson(config: dict, warehouse: WarehouseInterface, project_dir: str):
"""
source_schema: name of the input schema
input_name: name of the input model
Expand Down
3 changes: 2 additions & 1 deletion dbt_automation/operations/mergetables.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
from dotenv import load_dotenv

from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface


basicConfig(level=INFO)
logger = getLogger()


# pylint:disable=unused-argument,logging-fstring-interpolation
def union_tables(config, warehouse, project_dir):
def union_tables(config, warehouse: WarehouseInterface, project_dir):
"""generates a dbt model which uses the dbt_utils union_relations macro to union tables"""
tablenames = config["tablenames"]
dest_schema = config["dest_schema"]
Expand Down
3 changes: 2 additions & 1 deletion dbt_automation/operations/regexextraction.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""extract from a regex"""
from dbt_automation.utils.columnutils import quote_columnname
from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface


def regex_extraction(config: dict, warehouse, project_dir: str):
def regex_extraction(config: dict, warehouse: WarehouseInterface, project_dir: str):
"""given a regex and a column name, extract the regex from the column"""
input_name = config["input_name"]
dest_schema = config["dest_schema"]
Expand Down
3 changes: 2 additions & 1 deletion dbt_automation/operations/scaffold.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import subprocess, sys

from dbt_automation.utils.warehouseclient import get_client
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface


basicConfig(level=INFO)
logger = getLogger()


def scaffold(config: dict, warehouse, project_dir: str):
def scaffold(config: dict, warehouse: WarehouseInterface, project_dir: str):
"""scaffolds a dbt project"""
project_name = config["project_name"]
default_schema = config["default_schema"]
Expand Down
3 changes: 2 additions & 1 deletion dbt_automation/operations/syncsources.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# pylint:disable=wrong-import-position
from dbt_automation.utils.sourceschemas import mksourcedefinition
from dbt_automation.utils.warehouseclient import get_client
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface
from dbt_automation.utils.dbtsources import (
readsourcedefinitions,
merge_sourcedefinitions,
Expand All @@ -21,7 +22,7 @@
logger = getLogger()


def sync_sources(config, warehouse, project_dir):
def sync_sources(config, warehouse: WarehouseInterface, project_dir):
"""
reads tables from the input_schema to create a dbt sources.yml
uses the metadata from the existing source definitions, if any
Expand Down
3 changes: 2 additions & 1 deletion dbt_automation/utils/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from google.cloud.exceptions import NotFound
from google.oauth2 import service_account
import json
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface

basicConfig(level=INFO)
logger = getLogger()


class BigQueryClient:
class BigQueryClient(WarehouseInterface):
"""a bigquery client that can be used as a context manager"""

def __init__(self, conn_info=None, location=None):
Expand Down
Empty file.
59 changes: 59 additions & 0 deletions dbt_automation/utils/interfaces/warehouse_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from abc import ABC, abstractmethod


class WarehouseInterface(ABC):
@abstractmethod
def execute(self, statement: str):
pass

@abstractmethod
def get_tables(self, schema: str):
pass

@abstractmethod
def get_schemas(self):
pass

@abstractmethod
def get_table_data(self, schema: str, table: str, limit: int):
pass

@abstractmethod
def get_table_columns(self, schema: str, table: str):
pass

@abstractmethod
def get_columnspec(self, schema: str, table_id: str):
pass

@abstractmethod
def get_json_columnspec(self, schema: str, table: str, column: str):
pass

@abstractmethod
def ensure_schema(self, schema: str):
pass

@abstractmethod
def ensure_table(self, schema: str, table: str, columns: list):
pass

@abstractmethod
def drop_table(self, schema: str, table: str):
pass

@abstractmethod
def insert_row(self, schema: str, table: str, row: dict):
pass

@abstractmethod
def json_extract_op(self, json_column: str, json_field: str, sql_column: str):
pass

@abstractmethod
def close(self):
pass

@abstractmethod
def generate_profiles_yaml_dbt(self, project_name, default_schema):
pass
4 changes: 3 additions & 1 deletion dbt_automation/utils/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
from logging import basicConfig, getLogger, INFO
import psycopg2
import os
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface


basicConfig(level=INFO)
logger = getLogger()


class PostgresClient:
class PostgresClient(WarehouseInterface):
"""a postgres client that can be used as a context manager"""

@staticmethod
Expand Down

0 comments on commit 4476576

Please sign in to comment.