Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize sensor search #962

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
10 changes: 10 additions & 0 deletions documentation/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ Whether to allow overwriting existing data when saving data to the database.
Default: ``False``


.. _parallel-processes:

FLEXMEASURES_PARALLEL_PROCESSES
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Number of processes to use to parallelize the search for sensor data in the database.

Default: ``4``


.. _solver-config:

FLEXMEASURES_LP_SOLVER
Expand Down
5 changes: 4 additions & 1 deletion flexmeasures/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,8 @@ def create_test_battery_assets(
)
db.session.add(test_battery_sensor_small)

db.session.flush()
# Commit session so other (parallel search) sessions can find the data set up by this fixture
db.session.commit()
return {
"Test building": test_building,
"Test battery": test_battery,
Expand Down Expand Up @@ -1376,6 +1377,8 @@ def soc_sensors(db, add_battery_assets, setup_sources) -> tuple:
source=setup_sources["Seita"],
)

# Commit session so other (parallel search) sessions can find the data set up by this fixture
db.session.commit()
yield soc_maxima, soc_minima, soc_targets, values


Expand Down
32 changes: 16 additions & 16 deletions flexmeasures/data/models/generic_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,24 +645,24 @@ def search_beliefs(
:param resolution: optionally set the resolution of data being displayed
:returns: dictionary of BeliefsDataFrames or JSON string (if as_json is True)
"""
bdf_dict = {}
from flexmeasures.data.models.time_series import TimedBelief

if sensors is None:
sensors = self.sensors

for sensor in sensors:
bdf_dict[sensor] = sensor.search_beliefs(
event_starts_after=event_starts_after,
event_ends_before=event_ends_before,
beliefs_after=beliefs_after,
beliefs_before=beliefs_before,
horizons_at_least=horizons_at_least,
horizons_at_most=horizons_at_most,
source=source,
most_recent_beliefs_only=most_recent_beliefs_only,
most_recent_events_only=most_recent_events_only,
one_deterministic_belief_per_event_per_source=True,
resolution=resolution,
)
bdf_dict = TimedBelief.search(
sensors=sensors,
event_starts_after=event_starts_after,
event_ends_before=event_ends_before,
beliefs_after=beliefs_after,
beliefs_before=beliefs_before,
horizons_at_least=horizons_at_least,
horizons_at_most=horizons_at_most,
source=source,
most_recent_beliefs_only=most_recent_beliefs_only,
most_recent_events_only=most_recent_events_only,
one_deterministic_belief_per_event_per_source=True,
sum_multiple=False,
)
if as_json:
from flexmeasures.data.services.time_series import simplify_index

Expand Down
13 changes: 13 additions & 0 deletions flexmeasures/data/models/planning/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def flexible_devices(db, building) -> dict[str, Sensor]:
unit="MW",
)
db.session.add(battery_sensor)

# Commit session so other (parallel search) sessions can find the data set up by this fixture
db.session.commit()
return {
battery_sensor.name: battery_sensor,
}
Expand Down Expand Up @@ -204,6 +207,8 @@ def add_inflexible_device_forecasts(
setup_sources["Seita"],
)

# Commit session so other (parallel search) sessions can find the data set up by this fixture
db.session.commit()
return {
pv_sensor: pv_values,
residual_demand_sensor: residual_demand_values,
Expand Down Expand Up @@ -261,6 +266,8 @@ def efficiency_sensors(db, add_battery_assets, setup_sources) -> dict[str, Senso
)
sensors[name] = efficiency_sensor

# Commit session so other (parallel search) sessions can find the data set up by this fixture
db.session.commit()
return sensors


Expand Down Expand Up @@ -314,6 +321,8 @@ def add_stock_delta(db, add_battery_assets, setup_sources) -> dict[str, Sensor]:
)
sensors[name] = stock_delta_sensor

# Commit session so other (parallel search) sessions can find the data set up by this fixture
db.session.commit()
return sensors


Expand Down Expand Up @@ -372,6 +381,8 @@ def add_storage_efficiency(db, add_battery_assets, setup_sources) -> dict[str, S
)
sensors[name] = storage_efficiency_sensor

# Commit session so other (parallel search) sessions can find the data set up by this fixture
db.session.commit()
return sensors


Expand Down Expand Up @@ -417,6 +428,8 @@ def add_soc_targets(db, add_battery_assets, setup_sources) -> dict[str, Sensor]:
db.session.add(belief)
sensors[name] = storage_constraint_sensor

# Commit session so other (parallel search) sessions can find the data set up by this fixture
db.session.commit()
return sensors


Expand Down
154 changes: 98 additions & 56 deletions flexmeasures/data/models/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
from datetime import datetime as datetime_type, timedelta
import json
from flask import current_app
import multiprocessing

import pandas as pd
from sqlalchemy import select
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import DetachedInstanceError
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy import inspect
from sqlalchemy import inspect, create_engine
import timely_beliefs as tb
from timely_beliefs.beliefs.probabilistic_utils import get_median_belief
import timely_beliefs.utils as tb_utils
Expand Down Expand Up @@ -483,11 +486,14 @@ def __str__(self) -> str:
return self.name

def to_dict(self) -> dict:
return dict(
id=self.id,
name=self.name,
description=f"{self.name} ({self.generic_asset.name})",
)
try:
return dict(
id=self.id,
name=self.name,
description=f"{self.name} ({self.generic_asset.name})",
)
except DetachedInstanceError:
return Sensor.query.get(self.id).to_dict()

@classmethod
def find_closest(
Expand Down Expand Up @@ -704,57 +710,35 @@ def search(
)
custom_join_targets = [] if parsed_sources else [DataSource]

bdf_dict = {}
for sensor in sensors:
bdf = cls.search_session(
session=db.session,
sensor=sensor,
# Workaround (1st half) for https://github.com/FlexMeasures/flexmeasures/issues/484
event_ends_after=event_starts_after,
event_starts_before=event_ends_before,
beliefs_after=beliefs_after,
beliefs_before=beliefs_before,
horizons_at_least=horizons_at_least,
horizons_at_most=horizons_at_most,
source=parsed_sources,
most_recent_beliefs_only=most_recent_beliefs_only,
most_recent_events_only=most_recent_events_only,
most_recent_only=most_recent_only,
custom_filter_criteria=source_criteria,
custom_join_targets=custom_join_targets,
search_kwargs = dict(
# Workaround (1st half) for https://github.com/FlexMeasures/flexmeasures/issues/484
event_ends_after=event_starts_after,
event_starts_before=event_ends_before,
beliefs_after=beliefs_after,
beliefs_before=beliefs_before,
horizons_at_least=horizons_at_least,
horizons_at_most=horizons_at_most,
source=parsed_sources,
most_recent_beliefs_only=most_recent_beliefs_only,
most_recent_events_only=most_recent_events_only,
most_recent_only=most_recent_only,
one_deterministic_belief_per_event=one_deterministic_belief_per_event,
one_deterministic_belief_per_event_per_source=one_deterministic_belief_per_event_per_source,
resolution=resolution,
custom_filter_criteria=source_criteria,
custom_join_targets=custom_join_targets,
)
with multiprocessing.Pool(
processes=current_app.config.get("FLEXMEASURES_PARALLEL_PROCESSES", 4)
) as pool:
results = pool.starmap(
search_wrapper,
[
(s.id if isinstance(s, Sensor) else s, search_kwargs)
for s in sensors
],
)
if one_deterministic_belief_per_event:
# todo: compute median of collective belief instead of median of first belief (update expected test results accordingly)
# todo: move to timely-beliefs: select mean/median belief
if (
bdf.lineage.number_of_sources <= 1
and bdf.lineage.probabilistic_depth == 1
):
# Fast track, no need to loop over beliefs
pass
else:
bdf = (
bdf.for_each_belief(get_median_belief)
.groupby(level=["event_start"], group_keys=False)
.apply(lambda x: x.head(1))
)
elif one_deterministic_belief_per_event_per_source:
if len(bdf) == 0 or bdf.lineage.probabilistic_depth == 1:
# Fast track, no need to loop over beliefs
pass
else:
bdf = bdf.for_each_belief(get_median_belief)

# NB resampling will be triggered if resolutions are not an exact match (also in case of str vs timedelta)
if resolution is not None and resolution != bdf.event_resolution:
bdf = bdf.resample_events(
resolution, keep_only_most_recent_belief=most_recent_beliefs_only
)
# Workaround (2nd half) for https://github.com/FlexMeasures/flexmeasures/issues/484
bdf = bdf[bdf.event_starts >= event_starts_after]
bdf = bdf[bdf.event_ends <= event_ends_before]
bdf_dict[bdf.sensor] = bdf

bdf_dict = dict(results)
if sum_multiple:
return aggregate_values(bdf_dict)
else:
Expand Down Expand Up @@ -798,3 +782,61 @@ def add(
def __repr__(self) -> str:
"""timely-beliefs representation of timed beliefs."""
return tb.TimedBelief.__repr__(self)


def search_wrapper(
s,
search_kwargs,
):
db_uri = current_app.config.get("SQLALCHEMY_DATABASE_URI")
engine = create_engine(db_uri)
Session = sessionmaker(bind=engine)
session = Session()
search_kwargs["session"] = session

one_deterministic_belief_per_event = search_kwargs.pop(
"one_deterministic_belief_per_event", None
)
one_deterministic_belief_per_event_per_source = search_kwargs.pop(
"one_deterministic_belief_per_event_per_source", None
)
resolution = search_kwargs.pop("resolution", None)
most_recent_beliefs_only = search_kwargs.get("most_recent_beliefs_only")
event_starts_after = search_kwargs.get("event_starts_after")
event_ends_before = search_kwargs.get("event_ends_before")

bdf = TimedBelief.search_session(sensor=s, **search_kwargs)
if one_deterministic_belief_per_event:
# todo: compute median of collective belief instead of median of first belief (update expected test results accordingly)
# todo: move to timely-beliefs: select mean/median belief
if bdf.lineage.number_of_sources <= 1 and bdf.lineage.probabilistic_depth == 1:
# Fast track, no need to loop over beliefs
pass
else:
bdf = (
bdf.for_each_belief(get_median_belief)
.groupby(level=["event_start"], group_keys=False)
.apply(lambda x: x.head(1))
)
elif one_deterministic_belief_per_event_per_source:
if len(bdf) == 0 or bdf.lineage.probabilistic_depth == 1:
# Fast track, no need to loop over beliefs
pass
else:
bdf = bdf.for_each_belief(get_median_belief)

# NB resampling will be triggered if resolutions are not an exact match (also in case of str vs timedelta)
if resolution is not None and resolution != bdf.event_resolution:
bdf = bdf.resample_events(
resolution, keep_only_most_recent_belief=most_recent_beliefs_only
)
# Workaround (2nd half) for https://github.com/FlexMeasures/flexmeasures/issues/484
if event_starts_after is not None:
bdf = bdf[bdf.event_starts >= event_starts_after]
if event_ends_before is not None:
bdf = bdf[bdf.event_ends <= event_ends_before]

# Sensor instead of DBSensor
bdf.sensor = Sensor.query.get(bdf.sensor.id)

return bdf.sensor, bdf
21 changes: 9 additions & 12 deletions flexmeasures/utils/config_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,18 @@ class Config(object):
"renewables": ["solar", "wind"],
"EVSE": ["one-way_evse", "two-way_evse"],
} # how to group assets by asset types
FLEXMEASURES_PARALLEL_PROCESSES: int = 4
FLEXMEASURES_LP_SOLVER: str = "appsi_highs"
FLEXMEASURES_JOB_TTL: timedelta = timedelta(days=1)
FLEXMEASURES_PLANNING_HORIZON: timedelta = timedelta(days=2)
FLEXMEASURES_MAX_PLANNING_HORIZON: timedelta | int | None = (
2520 # smallest number divisible by 1-10, which yields pleasant-looking durations for common sensor resolutions
)
# smallest number divisible by 1-10, which yields pleasant-looking durations for common sensor resolutions
FLEXMEASURES_MAX_PLANNING_HORIZON: timedelta | int | None = 2520
FLEXMEASURES_PLANNING_TTL: timedelta = timedelta(
days=7
) # Time to live for UDI event ids of successful scheduling jobs. Set a negative timedelta to persist forever.
FLEXMEASURES_DEFAULT_DATASOURCE: str = "FlexMeasures"
FLEXMEASURES_JOB_CACHE_TTL: int = (
3600 # Time to live for the job caching keys in seconds. Set a negative timedelta to persist forever.
)
# Time to live for the job caching keys in seconds. Set a negative timedelta to persist forever.
FLEXMEASURES_JOB_CACHE_TTL: int = 3600
FLEXMEASURES_TASK_CHECK_AUTH_TOKEN: str | None = None
FLEXMEASURES_REDIS_URL: str = "localhost"
FLEXMEASURES_REDIS_PORT: int = 6379
Expand All @@ -138,13 +137,11 @@ class Config(object):
FLEXMEASURES_FALLBACK_REDIRECT: bool = False

# Custom sunset switches
FLEXMEASURES_API_SUNSET_ACTIVE: bool = (
False # if True, sunset endpoints return 410 (Gone) responses; if False, they return 404 (Not Found) responses or will work as before, depending on whether the current FlexMeasures version still contains the endpoint logic
)
# if True, sunset endpoints return 410 (Gone) responses; if False, they return 404 (Not Found) responses or will work as before, depending on whether the current FlexMeasures version still contains the endpoint logic
FLEXMEASURES_API_SUNSET_ACTIVE: bool = False
FLEXMEASURES_API_SUNSET_DATE: str | None = None # e.g. 2023-05-01
FLEXMEASURES_API_SUNSET_LINK: str | None = (
None # e.g. https://flexmeasures.readthedocs.io/en/latest/api/introduction.html#deprecation-and-sunset
)
# e.g. https://flexmeasures.readthedocs.io/en/latest/api/introduction.html#deprecation-and-sunset
FLEXMEASURES_API_SUNSET_LINK: str | None = None

# if True, all requests are forced to be via HTTPS.
FLEXMEASURES_FORCE_HTTPS: bool = False
Expand Down
Loading