From ab1c17a297e193685c92c564666d92b15775ee71 Mon Sep 17 00:00:00 2001 From: Matt Jaquiery Date: Wed, 28 Aug 2024 07:42:16 +0100 Subject: [PATCH] feat: plugins also enabled `black` linting --- pyproject.toml | 6 +- src/galv_harvester/__about__.py | 2 +- src/galv_harvester/api.py | 63 +++-- src/galv_harvester/harvest.py | 251 +++++++++++------- src/galv_harvester/parse/arbin.py | 16 +- .../parse/biologic_input_file.py | 43 +-- .../parse/delimited_input_file.py | 35 ++- src/galv_harvester/parse/exceptions.py | 7 +- src/galv_harvester/parse/input_file.py | 10 +- src/galv_harvester/parse/ivium_input_file.py | 136 +++++----- src/galv_harvester/parse/maccor_input_file.py | 109 ++++---- src/galv_harvester/plugins.py | 54 ++++ src/galv_harvester/run.py | 75 +++--- src/galv_harvester/settings.py | 69 +++-- src/galv_harvester/start.py | 176 ++++++++---- 15 files changed, 628 insertions(+), 424 deletions(-) create mode 100644 src/galv_harvester/plugins.py diff --git a/pyproject.toml b/pyproject.toml index 781c6da..01821c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,9 +44,9 @@ dependencies = [ ] [project.urls] -Documentation = "https://github.com/unknown/galv-harvester#readme" -Issues = "https://github.com/unknown/galv-harvester/issues" -Source = "https://github.com/unknown/galv-harvester" +Documentation = "https://github.com/galv-team/galv-harvester#readme" +Issues = "https://github.com/galv-team/galv-harvester/issues" +Source = "https://github.com/galv-team/galv-harvester" [project.scripts] galv-harvester = "galv_harvester.start:click_wrapper" diff --git a/src/galv_harvester/__about__.py b/src/galv_harvester/__about__.py index 3277f64..848258f 100644 --- a/src/galv_harvester/__about__.py +++ b/src/galv_harvester/__about__.py @@ -1 +1 @@ -VERSION = "1.0.0" +VERSION = "1.2.0" diff --git a/src/galv_harvester/api.py b/src/galv_harvester/api.py index 880ff33..3d6d515 100644 --- a/src/galv_harvester/api.py +++ b/src/galv_harvester/api.py @@ -7,7 +7,13 @@ from . import settings from .utils import NpEncoder import requests -from .settings import get_setting, get_settings, get_settings_file, get_logger, update_envvars +from .settings import ( + get_setting, + get_settings, + get_settings_file, + get_logger, + update_envvars, +) import time logger = get_logger(__file__) @@ -18,29 +24,30 @@ class StorageError(RuntimeError): def report_harvest_result( - path, - monitored_path_id: str, - content=None, - error: BaseException = None, - **kwargs # passed to requests.post + path, + monitored_path_id: str, + content=None, + error: BaseException = None, + **kwargs, # passed to requests.post ): start = time.time() try: if error is not None: - data = {'status': settings.HARVESTER_STATUS_ERROR, 'error': f"{error.__class__.__name__}: {error}"} + data = { + "status": settings.HARVESTER_STATUS_ERROR, + "error": f"{error.__class__.__name__}: {error}", + } else: - data = {'status': settings.HARVESTER_STATUS_SUCCESS, 'content': content} - data['path'] = path - data['monitored_path_id'] = monitored_path_id + data = {"status": settings.HARVESTER_STATUS_SUCCESS, "content": content} + data["path"] = path + data["monitored_path_id"] = monitored_path_id logger.debug(f"{get_setting('url')}report/; {json.dumps(data, cls=NpEncoder)}") out = requests.post( f"{get_setting('url')}report/", - headers={ - 'Authorization': f"Harvester {get_setting('api_key')}" - }, + headers={"Authorization": f"Harvester {get_setting('api_key')}"}, # encode then decode to ensure np values are converted to standard types and null bytes are removed - json=json.loads(json.dumps(data, cls=NpEncoder).replace('\\u0000', '')), - **kwargs + json=json.loads(json.dumps(data, cls=NpEncoder).replace("\\u0000", "")), + **kwargs, ) try: out.json() @@ -48,12 +55,16 @@ def report_harvest_result( error_text = out.text[:100].replace("\n", "\\n") if len(out.text) > 100: error_text += "..." - logger.error(f"Server returned invalid JSON (HTTP {out.status_code}): {error_text}") + logger.error( + f"Server returned invalid JSON (HTTP {out.status_code}): {error_text}" + ) return None if not out.ok: if out.status_code == 507: raise StorageError(out.json()) - logger.error(f"Server returned error (HTTP {out.status_code}): {out.json()}") + logger.error( + f"Server returned error (HTTP {out.status_code}): {out.json()}" + ) return None except StorageError as e: raise e @@ -67,9 +78,11 @@ def report_harvest_result( def update_config(): logger.info("Updating configuration from API") try: - url = get_setting('url') - key = get_setting('api_key') - result = requests.get(f"{url}config/", headers={'Authorization': f"Harvester {key}"}) + url = get_setting("url") + key = get_setting("api_key") + result = requests.get( + f"{url}config/", headers={"Authorization": f"Harvester {key}"} + ) if result.status_code == 200: dirty = False new = result.json() @@ -79,7 +92,9 @@ def update_config(): all_keys = [*new.keys(), *old.keys()] for key in all_keys: if key in old.keys() and key in new.keys(): - if json.dumps(old[key], cls=NpEncoder) == json.dumps(new[key], cls=NpEncoder): + if json.dumps(old[key], cls=NpEncoder) == json.dumps( + new[key], cls=NpEncoder + ): continue logger.info(f"Updating value for setting '{key}'") logger.info(f"Old value: {json.dumps(old[key], cls=NpEncoder)}") @@ -97,10 +112,12 @@ def update_config(): dirty = True if dirty: - with open(get_settings_file(), 'w+') as f: + with open(get_settings_file(), "w+") as f: json.dump(result.json(), f) update_envvars() else: - logger.error(f"Unable to fetch {url}config/ -- received HTTP {result.status_code}") + logger.error( + f"Unable to fetch {url}config/ -- received HTTP {result.status_code}" + ) except BaseException as e: logger.error(f"{e.__class__.__name__}: {e}") diff --git a/src/galv_harvester/harvest.py b/src/galv_harvester/harvest.py index 6836e13..db96b3f 100644 --- a/src/galv_harvester/harvest.py +++ b/src/galv_harvester/harvest.py @@ -31,20 +31,22 @@ from .api import report_harvest_result, StorageError from .__about__ import VERSION +from .plugins import get_parsers logger = settings.get_logger(__file__) class HarvestProcessor: - registered_input_files = [ + default_parsers = [ BiologicMprInputFile, IviumInputFile, MaccorInputFile, MaccorExcelInputFile, MaccorRawInputFile, ArbinCSVFile, - DelimitedInputFile # Should be last because it processes files line by line and accepts anything table-like + DelimitedInputFile, # Should be last because it processes files line by line and accepts anything table-like ] + parser_errors = {} @staticmethod def check_response(step: str, response): @@ -62,20 +64,24 @@ def __init__(self, file_path: str, monitored_path: dict): self.mapping = None self.file_path = file_path self.monitored_path = monitored_path - for input_file_cls in self.registered_input_files: + self.parser_classes = [*get_parsers(), *self.default_parsers] + for input_file_cls in self.default_parsers: try: - logger.debug('Tried input reader {}'.format(input_file_cls)) + logger.debug("Tried input reader {}".format(input_file_cls)) input_file = input_file_cls(file_path=file_path) except UnsupportedFileTypeError as e: - logger.debug('...failed with: ', type(e), e) + self.parser_errors[input_file_cls.__name__] = e + logger.debug("...failed with: ", type(e), e) continue except Exception as e: - logger.error(( - f"{input_file_cls.__name__} failed to import" - f" {file_path} with non-UnsupportedFileTypeError: {e}" - )) + logger.error( + ( + f"{input_file_cls.__name__} failed to import" + f" {file_path} with non-UnsupportedFileTypeError: {e}" + ) + ) continue - logger.debug('...succeeded...') + logger.debug("...succeeded...") self.input_file = input_file self.parser = input_file_cls return @@ -102,7 +108,7 @@ def get_test_date(metadata): """ Get the test date from the metadata """ - return HarvestProcessor.serialize_datetime(metadata.get('Date of Test')) + return HarvestProcessor.serialize_datetime(metadata.get("Date of Test")) def harvest(self): """ @@ -112,14 +118,20 @@ def harvest(self): metadata_time = time.time() self._report_file_metadata() column_time = time.time() - logger.info(f"Metadata reported in {column_time - metadata_time:.2f} seconds") + logger.info( + f"Metadata reported in {column_time - metadata_time:.2f} seconds" + ) self._report_summary() if self.mapping is not None: data_prep_time = time.time() - logger.info(f"Column metadata reported in {data_prep_time - column_time:.2f} seconds") + logger.info( + f"Column metadata reported in {data_prep_time - column_time:.2f} seconds" + ) self._prepare_data() upload_time = time.time() - logger.info(f"Data prepared in {upload_time - data_prep_time:.2f} seconds") + logger.info( + f"Data prepared in {upload_time - data_prep_time:.2f} seconds" + ) self._upload_data() logger.info(f"Data uploaded in {time.time() - upload_time:.2f} seconds") self._delete_temp_files() @@ -133,17 +145,19 @@ def _report_file_metadata(self): core_metadata, extra_metadata = self.input_file.load_metadata() report = report_harvest_result( path=self.file_path, - monitored_path_id=self.monitored_path.get('id'), + monitored_path_id=self.monitored_path.get("id"), content={ - 'task': settings.HARVESTER_TASK_IMPORT, - 'stage': settings.HARVEST_STAGE_FILE_METADATA, - 'data': { - 'core_metadata': HarvestProcessor.serialize_datetime(core_metadata), - 'extra_metadata': HarvestProcessor.serialize_datetime(extra_metadata), - 'test_date': HarvestProcessor.get_test_date(core_metadata), - 'parser': self.input_file.__class__.__name__ - } - } + "task": settings.HARVESTER_TASK_IMPORT, + "stage": settings.HARVEST_STAGE_FILE_METADATA, + "data": { + "core_metadata": HarvestProcessor.serialize_datetime(core_metadata), + "extra_metadata": HarvestProcessor.serialize_datetime( + extra_metadata + ), + "test_date": HarvestProcessor.get_test_date(core_metadata), + "parser": self.input_file.__class__.__name__, + }, + }, ) HarvestProcessor.check_response("Report Metadata", report) @@ -156,7 +170,11 @@ def _report_summary(self): summary_data = [] iterator = self.input_file.load_data( self.file_path, - [c for c in self.input_file.column_info.keys() if self.input_file.column_info[c].get('has_data')] + [ + c + for c in self.input_file.column_info.keys() + if self.input_file.column_info[c].get("has_data") + ], ) for row in iterator: summary_data.append(row) @@ -168,28 +186,32 @@ def _report_summary(self): # Upload results report = report_harvest_result( path=self.file_path, - monitored_path_id=self.monitored_path.get('id'), + monitored_path_id=self.monitored_path.get("id"), content={ - 'task': settings.HARVESTER_TASK_IMPORT, - 'stage': settings.HARVEST_STAGE_DATA_SUMMARY, - 'data': summary.to_json() - } + "task": settings.HARVESTER_TASK_IMPORT, + "stage": settings.HARVEST_STAGE_DATA_SUMMARY, + "data": summary.to_json(), + }, ) HarvestProcessor.check_response("Report Column Metadata", report) - mapping_url = report.json()['mapping'] + mapping_url = report.json()["mapping"] if mapping_url is None: - logger.info("Mapping could not be automatically determined. Will revisit when user determines mapping.") + logger.info( + "Mapping could not be automatically determined. Will revisit when user determines mapping." + ) return mapping_request = requests.get( mapping_url, - headers={'Authorization': f"Harvester {settings.get_setting('api_key')}"} + headers={"Authorization": f"Harvester {settings.get_setting('api_key')}"}, ) HarvestProcessor.check_response("Get Mapping", mapping_request) - self.mapping = mapping_request.json().get('rendered_map') + self.mapping = mapping_request.json().get("rendered_map") if not isinstance(self.mapping, dict): if mapping_request: - logger.error(f"Server returned mapping request but no mapping was found") + logger.error( + f"Server returned mapping request but no mapping was found" + ) else: logger.info("Mapping could not be automatically determined") @@ -197,34 +219,43 @@ def _prepare_data(self): """ Read the data from the file and save it as a temporary .parquet file self.data_file """ + def remap(df, mapping): """ Remap the columns in the dataframe according to the mapping. """ columns = list(df.columns) for col_name, mapping in mapping.items(): - new_name = mapping.get('new_name') + new_name = mapping.get("new_name") if new_name in df.columns and new_name != col_name: - raise ValueError(f"New name '{new_name}' already exists in the dataframe") - if mapping['data_type'] in ["bool", "str"]: + raise ValueError( + f"New name '{new_name}' already exists in the dataframe" + ) + if mapping["data_type"] in ["bool", "str"]: df[col_name] = df[col_name].astype(mapping["data_type"]) - elif mapping['data_type'] == 'datetime64[ns]': + elif mapping["data_type"] == "datetime64[ns]": df[col_name] = pandas.to_datetime(df[col_name]) else: - if mapping['data_type'] == 'int': - df[col_name] = fastnumbers.try_forceint(df[col_name], map=list, on_fail=math.nan) + if mapping["data_type"] == "int": + df[col_name] = fastnumbers.try_forceint( + df[col_name], map=list, on_fail=math.nan + ) else: - df[col_name] = fastnumbers.try_float(df[col_name], map=list, on_fail=math.nan) + df[col_name] = fastnumbers.try_float( + df[col_name], map=list, on_fail=math.nan + ) - addition = mapping.get('addition', 0) - multiplier = mapping.get('multiplier', 1) + addition = mapping.get("addition", 0) + multiplier = mapping.get("multiplier", 1) df[col_name] = df[col_name] + addition df[col_name] = df[col_name] * multiplier df.rename(columns={col_name: new_name}, inplace=True) columns.pop(columns.index(col_name)) # If there are any columns left, they are not in the mapping and should be converted to floats for col_name in columns: - df[col_name] = fastnumbers.try_float(df[col_name], map=list, on_fail=math.nan) + df[col_name] = fastnumbers.try_float( + df[col_name], map=list, on_fail=math.nan + ) return df def partition_generator(generator, partition_line_count=100_000): @@ -241,30 +272,36 @@ def to_df(rows): stopping = True yield to_df(rows) - partition_line_count = self.monitored_path.get("max_partition_line_count", 100_000) + partition_line_count = self.monitored_path.get( + "max_partition_line_count", 100_000 + ) reader = self.input_file.load_data( self.file_path, - [c for c in self.input_file.column_info.keys() if self.input_file.column_info[c].get('has_data')] + [ + c + for c in self.input_file.column_info.keys() + if self.input_file.column_info[c].get("has_data") + ], ) data = dask.dataframe.from_map( pandas.DataFrame, - partition_generator(reader, partition_line_count=partition_line_count) + partition_generator(reader, partition_line_count=partition_line_count), ) # Create a plot of key data columns for identification purposes self._plot_png(data) # Save the data as parquet - self.data_file_name = os.path.join(tempfile.gettempdir(), f"{os.path.basename(self.file_path)}.parquet") + self.data_file_name = os.path.join( + tempfile.gettempdir(), f"{os.path.basename(self.file_path)}.parquet" + ) data.to_parquet( self.data_file_name, write_index=False, compute=True, - custom_metadata={ - 'galv-harvester-version': VERSION - } + custom_metadata={"galv-harvester-version": VERSION}, ) self.row_count = data.shape[0].compute() self.partition_count = data.npartitions @@ -274,17 +311,18 @@ def _plot_png(self, data): Create a plot of key data columns for identification purposes """ try: - self.png_file_name = os.path.join(tempfile.gettempdir(), f"{os.path.basename(self.file_path)}.png") + self.png_file_name = os.path.join( + tempfile.gettempdir(), f"{os.path.basename(self.file_path)}.png" + ) hd.shade.cmap = ["lightblue", "darkblue"] hv.extension("matplotlib") - hv.output(fig='png', backend="matplotlib") - dataset = hv.Dataset(data, 'ElapsedTime_s', ['Voltage_V', 'Current_A']) - layout = ( - dataset.to(hv.Curve, 'ElapsedTime_s', 'Voltage_V') + - dataset.to(hv.Curve, 'ElapsedTime_s', 'Current_A') + hv.output(fig="png", backend="matplotlib") + dataset = hv.Dataset(data, "ElapsedTime_s", ["Voltage_V", "Current_A"]) + layout = dataset.to(hv.Curve, "ElapsedTime_s", "Voltage_V") + dataset.to( + hv.Curve, "ElapsedTime_s", "Current_A" ) - layout.opts(hv.opts.Curve(framewise=True, aspect=4, sublabel_format='')) - hv.save(layout, self.png_file_name, fmt='png', dpi=300) + layout.opts(hv.opts.Curve(framewise=True, aspect=4, sublabel_format="")) + hv.save(layout, self.png_file_name, fmt="png", dpi=300) self.png_ok = True except Exception as e: logger.warning(f"Failed to create plot: {e}") @@ -295,7 +333,7 @@ def _upload_data(self): Upload the data to the server """ - def pad0(n, width=math.floor(self.partition_count/10) + 1): + def pad0(n, width=math.floor(self.partition_count / 10) + 1): return f"{n:0{width}d}" successes = 0 @@ -303,77 +341,86 @@ def pad0(n, width=math.floor(self.partition_count/10) + 1): for i in range(self.partition_count): filename = f"{os.path.splitext(os.path.basename(self.file_path))[0]}.part_{pad0(i)}.parquet" - with open(os.path.join(self.data_file_name, f"part.{i}.parquet"), 'rb') as f: - files = {'parquet_file': (filename, f)} + with open( + os.path.join(self.data_file_name, f"part.{i}.parquet"), "rb" + ) as f: + files = {"parquet_file": (filename, f)} report = report_harvest_result( path=self.file_path, - monitored_path_id=self.monitored_path.get('id'), + monitored_path_id=self.monitored_path.get("id"), # send data in a flat format to accompany file upload protocol. # Kinda hacky because it overwrites much of report_harvest_result's functionality data={ - 'format': 'flat', - 'status': settings.HARVESTER_STATUS_SUCCESS, - 'path': self.file_path, - 'monitored_path_id': self.monitored_path.get('id'), - 'task': settings.HARVESTER_TASK_IMPORT, - 'stage': settings.HARVEST_STAGE_UPLOAD_PARQUET, - 'total_row_count': self.row_count, - 'partition_number': i, - 'partition_count': self.partition_count, - 'filename': filename + "format": "flat", + "status": settings.HARVESTER_STATUS_SUCCESS, + "path": self.file_path, + "monitored_path_id": self.monitored_path.get("id"), + "task": settings.HARVESTER_TASK_IMPORT, + "stage": settings.HARVEST_STAGE_UPLOAD_PARQUET, + "total_row_count": self.row_count, + "partition_number": i, + "partition_count": self.partition_count, + "filename": filename, }, - files=files + files=files, ) if report is None: - errors[i] = (f"Failed to upload {filename} - API Error: no response from server") + errors[i] = ( + f"Failed to upload {filename} - API Error: no response from server" + ) elif not report.ok: try: - errors[i] = (f"Failed to upload {filename} - API responded with Error: {report.json()['error']}") + errors[i] = ( + f"Failed to upload {filename} - API responded with Error: {report.json()['error']}" + ) except BaseException: - errors[i] = f"Failed to upload {filename}. Received HTTP {report.status_code}" + errors[i] = ( + f"Failed to upload {filename}. Received HTTP {report.status_code}" + ) else: successes += 1 if successes == 0 and self.partition_count > 0: raise RuntimeError("API Error: failed to upload all partitions to server") if successes != self.partition_count: - logger.error(f"Data Upload - {successes} of {self.partition_count} partitions uploaded successfully") + logger.error( + f"Data Upload - {successes} of {self.partition_count} partitions uploaded successfully" + ) for filename, error in errors.items(): - logger.error(f"Data Upload - Partition {filename} failed with error: {error}") + logger.error( + f"Data Upload - Partition {filename} failed with error: {error}" + ) else: logger.info(f"Data Upload - {successes} partitions uploaded successfully") report_harvest_result( path=self.file_path, - monitored_path_id=self.monitored_path.get('id'), + monitored_path_id=self.monitored_path.get("id"), content={ - 'task': settings.HARVESTER_TASK_IMPORT, - 'stage': settings.HARVEST_STAGE_UPLOAD_COMPLETE, - 'data': { - 'successes': successes, - 'errors': errors - } - } + "task": settings.HARVESTER_TASK_IMPORT, + "stage": settings.HARVEST_STAGE_UPLOAD_COMPLETE, + "data": {"successes": successes, "errors": errors}, + }, ) if self.png_ok: - with open(self.png_file_name, 'rb') as f: - files = {'png_file': f} + with open(self.png_file_name, "rb") as f: + files = {"png_file": f} report = report_harvest_result( path=self.file_path, - monitored_path_id=self.monitored_path.get('id'), + monitored_path_id=self.monitored_path.get("id"), # send data in a flat format to accompany file upload protocol. # Kinda hacky because it overwrites much of report_harvest_result's functionality data={ - 'format': 'flat', - 'status': settings.HARVESTER_STATUS_SUCCESS, - 'path': self.file_path, - 'monitored_path_id': self.monitored_path.get('id'), - 'task': settings.HARVESTER_TASK_IMPORT, - 'stage': settings.HARVEST_STAGE_UPLOAD_PNG, - 'filename': os.path.basename(self.png_file_name) + "format": "flat", + "status": settings.HARVESTER_STATUS_SUCCESS, + "path": self.file_path, + "monitored_path_id": self.monitored_path.get("id"), + "task": settings.HARVESTER_TASK_IMPORT, + "stage": settings.HARVEST_STAGE_UPLOAD_PNG, + "filename": os.path.basename(self.png_file_name), }, - files=files + files=files, ) try: HarvestProcessor.check_response("Upload PNG", report) @@ -384,7 +431,7 @@ def _delete_temp_files(self): """ Delete temporary files created during the process """ - for attribute in ['data_file_name', 'png_file_name']: + for attribute in ["data_file_name", "png_file_name"]: if hasattr(self, attribute): filename = getattr(self, attribute) if os.path.exists(filename): @@ -394,7 +441,9 @@ def _delete_temp_files(self): else: os.remove(filename) except PermissionError: - logger.warning(f"Failed to delete {filename}. This will have to be manually deleted.") + logger.warning( + f"Failed to delete {filename}. This will have to be manually deleted." + ) def __del__(self): self._delete_temp_files() diff --git a/src/galv_harvester/parse/arbin.py b/src/galv_harvester/parse/arbin.py index 8384280..0a074e8 100644 --- a/src/galv_harvester/parse/arbin.py +++ b/src/galv_harvester/parse/arbin.py @@ -9,8 +9,8 @@ class ArbinCSVFile(InputFile): """ - A class for handling Arbin csv files. - DelimitedFileInput fails to pick these up because it breaks on the space in the datetime rather than on , + A class for handling Arbin csv files. + DelimitedFileInput fails to pick these up because it breaks on the space in the datetime rather than on , """ def __init__(self, file_path, **kwargs): @@ -23,13 +23,17 @@ def __init__(self, file_path, **kwargs): :raises UnsupportedFileTypeError: if the file is not a supported type """ - with open(file_path, newline='', encoding='utf-8-sig') as csvfile: + with open(file_path, newline="", encoding="utf-8-sig") as csvfile: try: reader = csv.reader( - csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL + csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL ) self.header = next(reader) - assert [h.lower() for h in self.header[0:3]] == ["data_point", "date_time", "test_time(s)"] + assert [h.lower() for h in self.header[0:3]] == [ + "data_point", + "date_time", + "test_time(s)", + ] data = next(reader) assert len(self.header) == len(data) self.dialect = reader.dialect @@ -42,7 +46,7 @@ def __init__(self, file_path, **kwargs): def load_data(self, file_path, columns): column_names = self.header - with open(file_path, newline='', encoding='utf-8-sig') as csvfile: + with open(file_path, newline="", encoding="utf-8-sig") as csvfile: reader = csv.reader(csvfile, dialect=self.dialect) next(reader) for row in reader: diff --git a/src/galv_harvester/parse/biologic_input_file.py b/src/galv_harvester/parse/biologic_input_file.py index 4670e2f..f841b09 100644 --- a/src/galv_harvester/parse/biologic_input_file.py +++ b/src/galv_harvester/parse/biologic_input_file.py @@ -11,7 +11,7 @@ class BiologicMprInputFile(InputFile): """ - A class for handling input files + A class for handling input files """ def __init__(self, file_path, **kwargs): @@ -29,29 +29,32 @@ def load_data(self, file_path, columns): columns_of_interest.append(col_idx) for row in self.mpr_file.data: yield { - column_names[col_idx]: row[col_idx] - for col_idx in columns_of_interest + column_names[col_idx]: row[col_idx] for col_idx in columns_of_interest } def get_data_labels(self): - modes = self.mpr_file.get_flag('mode') - Ns_changes = self.mpr_file.get_flag('Ns changes') - Ns_index = self.mpr_file.data.dtype.names.index('Ns') + modes = self.mpr_file.get_flag("mode") + Ns_changes = self.mpr_file.get_flag("Ns changes") + Ns_index = self.mpr_file.data.dtype.names.index("Ns") mode_labels = { - 1: 'CC', - 2: 'CV', - 3: 'Rest', + 1: "CC", + 2: "CV", + 3: "Rest", } last_Ns_change = 1 column_names = self.mpr_file.data.dtype.names - time_col = next((i for i, c in enumerate(column_names) if c.startswith("time")), 3) - cont_col = next((i for i, c in enumerate(column_names) if c.startswith("control")), 4) + time_col = next( + (i for i, c in enumerate(column_names) if c.startswith("time")), 3 + ) + cont_col = next( + (i for i, c in enumerate(column_names) if c.startswith("control")), 4 + ) prev_time = 0 for i in range(len(self.mpr_file.data)): - last_mode = modes[i-1] - last_Ns = self.mpr_file.data[i-1][Ns_index] + last_mode = modes[i - 1] + last_Ns = self.mpr_file.data[i - 1][Ns_index] Ns_change = Ns_changes[i] if Ns_change: time = self.mpr_file.data[i][time_col] @@ -66,16 +69,22 @@ def get_data_labels(self): experiment_label = "Discharge " is_const_curr = mode_label.casefold() == "cc" - experiment_label += f"at {control} {'mA' if is_const_curr else 'V'} " + experiment_label += ( + f"at {control} {'mA' if is_const_curr else 'V'} " + ) experiment_label += f"for {time - prev_time} seconds" if last_mode in mode_labels: data_label = ( - f"Ns_{last_Ns}_{mode_label}", (last_Ns_change, i - 1), experiment_label + f"Ns_{last_Ns}_{mode_label}", + (last_Ns_change, i - 1), + experiment_label, ) else: data_label = ( - f"Ns_{last_Ns}", (last_Ns_change, i - 1), experiment_label + f"Ns_{last_Ns}", + (last_Ns_change, i - 1), + experiment_label, ) last_Ns_change = i @@ -88,7 +97,7 @@ def load_metadata(self): metadata = { "Machine Type": "BioLogic", "Dataset Name": os.path.splitext(ntpath.basename(file_path))[0], - "Date of Test": self.mpr_file.startdate + "Date of Test": self.mpr_file.startdate, } columns_with_data = { diff --git a/src/galv_harvester/parse/delimited_input_file.py b/src/galv_harvester/parse/delimited_input_file.py index 49c420e..090e64a 100644 --- a/src/galv_harvester/parse/delimited_input_file.py +++ b/src/galv_harvester/parse/delimited_input_file.py @@ -9,7 +9,7 @@ class DelimitedInputFile(InputFile): """ - A class for handling input files delimited by a character + A class for handling input files delimited by a character """ @staticmethod @@ -34,7 +34,7 @@ def __init__(self, file_path, **kwargs): :raises UnsupportedFileTypeError: if the file is not a supported type """ - with open(file_path, newline='', encoding='utf-8-sig') as csvfile: + with open(file_path, newline="", encoding="utf-8-sig") as csvfile: try: csvfile.readline() except UnicodeDecodeError as e: @@ -69,15 +69,19 @@ def __init__(self, file_path, **kwargs): # Move to the next line and try again last_line += 1 if last_line > max_header_lines: - raise UnsupportedFileTypeError(( - f"Could not determine delimiter and header status after {max_header_lines} lines." - )) + raise UnsupportedFileTypeError( + ( + f"Could not determine delimiter and header status after {max_header_lines} lines." + ) + ) continue try: csvfile.seek(0) if last_line > 0: - self.preamble = "".join([csvfile.readline() for _ in range(last_line)]) + self.preamble = "".join( + [csvfile.readline() for _ in range(last_line)] + ) self.data_start = last_line else: self.preamble = None @@ -89,10 +93,12 @@ def __init__(self, file_path, **kwargs): self.header = [f"column_{i}" for i in range(len(next(reader)))] break except Exception as e: - raise UnsupportedFileTypeError(( - f"Identified delimiter [{self.dialect.delimiter}] after {last_line} lines," - f" but could not use `next(reader)`." - )) from e + raise UnsupportedFileTypeError( + ( + f"Identified delimiter [{self.dialect.delimiter}] after {last_line} lines," + f" but could not use `next(reader)`." + ) + ) from e super().__init__(file_path, **kwargs) self.logger.info(f"Type is Delimited [{self.dialect.delimiter}]") @@ -108,13 +114,16 @@ def load_data(self, file_path, columns): column_names = [self.header[i] for i in column_numbers] - with open(file_path, newline='', encoding='utf-8-sig') as csvfile: + with open(file_path, newline="", encoding="utf-8-sig") as csvfile: self.spin_to_line(csvfile, self.data_start) reader = csv.reader(csvfile, dialect=self.dialect) if self.has_header: next(reader) for row in reader: - yield { column_names[n]: row[column_numbers[n]] for n in range(len(column_names)) } + yield { + column_names[n]: row[column_numbers[n]] + for n in range(len(column_names)) + } def get_data_labels(self): yield None @@ -124,6 +133,6 @@ def load_metadata(self): metadata = { "preamble": self.preamble, } - columns_with_data = { name: { "has_data": True } for name in self.header } + columns_with_data = {name: {"has_data": True} for name in self.header} return metadata, columns_with_data diff --git a/src/galv_harvester/parse/exceptions.py b/src/galv_harvester/parse/exceptions.py index 7096fdc..2e9f215 100644 --- a/src/galv_harvester/parse/exceptions.py +++ b/src/galv_harvester/parse/exceptions.py @@ -2,9 +2,10 @@ # Copyright (c) 2020-2023, The Chancellor, Masters and Scholars of the University # of Oxford, and the 'Galv' Developers. All rights reserved. + class UnsupportedFileTypeError(Exception): """ - Exception indicating the file is unsupported + Exception indicating the file is unsupported """ pass @@ -12,7 +13,7 @@ class UnsupportedFileTypeError(Exception): class InvalidDataInFileError(Exception): """ - Exception indicating the file has invalid data + Exception indicating the file has invalid data """ pass @@ -20,7 +21,7 @@ class InvalidDataInFileError(Exception): class EmptyFileError(Exception): """ - Exception indicating the file has no data + Exception indicating the file has no data """ pass diff --git a/src/galv_harvester/parse/input_file.py b/src/galv_harvester/parse/input_file.py index a1df92b..f49addc 100644 --- a/src/galv_harvester/parse/input_file.py +++ b/src/galv_harvester/parse/input_file.py @@ -11,12 +11,12 @@ class InputFile: """ - A class for handling input files + A class for handling input files """ def __init__(self, file_path): self.file_path = file_path - if not hasattr(self, 'logger'): + if not hasattr(self, "logger"): self.logger = get_logger(f"InputFile({self.file_path})") self.metadata, self.column_info = self.load_metadata() @@ -33,10 +33,10 @@ def load_data(self, file_path, available_desired_columns): def load_metadata(self): """ - returns a tuple of (metadata, column_info) + returns a tuple of (metadata, column_info) - metadata is a dictionary of metadata keys to values + metadata is a dictionary of metadata keys to values - column_info is a dictionary of column names to dictionaries of column info + column_info is a dictionary of column names to dictionaries of column info """ raise UnsupportedFileTypeError() diff --git a/src/galv_harvester/parse/ivium_input_file.py b/src/galv_harvester/parse/ivium_input_file.py index e273ed8..9456196 100644 --- a/src/galv_harvester/parse/ivium_input_file.py +++ b/src/galv_harvester/parse/ivium_input_file.py @@ -12,12 +12,12 @@ InvalidDataInFileError, ) -IDF_HEADER = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xfb\x00\x00\x00\r\x00Version=11' +IDF_HEADER = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xfb\x00\x00\x00\r\x00Version=11" class IviumInputFile(InputFile): """ - A class for handling input files + A class for handling input files """ def __init__(self, file_path, **kwargs): @@ -27,7 +27,7 @@ def __init__(self, file_path, **kwargs): def load_data(self, file_path, columns): """ - Load data in a ivium text file" + Load data in a ivium text file" """ with open(file_path, "rb") as f: @@ -44,14 +44,14 @@ def load_data(self, file_path, columns): while current_line != sample_row: line = f.readline() current_line += 1 - line = line.decode('ascii') + line = line.decode("ascii") if len(line) != 40: self.logger.debug(line) raise InvalidDataInFileError( - ( - "Incorrect line length on line {} was {} expected {}" - ).format(current_line, len(line), 40) + ("Incorrect line length on line {} was {} expected {}").format( + current_line, len(line), 40 + ) ) row = [line[:12].strip(), line[13:25].strip(), line[26:].strip()] yield { @@ -61,38 +61,38 @@ def load_data(self, file_path, columns): def _get_end_task_function(self, task): def duration(row): - return row['test_time'] > task['Duration'] + return row["test_time"] > task["Duration"] def E_greater_than(row): - return row['volts'] > task['E>'] + return row["volts"] > task["E>"] def E_less_than(row): - return row['volts'] < task['E<'] + return row["volts"] < task["E<"] def I_greater_than(row): - return row['amps'] > task['I<'] + return row["amps"] > task["I<"] def I_less_than(row): - return row['amps'] < task['I<'] + return row["amps"] < task["I<"] end_funcs = [] - for end_key in ['End1', 'End2', 'End3', 'End4']: + for end_key in ["End1", "End2", "End3", "End4"]: end = task[end_key] - if end == 'Duration': + if end == "Duration": end_funcs.append(duration) - elif end == 'E>': + elif end == "E>": end_funcs.append(E_greater_than) - elif end == 'E<': + elif end == "E<": end_funcs.append(E_less_than) - elif end == 'I>': + elif end == "I>": end_funcs.append(I_greater_than) - elif end == 'I<': + elif end == "I<": end_funcs.append(I_less_than) - elif end == 'select': + elif end == "select": continue else: raise UnsupportedFileTypeError( - 'task end condition {} unknown'.format(end) + "task end condition {} unknown".format(end) ) def is_end_task(row): @@ -106,7 +106,7 @@ def is_end_task(row): def get_data_labels(self): column_names = ["test_time", "amps", "volts"] task_index = 0 - current_task = self._file_metadata['Tasks'][task_index] + current_task = self._file_metadata["Tasks"][task_index] is_end_task = self._get_end_task_function(current_task) start_task_row = 0 end_task_row = 0 @@ -139,13 +139,15 @@ def get_data_labels(self): experiment_label = "" yield ( - f"task_{task_index}_{mode}", (start_task_row, end_task_row), experiment_label + f"task_{task_index}_{mode}", + (start_task_row, end_task_row), + experiment_label, ) prev_time = time task_index += 1 - if task_index < len(self._file_metadata['Tasks']): - current_task = self._file_metadata['Tasks'][task_index] + if task_index < len(self._file_metadata["Tasks"]): + current_task = self._file_metadata["Tasks"][task_index] is_end_task = self._get_end_task_function(current_task) start_task_row = end_task_row else: @@ -153,9 +155,10 @@ def get_data_labels(self): def _load_ivium_metadata(self): file_path = self.file_path - regex_key_array = re.compile(r'([^,\[]+)\[([0-9]+)\]$') + regex_key_array = re.compile(r"([^,\[]+)\[([0-9]+)\]$") match_sci_notation = re.compile( - r'[+\-]?(?:0|[1-9]\d*)(?:\.\d*)?(?:[eE][+\-]?\d+)?') + r"[+\-]?(?:0|[1-9]\d*)(?:\.\d*)?(?:[eE][+\-]?\d+)?" + ) with open(file_path, "rb") as f: # header line = f.readline() @@ -164,14 +167,14 @@ def _load_ivium_metadata(self): while True: samples_start += 1 - line = f.readline().decode('ascii', errors='replace') - line = line.replace('\n', '').replace('\r', '') - key_value = line.split('=') + line = f.readline().decode("ascii", errors="replace") + line = line.replace("\n", "").replace("\r", "") + key_value = line.split("=") if len(key_value) > 1: - keys = key_value[0].split('.') + keys = key_value[0].split(".") if len(keys) == 1: - if keys[0] == 'Tasks': - ivium_metadata['Tasks'] = [ + if keys[0] == "Tasks": + ivium_metadata["Tasks"] = [ {} for i in range(int(key_value[1])) ] elif key_value[1]: @@ -182,11 +185,11 @@ def _load_ivium_metadata(self): elif len(keys) == 2: base_metadata = ivium_metadata[keys[0]] - if keys[0] == 'Tasks': + if keys[0] == "Tasks": array_index_match = regex_key_array.search(keys[1]) if not array_index_match: raise UnsupportedFileTypeError( - 'Tasks entry should be a list:', keys[1] + "Tasks entry should be a list:", keys[1] ) key = array_index_match.group(1) index = int(array_index_match.group(2)) - 1 @@ -194,11 +197,11 @@ def _load_ivium_metadata(self): base_metadata[index][key] = key_value[1] else: raise UnsupportedFileTypeError( - 'unexpected array index {} for line {}'.format( + "unexpected array index {} for line {}".format( index, line - )) - elif keys[0] == 'Data Options' \ - and keys[1] == 'AnalogInputData': + ) + ) + elif keys[0] == "Data Options" and keys[1] == "AnalogInputData": base_metadata[keys[1]] = [ {} for i in range(int(key_value[1])) ] @@ -211,36 +214,32 @@ def _load_ivium_metadata(self): elif len(keys) == 3: base_metadata = ivium_metadata[keys[0]][keys[1]] - if keys[0] == 'Data Options' \ - and keys[1] == 'AnalogInputData': + if keys[0] == "Data Options" and keys[1] == "AnalogInputData": array_index_match = regex_key_array.search(keys[2]) if not array_index_match: raise UnsupportedFileTypeError( - 'Data Options.AnalogInputData entry should be a list:', keys[2] + "Data Options.AnalogInputData entry should be a list:", + keys[2], ) key = array_index_match.group(1) index = int(array_index_match.group(2)) - 1 if index < len(base_metadata): base_metadata[index][key] = key_value[1] else: - raise UnsupportedFileTypeError( - 'found unexpected # of keys' - ) + raise UnsupportedFileTypeError("found unexpected # of keys") else: # check if we've parsed the file ok - if not 'Mconfig' in ivium_metadata: - raise UnsupportedFileTypeError( - 'found unexpected line', line - ) + if not "Mconfig" in ivium_metadata: + raise UnsupportedFileTypeError("found unexpected line", line) # looks ok, check samples start where we expect them to for i in range(4): - line = f.readline().decode('ascii', errors='replace') - line = line.replace('\n', '').replace('\r', '') + line = f.readline().decode("ascii", errors="replace") + line = line.replace("\n", "").replace("\r", "") samples_start += 1 if len(match_sci_notation.findall(line)) != 3: raise UnsupportedFileTypeError( - 'cannot find samples start', line + "cannot find samples start", line ) # everything looks good, so we can terminate the loop break @@ -252,7 +251,7 @@ def _load_ivium_metadata(self): line = f.readline() line_no += 1 if line: - line = line.decode('ascii', errors='replace') + line = line.decode("ascii", errors="replace") if len(match_sci_notation.findall(line)) == 3: sample_rows.append(line_no) else: @@ -262,7 +261,7 @@ def _load_ivium_metadata(self): def load_metadata(self): """ - Load metadata in a ivium_text file" + Load metadata in a ivium_text file" """ self._sample_rows, self._file_metadata = self._load_ivium_metadata() @@ -272,24 +271,23 @@ def load_metadata(self): metadata["Machine Type"] = "Ivium" metadata["Dataset Name"] = os.path.splitext(ntpath.basename(file_path))[0] metadata["Date of Test"] = datetime.strptime( - self._file_metadata['starttime'], - r'%d/%m/%Y %H:%M:%S' + self._file_metadata["starttime"], r"%d/%m/%Y %H:%M:%S" ) columns_with_data = { - 'amps': { - 'has_data': True, - 'is_numeric': True, - 'unit': 'A', + "amps": { + "has_data": True, + "is_numeric": True, + "unit": "A", }, - 'volts': { - 'has_data': True, - 'is_numeric': True, - 'unit': 'V', + "volts": { + "has_data": True, + "is_numeric": True, + "unit": "V", }, - 'test_time': { - 'has_data': True, - 'is_numeric': True, - 'unit': 's', + "test_time": { + "has_data": True, + "is_numeric": True, + "unit": "s", }, } @@ -309,6 +307,4 @@ def validate_file(self, file_path): with open(file_path, "rb") as f: line = f.readline() if not line.startswith(IDF_HEADER): - raise UnsupportedFileTypeError( - 'incorrect header - {}'.format(line) - ) + raise UnsupportedFileTypeError("incorrect header - {}".format(line)) diff --git a/src/galv_harvester/parse/maccor_input_file.py b/src/galv_harvester/parse/maccor_input_file.py index 5c08e0c..d017c75 100644 --- a/src/galv_harvester/parse/maccor_input_file.py +++ b/src/galv_harvester/parse/maccor_input_file.py @@ -10,17 +10,13 @@ import xlrd import maya from .input_file import InputFile -from .exceptions import ( - UnsupportedFileTypeError, - EmptyFileError, - InvalidDataInFileError -) +from .exceptions import UnsupportedFileTypeError, EmptyFileError, InvalidDataInFileError from ..settings import get_logger class MaccorInputFile(InputFile): """ - A class for handling input files + A class for handling input files """ def __init__(self, file_path, **kwargs): @@ -30,7 +26,7 @@ def __init__(self, file_path, **kwargs): def identify_columns(self, reader): """ - Identifies columns in a maccor csv or tsv file" + Identifies columns in a maccor csv or tsv file" """ headers = [header for header in next(reader) if header != ""] correct_number_of_columns = len(headers) @@ -90,20 +86,20 @@ def identify_columns(self, reader): # add unit info for known columns known_units = { - "Amp-hr": 'Amp-hr', - "Amps": 'Amps', - "Watt-hr": 'Watt-hr', - "StepTime": 's', - "Step (Sec)": 's', - "Volts": 'Volts', - "TestTime": 's', - "Test (Sec)": 's', - "Rec#": '', - "Temp 1": 'celsius', + "Amp-hr": "Amp-hr", + "Amps": "Amps", + "Watt-hr": "Watt-hr", + "StepTime": "s", + "Step (Sec)": "s", + "Volts": "Volts", + "TestTime": "s", + "Test (Sec)": "s", + "Rec#": "", + "Temp 1": "celsius", } for name, info in column_info.items(): if name in known_units: - column_info[name]['unit'] = known_units[name] + column_info[name]["unit"] = known_units[name] # account for 0 based indexing total_rows = row_idx + 1 @@ -120,7 +116,7 @@ def identify_columns(self, reader): def load_metadata(self): """ - Load metadata in a maccor csv or tsv file" + Load metadata in a maccor csv or tsv file" """ metadata = {} with open(self.file_path, "r") as csvfile: @@ -136,8 +132,7 @@ def load_metadata(self): ntpath.basename(self.file_path) )[0] metadata["Machine Type"] = "Maccor" - column_info, total_rows, first_rec, last_rec = \ - self.identify_columns(reader) + column_info, total_rows, first_rec, last_rec = self.identify_columns(reader) metadata["num_rows"] = total_rows metadata["first_sample_no"] = first_rec metadata["last_sample_no"] = last_rec @@ -146,10 +141,10 @@ def load_metadata(self): def load_data(self, file_path, columns): """ - Load data in a maccor csv or tsv file" + Load data in a maccor csv or tsv file" """ - with open(file_path, "r", encoding='utf-8-sig') as csvfile: + with open(file_path, "r", encoding="utf-8-sig") as csvfile: # get rid of metadata rows try: csvfile.readline() @@ -171,9 +166,7 @@ def load_data(self, file_path, columns): if column_name in columns: columns_of_interest.append(col_idx) for row_idx, row in enumerate(reader): - row = handle_recno( - row, correct_number_of_columns, recno_col, row_idx - ) + row = handle_recno(row, correct_number_of_columns, recno_col, row_idx) yield { column_names[col_idx]: row[col_idx] for col_idx in columns_of_interest @@ -308,7 +301,7 @@ def get_data_labels(self): ) def is_maccor_text_file(self, file_path, delimiter): - with open(file_path, "r", encoding='utf-8-sig') as f: + with open(file_path, "r", encoding="utf-8-sig") as f: try: line = f.readline() except UnicodeDecodeError as e: @@ -336,14 +329,11 @@ def is_maccor_text_file(self, file_path, delimiter): return True def validate_file(self, file_path): - if not ( - file_path.endswith(".csv") or - file_path.endswith(".txt") - ): + if not (file_path.endswith(".csv") or file_path.endswith(".txt")): raise UnsupportedFileTypeError self.delimiter = None - for delim in [',', '\t']: + for delim in [",", "\t"]: if self.is_maccor_text_file(file_path, delim): self.delimiter = delim if self.delimiter is None: @@ -352,7 +342,7 @@ def validate_file(self, file_path): class MaccorExcelInputFile(MaccorInputFile): """ - A class for handling input files + A class for handling input files """ def __init__(self, file_path): @@ -361,7 +351,7 @@ def __init__(self, file_path): def identify_columns(self, wbook): """ - Identifies columns in a maccor excel file" + Identifies columns in a maccor excel file" """ sheet = wbook.sheet_by_index(0) column_has_data = [False for col in range(0, sheet.ncols)] @@ -374,9 +364,7 @@ def identify_columns(self, wbook): headers_row = 0 for col in range(0, sheet.ncols): headers.append(sheet.cell_value(headers_row, col)) - is_numeric = isfloat( - sheet.cell_value(headers_row+1, col) - ) + is_numeric = isfloat(sheet.cell_value(headers_row + 1, col)) column_is_numeric.append(is_numeric) if is_numeric: numeric_columns.append(col) @@ -386,7 +374,7 @@ def identify_columns(self, wbook): self.logger.debug("numeric_columns: {}".format(numeric_columns)) try: recno_col = headers.index("Rec#") - first_rec = sheet.cell_value(headers_row+1, recno_col) + first_rec = sheet.cell_value(headers_row + 1, recno_col) except ValueError: # Don't have record numbers, make them up first_rec = 1 @@ -395,7 +383,7 @@ def identify_columns(self, wbook): self.logger.debug("Loading sheet... " + str(sheet_id)) sheet = wbook.sheet_by_index(sheet_id) total_rows += sheet.nrows - 1 - int(self._has_metadata_row) - for row in range(headers_row+1, sheet.nrows): + for row in range(headers_row + 1, sheet.nrows): for column in numeric_columns[:]: if float(sheet.cell_value(row, column)) != 0.0: column_has_data[column] = True @@ -412,7 +400,7 @@ def identify_columns(self, wbook): # sure if the last sheet actually will have data try: recno_col = headers.index("Rec#") - last_rec = sheet.cell_value(headers_row+1, row) + last_rec = sheet.cell_value(headers_row + 1, row) except ValueError: # Don't have record numbers, make them up last_rec = total_rows @@ -430,10 +418,9 @@ def identify_columns(self, wbook): self.logger.debug("Num rows {}".format(total_rows)) return column_info, total_rows, first_rec, last_rec - def load_data(self, file_path, - columns, column_renames=None): + def load_data(self, file_path, columns, column_renames=None): """ - Load metadata in a maccor excel file" + Load metadata in a maccor excel file" """ if self._has_metadata_row: headers_row = 1 @@ -459,7 +446,7 @@ def load_data(self, file_path, for sheet_id in range(0, wbook.nsheets): self.logger.debug("Loading sheet..." + str(sheet_id)) sheet = wbook.sheet_by_index(sheet_id) - for row in range(headers_row+1, sheet.nrows): + for row in range(headers_row + 1, sheet.nrows): yield { column_names[col_idx]: ( sheet.cell_value(row, col_idx) @@ -473,10 +460,10 @@ def load_data(self, file_path, def load_metadata(self): """ - Load metadata in a maccor excel file" + Load metadata in a maccor excel file" """ metadata = {} - metadata['Filename'] = self.file_path + metadata["Filename"] = self.file_path with xlrd.open_workbook( self.file_path, on_demand=True, logfile=LogFilter(self.logger) ) as wbook: @@ -527,8 +514,7 @@ def load_metadata(self): os.path.getctime(self.file_path) ) metadata["Machine Type"] = "Maccor" - column_info, total_rows, first_rec, last_rec = \ - self.identify_columns(wbook) + column_info, total_rows, first_rec, last_rec = self.identify_columns(wbook) metadata["num_rows"] = total_rows metadata["first_sample_no"] = first_rec metadata["last_sample_no"] = last_rec @@ -544,18 +530,18 @@ def validate_file(self, file_path): class MaccorRawInputFile(MaccorInputFile): """ - A class for handling input files + A class for handling input files """ def __init__(self, file_path): self.logger = get_logger(f"InputFile({file_path})") self.validate_file(file_path) super().__init__(file_path) - self.delimiter = '\t' + self.delimiter = "\t" def load_metadata(self): """ - Load metadata in a maccor raw file" + Load metadata in a maccor raw file" """ metadata = {} column_info = {} @@ -566,9 +552,7 @@ def load_metadata(self): metadata["Today's Date"] = maya.parse( first[0].split(" ")[2], year_first=False ).datetime() - metadata["Date of Test"] = maya.parse( - first[1], year_first=False - ).datetime() + metadata["Date of Test"] = maya.parse(first[1], year_first=False).datetime() metadata["Filename"] = first[3].split(" Procedure:")[0] metadata["Dataset Name"] = ntpath.basename(metadata["Filename"]) # Just shove everything in the misc_file_data for now rather than @@ -580,8 +564,7 @@ def load_metadata(self): # parse what we have and leave handling anything different to some # future person metadata["Machine Type"] = "Maccor" - column_info, total_rows, first_rec, last_rec = \ - self.identify_columns(reader) + column_info, total_rows, first_rec, last_rec = self.identify_columns(reader) metadata["num_rows"] = total_rows metadata["first_sample_no"] = first_rec metadata["last_sample_no"] = last_rec @@ -590,14 +573,14 @@ def load_metadata(self): return metadata, column_info def validate_file(self, file_path): - self.logger.debug('is_maccor_raw_file') - with open(file_path, "r", encoding='utf-8-sig') as f: + self.logger.debug("is_maccor_raw_file") + with open(file_path, "r", encoding="utf-8-sig") as f: try: - self.logger.debug('got line') + self.logger.debug("got line") line = f.readline() except UnicodeDecodeError as e: raise UnsupportedFileTypeError from e - self.logger.debug('got line', line) + self.logger.debug("got line", line) line_start = "Today's Date" if not line.startswith(line_start): raise UnsupportedFileTypeError @@ -656,7 +639,7 @@ def handle_recno(row, correct_number_of_columns, recno_col, row_idx): row = ( row[0:recno_col] + [(row[recno_col] + row[recno_col + 1]).replace(",", "")] - + row[recno_col + 2:] + + row[recno_col + 2 :] ) else: raise InvalidDataInFileError( @@ -672,13 +655,13 @@ def handle_recno(row, correct_number_of_columns, recno_col, row_idx): def clean_key(key): """ - Unescapes and removes trailing characters on strings + Unescapes and removes trailing characters on strings """ return key.replace("''", "'").strip().rstrip(":") def clean_value(value): """ - Trims values + Trims values """ return value.replace("''", "'").strip().rstrip("\0").strip() diff --git a/src/galv_harvester/plugins.py b/src/galv_harvester/plugins.py new file mode 100644 index 0000000..d0ffed9 --- /dev/null +++ b/src/galv_harvester/plugins.py @@ -0,0 +1,54 @@ +import importlib +import pkgutil + +from src.galv_harvester.parse.input_file import InputFile +from .settings import get_logger + + +logger = get_logger(__file__) +_cached_parsers = None +_cached_plugins = {} + + +def _get_parsers(plugin, check=True): + parsers = getattr(plugin, "parsers", None) + ok_parsers = [] + if not isinstance(parsers, list): + if check: + logger.error(f"Plugin {plugin.__name__} does not have a 'parsers' list") + else: + for p in parsers: + if not isinstance(p, InputFile): + if check: + logger.error( + f"Plugin {plugin.__name__} has a non-InputFile parser: {p}" + ) + else: + ok_parsers.append(p) + return ok_parsers + + +def get_parsers(from_cache=True): + global _cached_parsers + global _cached_plugins + + if not from_cache or not _cached_parsers: + logger.info("Searching for new plugins") + parsers = [] + discovered_plugins = { + name: importlib.import_module(name) + for finder, name, ispkg in pkgutil.iter_modules() + if name.startswith("galv_harvester_") + } + for name, plugin in discovered_plugins.items(): + if name not in _cached_plugins: + _cached_plugins[name] = plugin + plugin_parsers = _get_parsers(plugin) + logger.info( + f"Discovered plugin: {name} with {len(plugin_parsers)} parsers" + ) + parsers.extend(plugin_parsers) + + _cached_parsers = parsers + + return _cached_parsers diff --git a/src/galv_harvester/run.py b/src/galv_harvester/run.py index d5eab17..10c57c1 100644 --- a/src/galv_harvester/run.py +++ b/src/galv_harvester/run.py @@ -7,6 +7,7 @@ import time import traceback +from src.galv_harvester.plugins import get_parsers from .parse.exceptions import UnsupportedFileTypeError from .settings import ( get_logger, @@ -15,7 +16,7 @@ HARVESTER_TASK_IMPORT, HARVEST_STAGE_COMPLETE, HARVESTER_TASK_IMPORT, - HARVEST_STAGE_FAILED + HARVEST_STAGE_FAILED, ) from .api import report_harvest_result, update_config from .harvest import HarvestProcessor @@ -34,7 +35,7 @@ def split_path(core_path, path) -> (os.PathLike, os.PathLike): def harvest(): logger.info("Beginning harvest cycle") - paths = get_setting('monitored_paths') + paths = get_setting("monitored_paths") if not paths: logger.info("No paths are being monitored.") return @@ -42,19 +43,21 @@ def harvest(): logger.debug(paths) for path in paths: - if path.get('active'): + if path.get("active"): harvest_path(path) else: - logger.info(f"Skipping inactive path {path.get('path')} {path.get('regex')}") + logger.info( + f"Skipping inactive path {path.get('path')} {path.get('regex')}" + ) def harvest_file(file_path, monitored_path: dict, compiled_regex: re.Pattern = None): - if os.path.basename(file_path).startswith('.'): + if os.path.basename(file_path).startswith("."): logger.debug(f"Skipping hidden file {file_path}") return if compiled_regex is None: - regex_str = monitored_path.get('regex') + regex_str = monitored_path.get("regex") if regex_str is not None: regex = re.compile(regex_str) else: @@ -62,7 +65,7 @@ def harvest_file(file_path, monitored_path: dict, compiled_regex: re.Pattern = N else: regex = compiled_regex - path = monitored_path.get('path') + path = monitored_path.get("path") _, rel_path = split_path(path, file_path) if regex is not None and not regex.search(rel_path): @@ -77,76 +80,78 @@ def harvest_file(file_path, monitored_path: dict, compiled_regex: re.Pattern = N logger.info(f"Reporting stats for {rel_path}") result = report_harvest_result( path=file_path, - monitored_path_id=monitored_path.get('id'), + monitored_path_id=monitored_path.get("id"), content={ - 'task': HARVESTER_TASK_FILE_SIZE, - 'size': os.stat(file_path).st_size - } + "task": HARVESTER_TASK_FILE_SIZE, + "size": os.stat(file_path).st_size, + }, ) if result is not None: result = result.json() - status = result['state'] + status = result["state"] logger.info(f"Server assigned status '{status}'") - if status in ['STABLE', 'RETRY IMPORT', 'MAP ASSIGNED', 'AWAITING STORAGE']: + if status in ["STABLE", "RETRY IMPORT", "MAP ASSIGNED", "AWAITING STORAGE"]: logger.info(f"Parsing file {rel_path}") try: file.harvest() report_harvest_result( path=file_path, - monitored_path_id=monitored_path.get('id'), + monitored_path_id=monitored_path.get("id"), content={ - 'task': HARVESTER_TASK_IMPORT, - 'stage': HARVEST_STAGE_COMPLETE - } + "task": HARVESTER_TASK_IMPORT, + "stage": HARVEST_STAGE_COMPLETE, + }, ) logger.info(f"Successfully parsed file {rel_path}") except BaseException as e: - logger.warning(f"FAILED parsing file {rel_path}: {e.__class__.__name__}: {e}") + logger.warning( + f"FAILED parsing file {rel_path}: {e.__class__.__name__}: {e}" + ) if e.__traceback__ is not None: logger.warning(traceback.format_exc()) report_harvest_result( path=file_path, - monitored_path_id=monitored_path.get('id'), + monitored_path_id=monitored_path.get("id"), content={ - 'task': HARVESTER_TASK_IMPORT, - 'stage': HARVEST_STAGE_FAILED, - 'error': f"Error in Harvester. {e.__class__.__name__}: {e}. [See harvester logs for more details]" - } + "task": HARVESTER_TASK_IMPORT, + "stage": HARVEST_STAGE_FAILED, + "error": f"Error in Harvester. {e.__class__.__name__}: {e}. [See harvester logs for more details]", + }, ) except BaseException as e: logger.error(f"{e.__class__.__name__}: {e}") report_harvest_result( - path=file_path, - monitored_path_id=monitored_path.get('id'), - error=e + path=file_path, monitored_path_id=monitored_path.get("id"), error=e ) - def harvest_path(monitored_path: dict): - path = monitored_path.get('path') - regex_str = monitored_path.get('regex') + path = monitored_path.get("path") + regex_str = monitored_path.get("regex") if regex_str is not None: logger.info(f"Harvesting from {path} with regex {regex_str}") else: logger.info(f"Harvesting from {path}") try: regex = re.compile(regex_str) if regex_str is not None else None - for (dir_path, dir_names, filenames) in os.walk(path): + for dir_path, dir_names, filenames in os.walk(path): for filename in filenames: - harvest_file(os.path.join(dir_path, filename), monitored_path, compiled_regex=regex) + harvest_file( + os.path.join(dir_path, filename), + monitored_path, + compiled_regex=regex, + ) logger.info(f"Completed directory walking of {path}") except BaseException as e: logger.error(f"{e.__class__.__name__}: {e}") report_harvest_result( - monitored_path_id=monitored_path.get('id'), - error=e, - path=path + monitored_path_id=monitored_path.get("id"), error=e, path=path ) def run(): update_config() + get_parsers(from_cache=False) harvest() @@ -158,7 +163,7 @@ def run_cycle(): except BaseException as e: logger.error(f"{e.__class__.__name__}: {e}") try: - sleep_time = get_setting('sleep_time') + sleep_time = get_setting("sleep_time") except BaseException as e: logger.error(f"{e.__class__.__name__}: {e}") time.sleep(sleep_time) diff --git a/src/galv_harvester/settings.py b/src/galv_harvester/settings.py index 3f77fbe..850bedb 100644 --- a/src/galv_harvester/settings.py +++ b/src/galv_harvester/settings.py @@ -8,19 +8,37 @@ import logging import logging.handlers +from click import get_current_context + logging.basicConfig( - format='%(asctime)s %(levelname)s %(message)s [%(name)s:%(lineno)d]', + format="%(asctime)s %(levelname)s %(message)s [%(name)s:%(lineno)d]", level=logging.INFO, - datefmt='%Y-%m-%d %H:%M:%S' + datefmt="%Y-%m-%d %H:%M:%S", ) def get_logfile() -> pathlib.Path: - return pathlib.Path(os.getenv('GALV_HARVESTER_LOG_FILE', "./.harvester/harvester.log")) + return pathlib.Path( + os.getenv("GALV_HARVESTER_LOG_FILE", "./.harvester/harvester.log") + ) + + +LOGGER = None def get_logger(name): - logger = logging.getLogger(name) + global LOGGER + if LOGGER: + return LOGGER + + debug = False + try: + if get_current_context(True).obj.get("verbose"): + debug = True + except AttributeError: + pass + + LOGGER = logging.getLogger(name) # stream_handler = logging.StreamHandler(sys.stdout) # stream_handler.setLevel(logging.INFO) # logger.addHandler(stream_handler) @@ -31,24 +49,24 @@ def get_logger(name): file_handler = logging.handlers.RotatingFileHandler( get_logfile(), maxBytes=5_000_000, backupCount=5 ) - file_handler.setLevel( - logging.INFO if not get_current_context().obj.get("verbose") else logging.DEBUG - ) + file_handler.setLevel(logging.DEBUG if debug else logging.INFO) file_handler.setFormatter(formatter) - logger.addHandler(file_handler) - return logger + LOGGER.addHandler(file_handler) + return LOGGER logger = get_logger(__file__) def get_settings_file() -> pathlib.Path: - return pathlib.Path(os.getenv('GALV_HARVESTER_SETTINGS_FILE', "./.harvester/settings.json")) + return pathlib.Path( + os.getenv("GALV_HARVESTER_SETTINGS_FILE", "./.harvester/settings.json") + ) def get_settings(): try: - with open(get_settings_file(), 'r') as f: + with open(get_settings_file(), "r") as f: try: return json.load(f) except json.JSONDecodeError as e: @@ -56,7 +74,7 @@ def get_settings(): f.seek(0) logger.error(f.readlines()) except FileNotFoundError: - logger.error(f'No config file at {get_settings_file()}') + logger.error(f"No config file at {get_settings_file()}") return None @@ -72,28 +90,29 @@ def get_setting(*args): def update_envvars(): - envvars = get_setting('environment_variables') or {} + envvars = get_setting("environment_variables") or {} for k, v in envvars.items(): old = os.getenv(k) os.environ[k] = v if old != v: logger.info(f"Update envvar {k} from '{old}' to '{v}'") - delvars = get_setting('deleted_environment_variables') or {} + delvars = get_setting("deleted_environment_variables") or {} for k in delvars: old = os.getenv(k) if old is not None: logger.info(f"Unsetting envvar {k} (previous value: {old})") os.unsetenv(k) + # These definitions should be kept in sync with the definitions in the backend -HARVESTER_TASK_FILE_SIZE = 'file_size' -HARVESTER_TASK_IMPORT = 'import' -HARVESTER_STATUS_SUCCESS = 'success' -HARVESTER_STATUS_ERROR = 'error' -HARVEST_STAGE_FILE_METADATA = 'file metadata' -HARVEST_STAGE_DATA_SUMMARY = 'data summary' -HARVEST_STAGE_UPLOAD_PARQUET = 'upload parquet partitions' -HARVEST_STAGE_UPLOAD_COMPLETE = 'upload complete' -HARVEST_STAGE_UPLOAD_PNG = 'upload png' -HARVEST_STAGE_COMPLETE = 'harvest complete' -HARVEST_STAGE_FAILED = 'harvest failed' \ No newline at end of file +HARVESTER_TASK_FILE_SIZE = "file_size" +HARVESTER_TASK_IMPORT = "import" +HARVESTER_STATUS_SUCCESS = "success" +HARVESTER_STATUS_ERROR = "error" +HARVEST_STAGE_FILE_METADATA = "file metadata" +HARVEST_STAGE_DATA_SUMMARY = "data summary" +HARVEST_STAGE_UPLOAD_PARQUET = "upload parquet partitions" +HARVEST_STAGE_UPLOAD_COMPLETE = "upload complete" +HARVEST_STAGE_UPLOAD_PNG = "upload png" +HARVEST_STAGE_COMPLETE = "harvest complete" +HARVEST_STAGE_FAILED = "harvest failed" diff --git a/src/galv_harvester/start.py b/src/galv_harvester/start.py index 9400b26..57839ad 100644 --- a/src/galv_harvester/start.py +++ b/src/galv_harvester/start.py @@ -2,6 +2,7 @@ # Copyright (c) 2020-2023, The Chancellor, Masters and Scholars of the University # of Oxford, and the 'Galv' Developers. All rights reserved. import json +import logging import os.path import re import subprocess @@ -10,10 +11,18 @@ import click import requests +from src.galv_harvester.plugins import get_parsers +from src.galv_harvester.settings import logger from . import run, settings, api -def query(url: str, data: object = None, retries: int = 5, sleep_seconds: float = 3.0, **kwargs): +def query( + url: str, + data: object = None, + retries: int = 5, + sleep_seconds: float = 3.0, + **kwargs, +): while retries > 0: try: if data is None: @@ -34,7 +43,9 @@ def query(url: str, data: object = None, retries: int = 5, sleep_seconds: float if retries == 0: raise e else: - click.echo(f"Retrying in {sleep_seconds}s ({retries} remaining)", err=True) + click.echo( + f"Retrying in {sleep_seconds}s ({retries} remaining)", err=True + ) time.sleep(sleep_seconds) @@ -55,41 +66,56 @@ def get_url() -> str: def create_monitored_path( - api_url, api_token, harvester_id, specified, - team_id, monitor_path, monitor_path_regex + api_url, + api_token, + harvester_id, + specified, + team_id, + monitor_path, + monitor_path_regex, ) -> None: # TODO: Ensure that the team is a member of the harvester's lab - click.echo("The harvester will monitor a path on the server for changes and upload files.") - click.echo("You must be a Team administrator to create a monitored path. " - "Note that Lab administrators are not necessarily Team administrators.") + click.echo( + "The harvester will monitor a path on the server for changes and upload files." + ) + click.echo( + "You must be a Team administrator to create a monitored path. " + "Note that Lab administrators are not necessarily Team administrators." + ) def monitored_path_exit(error: str): - click.echo('Harvester successfully created, but the monitored path could not be set.') + click.echo( + "Harvester successfully created, but the monitored path could not be set." + ) click.echo(f"Error: {error}", err=True) - click.echo('Please go to the frontend to set a monitored path.') - click.echo('') + click.echo("Please go to the frontend to set a monitored path.") + click.echo("") # To create monitored paths, we must be a team administrator teams_administered = [] try: - teams = query(f"{api_url}teams/", headers={'Authorization': f"Bearer {api_token}"}) - teams_administered = [t for t in teams['results'] if t['permissions']['write']] + teams = query( + f"{api_url}teams/", headers={"Authorization": f"Bearer {api_token}"} + ) + teams_administered = [t for t in teams["results"] if t["permissions"]["write"]] except BaseException as e: return monitored_path_exit(f"Unable to retrieve team list using API key -- {e}") # Check team okay - if team_id is not None and team_id not in [t['id'] for t in teams_administered]: + if team_id is not None and team_id not in [t["id"] for t in teams_administered]: return monitored_path_exit(f"Team {team_id} is not administered by this user.") page = 0 page_size = 10 while team_id is None: if len(teams_administered) == 1: - team_id = teams_administered[0]['id'] + team_id = teams_administered[0]["id"] break elif specified: - return monitored_path_exit('You administrate multiple teams and no team is specified with --team_id.') - teams = teams_administered[page:page + page_size] + return monitored_path_exit( + "You administrate multiple teams and no team is specified with --team_id." + ) + teams = teams_administered[page : page + page_size] has_prev = page != 0 has_next = len(teams_administered) > ((page + 1) * page_size) click.echo("Press a number for the Team that will own this Monitored Path.") @@ -124,13 +150,15 @@ def monitored_path_exit(error: str): except AssertionError: click.echo(f"{input_char} is not an available option") - team_id = teams[input_char]['id'] + team_id = teams[input_char]["id"] - team = [t for t in teams_administered if t['id'] == team_id][0] + team = [t for t in teams_administered if t["id"] == team_id][0] # Check path okay if monitor_path is None: - click.echo("Enter a directory on the server to monitor, or leave blank to skip this step.") + click.echo( + "Enter a directory on the server to monitor, or leave blank to skip this step." + ) while True: monitor_path = input("Path: ") if monitor_path == "": @@ -162,31 +190,37 @@ def monitored_path_exit(error: str): break if monitor_path is not None: - regex_str = f" with regex {monitor_path_regex}" if monitor_path_regex is not None else "" + regex_str = ( + f" with regex {monitor_path_regex}" + if monitor_path_regex is not None + else "" + ) click.echo(f"Setting monitor path to {monitor_path}{regex_str}") try: query( f"{api_url}monitored_paths/", { - 'path': monitor_path, - 'regex': monitor_path_regex, - 'harvester': harvester_id, - 'team': team['id'], - 'active': True - }, - headers={ - 'Authorization': f"Bearer {api_token}" + "path": monitor_path, + "regex": monitor_path_regex, + "harvester": harvester_id, + "team": team["id"], + "active": True, }, + headers={"Authorization": f"Bearer {api_token}"}, ) except BaseException as e: return monitored_path_exit(f"Unable to set monitored path -- {e}") def register( - url: str = None, name: str = None, api_token: str = None, - lab_id: int = None, team_id: int = None, - monitor_path: str = None, monitor_path_regex: str = ".*", - foreground: bool = None + url: str = None, + name: str = None, + api_token: str = None, + lab_id: int = None, + team_id: int = None, + monitor_path: str = None, + monitor_path_regex: str = ".*", + foreground: bool = None, ): """ Guide a user through the setup process. @@ -194,12 +228,12 @@ def register( Specifying any of the config args (url, name, api_token, lab_id, monitor_path, monitor_path_regex) function avoid all calls to input() making it non-interactive. """ specified = ( - url is not None or - name is not None or - api_token is not None or - lab_id is not None or - monitor_path is not None or - os.getenv("GALV_HARVESTER_SKIP_WIZARD", False) + url is not None + or name is not None + or api_token is not None + or lab_id is not None + or monitor_path is not None + or os.getenv("GALV_HARVESTER_SKIP_WIZARD", False) ) # Load from environment variables if not specified url = url or os.getenv("GALV_HARVESTER_SERVER_URL") @@ -208,7 +242,9 @@ def register( lab_id = lab_id or os.getenv("GALV_HARVESTER_LAB_ID") team_id = team_id or os.getenv("GALV_HARVESTER_TEAM_ID") monitor_path = monitor_path or os.getenv("GALV_HARVESTER_MONITOR_PATH") - monitor_path_regex = monitor_path_regex or os.getenv("GALV_HARVESTER_MONITOR_PATH_REGEX") + monitor_path_regex = monitor_path_regex or os.getenv( + "GALV_HARVESTER_MONITOR_PATH_REGEX" + ) foreground = foreground or os.getenv("GALV_HARVESTER_FOREGROUND", False) # Check we can connect to the API @@ -231,21 +267,28 @@ def register( if not specified: api_token = input("Enter your API token: ") try: - labs = query(f"{url}labs/", headers={'Authorization': f"Bearer {api_token}"}) - labs_administered = [l for l in labs['results'] if l['permissions']['write']] + labs = query( + f"{url}labs/", headers={"Authorization": f"Bearer {api_token}"} + ) + labs_administered = [ + l for l in labs["results"] if l["permissions"]["write"] + ] except BaseException as e: click.echo(f"Unable to retrieve lab list using API key -- {e}", err=True) if specified: exit(1) if len(labs_administered) == 0: - click.echo("This user does not administer any labs. Please try an API key from a different user.", err=True) + click.echo( + "This user does not administer any labs. Please try an API key from a different user.", + err=True, + ) if specified: exit(1) continue break # Check lab okay - if lab_id is not None and lab_id not in [l['id'] for l in labs_administered]: + if lab_id is not None and lab_id not in [l["id"] for l in labs_administered]: click.echo(f"Lab {lab_id} is not administered by this user.", err=True) exit(1) @@ -253,12 +296,15 @@ def register( page_size = 10 while lab_id is None: if len(labs_administered) == 1: - lab_id = labs_administered[0]['id'] + lab_id = labs_administered[0]["id"] break elif specified: - click.echo('You administrate multiple labs. Please specify a lab using --lab_id.', err=True) + click.echo( + "You administrate multiple labs. Please specify a lab using --lab_id.", + err=True, + ) exit(1) - labs = labs_administered[page:page + page_size] + labs = labs_administered[page : page + page_size] has_prev = page != 0 has_next = len(labs_administered) > ((page + 1) * page_size) click.echo("Press a number for the Lab that will own this Harvester.") @@ -293,9 +339,9 @@ def register( except AssertionError: click.echo(f"{input_char} is not an available option") - lab_id = labs[input_char]['id'] + lab_id = labs[input_char]["id"] - lab = [l for l in labs_administered if l['id'] == lab_id][0] + lab = [l for l in labs_administered if l["id"] == lab_id][0] # Check name okay if name is None: @@ -307,7 +353,7 @@ def register( continue result = query(f"{url}harvesters/?name={name}&lab__id={lab_id}") - if result['count'] > 0: + if result["count"] > 0: click.echo(f"This Lab already has a harvester called {name}.", err=True) if specified: exit(1) @@ -320,13 +366,13 @@ def register( click.echo(f"Registering new harvester {name} to Lab {lab['name']}") result = query( f"{url}harvesters/", - {'lab': lab['url'], 'name': name}, - headers={'Authorization': f"Bearer {api_token}"} + {"lab": lab["url"], "name": name}, + headers={"Authorization": f"Bearer {api_token}"}, ) # Save credentials file_name = settings.get_settings_file() - with open(file_name, 'w+') as f: + with open(file_name, "w+") as f: json.dump(result, f) click.echo("Details:") click.echo(json.dumps(result)) @@ -337,8 +383,13 @@ def register( if monitor_path is not None or not specified: create_monitored_path( - api_url=url, api_token=api_token, harvester_id=result['id'], specified=specified, - team_id=team_id, monitor_path=monitor_path, monitor_path_regex=monitor_path_regex + api_url=url, + api_token=api_token, + harvester_id=result["id"], + specified=specified, + team_id=team_id, + monitor_path=monitor_path, + monitor_path_regex=monitor_path_regex, ) click.echo( @@ -371,10 +422,11 @@ def click_wrapper(): ) @click.option("-v", "--verbose", is_flag=True, help="Enables verbose mode") @click.argument("paths", type=str, nargs=-1, required=False) -def harvest(paths): +def harvest(verbose: bool, paths): # Check we can access settings try: settings.get_settings() + logger.setLevel(logging.DEBUG if verbose else logging.INFO) sync() current_settings = settings.get_settings() except FileNotFoundError: @@ -390,15 +442,18 @@ def harvest(paths): click.echo(f"Path {path} does not exist on the filesystem.") exit(1) # Check the path is on a monitored path - monitored_paths = current_settings.get('monitored_paths', []) + monitored_paths = current_settings.get("monitored_paths", []) monitored_path = None for mp in monitored_paths: # A match is where the path is a substring of the monitored path - if not path.lower().startswith(mp['path'].lower()): + if not path.lower().startswith(mp["path"].lower()): continue # and the regex matches (for files) if os.path.isfile(path): - if re.search(mp['regex'], os.path.relpath(path, mp['path'])) is not None: + if ( + re.search(mp["regex"], os.path.relpath(path, mp["path"])) + is not None + ): monitored_path = mp break elif os.path.isdir(path): @@ -434,11 +489,12 @@ def harvest(paths): "(will not close the thread, useful for Dockerized application)." ), ) -def start(foreground: bool): +def start(verbose: bool, foreground: bool): foreground = foreground or os.getenv("GALV_HARVESTER_FOREGROUND", True) click.echo("Attempting to start harvester.") # Check whether a config file already exists, if so, use it current_settings = settings.get_settings() + logger.setLevel(logging.DEBUG if verbose else logging.INFO) if current_settings: sync() click.echo( @@ -487,6 +543,7 @@ def start(foreground: bool): help="Regex to match files to harvest. Other options can be specified using the frontend.", ) def setup( + verbose: bool, url: str, name: str, api_token: str, @@ -496,6 +553,7 @@ def setup( monitor_path_regex: str, ): click.echo("Welcome to Harvester setup.") + logger.setLevel(logging.DEBUG if verbose else logging.INFO) register( url=url, name=name,