Skip to content

Commit

Permalink
WIP: try to catch racing conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
mpvanderschelling committed Feb 6, 2025
1 parent d644bde commit 8aceeea
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/f3dasm/_src/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
JOBS_FILENAME = "jobs"

RESOLUTION_MATPLOTLIB_FIGURE = 300
MAX_TRIES = 10
MAX_TRIES = 20

# Storing methods
# =============================================================================
Expand Down
10 changes: 7 additions & 3 deletions src/f3dasm/_src/design/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from omegaconf import DictConfig, OmegaConf

# Local
from ..errors import EmptyFileError
from .parameter import (CategoricalParameter, CategoricalType,
ConstantParameter, ContinuousParameter,
DiscreteParameter, LoadFunction, Parameter,
Expand Down Expand Up @@ -193,13 +194,16 @@ def from_file(cls: Type[Domain], filename: Path | str) -> Domain:
>>> domain = Domain.from_json('domain.json')
"""
# convert filename to Path object
filename = Path(filename)
filename = Path(filename).with_suffix('.json')

# Check if filename exists
if not filename.with_suffix('.json').exists():
if not filename.exists():
raise FileNotFoundError(f"Domain file {filename} does not exist.")

with open(filename.with_suffix('.json'), 'r') as f:
if filename.stat().st_size == 0:
raise EmptyFileError(filename)

with open(filename, 'r') as f:
domain_dict = json.load(f)

input_space = {k: Parameter.from_dict(
Expand Down
19 changes: 19 additions & 0 deletions src/f3dasm/_src/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import annotations

from pathlib import Path


class EmptyFileError(Exception):
"""Exception raised when a file exists but is empty."""

def __init__(self, file_path: str | Path, message: str = "File is empty"):
"""
Initializes the EmptyFileError.
Args:
file_path (str | Path): The path to the empty file.
message (str): A custom error message.
"""
self.file_path = Path(file_path) # Ensure it's a Path object
self.message = f"{message}: {self.file_path}"
super().__init__(self.message)
44 changes: 27 additions & 17 deletions src/f3dasm/_src/experimentdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# Standard
import functools
import random
from collections import defaultdict
from copy import copy
from functools import partial
Expand All @@ -35,6 +36,7 @@
from .core import Block, DataGenerator
from .datageneration import _datagenerator_factory
from .design import Domain, _domain_factory, _sampler_factory
from .errors import EmptyFileError
from .experimentsample import ExperimentSample
from .logger import logger
from .optimization import _optimizer_factory
Expand Down Expand Up @@ -299,35 +301,27 @@ def wrapper_func(project_dir: Path, *args, **kwargs) -> None:
with lock:
tries = 0
while tries < MAX_TRIES:
# try:
# print(f"{args=}, {kwargs=}")
# self = ExperimentData.from_file(project_dir)
# value = operation(*args, **kwargs)
# self.store()
# break
try:
# Load a fresh instance of ExperimentData from file
loaded_self = ExperimentData.from_file(
self.project_dir)

# Call the operation with the loaded instance
# Replace the self in args with the loaded instance
# Modify the first argument
args = (loaded_self,) + args[1:]
value = operation(*args, **kwargs)
loaded_self.store()
break

# Racing conditions can occur when the file is empty
# and the file is being read at the same time
except pd.errors.EmptyDataError:
except EmptyFileError:
tries += 1
logger.debug((
f"EmptyDataError occurred, retrying"
f" {tries+1}/{MAX_TRIES}"))
sleep(1)
sleep(random.uniform(0.5, 2.5))

raise pd.errors.EmptyDataError()
raise EmptyFileError(self.project_dir)

return value

Expand Down Expand Up @@ -1662,9 +1656,17 @@ def _dict_factory(data: pd.DataFrame | List[Dict[str, Any]] | None | Path | str
return []

elif isinstance(data, (Path, str)):
return _dict_factory(pd.read_csv(
Path(data).with_suffix('.csv'),
header=0, index_col=0))
filepath = Path(data).with_suffix('.csv')

if not filepath.exists():
raise FileNotFoundError(f"File {filepath} not found")

if filepath.stat().st_size == 0:
raise EmptyFileError(filepath)

df = pd.read_csv(filepath, header=0, index_col=0)

return _dict_factory(df)

# check if data is already a list of dicts
elif isinstance(data, list) and all(isinstance(d, dict) for d in data):
Expand Down Expand Up @@ -1741,9 +1743,17 @@ def jobs_factory(jobs: pd.Series | str | Path | None) -> pd.Series:
return pd.Series()

elif isinstance(jobs, (Path, str)):
df = pd.read_csv(
Path(jobs).with_suffix('.csv'),
header=0, index_col=0).squeeze()

filepath = Path(jobs).with_suffix('.csv')

if not filepath.exists():
raise FileNotFoundError(f"File {filepath} not found")

if filepath.stat().st_size == 0:
raise EmptyFileError(filepath)

df = pd.read_csv(filepath,
header=0, index_col=0).squeeze()
# If the jobs is jut one value, it is parsed as a string
# So, make sure that we return a pd.Series either way!
if not isinstance(df, pd.Series):
Expand Down
23 changes: 16 additions & 7 deletions tests/experimentdata/test_experimentdata2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
from unittest.mock import MagicMock, patch

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -245,10 +246,14 @@ def mock_domain_from_file(path, *args, **kwargs):

monkeypatch.setattr(pd, 'read_csv', mock_read_csv)
monkeypatch.setattr(Domain, 'from_file', mock_domain_from_file)
mock_stat = MagicMock()
mock_stat.st_size = 10 # Non Empty file

experiment_data = ExperimentData(domain=domain, input_data=input_data,
output_data=output_data, jobs=jobs,
project_dir=project_dir)
with patch.object(Path, "exists", return_value=True), \
patch.object(Path, "stat", return_value=mock_stat):
experiment_data = ExperimentData(domain=domain, input_data=input_data,
output_data=output_data, jobs=jobs,
project_dir=project_dir)

if domain is None:
experiment_data.domain = edata_domain_with_output()
Expand Down Expand Up @@ -280,10 +285,14 @@ def mock_domain_from_file(path, *args, **kwargs):

monkeypatch.setattr(pd, 'read_csv', mock_read_csv)
monkeypatch.setattr(Domain, 'from_file', mock_domain_from_file)

experiment_data = ExperimentData(domain=domain, input_data=input_data,
jobs=jobs,
project_dir=project_dir)
mock_stat = MagicMock()
mock_stat.st_size = 10 # Non Empty file

with patch.object(Path, "exists", return_value=True), \
patch.object(Path, "stat", return_value=mock_stat):
experiment_data = ExperimentData(domain=domain, input_data=input_data,
jobs=jobs,
project_dir=project_dir)

if domain is None:
experiment_data.domain = edata_domain_without_output()
Expand Down

0 comments on commit 8aceeea

Please sign in to comment.