Skip to content

Commit

Permalink
Merge pull request #2 from gyk-jane/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
gyk-jane authored Sep 24, 2024
2 parents f7347fa + 3eab713 commit 0697408
Show file tree
Hide file tree
Showing 25 changed files with 287 additions and 76 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[submodule "suttacentral"]
path = suttacentral
url = https://github.com/suttacentral/suttacentral.git
[submodule "sc-data"]
path = sc-data
url = https://github.com/suttacentral/sc-data
Empty file removed etl_scripts/extract/__init__.py
Empty file.
4 changes: 0 additions & 4 deletions etl_scripts/extract/api_fetch.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import requests
import pandas as pd
from prefect import flow, task

@task(log_prints=True)
def get_menu_data(uid: str) -> dict:
"""
Retrieves menu data from SuttaCentral API based on the provided uid.
Expand All @@ -24,7 +21,6 @@ def get_menu_data(uid: str) -> dict:
menu_data = None
return menu_data

@task(log_prints=True)
def get_suttaplex(basket: str) -> None:
"""
Retrieves suttaplex data from the SuttaCentral API for given basket.
Expand Down
16 changes: 3 additions & 13 deletions etl_scripts/extract/arangodb_fetch.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import gzip
import json
import shutil
import subprocess
from prefect import task, flow
from prefect import task

@task(log_prints=True)
def export_arangodb_data(collections: list, output_dir: str = '/tmp/arangodb-dump') -> None:
"""Export arangodb data from arangodump command inside Docker container.
Expand All @@ -31,7 +29,7 @@ def export_arangodb_data(collections: list, output_dir: str = '/tmp/arangodb-dum
subprocess.run(arangodump_cmd, check=True)

# Copy dump from container to project's data_dumps folder
local_output_path = '/Users/janekim/Developer/tipitaka_db/data_dump'
local_output_path = 'data_dump'
docker_cp_cmd = [
"docker", "cp", f"sc-arangodb:{output_dir}", local_output_path
]
Expand All @@ -43,7 +41,6 @@ def export_arangodb_data(collections: list, output_dir: str = '/tmp/arangodb-dum
except subprocess.CalledProcessError as e:
print(f'Error during ArangoDB data dump: {e}')

@task(log_prints=True)
def extract_gz_file(input_gz_path: str, collection: str) -> list:
"""
Extract a .gz file in-memory and return the JSON content as a list of dictionaries.
Expand Down Expand Up @@ -80,12 +77,5 @@ def extract_gz_file(input_gz_path: str, collection: str) -> list:
except Exception as e:
print(f"Error extracting {input_gz_path}: {e}")
return None

if __name__ == "__main__":
html_in = 'data_dump/arangodb-dump/html_text_8a00c848c7b3360945795d3bc52ebe88.data.json.gz'
sc_bilara_in = 'data_dump/arangodb-dump/sc_bilara_texts_ede6cd7605f17ff53d131a783fb228e9.data.json.gz'
export_arangodb_data()
extract_gz_file(html_in)
extract_gz_file(sc_bilara_in)
print('successful')


73 changes: 73 additions & 0 deletions etl_scripts/generate_to_sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import sqlite3
import etl_scripts.util as util

def generate_to_sqlite(sqlite_filepath: str='data_dump/PaliCanon.db',
schema: str='dev_erd'):
"""Copies schema (and it's tables) from PostgreSQL to a sqlite PaliCanon.db
Args:
sqlite_filepath (str, optional): File path of target (sqlite db).
Defaults to 'data_dump/PaliCanon.db'.
schema (str, optional): Schema from PostreSQL db to be copied.
Defaults to 'dev_erd'.
"""
# Connect to postgres db
pg_conn = util.connect_to_db()

# Connect to sqlite db
sqlite_conn = sqlite3.connect(sqlite_filepath)

# Get all schema tables from postgres db
pg_cur = pg_conn.cursor()
pg_cur.execute(f"""
select table_name
from information_schema.tables
where table_schema = {schema}
""")
tables = pg_cur.fetchall()

# Loop through all tables and copy data from postgres to sqlite
for table in tables:
table_name = table[0]

# Create same table in sqlite
pg_cur.execute(f"""
select column_name, data_type
from information_schema.columns
where table_name = '{table_name}'
and table_schema = '{schema}'
""")
columns = pg_cur.fetchall()

