Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add duckdb build/concepts and use SQLGlot to convert BigQuery SQL into other dialects #1689

Merged
merged 47 commits into from
Feb 20, 2024
Merged
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
463609d
First "working" version (all SQL runs except for a few pivot concepts…
Apr 26, 2023
b7fffcc
Stripped dead code
Apr 26, 2023
ecbfe4d
Simplify pathing, strip more dead code
Apr 26, 2023
0e02dad
Align with shell script version
Apr 26, 2023
10e7f4e
Add requirements.txt
Apr 26, 2023
17ac9f4
Pulled sql out of .sh plus minor related mods
Apr 27, 2023
f777f46
Add table creation/loading
Apr 27, 2023
dc53b7b
Schema support (option) to mirror psql version
Apr 27, 2023
c52d421
Another chunk of dead code
Apr 27, 2023
2d3ef83
Big bug in DatetimeDiff implementation!
Apr 28, 2023
a173fd0
Adding missing fluid_balance views
Apr 28, 2023
f10eeb4
missed deletion
Apr 28, 2023
3aefb39
Rename in line with Postgres
Apr 28, 2023
0645b6c
Adding indexes
Apr 28, 2023
06a7422
Added checks
Apr 28, 2023
212f8bd
Updating README.md
Apr 28, 2023
f7a9729
Missed file rename
Apr 29, 2023
d4b5426
Move fake CHARTEVENTS PK to indexes script -- this may fail on machin…
SphtKr May 1, 2023
6bdcf14
Fixed outright errors
May 2, 2023
71e8d54
Experimental option to use integer or fractional DATETIME_DIFF function
May 8, 2023
2854015
simplify parse to use DATETIME_TRUNC and use unique alias
alistairewj Nov 24, 2023
4166507
explicitly cast seconds to integer
alistairewj Nov 24, 2023
b918099
move duckdb concept file to new python package folder
alistairewj Nov 24, 2023
62f4f59
tidy up readme and update to v2.2
alistairewj Nov 24, 2023
c79355a
init python package to support converting SQL scripts across dialects
alistairewj Nov 24, 2023
ae7a4c0
add step to test mimic_utils
alistairewj Nov 24, 2023
24de255
remove unneeded duckdb concept files
alistairewj Nov 24, 2023
f2f9148
move sqlglot monkey patching to individual modules
alistairewj Nov 24, 2023
3b133f8
reorganize classes to top
alistairewj Nov 27, 2023
12bdf7e
init duckdb transforms
alistairewj Nov 27, 2023
0654e16
add mimic_utils module name to import
alistairewj Nov 27, 2023
084a15f
add duckdb to transpilation
alistairewj Nov 27, 2023
7d54f86
remove semi-colon
alistairewj Nov 27, 2023
dcb8cc0
explicitly cast upper limit of generate series as an integer
alistairewj Nov 27, 2023
eec813b
add derived schema name to scripts by default
alistairewj Nov 27, 2023
1bcceb0
rename subfolder as it contains sqlglot transformations
alistairewj Nov 27, 2023
1ff7f57
update import
alistairewj Nov 27, 2023
9d2d4dc
refactor mimic-iv SQL queries into subfolders of concepts
alistairewj Dec 18, 2023
24d177b
formatting fixes by sqlfluff
alistairewj Dec 18, 2023
7487bbe
update postgres concepts with new transpile method
alistairewj Dec 19, 2023
51c6c4f
add duckdb concepts for mimic-iv
alistairewj Dec 19, 2023
7bb12b8
update README for mimic-iv
alistairewj Dec 23, 2023
9b083be
add help text for the mimic_utils entry points
alistairewj Jan 6, 2024
f1f07b8
switch to hatchling backend and only include mimic_utils package
alistairewj Jan 6, 2024
ad08bae
readme for pypi package
alistairewj Jan 6, 2024
f826074
various typo fixes and clearer language
alistairewj Feb 20, 2024
1dfa41c
add setup python to workflow
alistairewj Feb 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
init python package to support converting SQL scripts across dialects
alistairewj committed Dec 18, 2023

Verified

This commit was signed with the committer’s verified signature.
alistairewj Alistair Johnson
commit c79355a12778fb784eea63dee0090e145aa73609
41 changes: 41 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[build-system]
requires = ["setuptools>=61.0", "setuptools_scm[toml]>=6.2"]
build-backend = "setuptools.build_meta"

[project]
name = "mimic_utils"
version = "1.0.0"
authors = [
{ name="Alistair Johnson", email="aewj@mit.edu" },
]
description = "Utilities to support analyzing the MIMIC database(s)"
readme = "README.md"
license = { file="LICENSE" }
requires-python = ">=3.8"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
]
dependencies = [
"sqlglot",
"pandas",
"numpy",
]

