Skip to content

Commit

Permalink
Merge pull request #132 from winter-telescope/errortrack
Browse files Browse the repository at this point in the history
Postprocess Step
  • Loading branch information
robertdstein authored Sep 2, 2022
2 parents 72757ea + 2f7ac50 commit fe0c90f
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 46 deletions.
44 changes: 39 additions & 5 deletions winterdrp/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from winterdrp.monitor.base_monitor import Monitor
from winterdrp.paths import base_raw_dir
from datetime import datetime
from winterdrp.processors.utils import ImageLoader

logger = logging.getLogger(__name__)

Expand All @@ -33,7 +34,13 @@
parser.add_argument(
"-c",
"--config",
default="default",
default=None,
help="Pipeline configuration to be used"
)
parser.add_argument(
"-pc",
"--postprocessconfig",
default=None,
help="Pipeline configuration to be used"
)
parser.add_argument(
Expand Down Expand Up @@ -112,13 +119,18 @@
if night is None:
night = str(datetime.now()).split(" ")[0].replace("-", "")

config = args.config
if config is None:
config = "realtime"

monitor = Monitor(
pipeline=args.pipeline,
night=night,
realtime_configurations=["realtime"],
realtime_configurations=config,
postprocess_configurations=args.postprocessconfig.split(",") if args.postprocessconfig is not None else None,
log_level=args.level,
max_wait_hours=args.maxwaithours,
email_wait_hours=args.emailwaithours,
final_postprocess_hours=args.maxwaithours,
midway_postprocess_hours=args.emailwaithours,
email_sender=args.emailsender,
email_recipients=email_recipients,
raw_dir=args.rawdir
Expand All @@ -145,13 +157,35 @@
if night is None:
night = str(ln).split(" ")[0].replace("-", "")

config = args.config
if config is None:
config = "default"

pipe = get_pipeline(
args.pipeline,
selected_configurations=args.config,
selected_configurations=config,
night=night,
)

batches, errorstack = pipe.reduce_images([[[], []]], catch_all_errors=True)
if args.postprocessconfig is not None:
post_config = [x for x in pipe.set_configuration(config) if isinstance(x, ImageLoader)][:1]
post_config += pipe.postprocess_configuration(
errorstack=errorstack,
selected_configurations=args.postprocessconfig.split(",")
)

protected_key = "_new_postprocess"

pipe.add_configuration(protected_key, post_config)
pipe.set_configuration(protected_key)

_, new_errorstack = pipe.reduce_images(
batches=[[[], []]],
selected_configurations=protected_key,
catch_all_errors=True
)
errorstack += new_errorstack

print(errorstack.summarise_error_stack(verbose=False))

Expand Down
9 changes: 7 additions & 2 deletions winterdrp/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ def __add__(self, other):
self.add_report(report)
return self

def get_all_reports(self) -> list[ErrorReport]:
return self.reports + self.noncritical_reports

def summarise_error_stack(
self,
output_path=None,
Expand All @@ -106,7 +109,7 @@ def summarise_error_stack(
f"raised by winterdrp. \n" \
f"An additional {len(self.noncritical_reports)} non-critical errors were raised. \n" \

all_reports = self.reports + self.noncritical_reports
all_reports = self.get_all_reports()

if len(all_reports) > 0:

Expand All @@ -127,12 +130,14 @@ def summarise_error_stack(
matching_errors = [x for x in all_reports if x.get_error_message() == error_type]

img_paths = []
error_name = None
for x in matching_errors:
img_paths += x.contents
if error_name is None:
error_name = x.get_error_name()
img_paths = list(set(img_paths))

line = error_type.split('\n')[0]
error_name = error_type.split('\n')[1].split("raise ")[1].split("(")[0]
summary += f"Found {error_lines.count(error_type)} counts of error {error_name}, " \
f"affecting {len(img_paths)} images: \n{line}.\n \n"

Expand Down
106 changes: 81 additions & 25 deletions winterdrp/monitor/base_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
import sys
from watchdog.observers import Observer
from winterdrp.pipelines import get_pipeline
from winterdrp.pipelines import get_pipeline, PipelineConfigError
from winterdrp.errors import ErrorStack
from winterdrp.utils.send_email import send_gmail
from winterdrp.paths import get_output_path, raw_img_dir, raw_img_sub_dir
Expand All @@ -20,6 +20,9 @@
from warnings import catch_warnings
import warnings
from astropy.utils.exceptions import AstropyUserWarning
from winterdrp.processors.utils.image_loader import ImageLoader
from winterdrp.processors.csvlog import CSVLog


logger = logging.getLogger(__name__)

Expand All @@ -43,10 +46,11 @@ def __init__(
pipeline: str,
cal_requirements: list[CalRequirement] = None,
realtime_configurations: str | list[str] = "default",
postprocess_configurations: str | list[str] = None,
email_sender: str = None,
email_recipients: str | list = None,
email_wait_hours: float = 24.,
max_wait_hours: float = 48.,
midway_postprocess_hours: float = 16.,
final_postprocess_hours: float = 48.,
log_level: str = "INFO",
raw_dir: str = raw_img_sub_dir
):
Expand All @@ -59,9 +63,12 @@ def __init__(
realtime_configurations = [realtime_configurations]
self.realtime_configurations = realtime_configurations

self.postprocess_configurations = postprocess_configurations

self.pipeline = get_pipeline(pipeline, night=night, selected_configurations=realtime_configurations)

self.raw_image_directory = Path(raw_img_dir(sub_dir=self.pipeline.night_sub_dir, img_sub_dir=raw_dir))
self.sub_dir = raw_dir

if not self.raw_image_directory.exists():
for x in self.raw_image_directory.parents[::-1]:
Expand All @@ -79,21 +86,20 @@ def __init__(
logger.error(err)
raise ValueError(err)

self.max_wait_hours = float(max_wait_hours) * u.hour
logger.info(f"Will terminate after {max_wait_hours} hours.")
self.final_postprocess_hours = float(final_postprocess_hours) * u.hour
logger.info(f"Will terminate after {final_postprocess_hours} hours.")
self.t_start = Time.now()

self.email_wait_hours = float(email_wait_hours) * u.hour

if np.sum(check_email) == 2:
self.midway_postprocess_hours = float(midway_postprocess_hours) * u.hour

if self.email_wait_hours > self.max_wait_hours:
logger.warning(f"Email was set to {self.email_wait_hours}, "
f"but the monitor has a shorter termination period of {self.max_wait_hours}. "
f"Setting email to 95% of max wait.")
self.email_wait_hours = 0.95 * self.max_wait_hours
if self.midway_postprocess_hours > self.final_postprocess_hours:
logger.warning(f"Midway postprocessing was set to {self.midway_postprocess_hours}, "
f"but the monitor has a shorter termination period of {self.final_postprocess_hours}. "
f"Setting to to 95% of max wait.")
self.midway_postprocess_hours = 0.95 * self.final_postprocess_hours

logger.info(f"Will send an email summary after {self.email_wait_hours} hours.")
if np.sum(check_email) == 2:
logger.info(f"Will send an email summary after {self.midway_postprocess_hours} hours.")
self.email_info = (email_sender, email_recipients)
self.email_to_send = True

Expand All @@ -102,7 +108,10 @@ def __init__(
self.email_info = None
self.email_to_send = False

self.processed_science = []
self.midway_postprocess_complete = False
self.latest_csv_log = None

self.processed_science_images = []

# default to "pipeline default cal requirements"

Expand All @@ -125,7 +134,7 @@ def summarise_errors(
):

error_summary = errorstack.summarise_error_stack(verbose=False)
summary = f"Processed a total of {len(self.processed_science)} science images. \n\n" + error_summary + " \n"
summary = f"Processed a total of {len(self.processed_science_images)} science images. \n\n" + error_summary + " \n"

logger.info(f"Writing error log to {self.error_path}")
errorstack.summarise_error_stack(verbose=True, output_path=self.error_path)
Expand All @@ -136,12 +145,18 @@ def summarise_errors(

subject = f"{self.pipeline_name}: Summary for night {self.night}"

attachments = [self.log_path, self.error_path]

# Send the latest CSV log if there is one
if self.latest_csv_log is not None:
attachments.append(self.latest_csv_log)

send_gmail(
email_sender=sender,
email_recipients=recipients,
email_subject=subject,
email_text=summary,
attachments=[self.log_path, self.error_path]
attachments=attachments
)
else:
print(summary)
Expand Down Expand Up @@ -216,17 +231,55 @@ def process_realtime(self):
observer.start()

try:
while (Time.now() - self.t_start) < self.max_wait_hours:
while (Time.now() - self.t_start) < self.final_postprocess_hours:
time.sleep(2)
finally:
logger.info(f"No longer waiting for new images.")
observer.stop()
observer.join()
self.update_error_log()
self.postprocess()

def update_error_log(self):
self.errorstack.summarise_error_stack(verbose=True, output_path=self.error_path)

def postprocess(self):
self.update_error_log()

logger.info("Running postprocess steps")

if self.postprocess_configurations is not None:

postprocess_config = [ImageLoader(
load_image=self.pipeline.load_raw_image,
input_sub_dir=self.sub_dir,
input_img_dir=str(Path(self.raw_image_directory)).split(self.pipeline_name)[0]
)]

postprocess_config += self.pipeline.postprocess_configuration(
errorstack=self.errorstack,
processed_images=[os.path.basename(x) for x in self.processed_science_images],
selected_configurations=self.postprocess_configurations
)

protected_key = "_monitor"
while protected_key in self.pipeline.all_pipeline_configurations.keys():
protected_key += "_2"

self.pipeline.add_configuration(protected_key, postprocess_config)
self.pipeline.set_configuration(protected_key)

for processor in self.pipeline.all_pipeline_configurations[protected_key]:
if isinstance(processor, CSVLog):
self.latest_csv_log = processor.get_output_path()

_, errorstack = self.pipeline.reduce_images(
batches=[[[], []]],
selected_configurations=protected_key,
catch_all_errors=True
)
self.errorstack += errorstack
self.update_error_log()

def process_load_queue(self, q):
'''This is the worker thread function. It is run as a daemon
threads that only exit when the main thread ends.
Expand All @@ -237,11 +290,14 @@ def process_load_queue(self, q):
'''
while True:

if self.email_to_send:
if Time.now() - self.t_start > self.email_wait_hours:
logger.info(f"More than {self.email_wait_hours} hours have elapsed. Sending summary email.")
self.summarise_errors(errorstack=self.errorstack)
self.email_to_send = False
if Time.now() - self.t_start > self.midway_postprocess_hours:
if not self.midway_postprocess_complete:
logger.info("Postprocess time")
self.postprocess()
if self.email_to_send:
logger.info(f"More than {self.midway_postprocess_hours} hours have elapsed. Sending summary email.")
self.summarise_errors(errorstack=self.errorstack)
self.midway_postprocess_complete = True

if not q.empty():
event = q.get()
Expand Down Expand Up @@ -287,7 +343,7 @@ def process_load_queue(self, q):
selected_configurations=self.realtime_configurations,
catch_all_errors=True
)
self.processed_science.append(event.src_path)
self.processed_science_images.append(event.src_path)
self.errorstack += errorstack
self.update_error_log()

Expand Down
7 changes: 6 additions & 1 deletion winterdrp/pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import logging
from winterdrp.errors import ProcessorError
from winterdrp.pipelines.base_pipeline import Pipeline
from winterdrp.pipelines.wirc.wirc_pipeline import WircPipeline
from winterdrp.pipelines.summer.summer_pipeline import SummerPipeline
from winterdrp.pipelines.wirc_imsub.wirc_imsub_pipeline import WircImsubPipeline

logger = logging.getLogger(__name__)


# Convention: lowercase names

class PipelineConfigError(ProcessorError, KeyError):
pass


def get_pipeline(instrument, selected_configurations=None, *args, **kwargs):

Expand All @@ -17,6 +22,6 @@ def get_pipeline(instrument, selected_configurations=None, *args, **kwargs):
except KeyError:
err = f"Unrecognised pipeline {instrument}. Available pipelines are: {Pipeline.pipelines.keys()}"
logger.error(err)
raise KeyError(err)
raise PipelineConfigError(err)

return pipeline(selected_configurations=selected_configurations, *args, **kwargs)
26 changes: 25 additions & 1 deletion winterdrp/pipelines/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from winterdrp.paths import saturate_key
from winterdrp.errors import ErrorStack
from winterdrp.processors.base_processor import BaseProcessor
from winterdrp.processors.utils.error_annotator import ErrorStackAnnotator

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -151,4 +152,27 @@ def set_saturation(
if "SKMEDSUB" in header.keys():
saturation_level -= header['SKMEDSUB']
header.append((saturate_key, saturation_level, 'Saturation level'), end=True)
return header
return header

def postprocess_configuration(
self,
errorstack: ErrorStack,
selected_configurations: str | list[str],
processed_images: list[str] = None
) -> list[BaseProcessor]:

cleanup_config = [
ErrorStackAnnotator(
errorstack=errorstack,
processed_images=processed_images),
]

if isinstance(selected_configurations, str):
cleanup_config += self.all_pipeline_configurations[selected_configurations]
else:
for config in selected_configurations:
cleanup_config += self.all_pipeline_configurations[config]

return cleanup_config


Loading

0 comments on commit fe0c90f

Please sign in to comment.