Skip to content

Commit

Permalink
lintify fritzsend
Browse files Browse the repository at this point in the history
  • Loading branch information
robertdstein committed Dec 8, 2022
1 parent 63a7819 commit 7803e0e
Showing 3 changed files with 71 additions and 63 deletions.
27 changes: 15 additions & 12 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion winterdrp/processors/base_processor.py
Original file line number Diff line number Diff line change
@@ -385,6 +385,6 @@ def _apply(self, batch: SourceBatch) -> SourceBatch:

def _apply_to_candidates(
self,
source_list: SourceBatch,
batch: SourceBatch,
) -> SourceBatch:
raise NotImplementedError
105 changes: 55 additions & 50 deletions winterdrp/processors/send_to_fritz.py
Original file line number Diff line number Diff line change
@@ -40,17 +40,20 @@ def __init__(self, *args, **kwargs):
del kwargs["timeout"]
super().__init__(*args, **kwargs)

def send(self, request, **kwargs):
def send(self, request, *args, **kwargs):
try:
timeout = kwargs.get("timeout")
if timeout is None:
kwargs["timeout"] = self.timeout
return super().send(request, **kwargs)
return super().send(request, *args, **kwargs)
except AttributeError:
kwargs["timeout"] = DEFAULT_TIMEOUT


class SendToFritz(BaseDataframeProcessor):

base_key = "fritzsender"

def __init__(
self,
base_name: str,
@@ -60,9 +63,8 @@ def __init__(
stream_id: int,
update_thumbnails: bool = True,
protocol: str = "http",
**kwargs,
):
super(SendToFritz, self).__init__(**kwargs)
super().__init__()
self.token = None
self.group_ids = group_ids
self.base_name = base_name
@@ -77,7 +79,7 @@ def __init__(
self.session = requests.Session()
self.session_headers = {
"Authorization": f"token {self._get_fritz_token()}",
"User-Agent": f"winterdrp",
"User-Agent": "winterdrp",
}

retries = Retry(
@@ -96,10 +98,11 @@ def _get_fritz_token():

if token_fritz is None:
err = (
"No Fritz token specified. Run 'export FRITZ_TOKEN=<token>' to set. "
"The Fritz token will need to be specified manually for Fritz API queries."
"No Fritz token specified. Run 'export FRITZ_TOKEN=<token>' to "
"set. The Fritz token will need to be specified manually "
"for Fritz API queries."
)
logger.warning(err)
logger.error(err)
raise ValueError(err)

return token_fritz
@@ -124,8 +127,8 @@ def _get_author_id():
"No Fritz author id specified. Run 'export FRITZ_AUTHID=<id>' to set. "
"Author id needs to be specified for updates sent by Fritz API queries."
)
logger.warning(err)
raise ValueError
logger.error(err)
raise ValueError(err)

return authid_fritz

@@ -147,10 +150,7 @@ def read_input_df(df: pd.DataFrame):
candidate = {}
for key in df.keys():
try:
if (
type(df.iloc[i].get(key)) is str
or type(df.iloc[i].get(key)) is list
):
if isinstance(df.iloc[i].get(key), (list, str)):
candidate[key] = df.iloc[i].get(key)
else:
# change to native python type
@@ -230,9 +230,11 @@ def alert_post_source(self, alert: dict, group_ids: list[int] = None):
f"Saved {alert['objectId']} {alert['candid']} as a Source on SkyPortal"
)
else:
logger.error(
f"Failed to save {alert['objectId']} {alert['candid']} as a Source on SkyPortal"
err = (
f"Failed to save {alert['objectId']} {alert['candid']} "
f"as a Source on SkyPortal"
)
logger.error(err)
logger.error(response.json())

def alert_post_candidate(self, alert):
@@ -261,7 +263,8 @@ def alert_post_candidate(self, alert):
)
else:
logger.error(
f"Failed to post {alert['objectId']} {alert['candid']} metadata to SkyPortal"
f"Failed to post {alert['objectId']} {alert['candid']} "
f"metadata to SkyPortal"
)
logger.error(response.json())

@@ -278,16 +281,12 @@ def make_thumbnail(self, alert, skyportal_type: str, alert_packet_type: str):
alert = deepcopy(alert)
cutout_data = alert[f"cutout{alert_packet_type}"]

with gzip.open(io.BytesIO(cutout_data), "rb") as f:
with fits.open(io.BytesIO(f.read()), ignore_missing_simple=True) as hdu:
with gzip.open(io.BytesIO(cutout_data), "rb") as cutout:
with fits.open(
io.BytesIO(cutout.read()), ignore_missing_simple=True
) as hdu:
image_data = hdu[0].data

# Survey-specific transformations to get North up and West on the right
# if self.instrument == "ZTF":
# image_data = np.flipud(image_data)
# elif self.instrument == "PGIR":
# image_data = np.rot90(np.fliplr(image_data), 3)

buff = io.BytesIO()
plt.close("all")
fig = plt.figure()
@@ -343,22 +342,26 @@ def alert_post_thumbnails(self, alert):
("sub", "Difference"),
]:
logger.debug(
f"Making {instrument_type} thumbnail for {alert['objectId']} {alert['candid']}",
f"Making {instrument_type} thumbnail for {alert['objectId']} "
f"{alert['candid']}",
)
thumb = self.make_thumbnail(alert, ttype, instrument_type)

logger.debug(
f"Posting {instrument_type} thumbnail for {alert['objectId']} {alert['candid']} to SkyPortal",
f"Posting {instrument_type} thumbnail for {alert['objectId']} "
f"{alert['candid']} to SkyPortal",
)
response = self.api("POST", "https://fritz.science/api/thumbnail", thumb)

if response.json()["status"] == "success":
logger.debug(
f"Posted {alert['objectId']} {alert['candid']} {instrument_type} cutout to SkyPortal"
f"Posted {alert['objectId']} {alert['candid']} "
f"{instrument_type} cutout to SkyPortal"
)
else:
logger.error(
f"Failed to post {alert['objectId']} {alert['candid']} {instrument_type} cutout to SkyPortal"
f"Failed to post {alert['objectId']} {alert['candid']} "
f"{instrument_type} cutout to SkyPortal"
)
logger.error(response.json())

@@ -373,8 +376,7 @@ def upload_thumbnail(self, alert):
"""
fritz_to_cand = {"new": "SciBitIm", "ref": "RefBitIm", "sub": "DiffBitIm"}

for fritz_key in fritz_to_cand.keys():
cand_key = fritz_to_cand[fritz_key]
for fritz_key, cand_key in fritz_to_cand.items():
cutout = alert[cand_key]

buffer = io.BytesIO()
@@ -407,11 +409,13 @@ def upload_thumbnail(self, alert):

if response.json()["status"] == "success":
logger.debug(
f"Posted {alert['objectId']} {alert['candid']} {cand_key} cutout to SkyPortal"
f"Posted {alert['objectId']} {alert['candid']} "
f"{cand_key} cutout to SkyPortal"
)
else:
logger.error(
f"Failed to post {alert['objectId']} {alert['candid']} {cand_key} cutout to SkyPortal"
f"Failed to post {alert['objectId']} {alert['candid']} "
f"{cand_key} cutout to SkyPortal"
)
logger.error(response.json())

@@ -423,9 +427,7 @@ def make_photometry(self, alert, jd_start: float = None):
:param alert: candidate dictionary
:param jd_start: date from which to start photometry from
"""
# print(alert.keys())
alert = deepcopy(alert)
# df_candidate = pd.DataFrame(alert["candidate"], index=[0])
top_level = [
"schemavsn",
"publisher",
@@ -463,13 +465,9 @@ def make_photometry(self, alert, jd_start: float = None):
df_light_curve["magsys"] = "ab"
df_light_curve["mjd"] = df_light_curve["jd"] - 2400000.5

df_light_curve["mjd"] = df_light_curve["mjd"].apply(lambda x: np.float64(x))
df_light_curve["magpsf"] = df_light_curve["magpsf"].apply(
lambda x: np.float32(x)
)
df_light_curve["sigmapsf"] = df_light_curve["sigmapsf"].apply(
lambda x: np.float32(x)
)
df_light_curve["mjd"] = df_light_curve["mjd"].astype(np.float64)
df_light_curve["magpsf"] = df_light_curve["magpsf"].astype(np.float32)
df_light_curve["sigmapsf"] = df_light_curve["sigmapsf"].astype(np.float32)

df_light_curve = (
df_light_curve.drop_duplicates(subset=["mjd", "magpsf"])
@@ -560,11 +558,13 @@ def alert_put_photometry(self, alert):
)
if response.json()["status"] == "success":
logger.debug(
f"Posted {alert['objectId']} photometry stream_id={self.stream_id} to SkyPortal"
f"Posted {alert['objectId']} photometry stream_id={self.stream_id} "
f"to SkyPortal"
)
else:
logger.error(
f"Failed to post {alert['objectId']} photometry stream_id={self.stream_id} to SkyPortal"
f"Failed to post {alert['objectId']} photometry "
f"stream_id={self.stream_id} to SkyPortal"
)
logger.error(response.json())

@@ -630,7 +630,8 @@ def alert_put_annotation(self, alert):
}

logger.debug(
f"Putting annotation for {alert['objectId']} {alert['candid']} to SkyPortal",
f"Putting annotation for {alert['objectId']} {alert['candid']} "
f"to SkyPortal",
)
response = self.api(
"PUT",
@@ -644,10 +645,13 @@ def alert_put_annotation(self, alert):
)
else:
logger.error(
f"Failed to post updated {alert['objectId']} annotation to SkyPortal"
f"Failed to post updated {alert['objectId']} annotation "
f"to SkyPortal"
)
logger.error(response.json())

return None

def alert_skyportal_manager(self, alert):
"""Posts alerts to SkyPortal if criteria is met
@@ -661,7 +665,8 @@ def alert_skyportal_manager(self, alert):
)
is_candidate = response.status_code == 200
logger.debug(
f"{alert['objectId']} {'is' if is_candidate else 'is not'} candidate in SkyPortal"
f"{alert['objectId']} {'is' if is_candidate else 'is not'} "
f"candidate in SkyPortal"
)

# check if source exists in SkyPortal
@@ -733,14 +738,14 @@ def alert_skyportal_manager(self, alert):
logger.debug(f'SendToFritz Manager complete for {alert["objectId"]}')

def make_alert(self, cand_table):
t0 = time.time()
t_0 = time.time()
all_cands = self.read_input_df(cand_table)
num_cands = len(all_cands)

for cand in all_cands:
self.alert_skyportal_manager(cand)

t1 = time.time()
t_1 = time.time()
logger.info(
f"Took {(t1 - t0):.2f} seconds to Fritz process {num_cands} candidates."
f"Took {(t_1 - t_0):.2f} seconds to Fritz process {num_cands} candidates."
)

0 comments on commit 7803e0e

Please sign in to comment.