[project.scripts]
mimic_utils = "mimic_utils.__main__:main"

[project.urls]
"Homepage" = "https://github.com/MIT-LCP/mimic-code"
"Bug Tracker" = "https://github.com/MIT-LCP/mimic-code/issues"

[tool.setuptools]
include-package-data = true

[tool.setuptools.packages.find]
where = ["src"]
include = ["mimic_utils"]

[tool.setuptools_scm]
# this section is required to enable the scm plugin
Empty file added src/mimic_utils/__init__.py
Empty file.
31 changes: 31 additions & 0 deletions src/mimic_utils/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from argparse import ArgumentParser

from .transpile import transpile_file, transpile_folder

def main():
parser = ArgumentParser(description="Convert SQL to different dialects.")
subparsers = parser.add_subparsers()

file_parser = subparsers.add_parser('convert_file', help='Transpile a single SQL file.')
file_parser.add_argument("source_file", help="Source file.")
file_parser.add_argument("destination_file", help="Destination file.")
file_parser.add_argument("--source_dialect", choices=["bigquery", "postgres", "duckdb"], default='bigquery', help="SQL dialect to transpile.")
file_parser.add_argument("--destination_dialect", choices=["postgres", "duckdb"], default='postgres', help="SQL dialect to transpile.")
file_parser.set_defaults(func=transpile_file)

folder_parser = subparsers.add_parser('convert_folder', help='Transpile all SQL files in a folder.')
folder_parser.add_argument("source_folder", help="Source folder.")
folder_parser.add_argument("destination_folder", help="Destination folder.")
folder_parser.add_argument("--source_dialect", choices=["bigquery", "postgres", "duckdb"], default='bigquery', help="SQL dialect to transpile.")
folder_parser.add_argument("--destination_dialect", choices=["bigquery", "postgres", "duckdb"], default="postgres", help="SQL dialect to transpile.")
folder_parser.set_defaults(func=transpile_folder)

args = parser.parse_args()
# pop func from args
args = vars(args)
func = args.pop("func")
func(**args)


if __name__ == '__main__':
main()
554 changes: 179 additions & 375 deletions src/mimic_utils/transpile.py
Original file line number Diff line number Diff line change
@@ -1,122 +1,15 @@
import cProfile

import sys
import os
import re
import argparse

import duckdb
import datetime
from pathlib import Path
from typing import Union

#import sqlparse
import sqlglot
import sqlglot.dialects.bigquery
import sqlglot.dialects.duckdb
from sqlglot import exp, generator, parser, tokens, transforms
import sqlglot.dialects.postgres
from sqlglot import Expression, exp, select
from sqlglot.helper import seq_get

from pprint import pprint

