From f91cfddcf9262aa99ae7090bd145f942bed673f2 Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Sun, 31 Jul 2022 02:49:05 -0700 Subject: [PATCH 01/14] add function for xmatch and database import, refactor import_db to make it more modular --- winterdrp/processors/database/postgres.py | 118 ++++++++++++++++++++-- 1 file changed, 112 insertions(+), 6 deletions(-) diff --git a/winterdrp/processors/database/postgres.py b/winterdrp/processors/database/postgres.py index cda02c4e5..5c595a5ae 100644 --- a/winterdrp/processors/database/postgres.py +++ b/winterdrp/processors/database/postgres.py @@ -95,7 +95,6 @@ def check_if_user_exists( db_user: str = os.environ.get('PG_DEFAULT_USER'), password: str = os.environ.get('PG_DEFAULT_PWD') ) -> bool: - validate_credentials(db_user, password) with psycopg.connect(f"dbname=postgres user={db_user} password={password}") as conn: @@ -115,7 +114,6 @@ def check_if_db_exists( db_user: str = os.environ.get('PG_DEFAULT_USER'), password: str = os.environ.get('PG_DEFAULT_PWD') ) -> bool: - validate_credentials(db_user, password) with psycopg.connect(f"dbname=postgres user={db_user} password={password}") as conn: @@ -138,7 +136,6 @@ def check_if_table_exists( db_user: str = os.environ.get('PG_DEFAULT_USER'), password: str = os.environ.get('PG_DEFAULT_PWD') ) -> bool: - validate_credentials(db_user=db_user, password=password) with psycopg.connect(f"dbname={db_name} user={db_user} password={password}") as conn: @@ -275,15 +272,40 @@ def export_to_db( return primary_key, primary_key_values +def parse_constraints(db_query_columns, + db_comparison_types, + db_accepted_values + ): + assert len(db_comparison_types) == len(db_accepted_values) + assert np.isin(np.all(np.unique(db_comparison_types), ['=', '<', '>', 'between'])) + constraints = "" + for i, x in enumerate(db_query_columns): + if db_comparison_types[i] == 'between': + assert len(db_accepted_values[i]) == 2 + constraints += f"{x} between {db_accepted_values[i][0]} and {db_accepted_values[i][1]} AND " + else: + constraints += f"{x} {db_comparison_types[i]} {db_accepted_values[i]} AND " + + constraints = constraints[:-4] # strip the last AND + + return constraints + + +def parse_select(): + pass + + def import_from_db( db_name: str, db_table: str, db_query_columns: str | list[str], - db_accepted_values: str | int | float | list[str | float | int], + db_accepted_values: str | int | float | list[str | float | int | list], db_output_columns: str | list[str], output_alias_map: str | list[str], db_user: str = os.environ.get('PG_DEFAULT_USER'), password: str = os.environ.get('PG_DEFAULT_PWD'), + max_num_results: int = None, + db_comparison_types: list[str] = None ) -> list[dict]: """Query an SQL database with constraints, and return a list of dictionaries. One dictionary per entry returned from the query. @@ -325,15 +347,29 @@ def import_from_db( all_query_res = [] - constraints = " AND ".join([f"{x} = {db_accepted_values[i]}" for i, x in enumerate(db_query_columns)]) + if db_comparison_types is None: + db_comparison_types = ['='] * len(db_accepted_values) + assert len(db_comparison_types) == len(db_accepted_values) + assert np.isin(np.all(np.unique(db_comparison_types), ['=', '<', '>', 'between'])) + + constraints = parse_constraints(db_query_columns, + db_comparison_types, + db_accepted_values) + # constraints = " AND ".join([f"{x} {db_comparison_types[i]} {db_accepted_values[i]}" for i, x in enumerate( + # db_query_columns)]) with psycopg.connect(f"dbname={db_name} user={db_user} password={password}") as conn: conn.autocommit = True sql_query = f""" SELECT {', '.join(db_output_columns)} from {db_table} - WHERE {constraints}; + WHERE {constraints} """ + if max_num_results is not None: + sql_query += f" LIMIT {max_num_results}" + + sql_query += f";" + logger.debug(f"Query: {sql_query}") with conn.execute(sql_query) as cursor: @@ -351,3 +387,73 @@ def import_from_db( all_query_res.append(query_res) return all_query_res + + +def execute_query(sql_query, db_name, db_user, password): + with psycopg.connect(f"dbname={db_name} user={db_user} password={password}") as conn: + conn.autocommit = True + logger.debug(f"Query: {sql_query}") + + with conn.execute(sql_query) as cursor: + query_output = cursor.fetchall() + + return query_output + + +def xmatch_import_db(db_name: str, + db_table: str, + db_query_columns: str | list[str], + db_accepted_values: str | int | float | list[str | float | int], + db_output_columns: str | list[str], + output_alias_map: str | list[str], + ra: float, + dec: float, + xmatch_radius_arcsec: float, + ra_field_name: str = 'ra', + dec_field_name: str = 'dec', + query_dist=False, + q3c=False, + db_comparison_types: list[str] = None, + order_field_name: str = None, + order_ascending: bool = True, + num_limit: int = None, + db_user: str = os.environ.get('PG_DEFAULT_USER'), + db_password: str = os.environ.get('PG_DEFAULT_PWD'), + ) -> list[dict]: + xmatch_radius_deg = xmatch_radius_arcsec / 3600.0 + constraints = f"""q3c_radial_query({ra_field_name},{dec_field_name},{ra},{dec},{xmatch_radius_deg}) """ \ + + parse_constraints(db_query_columns, + db_comparison_types, + db_accepted_values) + + select = f"""{', '.join(db_output_columns)}""" + if query_dist: + if q3c: + select = f"""q3c_dist({ra_field_name},{dec_field_name},{ra},{dec}) AS xdist,""" + select + else: + select = f"""{ra_field_name} - ra AS xdist,""" + select + + query = f"""SELECT {select} FROM {db_table} WHERE {constraints}""" + order_seq = ["asc", "desc"][np.sum(order_ascending)] + if order_field_name is not None: + query += f""" ORDER BY {order_field_name}""" + if num_limit is not None: + query += f""" LIMIT {num_limit}""" + + query += ";" + + query_output = execute_query(query, db_name, db_user, db_password) + all_query_res = [] + for entry in query_output: + if not query_dist: + assert len(entry) == len(db_output_columns) + else: + assert len(entry) == len(db_output_columns) + 1 + query_res = dict() + for i, key in enumerate(output_alias_map): + query_res[key] = entry[i] + if query_dist: + query_res['dist'] = entry['xdist'] + all_query_res.append(query_res) + return all_query_res + From c6a31c6d9a9c494d1430cc5763ed467f33e31d7c Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Sun, 31 Jul 2022 02:50:00 -0700 Subject: [PATCH 02/14] add function to get column names from sql schema --- winterdrp/processors/database/postgres.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/winterdrp/processors/database/postgres.py b/winterdrp/processors/database/postgres.py index 5c595a5ae..9df5b262e 100644 --- a/winterdrp/processors/database/postgres.py +++ b/winterdrp/processors/database/postgres.py @@ -457,3 +457,12 @@ def xmatch_import_db(db_name: str, all_query_res.append(query_res) return all_query_res +def get_colnames_from_schema(schema_file): + with open(schema_file,'r') as f: + dat = f.read() + dat = dat.split(');')[0] + dat = dat.split('\n')[1:-1] + pkstrip = [x.strip(',').split('PRIMARY KEY')[0].strip() for x in dat] + fkstrip = [x.strip(',').split('FOREIGN KEY')[0].strip() for x in pkstrip] + colnames = [x.split(' ')[0] for x in fkstrip] + return colnames \ No newline at end of file From 9b5dcb2d3fd41dad833cfc6551dfb384fee516f1 Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Sun, 31 Jul 2022 02:51:26 -0700 Subject: [PATCH 03/14] change name of aperture photometry function --- .../processors/photometry/aperture_photometry.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/winterdrp/processors/photometry/aperture_photometry.py b/winterdrp/processors/photometry/aperture_photometry.py index cc99bd5bb..c67783eec 100644 --- a/winterdrp/processors/photometry/aperture_photometry.py +++ b/winterdrp/processors/photometry/aperture_photometry.py @@ -1,7 +1,6 @@ from winterdrp.processors.base_processor import BaseDataframeProcessor import pandas as pd import numpy as np -from astropy.io import fits from winterdrp.processors.photometry.utils import make_cutouts import matplotlib.pyplot as plt from astropy.stats import sigma_clipped_stats @@ -29,8 +28,8 @@ def __init__(self, self.col_suffix_list = self.aper_diameters @staticmethod - def aperture_photometry(diff_cutout, diff_unc_cutout, aper_diameter, bkg_in_diameter, bkg_out_diameter, - plot=False): + def aper_photometry(diff_cutout, diff_unc_cutout, aper_diameter, bkg_in_diameter, bkg_out_diameter, + plot=False): # w = WCS(header) # x,y = w.all_world2pix(ra,dec,0) @@ -94,22 +93,22 @@ def _apply_to_candidates( for cand_ind in range(len(candidate_table)): row = candidate_table.iloc[cand_ind] - ximage, yimage = int(row['X_IMAGE'])-1, int(row['Y_IMAGE'])-1 + ximage, yimage = int(row['X_IMAGE']) - 1, int(row['Y_IMAGE']) - 1 diff_filename = row['diffimname'] diff_psf_filename = row['diffpsfname'] diff_unc_filename = row['diffuncname'] diff_cutout = make_cutouts(diff_filename, (ximage, yimage), self.cutout_size_aper_phot) diff_unc_cutout = make_cutouts(diff_unc_filename, (ximage, yimage), self.cutout_size_aper_phot) - flux, fluxunc = self.aperture_photometry(diff_cutout, diff_unc_cutout, aper_diam, bkg_in_diameter, - bkg_out_diameter) + flux, fluxunc = self.aper_photometry(diff_cutout, diff_unc_cutout, aper_diam, bkg_in_diameter, + bkg_out_diameter) fluxes.append(flux) fluxuncs.append(fluxunc) candidate_table[f'fluxap{suffix}'] = fluxes candidate_table[f'fluxuncap{suffix}'] = fluxuncs candidate_table[f'magap{suffix}'] = candidate_table['magzpsci'] - \ - 2.5 * np.log10(candidate_table[f'fluxap{suffix}']) + 2.5 * np.log10(candidate_table[f'fluxap{suffix}']) candidate_table[f'sigmagap{suffix}'] = 1.086 * candidate_table[f'fluxuncap{suffix}'] \ - / candidate_table[f'fluxap{suffix}'] + / candidate_table[f'fluxap{suffix}'] return candidate_table From 86772bfcb548ef5bb9c523ff7ee3119d3700ef90 Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Sun, 31 Jul 2022 02:52:47 -0700 Subject: [PATCH 04/14] first pass for xmatch and history queries to tables on a local db --- .../processors/database/database_importer.py | 158 +++++++++++++++++- 1 file changed, 155 insertions(+), 3 deletions(-) diff --git a/winterdrp/processors/database/database_importer.py b/winterdrp/processors/database/database_importer.py index d743c8186..3ba284f60 100644 --- a/winterdrp/processors/database/database_importer.py +++ b/winterdrp/processors/database/database_importer.py @@ -9,13 +9,12 @@ from winterdrp.processors.base_processor import BaseImageProcessor, BaseDataframeProcessor import logging from winterdrp.processors.database.base_database_processor import BaseDatabaseProcessor, DataBaseError -from winterdrp.processors.database.postgres import import_from_db +from winterdrp.processors.database.postgres import import_from_db, xmatch_import_db logger = logging.getLogger(__name__) class BaseDatabaseImporter(BaseDatabaseProcessor, ABC): - base_key = "dbimporter" def __init__( @@ -105,6 +104,159 @@ def get_constraints(self, header) -> list[str]: return accepted_values +def update_dataframe_with_single_match( + candidate_table: pd.DataFrame, + results: list[dict] +) -> pd.DataFrame: + for res in results: + assert len(res) < 1 + + keys = results[0].keys() + for key in keys: + candidate_table[key] = [x[0][key] for x in results] + + return candidate_table + + class DatabaseDataframeImporter(BaseDatabaseImporter, BaseDataframeProcessor): - pass + def __init__(self, + db_output_columns: str | list[str], + output_alias_map: str | list[str] = None, + update_dataframe: Callable[ + [pd.DataFrame, list[list[dict]]], pd.DataFrame] = update_dataframe_with_single_match, + max_num_results: int = None, + *args, **kwargs): + self.db_output_columns = db_output_columns + self.output_alias_map = output_alias_map + self.update_dataframe = update_dataframe + self.max_num_results = max_num_results + super(DatabaseDataframeImporter, self).__init__(*args, **kwargs) + + def _apply_to_candidates( + self, + candidate_table: pd.DataFrame, + ) -> pd.DataFrame: + results = [] + for ind in range(len(candidate_table)): + cand = candidate_table.loc[ind] + query_columns, comparison_values, comparison_types = self.get_constraints(cand) + res = import_from_db( + db_name=self.db_name, + db_table=self.db_table, + db_query_columns=query_columns, + db_accepted_values=comparison_values, + db_output_columns=self.db_output_columns, + output_alias_map=self.output_alias_map, + db_user=self.db_user, + password=self.db_password, + max_num_results=self.max_num_results + ) + + results.append(res) + new_df = self.update_dataframe(candidate_table, results) + return new_df + + def get_constraints(self, cand): + raise NotImplementedError + + +def no_additional_constraints(cand): + return [], [], [] + + +def update_xmatch_dataframe( + candidate_table: pd.DataFrame, + results: list[list[dict]]) -> pd.DataFrame: + assert len(results) == len(candidate_table) + keys = results[0][0].keys() + for num in range(len(results[0])): + for key in keys: + candidate_table[f"{key}{num + 1}"] = [x[num][key] for x in results] + return candidate_table + + +class DatabaseXMatchImporter(DatabaseDataframeImporter): + def __init__(self, + xmatch_radius_arcsec: float, + user_defined_constraints: Callable[[pd.DataFrame], tuple] = no_additional_constraints, + update_dataframe: Callable[ + [pd.DataFrame, list[list[dict]]], pd.DataFrame] = update_xmatch_dataframe, + ra_field_name: str = 'ra', + dec_field_name: str = 'dec', + order_field_name: str = None, + order_ascending: bool = False, + q3c: bool = False, + query_dist: bool = False, + *args, **kwargs): + super(DatabaseXMatchImporter, self).__init__(*args, **kwargs) + self.xmatch_radius_arcsec = xmatch_radius_arcsec + self.ra_field_name = ra_field_name + self.dec_field_name = dec_field_name + self.q3c = q3c + self.user_defined_constraints = user_defined_constraints + self.order_field_name = order_field_name + self.order_ascending = order_ascending + self.query_dist = query_dist + + def get_constraints(self, cand): + query_columns, comparison_types, accepted_values = self.user_defined_constraints(cand) + return query_columns, comparison_types, accepted_values + + def _apply_to_candidates( + self, + candidate_table: pd.DataFrame, + ) -> pd.DataFrame: + results = [] + for ind in range(len(candidate_table)): + cand = candidate_table.loc[ind] + query_columns, comparison_types, accepted_values = self.get_constraints(cand) + res = xmatch_import_db(db_name=self.db_name, + db_table=self.db_table, + db_user=self.db_user, + db_password=self.db_password, + db_output_columns=self.db_output_columns, + output_alias_map=self.output_alias_map, + db_query_columns=query_columns, + db_comparison_types=comparison_types, + db_accepted_values=accepted_values, + ra=cand[self.ra_field_name], + dec=cand[self.dec_field_name], + xmatch_radius_arcsec=self.xmatch_radius_arcsec, + query_dist=self.query_dist, + q3c=self.q3c, + order_field_name=self.order_field_name, + order_ascending=self.order_ascending, + num_limit=self.max_num_results, + ) + results.append(res) + + new_table = self.update_dataframe(candidate_table, results) + return new_table + + +def update_history_dataframe( + candidate_table: pd.DataFrame, + results: list[list[dict]] +): + assert len(results) == len(candidate_table) + candidate_table['prv_candidates'] = results + return candidate_table + + +class DatabaseHistoryImporter(DatabaseXMatchImporter): + def __init__(self, + history_duration_days: float, + time_field_name: str = 'jd', + update_dataframe: Callable[ + [pd.DataFrame, list[list[dict]]], pd.DataFrame] = update_history_dataframe, + *args, **kwargs): + super(DatabaseHistoryImporter, self).__init__(update_dataframe=update_dataframe, *args, **kwargs) + self.history_duration_days = history_duration_days + self.time_field_name = time_field_name + def get_constraints(self, cand): + query_columns, comparison_types, accepted_values = self.user_defined_constraints(cand) + query_columns.append(self.time_field_name) + comparison_types.append('>') + accepted_values.append(cand[self.time_field_name] - self.history_duration_days) + return query_columns, comparison_types, accepted_values From 3c56c9872301daf3e3aa292f5fafaf6ff8ff3f91 Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Sun, 31 Jul 2022 02:54:12 -0700 Subject: [PATCH 05/14] fix formatting --- winterdrp/processors/xmatch.py | 1 - 1 file changed, 1 deletion(-) diff --git a/winterdrp/processors/xmatch.py b/winterdrp/processors/xmatch.py index 6df47f867..c80f67000 100644 --- a/winterdrp/processors/xmatch.py +++ b/winterdrp/processors/xmatch.py @@ -41,7 +41,6 @@ def _apply_to_candidates( if catalog.projection[k] == 1: available_projection_keys += [k] - for key in available_projection_keys: for num in range(self.num_stars): colname = catalog.column_names[key] From 0f7b9c4df2d23731e99472f8f52bfe7e765c684c Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Sun, 31 Jul 2022 02:55:25 -0700 Subject: [PATCH 06/14] reformat --- winterdrp/catalog/base_catalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/winterdrp/catalog/base_catalog.py b/winterdrp/catalog/base_catalog.py index ec0578d45..fdde50a22 100644 --- a/winterdrp/catalog/base_catalog.py +++ b/winterdrp/catalog/base_catalog.py @@ -129,7 +129,7 @@ def __init__(self, max_time_ms: float = 10000, *args, **kwargs): - super(BaseKowalskiXMatch, self).__init__(*args,**kwargs) + super(BaseKowalskiXMatch, self).__init__(*args, **kwargs) self.max_time_ms = max_time_ms self.kowalski = kowalski From f22ea5e36a503a2beabbdc2b33661379874b15b8 Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Sun, 31 Jul 2022 09:10:01 -0700 Subject: [PATCH 07/14] Fixed pg_default_user and db_user issue, fixed bug that apply was not returning anything --- .../database/base_database_processor.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/winterdrp/processors/database/base_database_processor.py b/winterdrp/processors/database/base_database_processor.py index 33cda1d2c..f6a102abf 100644 --- a/winterdrp/processors/database/base_database_processor.py +++ b/winterdrp/processors/database/base_database_processor.py @@ -17,8 +17,8 @@ def __init__( db_name: str, db_table: str, schema_path: str, - db_user: str = os.environ.get('PG_DEFAULT_USER'), - db_password: str = os.environ.get('PG_DEFAULT_PWD'), + db_user: str = os.environ.get('DB_USER'), + db_password: str = os.environ.get('DB_PWD'), full_setup: bool = False, schema_dir: str = None, *args, @@ -41,17 +41,17 @@ def db_exists(self): def make_db(self): create_db( - db_name=self.db_name, - db_user=self.db_user, - password=self.db_password + db_name=self.db_name ) def user_exists(self): return check_if_user_exists(self.db_user) - @staticmethod - def make_user(): - return create_new_user + def make_user(self): + return create_new_user( + new_db_user=self.db_user, + new_password=self.db_password + ) def grant_privileges(self): return grant_privileges(self.db_name, self.db_user) @@ -76,7 +76,8 @@ def apply(self, batch): self.set_up_databases() self.db_check = True - super(BaseDatabaseProcessor, self).apply(batch) + batch = super(BaseDatabaseProcessor, self).apply(batch) + return batch def set_up_databases(self): From 2d6ff713c90271732beff85eb8a9360fb1339e1c Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Sun, 31 Jul 2022 09:10:33 -0700 Subject: [PATCH 08/14] all df columns in lower case --- winterdrp/processors/candidates/candidate_detector.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/winterdrp/processors/candidates/candidate_detector.py b/winterdrp/processors/candidates/candidate_detector.py index 8aa57d12d..773a255e1 100644 --- a/winterdrp/processors/candidates/candidate_detector.py +++ b/winterdrp/processors/candidates/candidate_detector.py @@ -130,9 +130,9 @@ def generate_candidates_table(self, scorr_catalog_name, sci_resamp_imagename, re display_ref_ims.append(display_ref_bit) display_diff_ims.append(display_diff_bit) - det_srcs['SciBitIm'] = display_sci_ims - det_srcs['RefBitIm'] = display_ref_ims - det_srcs['DiffBitIm'] = display_diff_ims + det_srcs['scibitim'] = display_sci_ims + det_srcs['refbitim'] = display_ref_ims + det_srcs['diffbitim'] = display_diff_ims diff_zp = float(fits.getval(diff_filename, 'TMC_ZP')) det_srcs['magzpsci'] = diff_zp @@ -160,7 +160,6 @@ def generate_candidates_table(self, scorr_catalog_name, sci_resamp_imagename, re det_srcs['programid'] = fits.getval(sci_resamp_imagename, 'PROGID') det_srcs['fid'] = fits.getval(sci_resamp_imagename, 'FILTERID') det_srcs['candid'] = np.array(det_srcs['jd']*100, dtype=int)*10000 + np.arange(len(det_srcs)) - det_srcs['name'] = '' det_srcs = det_srcs.to_pandas() return det_srcs From 612900f99d351cd7ac32a8834ac6fd81e3e3469c Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Sun, 31 Jul 2022 09:11:33 -0700 Subject: [PATCH 09/14] fixed update_dataframe initialization --- winterdrp/processors/database/database_importer.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/winterdrp/processors/database/database_importer.py b/winterdrp/processors/database/database_importer.py index 3ba284f60..f5e14db5a 100644 --- a/winterdrp/processors/database/database_importer.py +++ b/winterdrp/processors/database/database_importer.py @@ -175,7 +175,7 @@ def update_xmatch_dataframe( return candidate_table -class DatabaseXMatchImporter(DatabaseDataframeImporter): +class DatabaseXMatchImporter(DatabaseDataframeImporter, BaseDataframeProcessor): def __init__(self, xmatch_radius_arcsec: float, user_defined_constraints: Callable[[pd.DataFrame], tuple] = no_additional_constraints, @@ -197,6 +197,7 @@ def __init__(self, self.order_field_name = order_field_name self.order_ascending = order_ascending self.query_dist = query_dist + self.update_dataframe = update_dataframe def get_constraints(self, cand): query_columns, comparison_types, accepted_values = self.user_defined_constraints(cand) @@ -250,9 +251,11 @@ def __init__(self, update_dataframe: Callable[ [pd.DataFrame, list[list[dict]]], pd.DataFrame] = update_history_dataframe, *args, **kwargs): - super(DatabaseHistoryImporter, self).__init__(update_dataframe=update_dataframe, *args, **kwargs) + super(DatabaseHistoryImporter, self).__init__(*args, **kwargs) self.history_duration_days = history_duration_days self.time_field_name = time_field_name + self.update_dataframe = update_dataframe + logger.info(f'Update db is {self.update_dataframe}') def get_constraints(self, cand): query_columns, comparison_types, accepted_values = self.user_defined_constraints(cand) From c4b7d2a3646c35eabfa338598c403858c35fc18d Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Sun, 31 Jul 2022 09:12:33 -0700 Subject: [PATCH 10/14] fix pg_default_user, db_user issue, change export db to skip the serial db columns, not primary columns --- winterdrp/processors/database/postgres.py | 112 +++++++++++++++------- 1 file changed, 75 insertions(+), 37 deletions(-) diff --git a/winterdrp/processors/database/postgres.py b/winterdrp/processors/database/postgres.py index 9df5b262e..9c8b03914 100644 --- a/winterdrp/processors/database/postgres.py +++ b/winterdrp/processors/database/postgres.py @@ -17,29 +17,41 @@ class DataBaseError(ProcessorError): def validate_credentials( db_user: str, - password: str + password: str, + admin=False ): + if db_user is None: - err = "'db_user' is set as None. Please pass a db_user as an argument, " \ - "or set the environment variable 'PG_DEFAULT_USER'. Using " + user = 'db_user' + env_user_var = 'DB_USER' + if admin: + user = 'admin_db_user' + env_user_var = 'PG_DEFAULT_USER' + err = f"'{user}' is set as None. Please pass a db_user as an argument, " \ + f"or set the environment variable '{env_user_var}'. Using " logger.warning(err) raise DataBaseError(err) if password is None: - err = "'password' is set as None. Please pass a password as an argument, " \ - "or set the environment variable 'PG_DEFAULT_PWD'." + pwd = 'password' + env_pwd_var = 'DB_PWD' + if admin: + pwd = 'db_admin_password' + env_pwd_var = 'PG_DEFAULT_PWD' + err = f"'{pwd}' is set as None. Please pass a password as an argument, " \ + f"or set the environment variable '{env_pwd_var}'." logger.error(err) raise DataBaseError(err) def create_db( db_name: str, - db_user: str = os.environ.get('PG_DEFAULT_USER'), - password: str = os.environ.get('PG_DEFAULT_PWD') ): - validate_credentials(db_user=db_user, password=password) + admin_user = os.environ.get('PG_DEFAULT_USER') + admin_password = os.environ.get('PG_DEFAULT_PWD') + validate_credentials(db_user=admin_user, password=admin_password) - with psycopg.connect(f"dbname=postgres user={db_user} password={password}") as conn: + with psycopg.connect(f"dbname=postgres user={admin_user} password={admin_password}") as conn: conn.autocommit = True sql = f'''CREATE database {db_name}''' conn.execute(sql) @@ -49,8 +61,8 @@ def create_db( def create_table( schema_path: str, db_name: str, - db_user: str = os.environ.get('PG_DEFAULT_USER'), - password: str = os.environ.get('PG_DEFAULT_PWD') + db_user: str, + password: str ): validate_credentials(db_user, password) @@ -59,20 +71,22 @@ def create_table( with open(schema_path, "r") as f: conn.execute(f.read()) + logger.info(f'Created table from schema path {schema_path}') + def create_new_user( new_db_user: str, new_password: str ): - default_user = os.environ.get('PG_DEFAULT_USER') - default_password = os.environ.get('PG_DEFAULT_PWD') + admin_user = os.environ.get('PG_DEFAULT_USER') + admin_password = os.environ.get('PG_DEFAULT_PWD') validate_credentials(new_db_user, new_password) - validate_credentials(db_user=default_user, password=default_password) + validate_credentials(db_user=admin_user, password=admin_password,admin=True) - with psycopg.connect(f"dbname=postgres user={default_user} password={default_password}") as conn: + with psycopg.connect(f"dbname=postgres user={admin_user} password={admin_password}") as conn: conn.autocommit = True - command = f'''CREATE ROLE {new_db_user} WITH password '{new_password}';''' + command = f'''CREATE ROLE {new_db_user} WITH password '{new_password}' LOGIN;''' conn.execute(command) @@ -80,11 +94,11 @@ def grant_privileges( db_name: str, db_user: str ): - default_user = os.environ.get('PG_DEFAULT_USER') - default_password = os.environ.get('PG_DEFAULT_PWD') - validate_credentials(default_user, default_password) + admin_user = os.environ.get('PG_DEFAULT_USER') + admin_password = os.environ.get('PG_DEFAULT_PWD') + validate_credentials(admin_user, admin_password, admin=True) - with psycopg.connect(f"dbname=postgres user={default_user} password={default_password}") as conn: + with psycopg.connect(f"dbname=postgres user={admin_user} password={admin_password}") as conn: conn.autocommit = True command = f'''GRANT ALL PRIVILEGES ON DATABASE {db_name} TO {db_user};''' conn.execute(command) @@ -133,23 +147,23 @@ def check_if_db_exists( def check_if_table_exists( db_name: str, db_table: str, - db_user: str = os.environ.get('PG_DEFAULT_USER'), - password: str = os.environ.get('PG_DEFAULT_PWD') + db_user: str , + password: str ) -> bool: validate_credentials(db_user=db_user, password=password) with psycopg.connect(f"dbname={db_name} user={db_user} password={password}") as conn: conn.autocommit = True - command = '''SELECT datname FROM pg_database;''' + command = '''SELECT table_name FROM information_schema.tables WHERE table_schema='public';''' data = conn.execute(command).fetchall() - existing_db_names = [x[0] for x in data] - logger.debug(f"Found the following databases: {existing_db_names}") + existing_table_names = [x[0] for x in data] + logger.debug(f"Found the following tables: {existing_table_names}") - db_exist_bool = db_name in existing_db_names - logger.info(f"Database table '{db_table}' {['does not exist', 'already exists'][db_exist_bool]}") + table_exist_bool = db_table in existing_table_names + logger.info(f"Database table '{db_table}' {['does not exist', 'already exists'][table_exist_bool]}") - return db_exist_bool + return table_exist_bool def get_foreign_tables_list( @@ -239,10 +253,14 @@ def export_to_db( with conn.execute(sql_query) as cursor: primary_key = [x[0] for x in cursor.fetchall()] - + sequences = [x[0] for x in conn.execute(f"SELECT c.relname FROM pg_class c WHERE c.relkind = 'S';").fetchall()] + seq_tables = np.array([x.split('_')[0] for x in sequences]) + seq_columns = np.array([x.split('_')[1] for x in sequences]) + serial_keys = seq_columns[(seq_tables==db_table)] + logger.debug(serial_keys) colnames = [ desc[0] for desc in conn.execute(f"SELECT * FROM {db_table} LIMIT 1").description - if desc[0] not in primary_key + if desc[0] not in serial_keys ] logger.debug(primary_key) logger.debug(colnames) @@ -253,8 +271,8 @@ def export_to_db( txt = txt.replace(char, '') for c in colnames: - logger.debug(f"{c}, {value_dict[c.upper()]}") - txt += f"'{str(value_dict[c.upper()])}', " + logger.debug(f"{c}, {value_dict[c]}") + txt += f"'{str(value_dict[c])}', " txt = txt + ') ' txt = txt.replace(', )', ')') @@ -277,7 +295,7 @@ def parse_constraints(db_query_columns, db_accepted_values ): assert len(db_comparison_types) == len(db_accepted_values) - assert np.isin(np.all(np.unique(db_comparison_types), ['=', '<', '>', 'between'])) + assert np.all(np.isin(np.unique(db_comparison_types), ['=', '<', '>', 'between'])) constraints = "" for i, x in enumerate(db_query_columns): if db_comparison_types[i] == 'between': @@ -420,11 +438,29 @@ def xmatch_import_db(db_name: str, db_user: str = os.environ.get('PG_DEFAULT_USER'), db_password: str = os.environ.get('PG_DEFAULT_PWD'), ) -> list[dict]: + + if output_alias_map is None: + output_alias_map = {} + for col in db_output_columns: + output_alias_map[col] = col + xmatch_radius_deg = xmatch_radius_arcsec / 3600.0 - constraints = f"""q3c_radial_query({ra_field_name},{dec_field_name},{ra},{dec},{xmatch_radius_deg}) """ \ - + parse_constraints(db_query_columns, - db_comparison_types, - db_accepted_values) + + if q3c: + constraints = f"""q3c_radial_query({ra_field_name},{dec_field_name},{ra},{dec},{xmatch_radius_deg}) """ \ + + else: + ra_min = ra - xmatch_radius_deg + ra_max = ra + xmatch_radius_deg + dec_min = dec - xmatch_radius_deg + dec_max = dec + xmatch_radius_deg + constraints = f""" {ra_field_name} between {ra_min} and {ra_max} AND {dec_field_name} between {dec_min} and {dec_max} """ + + parsed_constraints = parse_constraints(db_query_columns, + db_comparison_types, + db_accepted_values) + if len(parsed_constraints) > 0: + constraints += f"""AND {constraints}""" select = f"""{', '.join(db_output_columns)}""" if query_dist: @@ -444,6 +480,7 @@ def xmatch_import_db(db_name: str, query_output = execute_query(query, db_name, db_user, db_password) all_query_res = [] + for entry in query_output: if not query_dist: assert len(entry) == len(db_output_columns) @@ -457,6 +494,7 @@ def xmatch_import_db(db_name: str, all_query_res.append(query_res) return all_query_res + def get_colnames_from_schema(schema_file): with open(schema_file,'r') as f: dat = f.read() From 90fa1a1330bdaa2f1c109f6f06b3cf545b542750 Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Sun, 31 Jul 2022 09:12:58 -0700 Subject: [PATCH 11/14] working candidate namer --- winterdrp/processors/candidates/namer.py | 131 ++++++++++++++++++++++- 1 file changed, 129 insertions(+), 2 deletions(-) diff --git a/winterdrp/processors/candidates/namer.py b/winterdrp/processors/candidates/namer.py index 842c7c869..a9af4ebfa 100644 --- a/winterdrp/processors/candidates/namer.py +++ b/winterdrp/processors/candidates/namer.py @@ -1,12 +1,139 @@ +import logging +import os +import logging from winterdrp.processors.base_processor import BaseDataframeProcessor import numpy as np +import pandas as pd +from winterdrp.processors.database.postgres import execute_query, xmatch_import_db +from astropy.time import Time + +logger = logging.getLogger(__name__) class CandidateNamer(BaseDataframeProcessor): def __init__(self, - xmatch_radius_arcsec, + db_name: str, + base_name: str, + xmatch_radius_arcsec: float, + name_start: str = 'aaaaa', + cand_table_name: str = 'candidates', + db_user: str = os.environ.get('DB_USER'), + db_pwd: str = os.environ.get('DB_PWD'), + db_name_field: str = 'name', + db_order_field: str = 'candid', + date_field: str = 'jd', *args, **kwargs): super(CandidateNamer, self).__init__(*args, **kwargs) + self.db_name = db_name + self.db_user = db_user + self.db_pwd = db_pwd + self.cand_table_name = cand_table_name + self.db_name_field = db_name_field + self.db_order_field = db_order_field + self.base_name = base_name + self.name_start = name_start + self.date_field = date_field self.xmatch_radius_arcsec = xmatch_radius_arcsec - pass + + @staticmethod + def increment_string(string: str): + ''' + + Parameters + ---------- + string + + Returns + ------- + An incremented string, eg. aaa -> aab, aaz -> aba, azz -> baa, zzz-> aaaa + ''' + charpos = len(string) - 1 + # will iteratively try to increment characters starting from the last + inctrue = False + newstring = '' + while charpos >= 0: + cref = string[charpos] + if inctrue: + newstring = cref + newstring + charpos -= 1 + continue + creford = ord(cref) + # increment each character, if at 'z', increment the next one + if creford + 1 > 122: + newstring = 'a' + newstring + if charpos == 0: + newstring = 'a' + newstring + else: + nextchar = chr(creford + 1) + newstring = nextchar + newstring + inctrue = True + charpos -= 1 + continue + + return newstring + + def get_next_name(self, candjd, lastname=None): + candyear = Time(candjd, format='jd').datetime.year % 1000 + if lastname is None: + query = f"""SELECT name FROM {self.cand_table_name} ORDER BY {self.db_order_field} desc LIMIT 1;""" + res = execute_query(query, db_name=self.db_name, db_user=self.db_user, password=self.db_pwd) + if len(res) == 0: + name = self.base_name + str(candyear) + self.name_start + return name + else: + lastname = res[0][0] + logger.debug(res) + lastyear = int(lastname[len(self.base_name):len(self.base_name) + 2]) + if candyear != lastyear: + name = self.base_name + str(candyear) + self.name_start + else: + lastname_letters = lastname[len(self.base_name) + 2:] + newname_letters = self.increment_string(lastname_letters) + name = self.base_name + str(candyear) + newname_letters + logger.info(name) + + return name + + + def is_detected_previously(self, ra, dec): + name = xmatch_import_db(db_name=self.db_name, + db_user=self.db_user, + db_password=self.db_pwd, + db_table=self.cand_table_name, + db_output_columns=[self.db_name_field], + num_limit=1, + ra=ra, + dec=dec, + xmatch_radius_arcsec=self.xmatch_radius_arcsec, + db_query_columns=[], + db_comparison_types=[], + output_alias_map=[], + db_accepted_values=[] + ) + logger.info(name) + return len(name)>0, name + + def _apply_to_candidates( + self, + candidate_table: pd.DataFrame, + ) -> pd.DataFrame: + logger.info(candidate_table) + names = [] + lastname = None + for ind in range(len(candidate_table)): + cand = candidate_table.loc[ind] + if len(cand['prv_candidates']) > 0: + cand_name = cand['prv_candidates'][0][self.db_name_field] + + else: + prv_det, prv_name=self.is_detected_previously(cand['ra'], cand['dec']) + if prv_det: + cand_name = prv_name[0] + else: + cand_name = self.get_next_name(cand[self.date_field], lastname=lastname) + lastname = cand_name + names.append(cand_name) + candidate_table[self.db_name_field] = names + logger.info(candidate_table) + return candidate_table From 78220e546438c5953cbe48e5875a652b88059a00 Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Sun, 31 Jul 2022 09:13:18 -0700 Subject: [PATCH 12/14] change columns to match dataframe --- .../wirc_imsub/wirc_imsub_files/schema/candidates.sql | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/winterdrp/pipelines/wirc_imsub/wirc_imsub_files/schema/candidates.sql b/winterdrp/pipelines/wirc_imsub/wirc_imsub_files/schema/candidates.sql index 6d17718e3..c6fc18304 100644 --- a/winterdrp/pipelines/wirc_imsub/wirc_imsub_files/schema/candidates.sql +++ b/winterdrp/pipelines/wirc_imsub/wirc_imsub_files/schema/candidates.sql @@ -1,15 +1,15 @@ CREATE TABLE IF NOT EXISTS candidates ( - candid INT PRIMARY KEY, + candid BIGINT PRIMARY KEY, name VARCHAR(15), ra REAL, dec REAL, fwhm REAL, jd REAL, fid INT, - diffimg VARCHAR(255), - sciimg VARCHAR(255), - refimg VARCHAR(255), - psfmag REAL, + diffimname VARCHAR(255), + sciimname VARCHAR(255), + refimname VARCHAR(255), + magpsf REAL, sigmapsf REAL, chipsf REAL, aimage REAL, From 00e36f80d8421673a333e77e2561270cc95b7783 Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Sun, 31 Jul 2022 09:14:07 -0700 Subject: [PATCH 13/14] pipeline with candidates table, history queries and naming --- .../wirc_imsub/wirc_imsub_pipeline.py | 43 ++++++++++++++++--- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/winterdrp/pipelines/wirc_imsub/wirc_imsub_pipeline.py b/winterdrp/pipelines/wirc_imsub/wirc_imsub_pipeline.py index 55cb9c697..48de1e115 100644 --- a/winterdrp/pipelines/wirc_imsub/wirc_imsub_pipeline.py +++ b/winterdrp/pipelines/wirc_imsub/wirc_imsub_pipeline.py @@ -6,10 +6,8 @@ from winterdrp.processors.reference import Reference from winterdrp.processors.zogy.zogy import ZOGY, ZOGYPrepare from winterdrp.processors.candidates.candidate_detector import DetectCandidates -import numpy as np from astropy.io import fits, ascii import os -from astropy.time import Time import logging from winterdrp.processors.alert_packets.avro_alert import AvroPacketMaker @@ -23,7 +21,10 @@ from winterdrp.catalog.kowalski import TMASS, PS1 from winterdrp.processors.xmatch import XMatch from winterdrp.pipelines.wirc.wirc_pipeline import load_raw_wirc_image - +from winterdrp.processors.database.database_importer import DatabaseHistoryImporter +from winterdrp.processors.database.postgres import get_colnames_from_schema +from winterdrp.processors.candidates.namer import CandidateNamer +from winterdrp.processors.database.database_exporter import DatabaseDataframeExporter logger = logging.getLogger(__name__) @@ -104,6 +105,8 @@ class WircImsubPipeline(Pipeline): ] batch_split_keys = ["UTSHUT"] + candidates_db_columns = get_colnames_from_schema('winterdrp/pipelines/wirc_imsub/wirc_imsub_files/schema' + '/candidates.sql') pipeline_configurations = { None: [ ImageLoader( @@ -111,7 +114,7 @@ class WircImsubPipeline(Pipeline): load_image=load_raw_wirc_image ), # ImageBatcher(split_key='UTSHUT'), - ImageSelector((base_name_key, "ZTF21aagppzg_J_stack_1_20210330.fits")), + ImageSelector((base_name_key, "ZTF21aagppzg_J_stack_1_20210702.fits")), Reference( ref_image_generator=wirc_reference_image_generator, ref_swarp_resampler=wirc_reference_image_resampler, @@ -150,13 +153,43 @@ class WircImsubPipeline(Pipeline): num_stars=3, search_radius_arcsec=30 ), + # History(), DataframeWriter(output_dir_name='kowalski'), + DatabaseHistoryImporter( + xmatch_radius_arcsec=2, + time_field_name='jd', + history_duration_days=500, + db_name="wirc", + db_user=os.environ.get('DB_USER'), + db_pwd=os.environ.get('DB_PWD'), + db_table='candidates', + db_output_columns=candidates_db_columns, + schema_path='winterdrp/pipelines/wirc_imsub/wirc_imsub_files/schema/candidates.sql', + q3c=False + ), + CandidateNamer( + db_name='wirc', + cand_table_name='candidates', + base_name='WIRC', + name_start='aaaaa', + xmatch_radius_arcsec=2 + ), + DatabaseDataframeExporter( + db_name='wirc', + db_table='candidates', + schema_path='winterdrp/pipelines/wirc_imsub/wirc_imsub_files/schema/candidates.sql' + ) # EdgeCandidatesMask(edge_boundary_size=100) # FilterCandidates(), # AvroPacketMaker(output_sub_dir="avro", # base_name="WNTR", # broadcast=False, # save_local=False), - SendToFritz(output_sub_dir="test") + # SendToFritz(output_sub_dir="test") + + # base_name="WNTR", + # broadcast=False, + # save_local=False), + # SendToFritz(output_sub_dir="test") ] } From 640204a4c8635f18bc12ebdcc52ab60fc0786e77 Mon Sep 17 00:00:00 2001 From: viraj21197 Date: Mon, 1 Aug 2022 13:45:49 -0700 Subject: [PATCH 14/14] change pg_default to pg_admin for clarity --- winterdrp/processors/database/postgres.py | 42 ++++++++++++----------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/winterdrp/processors/database/postgres.py b/winterdrp/processors/database/postgres.py index 9c8b03914..802caf2ca 100644 --- a/winterdrp/processors/database/postgres.py +++ b/winterdrp/processors/database/postgres.py @@ -10,6 +10,8 @@ schema_dir = os.path.join(os.path.dirname(__file__), "schema") +pg_admin_user_key = 'PG_ADMIN_USER' +pg_admin_pwd_key = 'PG_ADMIN_PWD' class DataBaseError(ProcessorError): pass @@ -26,7 +28,7 @@ def validate_credentials( env_user_var = 'DB_USER' if admin: user = 'admin_db_user' - env_user_var = 'PG_DEFAULT_USER' + env_user_var = pg_admin_user_key err = f"'{user}' is set as None. Please pass a db_user as an argument, " \ f"or set the environment variable '{env_user_var}'. Using " logger.warning(err) @@ -37,7 +39,7 @@ def validate_credentials( env_pwd_var = 'DB_PWD' if admin: pwd = 'db_admin_password' - env_pwd_var = 'PG_DEFAULT_PWD' + env_pwd_var = pg_admin_pwd_key err = f"'{pwd}' is set as None. Please pass a password as an argument, " \ f"or set the environment variable '{env_pwd_var}'." logger.error(err) @@ -47,8 +49,8 @@ def validate_credentials( def create_db( db_name: str, ): - admin_user = os.environ.get('PG_DEFAULT_USER') - admin_password = os.environ.get('PG_DEFAULT_PWD') + admin_user = os.environ.get(pg_admin_user_key) + admin_password = os.environ.get(pg_admin_pwd_key) validate_credentials(db_user=admin_user, password=admin_password) with psycopg.connect(f"dbname=postgres user={admin_user} password={admin_password}") as conn: @@ -78,8 +80,8 @@ def create_new_user( new_db_user: str, new_password: str ): - admin_user = os.environ.get('PG_DEFAULT_USER') - admin_password = os.environ.get('PG_DEFAULT_PWD') + admin_user = os.environ.get(pg_admin_user_key) + admin_password = os.environ.get(pg_admin_pwd_key) validate_credentials(new_db_user, new_password) validate_credentials(db_user=admin_user, password=admin_password,admin=True) @@ -94,8 +96,8 @@ def grant_privileges( db_name: str, db_user: str ): - admin_user = os.environ.get('PG_DEFAULT_USER') - admin_password = os.environ.get('PG_DEFAULT_PWD') + admin_user = os.environ.get(pg_admin_user_key) + admin_password = os.environ.get(pg_admin_pwd_key) validate_credentials(admin_user, admin_password, admin=True) with psycopg.connect(f"dbname=postgres user={admin_user} password={admin_password}") as conn: @@ -106,8 +108,8 @@ def grant_privileges( def check_if_user_exists( user_name: str, - db_user: str = os.environ.get('PG_DEFAULT_USER'), - password: str = os.environ.get('PG_DEFAULT_PWD') + db_user: str = os.environ.get(pg_admin_user_key), + password: str = os.environ.get(pg_admin_pwd_key) ) -> bool: validate_credentials(db_user, password) @@ -125,8 +127,8 @@ def check_if_user_exists( def check_if_db_exists( db_name: str, - db_user: str = os.environ.get('PG_DEFAULT_USER'), - password: str = os.environ.get('PG_DEFAULT_PWD') + db_user: str = os.environ.get(pg_admin_user_key), + password: str = os.environ.get(pg_admin_pwd_key) ) -> bool: validate_credentials(db_user, password) @@ -219,8 +221,8 @@ def get_ordered_schema_list( def create_tables_from_schema( schema_dir: str, db_name: str, - db_user: str = os.environ.get('PG_DEFAULT_USER'), - password: str = os.environ.get('PG_DEFAULT_PWD') + db_user: str = os.environ.get(pg_admin_user_key), + password: str = os.environ.get(pg_admin_pwd_key) ): schema_files = glob(f'{schema_dir}/*.sql') ordered_schema_files = get_ordered_schema_list(schema_files) @@ -233,8 +235,8 @@ def export_to_db( value_dict: dict | astropy.io.fits.Header, db_name: str, db_table: str, - db_user: str = os.environ.get('PG_DEFAULT_USER'), - password: str = os.environ.get('PG_DEFAULT_PWD'), + db_user: str = os.environ.get(pg_admin_user_key), + password: str = os.environ.get(pg_admin_pwd_key), ) -> tuple[str, list]: with psycopg.connect(f"dbname={db_name} user={db_user} password={password}") as conn: conn.autocommit = True @@ -320,8 +322,8 @@ def import_from_db( db_accepted_values: str | int | float | list[str | float | int | list], db_output_columns: str | list[str], output_alias_map: str | list[str], - db_user: str = os.environ.get('PG_DEFAULT_USER'), - password: str = os.environ.get('PG_DEFAULT_PWD'), + db_user: str = os.environ.get(pg_admin_user_key), + password: str = os.environ.get(pg_admin_pwd_key), max_num_results: int = None, db_comparison_types: list[str] = None ) -> list[dict]: @@ -435,8 +437,8 @@ def xmatch_import_db(db_name: str, order_field_name: str = None, order_ascending: bool = True, num_limit: int = None, - db_user: str = os.environ.get('PG_DEFAULT_USER'), - db_password: str = os.environ.get('PG_DEFAULT_PWD'), + db_user: str = os.environ.get(pg_admin_user_key), + db_password: str = os.environ.get(pg_admin_pwd_key), ) -> list[dict]: if output_alias_map is None: