Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiconfigs #68

Merged
merged 3 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions tests/test_summer_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

test_data_dir = get_test_data_dir()

test_pipeline = [
test_configuration = [
ImageLoader(
input_img_dir=test_data_dir,
input_sub_dir="raw",
Expand Down Expand Up @@ -97,7 +97,10 @@
"ZP_8.0_nstars": 43
}

pipeline = SummerPipeline(pipeline_configuration=test_pipeline, night="20220402")
test_config_name = "test"

pipeline = SummerPipeline(night="20220402", selected_configurations=[test_config_name])
pipeline.add_configuration(configuration_name=test_config_name, configuration=test_configuration)


class TestSummerPipeline(unittest.TestCase):
Expand Down
5 changes: 3 additions & 2 deletions tests/test_wirc_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_cal_path(name: str):
return os.path.join(test_data_dir, f"wirc/cals/test_{name}.fits")


test_pipeline = [
test_configuration = [
ImageLoader(
input_img_dir=test_data_dir,
input_sub_dir="raw",
Expand Down Expand Up @@ -91,7 +91,8 @@ def get_cal_path(name: str):
PhotCalibrator(ref_catalog_generator=wirc_photometric_catalog_generator)
]

pipeline = WircPipeline(pipeline_configuration=test_pipeline, night="20210330")
pipeline = WircPipeline(night="20210330", selected_configurations="test")
pipeline.add_configuration(configuration_name="test", configuration=test_configuration)


class TestWircPipeline(unittest.TestCase):
Expand Down
101 changes: 65 additions & 36 deletions winterdrp/pipelines/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import copy
from winterdrp.paths import saturate_key
from winterdrp.errors import ErrorStack
from winterdrp.processors.base_processor import BaseProcessor

logger = logging.getLogger(__name__)

Expand All @@ -17,7 +18,7 @@ class Pipeline:
name = None

@property
def pipeline_configurations(self):
def all_pipeline_configurations(self):
raise NotImplementedError()

@property
Expand All @@ -30,23 +31,15 @@ def non_linear_level(self):

def __init__(
self,
pipeline_configuration: str | list = None,
selected_configurations: str | list[str] = None,
night: int | str = "",
):

self.night_sub_dir = os.path.join(self.name, night)

self.processors = self.load_pipeline_configuration(pipeline_configuration)

self.configure_processors(sub_dir=self.night_sub_dir)

for i, (processor) in enumerate(self.processors):

logger.debug(f"Initialising processor {processor.__class__}")
processor.set_preceding_steps(previous_steps=self.processors[:i])
processor.check_prerequisites()

logger.debug("Pipeline initialisation complete.")
if not isinstance(selected_configurations, list):
selected_configurations = [selected_configurations]
self.selected_configurations = selected_configurations
self.processors = list()

@classmethod
def __init_subclass__(cls, **kwargs):
Expand All @@ -58,47 +51,83 @@ def __init_subclass__(cls, **kwargs):
raise ValueError(err)
cls.pipelines[cls.name] = cls

def configure_processors(
def load_pipeline_configuration(
self,
sub_dir: str = ""
configuration: str = None,
):
for processor in self.processors:
return copy.copy(self.all_pipeline_configurations[configuration])

@staticmethod
def configure_processors(
processors: list[BaseProcessor],
sub_dir: str = ""
) -> list[BaseProcessor]:

for processor in processors:
processor.set_night(night_sub_dir=sub_dir)
return processors

def add_configuration(
self,
configuration_name: str,
configuration: str | list[BaseProcessor]
):
self.all_pipeline_configurations[configuration_name] = configuration

def set_configuration(
self,
new_configuration: str = None
):
logger.debug(f"Setting pipeline configuration to {new_configuration}.")

processors = self.load_pipeline_configuration(new_configuration)
processors = self.configure_processors(processors, sub_dir=self.night_sub_dir)
for i, (processor) in enumerate(processors):
logger.debug(f"Initialising processor {processor.__class__}")
processor.set_preceding_steps(previous_steps=processors[:i])
processor.check_prerequisites()
logger.debug("Pipeline initialisation complete.")
self.processors = processors

@staticmethod
def download_raw_images_for_night(
night: str | int
):
raise NotImplemented

def load_pipeline_configuration(
self,
configuration: str | list = None,
):
if isinstance(configuration, str | None):
return copy.copy(self.pipeline_configurations[configuration])
else:
return copy.copy(configuration)

def reduce_images(
self,
batches: list[list[list[np.ndarray], list[astropy.io.fits.header]]],
output_error_path: str = None,
catch_all_errors: bool = False
catch_all_errors: bool = False,
selected_configurations: str | list[str] = None
):
err_stack = ErrorStack()

for i, processor in enumerate(self.processors):
logger.debug(f"Applying '{processor.__class__}' processor to {len(batches)} batches. "
f"(Step {i+1}/{len(self.processors)})")
if selected_configurations is None:
selected_configurations = self.selected_configurations

if not isinstance(selected_configurations, list):
selected_configurations = [selected_configurations]

for j, configuration in enumerate(selected_configurations):

logger.info(f"Using pipeline configuration {configuration} "
f"({j}/{len(selected_configurations)})")

self.set_configuration(configuration)

for i, processor in enumerate(self.processors):
logger.debug(f"Applying '{processor.__class__}' processor to {len(batches)} batches. "
f"(Step {i+1}/{len(self.processors)})")

batches, new_err_stack = processor.base_apply(
batches
)
err_stack += new_err_stack
batches, new_err_stack = processor.base_apply(
batches
)
err_stack += new_err_stack

if np.logical_and(not catch_all_errors, len(err_stack.reports) > 0):
raise err_stack.reports[0].error
if np.logical_and(not catch_all_errors, len(err_stack.reports) > 0):
raise err_stack.reports[0].error

err_stack.summarise_error_stack(output_path=output_error_path)
return batches, err_stack
Expand Down
2 changes: 1 addition & 1 deletion winterdrp/pipelines/summer/summer_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def load_raw_summer_image(
class SummerPipeline(Pipeline):
name = pipeline_name

pipeline_configurations = {
all_pipeline_configurations = {
None: [
ImageLoader(
load_image=load_raw_summer_image
Expand Down
2 changes: 1 addition & 1 deletion winterdrp/pipelines/wirc/wirc_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class WircPipeline(Pipeline):

batch_split_keys = ["OBJECT", "FILTER"]

pipeline_configurations = {
all_pipeline_configurations = {
None: test_pipeline
}

Expand Down
2 changes: 1 addition & 1 deletion winterdrp/pipelines/wirc_imsub/wirc_imsub_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class WircImsubPipeline(Pipeline):

candidates_db_columns = get_colnames_from_schema('winterdrp/pipelines/wirc_imsub/wirc_imsub_files/schema'
'/candidates.sql')
pipeline_configurations = {
all_pipeline_configurations = {
None: [
ImageLoader(
input_sub_dir="raw",
Expand Down