concept_name_map = {
'icustay_times': {"path": "demographics/icustay_times.sql"},
'icustay_hours': {"path": "icustay_hours.sql", "db": "duckdb"},
'echo_data': {"path": "echo_data.sql"},
'code_status': {"path": "code_status.sql"},
'weight_durations': {"path": "durations/weight_durations.sql"},
'rrt': {"path": "rrt.sql"},
'heightweight': {"path": "demographics/heightweight.sql"},
'icustay_detail': {"path": "demographics/icustay_detail.sql"},

'ventilation_classification': {"path": "durations/ventilation_classification.sql"},
'ventilation_durations': {"path": "durations/ventilation_durations.sql"},
'crrt_durations': {"path": "durations/crrt_durations.sql"},
'adenosine_durations': {"path": "durations/adenosine_durations.sql"},
'dobutamine_durations': {"path": "durations/dobutamine_durations.sql"},
'dopamine_durations': {"path": "durations/dopamine_durations.sql"},
'epinephrine_durations': {"path": "durations/epinephrine_durations.sql"},
'isuprel_durations': {"path": "durations/isuprel_durations.sql"},
'milrinone_durations': {"path": "durations/milrinone_durations.sql"},
'norepinephrine_durations': {"path": "durations/norepinephrine_durations.sql"},
'phenylephrine_durations': {"path": "durations/phenylephrine_durations.sql"},
'vasopressin_durations': {"path": "durations/vasopressin_durations.sql"},
'vasopressor_durations': {"path": "durations/vasopressor_durations.sql"},

'dobutamine_dose': {"path": "durations/dobutamine_dose.sql"},
'dopamine_dose': {"path": "durations/dopamine_dose.sql"},
'epinephrine_dose': {"path": "durations/epinephrine_dose.sql"},
'norepinephrine_dose': {"path": "durations/norepinephrine_dose.sql"},
'phenylephrine_dose': {"path": "durations/phenylephrine_dose.sql"},
'vasopressin_dose': {"path": "durations/vasopressin_dose.sql"},

'pivoted_vital': {"path": "pivot/pivoted_vital.sql"},
'pivoted_uo': {"path": "pivot/pivoted_uo.sql"},
'pivoted_rrt': {"path": "pivot/pivoted_rrt.sql"},
'pivoted_lab': {"path": "pivot/pivoted_lab.sql"},
'pivoted_invasive_lines': {"path": "pivot/pivoted_invasive_lines.sql"},
'pivoted_icp': {"path": "pivot/pivoted_icp.sql"},
'pivoted_height': {"path": "pivot/pivoted_height.sql"},
'pivoted_gcs': {"path": "pivot/pivoted_gcs.sql"},
'pivoted_fio2': {"path": "pivot/pivoted_fio2.sql"},
'pivoted_bg': {"path": "pivot/pivoted_bg.sql"},
# pivoted_bg_art must be run after pivoted_bg
'pivoted_bg_art': {"path": "pivot/pivoted_bg_art.sql"},
# Difficult error here, the original query seems to reference something non-existent...
# the `pivot` queries are omitted from the Postgres version... we may have to do the same?
# pivoted oasis depends on icustay_hours in demographics
#'pivoted_oasis': {"path": "pivot/pivoted_oasis.sql"},
# Another puzzling error here, duckdb doesn't like something on the `WITH` line!
# pivoted sofa depends on many above pivoted views, ventilation_durations, and dose queries
#'pivoted_sofa': {"path": "pivot/pivoted_sofa.sql"},

'elixhauser_ahrq_v37': {"path": "comorbidity/elixhauser_ahrq_v37.sql"},
'elixhauser_ahrq_v37_no_drg': {"path": "comorbidity/elixhauser_ahrq_v37_no_drg.sql"},
'elixhauser_quan': {"path": "comorbidity/elixhauser_quan.sql"},
'elixhauser_score_ahrq': {"path": "comorbidity/elixhauser_score_ahrq.sql"},
'elixhauser_score_quan': {"path": "comorbidity/elixhauser_score_quan.sql"},

'blood_gas_first_day': {"path": "firstday/blood_gas_first_day.sql"},
'blood_gas_first_day_arterial': {"path": "firstday/blood_gas_first_day_arterial.sql"},
'gcs_first_day': {"path": "firstday/gcs_first_day.sql"},
'labs_first_day': {"path": "firstday/labs_first_day.sql"},
'rrt_first_day': {"path": "firstday/rrt_first_day.sql"},
'urine_output_first_day': {"path": "firstday/urine_output_first_day.sql"},
'ventilation_first_day': {"path": "firstday/ventilation_first_day.sql"},
'vitals_first_day': {"path": "firstday/vitals_first_day.sql"},
'weight_first_day': {"path": "firstday/weight_first_day.sql"},

'urine_output': {"path": "fluid_balance/urine_output.sql"},
'colloid_bolus': {"path": "fluid_balance/colloid_bolus.sql"},
'crystalloid_bolus': {"path": "fluid_balance/crystalloid_bolus.sql"},
'ffp_transfusion': {"path": "fluid_balance/ffp_transfusion.sql"},
'rbc_transfusion': {"path": "fluid_balance/rbc_transfusion.sql"},

'angus': {"path": "sepsis/angus.sql"},
'martin': {"path": "sepsis/martin.sql"},
'explicit': {"path": "sepsis/explicit.sql"},

'ccs_dx': {"path": "diagnosis/ccs_dx.sql", "schema": None}, # explicit None means default schema not schema_name

'kdigo_creatinine': {"path": "organfailure/kdigo_creatinine.sql"},
'kdigo_uo': {"path": "organfailure/kdigo_uo.sql"},
'kdigo_stages': {"path": "organfailure/kdigo_stages.sql"},
'kdigo_stages_7day': {"path": "organfailure/kdigo_stages_7day.sql"},
'kdigo_stages_48hr': {"path": "organfailure/kdigo_stages_48hr.sql"},
'meld': {"path": "organfailure/meld.sql"},

'oasis': {"path": "severityscores/oasis.sql"},
'sofa': {"path": "severityscores/sofa.sql"},
'saps': {"path": "severityscores/saps.sql"},
'sapsii': {"path": "severityscores/sapsii.sql"},
'apsiii': {"path": "severityscores/apsiii.sql"},
'lods': {"path": "severityscores/lods.sql"},
'sirs': {"path": "severityscores/sirs.sql"},

}

