Skip to content

Commit

Permalink
Unify error reports and propagate processing history to sourcelists (#…
Browse files Browse the repository at this point in the history
…218)

* Propagate proc history

* return, not raise...
  • Loading branch information
robertdstein authored Nov 30, 2022
1 parent 15ead58 commit 525cccd
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 43 deletions.
9 changes: 8 additions & 1 deletion winterdrp/data/base_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from typing import Type
from pathlib import Path
from winterdrp.paths import raw_img_key, base_name_key

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -105,7 +106,7 @@ def __iter__(self):
return self._datalist.__iter__()


class DataBatch (PseudoList):
class DataBatch(PseudoList):

@property
def data_type(self) -> Type[DataBlock]:
Expand All @@ -117,6 +118,12 @@ def __init__(self, batch: list[DataBlock] | DataBlock = None):
def get_batch(self) -> list[DataBlock]:
return self.get_data_list()

def get_raw_image_names(self) -> list[str]:
img_list = []
for data_block in self.get_batch():
img_list += [Path(x).name for x in data_block.get_raw_img_list()]
return img_list


class Dataset (PseudoList):

Expand Down
74 changes: 36 additions & 38 deletions winterdrp/processors/base_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,19 @@ class NoCandidatesError(ProcessorError):
pass


class BaseProcessor:
class BaseDPU:

def base_apply(
self,
dataset: Dataset
) -> tuple[Dataset, ErrorStack]:
raise NotImplementedError()

def generate_error_report(self, exception: Exception, batch: DataBatch) -> ErrorReport:
return ErrorReport(exception, self.__module__, batch.get_raw_image_names())


class BaseProcessor(BaseDPU):

@property
def base_key(self):
Expand Down Expand Up @@ -101,11 +113,26 @@ def base_apply(
return dataset, err_stack

def apply(self, batch: DataBatch):
raise NotImplementedError
batch = self._apply(batch)
batch = self._update_processing_history(batch)
return batch

def generate_error_report(self, exception: Exception, batch: DataBatch) -> ErrorReport:
def _apply(self, batch: DataBatch):
raise NotImplementedError

def _update_processing_history(
self,
batch: DataBatch,
) -> DataBatch:
for i, data_block in enumerate(batch):
data_block[proc_history_key] += self.base_key + ","
data_block['REDUCER'] = getpass.getuser()
data_block['REDMACH'] = socket.gethostname()
data_block['REDTIME'] = str(datetime.datetime.now())
data_block["REDSOFT"] = package_name
batch[i] = data_block
return batch


class CleanupProcessor(BaseProcessor, ABC):

Expand Down Expand Up @@ -159,50 +186,25 @@ def save_mask(
return mask_path

@staticmethod
def get_hash(image: Image):
def get_hash(image: ImageBatch):
key = "".join(sorted([x[base_name_key] + x[proc_history_key] for x in image]))
return hashlib.sha1(key.encode()).hexdigest()

def image_batch_error_report(self, exception: Exception, batch: ImageBatch):
contents = []
for image in batch:
contents += [Path(x).name for x in image.get_raw_img_list()]
return ErrorReport(exception, self.__module__, contents)


class BaseImageProcessor(BaseProcessor, ImageHandler, ABC):

def apply(
def _apply(
self,
batch: ImageBatch
) -> ImageBatch:
batch = self._apply_to_images(batch)
batch = self._update_processing_history(batch)

return batch
return self._apply_to_images(batch)

def _apply_to_images(
self,
batch: ImageBatch,
) -> ImageBatch:
raise NotImplementedError

def _update_processing_history(
self,
batch: ImageBatch,
) -> ImageBatch:
for i, image in enumerate(batch):
image[proc_history_key] += self.base_key + ","
image['REDUCER'] = getpass.getuser()
image['REDMACH'] = socket.gethostname()
image['REDTIME'] = str(datetime.datetime.now())
image["REDSOFT"] = package_name
batch[i] = image
return batch

def generate_error_report(self, exception: Exception, batch: ImageBatch) -> ErrorReport:
return self.image_batch_error_report(exception, batch)


class ProcessorWithCache(BaseImageProcessor, ABC):

Expand Down Expand Up @@ -291,7 +293,7 @@ def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls.subclasses[cls.base_key] = cls

def apply(self, batch: ImageBatch) -> SourceBatch:
def _apply(self, batch: ImageBatch) -> SourceBatch:
source_batch = self._apply_to_images(batch)

if len(source_batch) == 0:
Expand All @@ -312,18 +314,14 @@ def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls.subclasses[cls.base_key] = cls

def apply(
def _apply(
self,
batch: SourceBatch
) -> SourceBatch:
return batch
return self._apply_to_candidates(batch)

def _apply_to_candidates(
self,
source_list: SourceBatch,
) -> SourceBatch:
raise NotImplementedError

def generate_error_report(self, exception: Exception, batch: SourceBatch):
contents = batch[base_name_key]
return ErrorReport(exception, self.__module__, contents)
8 changes: 4 additions & 4 deletions winterdrp/processors/candidates/candidate_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from astropy.io import fits
from winterdrp.processors.astromatic.sextractor.sourceextractor import run_sextractor_dual
from winterdrp.utils.ldac_tools import get_table_from_ldac
from winterdrp.paths import get_output_dir, raw_img_key, base_name_key
from winterdrp.paths import get_output_dir, core_fields, base_name_key
import io
import gzip
import os
Expand Down Expand Up @@ -184,7 +184,7 @@ def _apply_to_images(
diff_psf_path = os.path.join(self.get_sub_output_dir(), image["DIFFPSF"])
diff_unc_path = os.path.join(self.get_sub_output_dir(), image["DIFFUNC"])

scorr_mask_path = os.path.join(self.get_sub_output_dir(),image["SCORMASK"])
scorr_mask_path = os.path.join(self.get_sub_output_dir(), image["SCORMASK"])
cands_catalog_name = diff_image_path.replace('.fits', '.dets')
cands_catalog_name, _ = run_sextractor_dual(
det_image=scorr_image_path,
Expand All @@ -199,7 +199,7 @@ def _apply_to_images(
gain=1.0
)

sci_image_path = os.path.join(self.get_sub_output_dir(), image['BASENAME'])
sci_image_path = os.path.join(self.get_sub_output_dir(), image[base_name_key])
ref_image_path = os.path.join(self.get_sub_output_dir(), image['REFIMG'])
cands_table = self.generate_candidates_table(
scorr_catalog_name=cands_catalog_name,
Expand All @@ -218,7 +218,7 @@ def _apply_to_images(

metadata = dict()

for key in [raw_img_key, base_name_key]:
for key in core_fields:
metadata[key] = image[key]

all_cands.append(SourceTable(cands_table, metadata=metadata))
Expand Down

0 comments on commit 525cccd

Please sign in to comment.