column_defs = []
for column_name, data_type in columns:
sqlite_type = 'TEXT'
if data_type in ['integer', 'bigint']:
sqlite_type = 'INTEGER'
elif data_type in ['numeric', 'real', 'double precision']:
sqlite_type = 'REAL'
column_defs.append(f'{column_name} {sqlite_type}')

create_table_sql = f'create table {table_name} ({', '.join(column_defs)})'
sqlite_conn.execute(create_table_sql)

# Fetch data from postgres
pg_cur.execute(f'select * from {schema}."{table_name}"')
rows = pg_cur.fetchall()

# Insert data into sqlite
insert_sql = f"""
insert into {table_name}
values ({', '.join(['?' for _ in range(len(columns))])})
"""
sqlite_conn.executemany(insert_sql, rows)

sqlite_conn.commit()
pg_cur.close()
pg_conn.close()
sqlite_conn.close()

print("Data transferred to SQLite")

if __name__ == '__main__':
generate_to_sqlite()
Empty file removed etl_scripts/load/__init__.py
Empty file.
27 changes: 10 additions & 17 deletions etl_scripts/load/arangodb_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import subprocess
import platform
import time
import os
from prefect import task, flow

@task(log_prints=True)
def is_docker_running():
"""Check if Docker daemon is running."""
try:
Expand All @@ -13,7 +13,6 @@ def is_docker_running():
except subprocess.CalledProcessError:
return False

@task(log_prints=True)
def start_docker():
"""Start Docker depending on the OS."""
system = platform.system()
Expand All @@ -40,8 +39,8 @@ def start_docker():
print(f"Error starting Docker: {e}")
return False

def start_suttacentral():
"""Start suttacentral service."""
def start_suttacentral_docker():
"""Start suttacentral Docker service."""
if not is_docker_running():
start_docker()

Expand All @@ -52,8 +51,8 @@ def start_suttacentral():
except subprocess.CalledProcessError as e:
print(f"Error starting suttacentral service: {e}")

@task(log_prints=True)
def start_sc_arangodb():
"""Start sc_arangodb service"""
if not is_docker_running():
start_docker()

Expand All @@ -65,14 +64,11 @@ def start_sc_arangodb():
except subprocess.CalledProcessError as e:
print(f"Error starting sc-arangodb service: {e}")

@task(log_prints=True)
def pull_suttacentral_repo():
"""Pull latest suttacentral github repo"""
subprocess.run(["git", "checkout", "main"], cwd='suttacentral', check=True)
subprocess.run(["git", "pull", "origin", "main"], cwd='suttacentral', check=True)
print("SuttaCentral repository updated.")
def pull_submodules():
"""Update all submodules to their latest commits"""
subprocess.run(["git", "submodule", "update", "--remote", "--merge"], check=True)
print("All submodules updated.")

@task(log_prints=True)
def refresh_arangodb():
"""Pull the latest updates and refresh ArangoDB."""
try:
Expand All @@ -83,10 +79,10 @@ def refresh_arangodb():
return

# Start the suttacentral service if it's not running
start_suttacentral()
start_suttacentral_docker()

# Load new data into ArangoDB
bash_script_path = '/Users/janekim/Developer/tipitaka_db/etl_scripts/util/run_suttacentral.sh'
bash_script_path = 'etl_scripts/util/run_suttacentral.sh'
subprocess.run([bash_script_path], cwd='suttacentral', check=True, shell=True)
print("Bash script executed successfully.")
print("Waiting for Docker containers to initialize...")
Expand All @@ -99,6 +95,3 @@ def refresh_arangodb():

except subprocess.CalledProcessError as e:
print(f"Error during data refresh: {e}")

if __name__ == "__main__":
refresh_arangodb()
6 changes: 3 additions & 3 deletions etl_scripts/load/load.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import json
from prefect import task