# This will contain all the table/view names to put in a namespace...
tables_in_schema = set()

# BigQuery monkey patches
#=== BigQuery monkey patches
sqlglot.dialects.bigquery.BigQuery.Parser.FUNCTIONS["PARSE_DATETIME"] = lambda args: exp.StrToTime(
this=seq_get(args, 1), format=seq_get(args, 0)
)
@@ -125,276 +18,187 @@
)
sqlglot.dialects.bigquery.BigQuery.Parser.STRICT_CAST = False

# DuckDB monkey patches
def duckdb_date_sub_sql(self, expression):
#print("CALLING duckdb._date_sub")
this = self.sql(expression, "this")
unit = self.sql(expression, "unit") or "DAY" # .strip("'")
return f"{this} - {self.sql(exp.Interval(this=expression.expression, unit=unit))}"
sqlglot.dialects.duckdb.DuckDB.Generator.TRANSFORMS[exp.DatetimeSub] = duckdb_date_sub_sql
sqlglot.dialects.duckdb.DuckDB.Generator.TRANSFORMS[exp.DatetimeAdd] = sqlglot.dialects.duckdb._date_add

_unit_ms_conversion_factor_map = {
'SECOND': 1e6,
'MINUTE': 60.0*1e6,
'HOUR': 3600.0*1e6,
'DAY': 24*3600.0*1e6,
'YEAR': 365.242*24*3600.0*1e6,
}
def duckdb_date_diff_whole_sql(self, expression):
#print("CALLING duckdb._date_diff")
#=== PSQL monkey patches
# DATETIME_ADD / DATETIME_SUB -> quote the integer
def date_arithmetic_sql(self: Expression, expression: Expression, operator: str):
"""Render DATE_ADD and DATE_SUB functions as a addition or subtraction of an interval."""
this = self.sql(expression, "this")
unit = self.sql(expression, "unit") or "DAY"
# DuckDB DATE_DIFF operand order is start_time, end_time--not like end_time - start_time!
return f"DATE_DIFF('{unit}', {self.sql(expression.expression)}, {this})"
def duckdb_date_diff_frac_sql(self, expression):
this = self.sql(expression, "this")
mfactor = _unit_ms_conversion_factor_map[self.sql(expression, "unit").upper() or "DAY"]
# DuckDB DATE_DIFF operand order is start_time, end_time--not like end_time - start_time!
return f"DATE_DIFF('microseconds', {self.sql(expression.expression)}, {this})/{mfactor:.1f}"
# only one of these will be used, set later based on arguments!

# This may not be strictly necessary because the views work without
# it IF you `use` the schema first... but making them fully qualified
# makes them work regardless of the current schema.
def _duckdb_rewrite_schema(sql: str, schema: str):
parsed = sqlglot.parse_one(sql, read=sqlglot.dialects.DuckDB)
for table in parsed.find_all(exp.Table):
for identifier in table.find_all(exp.Identifier):
if identifier.this.lower() in tables_in_schema:
sql = sql.replace('"'+identifier.this+'"', schema+'.'+identifier.this.lower())
# The below (unfinished) causes problems because some munging of functions
# occurs in the output. The above approach is kludgy, but works and limits
# the blast radius of potential problems regexping SQL.
"""
def transformer(node):
if isinstance(node, exp.Table): #and node.name == "a":
for id in node.find_all(exp.Identifier):
if id.this.lower() in tables_in_schema:
id.this = schema + '.' + id.this.lower()
#print(id)
return node
return node
transformed_tree = parsed.transform(transformer)
sql = transformed_tree.sql(dialect=sqlglot.dialects.DuckDB)
"""
return sql


