Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Candidatestable #60

Merged
merged 14 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion winterdrp/catalog/base_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def __init__(self,
max_time_ms: float = 10000,
*args,
**kwargs):
super(BaseKowalskiXMatch, self).__init__(*args,**kwargs)
super(BaseKowalskiXMatch, self).__init__(*args, **kwargs)
self.max_time_ms = max_time_ms
self.kowalski = kowalski

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
CREATE TABLE IF NOT EXISTS candidates (
candid INT PRIMARY KEY,
candid BIGINT PRIMARY KEY,
name VARCHAR(15),
ra REAL,
dec REAL,
fwhm REAL,
jd REAL,
fid INT,
diffimg VARCHAR(255),
sciimg VARCHAR(255),
refimg VARCHAR(255),
psfmag REAL,
diffimname VARCHAR(255),
sciimname VARCHAR(255),
refimname VARCHAR(255),
magpsf REAL,
sigmapsf REAL,
chipsf REAL,
aimage REAL,
Expand Down
43 changes: 38 additions & 5 deletions winterdrp/pipelines/wirc_imsub/wirc_imsub_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
from winterdrp.processors.reference import Reference
from winterdrp.processors.zogy.zogy import ZOGY, ZOGYPrepare
from winterdrp.processors.candidates.candidate_detector import DetectCandidates
import numpy as np
from astropy.io import fits, ascii
import os
from astropy.time import Time
import logging

from winterdrp.processors.alert_packets.avro_alert import AvroPacketMaker
Expand All @@ -23,7 +21,10 @@
from winterdrp.catalog.kowalski import TMASS, PS1
from winterdrp.processors.xmatch import XMatch
from winterdrp.pipelines.wirc.wirc_pipeline import load_raw_wirc_image

from winterdrp.processors.database.database_importer import DatabaseHistoryImporter
from winterdrp.processors.database.postgres import get_colnames_from_schema
from winterdrp.processors.candidates.namer import CandidateNamer
from winterdrp.processors.database.database_exporter import DatabaseDataframeExporter
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -104,14 +105,16 @@ class WircImsubPipeline(Pipeline):
]
batch_split_keys = ["UTSHUT"]