@task(log_prints=True)
def generate_create_sql(json_data, schema, table_name) -> str:
def generate_create_sql(json_data: json, schema: str, table_name: str) -> str:
"""
Generates a CREATE TABLE statement dynamically from all keys in the given JSON data.
Args:
json_data (json): JSON file from API get request
schema (string): Name of schema target table should be placed in
table_name (string): Name of target table in Postgres
Returns:
Expand Down Expand Up @@ -40,13 +40,13 @@ def generate_create_sql(json_data, schema, table_name) -> str:

return sql

@task(log_prints=True)
def insert_to_db(data, conn, schema, table_name) -> None:
"""Inserts data into PostgreSQL
Args:
data (json): Data to be ingested
conn (psycopg2 object): Connection to data warehouse
schema (string): Name of schema target table should be placed in
table_name (string): Target table name
"""
with conn.cursor() as cur:
Expand Down
Empty file.
29 changes: 21 additions & 8 deletions etl_scripts/orchestration/extract_and_load_flow.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
from pathlib import Path
from prefect import task, flow
from prefect import flow
from etl_scripts.util import connect_to_db
from etl_scripts.extract import api_fetch, arangodb_fetch
from etl_scripts.load.arangodb_helpers import start_suttacentral
from etl_scripts.load.arangodb_helpers import start_suttacentral_docker
from etl_scripts.load.load import generate_create_sql, insert_to_db


@flow(log_prints=True)
def extract_suttaplex_flow(schema: str, basket: str):
"""Extract suttaplex data from SuttaCentral API.
Args:
schema (str): PostgreSQL schema in which data should be placed
basket (str): Pali canon basket to be extracted
"""
conn = connect_to_db()

json_data = api_fetch.get_suttaplex(basket)
Expand All @@ -29,12 +35,16 @@ def extract_suttaplex_flow(schema: str, basket: str):
conn.close()

@flow(log_prints=True)
def extract_arangodb_flow(schema: str):
conn = connect_to_db()
start_suttacentral()
dump_directory = Path('/Users/janekim/Developer/tipitaka_db/data_dump/arangodb-dump')
def extract_arangodb_flow(schema: str, collections):
"""Extract data from arangodb backend db.
collections = ['sc_bilara_texts', 'html_text', 'super_nav_details_edges']
Args:
schema (str): PostgreSQL schema in which data should be placed
collections (str): Name of arangodb collection to be extracted
"""
conn = connect_to_db()
start_suttacentral_docker()
dump_directory = Path('data_dump/arangodb-dump')

for collection in collections:
# Find the file corresponding to the collection
Expand Down Expand Up @@ -66,10 +76,13 @@ def extract_arangodb_flow(schema: str):

@flow(log_prints=True)
def extract_and_load_flow():
"""Flow for extraction and loading of dev_raw tables
"""
collections = ['sc_bilara_texts', 'html_text', 'super_nav_details_edges']
schema = 'dev_raw'
extract_suttaplex_flow(schema, 'sutta')
extract_suttaplex_flow(schema, 'vinaya')
extract_suttaplex_flow(schema, 'abhidhamma')

extract_arangodb_flow(schema)
extract_arangodb_flow(schema, collections)

2 changes: 1 addition & 1 deletion etl_scripts/orchestration/main_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

from prefect import flow
from etl_scripts.orchestration.extract_and_load_flow import extract_and_load_flow
from etl_scripts.orchestration.extract_and_load_flow import extract_and_load_flow, extract_arangodb_flow
from etl_scripts.orchestration.transform_flow import transform_stage_flow, transform_mart_flow

@flow(log_prints=True)
Expand Down
17 changes: 16 additions & 1 deletion etl_scripts/orchestration/transform_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@
from prefect import flow
from etl_scripts.util import get_postgres_data
from etl_scripts.transform.stage.generate_hierarchy import preprocess_graph, insert_graph_to_postgres
from etl_scripts.transform.stage.get_text_contents import update_translations_in_parallel

@flow(log_prints=True)
def stage_load_hierarchy_table():
"""Create and ingest hierarchical table to dev_stage
"""
edges = get_postgres_data('dev_raw', 'super_nav_details_edges_arangodb')
graph = preprocess_graph(edges)
insert_graph_to_postgres(graph)
print('graph_table created')


@flow(log_prints=True)
def run_dbt(dir: str):
"""Run dbt process to create tables in PostgreSQL.
Args:
dir (str): Directory to be run
"""
project_dir = 'pali_canon_dbt'

print('Current working directory:', os.getcwd())
Expand All @@ -24,10 +31,18 @@ def run_dbt(dir: str):

@flow(log_prints=True)
def transform_stage_flow():
"""Flow to run dbt and transform data to create stage tables
"""
run_dbt('stage')
stage_load_hierarchy_table()

os.chdir('..')
update_translations_in_parallel('dev_stage', 'html_text_arangodb')
update_translations_in_parallel('dev_stage', 'sc_bilara_texts_arangodb')

@flow(log_prints=True)
def transform_mart_flow():
"""Flow to run dbt for mart tables.
"""
run_dbt('mart.erd')

Empty file removed etl_scripts/transform/__init__.py
Empty file.
Empty file.
Loading

0 comments on commit 0697408

Please sign in to comment.