def _make_duckdb_query_bigquery(qname: str, qfile: str, conn, schema: str = None):
_multischema_trunc_re = re.compile("\"physionet-data\.mimiciii_\w+\.")
# for psql, we need to quote the number
interval_exp = expression.expression
if isinstance(interval_exp, exp.Literal):
interval_exp = exp.Literal(this=expression.expression.this, is_string=True)
return f"{this} {operator} {self.sql(exp.Interval(this=interval_exp, unit=unit))}"

#TODO: better answer here? should only hit ccs_dx.sql!
_too_many_backslashes_re = re.compile("\\\\([\[\.\]])")

with open(qfile, "r") as fp:
sql = fp.read()
sql = re.sub(_too_many_backslashes_re, '\\$1', sql)
try:
sql_list = sqlglot.transpile(sql, read="bigquery", write="duckdb", pretty=True)
except Exception as e:
print(sql)
raise e
for st in sql_list:
sql = re.sub(_multischema_trunc_re, "\"", st)

if schema is not None:
sql = _duckdb_rewrite_schema(sql, schema)

conn.execute(f"DROP VIEW IF EXISTS {qname}")
try:
conn.execute(f"CREATE VIEW {qname} AS " + sql)
except Exception as e:
print(sql)
#print(repr(sqlglot.parse_one(sql)))
raise e
print(f"CREATED VIEW {qname}")

#print()


def _make_duckdb_query_duckdb(qname: str, qfile: str, conn, schema: str = None):
with open(qfile, "r") as fp:
sql = fp.read()

if schema is not None:
sql = _duckdb_rewrite_schema(sql, schema)

try:
conn.execute(f"CREATE OR REPLACE VIEW {qname} AS " + sql)
except Exception as e:
print(sql)
raise e
print(f"CREATED VIEW {qname}")

# if the interval number is an expression, we multiply it by an interval instead
# e.g. if it is CAST(column AS INT), it becomes CAST(column AS INT) * INTERVAL '1' HOUR
one_interval = exp.Interval(
this=exp.Literal(this="1", is_string=True),
unit=unit
)
return f"{this} {operator} {self.sql(exp.Mul(this=interval_exp, expression=one_interval))}"
sqlglot.dialects.postgres.Postgres.Generator.TRANSFORMS[exp.DatetimeSub] = lambda self, expression: date_arithmetic_sql(self, expression, "-")
sqlglot.dialects.postgres.Postgres.Generator.TRANSFORMS[exp.DatetimeAdd] = lambda self, expression: date_arithmetic_sql(self, expression, "+")

# DATETIME_DIFF / DATE_DIFF -> use EXTRACT(EPOCH ...) with a custom conversion factor
_unit_second_conversion_factor_map = {
'SECOND': 1,
'MINUTE': 60.0,
'HOUR': 3600.0,
'DAY': 24*3600.0,
'YEAR': 365.242*24*3600.0,
}
def date_diff_sql(self: Expression, expression: Expression):
this = self.sql(expression, "this")
mfactor = _unit_second_conversion_factor_map[self.sql(expression, "unit").upper() or "DAY"]
return f"EXTRACT(EPOCH FROM {this} - {self.sql(expression.expression)}) / {mfactor:.1f}"

def main() -> int:
sqlglot.dialects.postgres.Postgres.Generator.TRANSFORMS[exp.DatetimeDiff] = date_diff_sql
sqlglot.dialects.postgres.Postgres.Generator.TRANSFORMS[exp.DateDiff] = date_diff_sql

parser = argparse.ArgumentParser(
prog='buildmimic_duckdb',
description='Creates the MIMIC-III database in DuckDB and optionally the concepts views.',
# DATE_TRUNC -> quote the unit part
def date_trunc_sql(self: Expression, expression: Expression):
this = self.sql(expression, "this")
unit = self.sql(expression, "unit") or "DAY"
return f"DATE_TRUNC('{unit}', {this})"
sqlglot.dialects.postgres.Postgres.Generator.TRANSFORMS[exp.DateTrunc] = date_trunc_sql
sqlglot.dialects.postgres.Postgres.Generator.TRANSFORMS[exp.DatetimeTrunc] = date_trunc_sql

# DATETIME: allow passing either a DATE directly, or multiple arguments
# there isn't a class for the Datetime function, so we have to create it ourself,
# and recast anonymous functions with the name "datetime" to this class
class DateTime(exp.Func):
arg_types = {"this": False, "zone": False, "expressions": False}
is_var_len_args = True

def datetime_sql(self: Expression, expression: Expression):
# https://cloud.google.com/bigquery/docs/reference/standard-sql/datetime_functions#datetime
# BigQuery supports three overloaded arguments to DATETIME, but we will only accept
# (1) the version which accepts integer valued arguments
# (2) the version which accepts a DATE directly (no optional 2nd argument allowed)
if not isinstance(expression.expressions, list):
raise NotImplementedError("Transpile only supports DATETIME(date) OR DATETIME(year, month, day, hour, minute, second)")
if len(expression.expressions) == 1:
# handle the case where we are passing a DATE directly
return f"CAST({self.sql(expression.expressions[0])} AS TIMESTAMP)"

if len(expression.expressions) != 6:
raise NotImplementedError("Transpile only supports DATETIME(date) OR DATETIME(year, month, day, hour, minute, second)")

# we will now map the args for passing to the TO_TIMESTAMP(string, format) PSQL function
args = [self.sql(arg) for arg in expression.expressions]
# pad the arguments with zeros
args = [f"TO_CHAR({arg}, '{'0000' if i == 0 else '00'}')" for i, arg in enumerate(args)]
# concatenate the arguments
args = " || ".join(args)
# convert the concatenated string to a timestamp
return f"TO_TIMESTAMP({args}, 'yyyymmddHH24MISS')"
sqlglot.dialects.postgres.Postgres.Generator.TRANSFORMS[DateTime] = datetime_sql

# GENERATE_ARRAY(exp1, exp2) -> convert to ARRAY(SELECT * FROM generate_series(exp1, exp2))
# https://cloud.google.com/bigquery/docs/reference/standard-sql/array_functions#generate_array
# https://www.postgresql.org/docs/current/functions-srf.html
class GenerateArray(exp.Func):
arg_types = {"this": False, "expressions": False}

class GenerateSeries(exp.Func):
arg_types = {"this": False, "expressions": False}

def generate_array_sql(self: Expression, expression: Expression):
# first create a select statement which selects from generate_series
select_statement = select("*").from_(
GenerateSeries(
expressions=[
expression.expressions[0],
expression.expressions[1],
],
)
parser.add_argument('mimic_data_dir', help="directory that contains csv.tar.gz or csv files")
parser.add_argument('output_db', help="filename for duckdb file (default: mimic3.db)", nargs='?', default="./mimic3.db")
parser.add_argument('--mimic-code-root', help="location of the mimic-code repo (used to find concepts SQL)", default='../../../')
parser.add_argument('--make-concepts', help="generate the concepts views", action="store_true")
parser.add_argument('--skip-tables', help="don't create schema or load data (they must already exist)", action="store_true")
parser.add_argument('--skip-indexes', help="don't create indexes (implied by --skip-tables)", action="store_true")
parser.add_argument('--schema-name', help="put all object (except ccs_dx) into a schema (like the PostgreSQL version)", default=None)
parser.add_argument('--integer-datetime-diff', help="EXPERIMENTAL: calculate integer DATETIME_DIFF results (like BigQuery) for e.g. icustay_detail.los_icu", action="store_true")
args = parser.parse_args()
output_db = args.output_db
mimic_data_dir = args.mimic_data_dir
make_concepts = args.make_concepts
mimic_code_root = args.mimic_code_root
skip_tables = args.skip_tables
skip_indexes = args.skip_indexes
integer_datetime_diff = args.integer_datetime_diff
#TODO: validate schema_name is valid identifier
schema_name = args.schema_name

#EXPERIMENTAL! May be removed.
if integer_datetime_diff:
sqlglot.dialects.duckdb.DuckDB.Generator.TRANSFORMS[exp.DatetimeDiff] = duckdb_date_diff_whole_sql
sqlglot.dialects.duckdb.DuckDB.Generator.TRANSFORMS[exp.DateDiff] = duckdb_date_diff_whole_sql
else:
sqlglot.dialects.duckdb.DuckDB.Generator.TRANSFORMS[exp.DatetimeDiff] = duckdb_date_diff_frac_sql
sqlglot.dialects.duckdb.DuckDB.Generator.TRANSFORMS[exp.DateDiff] = duckdb_date_diff_frac_sql


if not skip_tables:

connection = duckdb.connect(output_db)
print("Connected to duckdb...")

try:
schema_prequel = ""
if schema_name is not None:
connection.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name};")
connection.execute(f"USE {schema_name};")
schema_prequel = f"USE {schema_name};"

print("Creating tables...")
)