candidates_db_columns = get_colnames_from_schema('winterdrp/pipelines/wirc_imsub/wirc_imsub_files/schema'
'/candidates.sql')
pipeline_configurations = {
None: [
ImageLoader(
input_sub_dir="raw",
load_image=load_raw_wirc_image
),
# ImageBatcher(split_key='UTSHUT'),
ImageSelector((base_name_key, "ZTF21aagppzg_J_stack_1_20210330.fits")),
ImageSelector((base_name_key, "ZTF21aagppzg_J_stack_1_20210702.fits")),
Reference(
ref_image_generator=wirc_reference_image_generator,
ref_swarp_resampler=wirc_reference_image_resampler,
Expand Down Expand Up @@ -150,13 +153,43 @@ class WircImsubPipeline(Pipeline):
num_stars=3,
search_radius_arcsec=30
),
# History(),
DataframeWriter(output_dir_name='kowalski'),
DatabaseHistoryImporter(
xmatch_radius_arcsec=2,
time_field_name='jd',
history_duration_days=500,
db_name="wirc",
db_user=os.environ.get('DB_USER'),
db_pwd=os.environ.get('DB_PWD'),
db_table='candidates',
db_output_columns=candidates_db_columns,
schema_path='winterdrp/pipelines/wirc_imsub/wirc_imsub_files/schema/candidates.sql',
q3c=False
),
CandidateNamer(
db_name='wirc',
cand_table_name='candidates',
base_name='WIRC',
name_start='aaaaa',
xmatch_radius_arcsec=2
),
DatabaseDataframeExporter(
db_name='wirc',
db_table='candidates',
schema_path='winterdrp/pipelines/wirc_imsub/wirc_imsub_files/schema/candidates.sql'
)
# EdgeCandidatesMask(edge_boundary_size=100)
# FilterCandidates(),
# AvroPacketMaker(output_sub_dir="avro",
# base_name="WNTR",
# broadcast=False,
# save_local=False),
SendToFritz(output_sub_dir="test")
# SendToFritz(output_sub_dir="test")

# base_name="WNTR",
# broadcast=False,
# save_local=False),
# SendToFritz(output_sub_dir="test")
]
}
7 changes: 3 additions & 4 deletions winterdrp/processors/candidates/candidate_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ def generate_candidates_table(self, scorr_catalog_name, sci_resamp_imagename, re
display_ref_ims.append(display_ref_bit)
display_diff_ims.append(display_diff_bit)

det_srcs['SciBitIm'] = display_sci_ims
det_srcs['RefBitIm'] = display_ref_ims
det_srcs['DiffBitIm'] = display_diff_ims
det_srcs['scibitim'] = display_sci_ims
det_srcs['refbitim'] = display_ref_ims
det_srcs['diffbitim'] = display_diff_ims

diff_zp = float(fits.getval(diff_filename, 'TMC_ZP'))
det_srcs['magzpsci'] = diff_zp
Expand Down Expand Up @@ -160,7 +160,6 @@ def generate_candidates_table(self, scorr_catalog_name, sci_resamp_imagename, re
det_srcs['programid'] = fits.getval(sci_resamp_imagename, 'PROGID')
det_srcs['fid'] = fits.getval(sci_resamp_imagename, 'FILTERID')
det_srcs['candid'] = np.array(det_srcs['jd']*100, dtype=int)*10000 + np.arange(len(det_srcs))
det_srcs['name'] = ''
det_srcs = det_srcs.to_pandas()

return det_srcs
Expand Down
131 changes: 129 additions & 2 deletions winterdrp/processors/candidates/namer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,139 @@
import logging
import os
import logging
from winterdrp.processors.base_processor import BaseDataframeProcessor
import numpy as np
import pandas as pd
from winterdrp.processors.database.postgres import execute_query, xmatch_import_db
from astropy.time import Time

logger = logging.getLogger(__name__)


class CandidateNamer(BaseDataframeProcessor):
def __init__(self,
xmatch_radius_arcsec,
db_name: str,
base_name: str,
xmatch_radius_arcsec: float,
name_start: str = 'aaaaa',
cand_table_name: str = 'candidates',
db_user: str = os.environ.get('DB_USER'),
db_pwd: str = os.environ.get('DB_PWD'),
db_name_field: str = 'name',
db_order_field: str = 'candid',
date_field: str = 'jd',
*args,
**kwargs):
super(CandidateNamer, self).__init__(*args, **kwargs)
self.db_name = db_name
self.db_user = db_user
self.db_pwd = db_pwd
self.cand_table_name = cand_table_name
self.db_name_field = db_name_field
self.db_order_field = db_order_field
self.base_name = base_name
self.name_start = name_start
self.date_field = date_field
self.xmatch_radius_arcsec = xmatch_radius_arcsec
pass

@staticmethod
def increment_string(string: str):
'''

Parameters
----------
string

Returns
-------
An incremented string, eg. aaa -> aab, aaz -> aba, azz -> baa, zzz-> aaaa
'''
charpos = len(string) - 1
# will iteratively try to increment characters starting from the last
inctrue = False
newstring = ''
while charpos >= 0:
cref = string[charpos]
if inctrue:
newstring = cref + newstring
charpos -= 1
continue
creford = ord(cref)
# increment each character, if at 'z', increment the next one
if creford + 1 > 122:
newstring = 'a' + newstring
if charpos == 0:
newstring = 'a' + newstring
else:
nextchar = chr(creford + 1)
newstring = nextchar + newstring
inctrue = True
charpos -= 1
continue

return newstring

def get_next_name(self, candjd, lastname=None):
candyear = Time(candjd, format='jd').datetime.year % 1000
if lastname is None:
query = f"""SELECT name FROM {self.cand_table_name} ORDER BY {self.db_order_field} desc LIMIT 1;"""
res = execute_query(query, db_name=self.db_name, db_user=self.db_user, password=self.db_pwd)
if len(res) == 0:
name = self.base_name + str(candyear) + self.name_start
return name
else:
lastname = res[0][0]
logger.debug(res)
lastyear = int(lastname[len(self.base_name):len(self.base_name) + 2])
if candyear != lastyear:
name = self.base_name + str(candyear) + self.name_start
else:
lastname_letters = lastname[len(self.base_name) + 2:]
newname_letters = self.increment_string(lastname_letters)
name = self.base_name + str(candyear) + newname_letters
logger.info(name)

return name


def is_detected_previously(self, ra, dec):
name = xmatch_import_db(db_name=self.db_name,
db_user=self.db_user,
db_password=self.db_pwd,
db_table=self.cand_table_name,
db_output_columns=[self.db_name_field],
num_limit=1,
ra=ra,
dec=dec,
xmatch_radius_arcsec=self.xmatch_radius_arcsec,
db_query_columns=[],
db_comparison_types=[],
output_alias_map=[],
db_accepted_values=[]
)
logger.info(name)
return len(name)>0, name

def _apply_to_candidates(
self,
candidate_table: pd.DataFrame,
) -> pd.DataFrame:
logger.info(candidate_table)
names = []
lastname = None
for ind in range(len(candidate_table)):
cand = candidate_table.loc[ind]
if len(cand['prv_candidates']) > 0:
cand_name = cand['prv_candidates'][0][self.db_name_field]

else:
prv_det, prv_name=self.is_detected_previously(cand['ra'], cand['dec'])
if prv_det:
cand_name = prv_name[0]
else:
cand_name = self.get_next_name(cand[self.date_field], lastname=lastname)
lastname = cand_name
names.append(cand_name)
candidate_table[self.db_name_field] = names
logger.info(candidate_table)
return candidate_table
19 changes: 10 additions & 9 deletions winterdrp/processors/database/base_database_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def __init__(
db_name: str,
db_table: str,
schema_path: str,
db_user: str = os.environ.get('PG_DEFAULT_USER'),
db_password: str = os.environ.get('PG_DEFAULT_PWD'),
db_user: str = os.environ.get('DB_USER'),
db_password: str = os.environ.get('DB_PWD'),
full_setup: bool = False,
schema_dir: str = None,
*args,
Expand All @@ -41,17 +41,17 @@ def db_exists(self):

def make_db(self):
create_db(
db_name=self.db_name,
db_user=self.db_user,
password=self.db_password
db_name=self.db_name
)

def user_exists(self):
return check_if_user_exists(self.db_user)

@staticmethod
def make_user():
return create_new_user
def make_user(self):
return create_new_user(
new_db_user=self.db_user,
new_password=self.db_password
)

def grant_privileges(self):
return grant_privileges(self.db_name, self.db_user)
Expand All @@ -76,7 +76,8 @@ def apply(self, batch):
self.set_up_databases()
self.db_check = True

super(BaseDatabaseProcessor, self).apply(batch)
batch = super(BaseDatabaseProcessor, self).apply(batch)
return batch

def set_up_databases(self):

Expand Down
Loading