# now convert the select statement to an array
return f"ARRAY({self.sql(select_statement)})"
sqlglot.dialects.postgres.Postgres.Generator.TRANSFORMS[GenerateArray] = generate_array_sql

# we need to prevent the wrapping of the table alias in brackets for UNNEST
# e.g. UNNEST(array) AS (alias) -> UNNEST(array) AS alias
def unnest_sql(self: Expression, expression: Expression):
alias = self.sql(expression, "alias")
# remove the brackets
if alias.startswith("(") and alias.endswith(")"):
alias = alias[1:-1]
sql_text = expression.sql()
# substitute the alias
sql_text = sql_text.replace(f' AS {self.sql(expression, "alias")}', f' AS {alias}')
return sql_text
sqlglot.dialects.postgres.Postgres.Generator.TRANSFORMS[exp.Unnest] = unnest_sql

def transpile_query(query: str, source_dialect: str="bigquery", destination_dialect: str="postgres"):
"""
Transpiles the SQL file from BigQuery to the specified dialect.
"""
sql_parsed = sqlglot.parse_one(query, read=source_dialect)

# Remove "physionet-data" as the catalog name
catalog_to_remove = 'physionet-data'
for table in sql_parsed.find_all(exp.Table):
if table.catalog == catalog_to_remove:
table.args['catalog'] = None
elif table.this.name.startswith(catalog_to_remove):
table.args['this'].args['this'] = table.this.name.replace(catalog_to_remove + '.', '')
# sqlglot wants to output the schema/table as a single quoted identifier
# so here we remove the quoting
table.args['this'] = sqlglot.expressions.to_identifier(
name=table.args['this'].args['this'],
quoted=False
)

if source_dialect == 'bigquery':
# BigQuery has a few functions which are not in sqlglot, so we have
# created classes for them, and this loop replaces the anonymous functions
# with the named functions
for anon_function in sql_parsed.find_all(exp.Anonymous):
if anon_function.this == 'DATETIME':
named_function = DateTime(
**anon_function.args,
)
anon_function.replace(named_function)
elif anon_function.this == 'GENERATE_ARRAY':
named_function = GenerateArray(
**anon_function.args,
)
anon_function.replace(named_function)

# convert back to sql
transpiled_query = sql_parsed.sql(dialect=destination_dialect, pretty=True)

with open(os.path.join(mimic_code_root, 'mimic-iii','buildmimic','duckdb','duckdb_add_tables.sql'), 'r') as fp:
sql = fp.read()
connection.execute(sql)

print("Loading data...")

for f in os.listdir(mimic_data_dir):
m = re.match(r'^(.*)\.csv(\.gz)*', f)
if m is not None:
tablename = m.group(1).lower()
tables_in_schema.add(tablename)
tablename = tablename if schema_name is None else schema_name+'.'+tablename
print(f" {tablename}")
connection.execute(f"COPY {tablename} from '{os.path.join(mimic_data_dir,m.group(0))}' (FORMAT CSV, DELIMITER ',', HEADER);")

if not skip_indexes:
print("Adding indexes...")

with open(os.path.join(mimic_code_root, 'mimic-iii','buildmimic','duckdb','duckdb_add_indexes.sql'), 'r') as fp:
sql = fp.read()
connection.execute(sql)
return transpiled_query

print("Running checks...")
def transpile_file(source_file: Union[str, os.PathLike], destination_file: Union[str, os.PathLike], source_dialect: str="bigquery", destination_dialect: str="postgres"):
"""
Reads an SQL file in from file, transpiles it, and outputs it to file.
"""
with open(source_file, "r") as read_file:
sql_query = read_file.read()

with open(os.path.join(mimic_code_root, 'mimic-iii','buildmimic','duckdb','duckdb_checks.sql'), 'r') as fp:
sql = fp.read()
result = connection.execute(sql).fetchall()
for row in result:
print(f"{row[0]}: {row[2]} records ({row[1]} expected) - {row[3]}")

except Exception as error:
print("Failed setting up database: ", error)
raise error
finally:
if connection:
connection.close()
print("duckdb connection is closed")

#TODO: If both --schema-name and --skip-tables are specified, we won't have
# populated tables_in_schema with the data table names... so the views won't
# work... So, here, read the tables already in the destination schema from
# the DB and add those tablenames to tables_in_schema?

if make_concepts:
connection = duckdb.connect(output_db)
print("Connected to duckdb...")

print("Creating tables...")

# ccs_dx is an outlier...this is adapted from the BigQuery version...
ccs_multi_dx_create = """
DROP TABLE IF EXISTS ccs_multi_dx;
CREATE TABLE ccs_multi_dx
(
icd9_code CHAR(5) NOT NULL,
-- CCS levels and names based on position in hierarchy
ccs_level1 VARCHAR(10),
ccs_group1 VARCHAR(100),
ccs_level2 VARCHAR(10),
ccs_group2 VARCHAR(100),
ccs_level3 VARCHAR(10),
ccs_group3 VARCHAR(100),
ccs_level4 VARCHAR(10),
ccs_group4 VARCHAR(100)
);
"""

print("Loading data...")
try:

connection.execute(f"USE main;")

connection.execute(ccs_multi_dx_create)
csvgz_path = os.path.join(mimic_code_root, 'mimic-iii','concepts_postgres','diagnosis','ccs_multi_dx.csv.gz')
connection.execute(f"COPY ccs_multi_dx from '{csvgz_path}' (FORMAT CSV, DELIMITER ',', HEADER);")

except Exception as error:
print("Failed to setup ccs_multi_dx: ", error)
raise error
finally:
if connection:
connection.close()
print("duckdb connection is closed")

connection = duckdb.connect(output_db)

print("Creating views...")
try:

if schema_name is not None:
connection.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name};")
connection.execute(f"USE {schema_name}")

for key in concept_name_map:
if schema_name is not None:
if "schema" not in concept_name_map[key]:
tables_in_schema.add(key.lower())

#cProfile.run('...')
#print(f"Making view {key}...")
db = concept_name_map[key].get("db", "bigquery")
if db == "duckdb":
qpath = os.path.join(mimic_code_root, 'mimic-iii', 'buildmimic', 'duckdb', 'concepts', concept_name_map[key]['path'])
_make_duckdb_query_duckdb(key, qpath, connection, schema=concept_name_map[key].get('schema', schema_name))
elif db == "bigquery":
qpath = os.path.join(mimic_code_root, 'mimic-iii', 'concepts', concept_name_map[key]['path'])
_make_duckdb_query_bigquery(key, qpath, connection, schema=schema_name)

except Exception as error:
print("Failed to execute translated SQL: ", error)
raise error
finally:
if connection:
connection.close()
print("duckdb connection is closed")

if __name__ == '__main__':
sys.exit(main())



transpiled_query = transpile_query(sql_query, source_dialect, destination_dialect)
# add "create" statement based on the file stem
transpiled_query = (
"-- THIS SCRIPT IS AUTOMATICALLY GENERATED. DO NOT EDIT IT DIRECTLY.\n"
f"DROP TABLE IF EXISTS {Path(source_file).stem}; "
f"CREATE TABLE {Path(source_file).stem} AS\n"
) + transpiled_query

with open(destination_file, "w") as write_file:
write_file.write(transpiled_query)

def transpile_folder(source_folder: Union[str, os.PathLike], destination_folder: Union[str, os.PathLike], source_dialect: str="bigquery", destination_dialect: str="postgres"):
"""
Transpiles each file in the folder from BigQuery to the specified dialect.
"""
source_folder = Path(source_folder).resolve()
for filename in source_folder.rglob("*.sql"):
source_file = filename
destination_file = Path(destination_folder).resolve() / filename.relative_to(source_folder)
destination_file.parent.mkdir(parents=True, exist_ok=True)

transpile_file(source_file, destination_file, source_dialect, destination_dialect)