Skip to content

Commit

Permalink
feat: create FillRateBasedControlTUNES (#83)
Browse files Browse the repository at this point in the history
* feat: create FillRateBasedControlTUNES which implements an ad-hoc FRBC control type to work only with TUNES RM (for now).

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* make session optional

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* docs: fix typo

* docs: fix typo

* fix: pin pandas

* docs: comment on tests

* feature: expose translation strategy further up the chain of methods

* style: black

* style: update black

* apply pre-commit

* per-comit on setup

* chore: use (dev) released s2-python version

* fix: update s2-python

Signed-off-by: F.N. Claessen <felix@seita.nl>

* fix: relax Pandas constraint

Signed-off-by: F.N. Claessen <felix@seita.nl>

* fix: fetch whole history, incl. previous dev tags

Signed-off-by: F.N. Claessen <felix@seita.nl>

* fix: call to FM scheduler in FRBCSimple.trigger_schedule

Signed-off-by: F.N. Claessen <felix@seita.nl>

* docs: fix typo

Signed-off-by: F.N. Claessen <felix@seita.nl>

---------

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>
Signed-off-by: F.N. Claessen <felix@seita.nl>
Co-authored-by: F.N. Claessen <felix@seita.nl>
  • Loading branch information
victorgarcia98 and Flix6x authored Jan 23, 2025
1 parent 1977344 commit b88c8d6
Show file tree
Hide file tree
Showing 17 changed files with 1,311 additions and 163 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ jobs:
id-token: write
steps:
- uses: actions/checkout@v3
with: {fetch-depth: 0} # deep clone for setuptools-scm
- uses: actions/setup-python@v4
with: { python-version: "3.11" }
- name: Retrieve pre-built distribution files
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ repos:
- id: isort

- repo: https://github.com/psf/black
rev: 23.3.0
rev: 24.8.0
hooks:
- id: black
language_version: python3
Expand Down
6 changes: 3 additions & 3 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ python_requires >= 3.9
# For more information, check out https://semver.org/.
install_requires =
importlib-metadata; python_version<"3.8"
aiohttp
pandas
aiohttp<=3.9.1
pandas>=2.1.4
pydantic>=1.10.8,<2.0
s2-python==0.2.0.dev2
s2-python==0.1.3
async_timeout

[options.packages.find]
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
PyScaffold helps you to put up the scaffold of your new Python project.
Learn more under: https://pyscaffold.org/
"""

from setuptools import setup

if __name__ == "__main__":
Expand Down
9 changes: 6 additions & 3 deletions src/flexmeasures_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import socket
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
from typing import Any, cast

import async_timeout
import pandas as pd
Expand Down Expand Up @@ -59,6 +59,9 @@ class FlexMeasuresClient:
session: ClientSession | None = None

def __post_init__(self):
if self.session is None:
self.session = ClientSession()

if not re.match(r".+\@.+\..+", self.email):
raise EmailValidationError(
f"{self.email} is not an email address format string"
Expand Down Expand Up @@ -102,7 +105,7 @@ def determine_port(self):

async def close(self):
"""Function to close FlexMeasuresClient session when all requests are done"""
await self.session.close()
await cast(ClientSession, self.session).close()

async def request(
self,
Expand Down Expand Up @@ -198,7 +201,7 @@ async def request_once(

"""Sends a single request to FlexMeasures and checks the response"""
self.ensure_session()
response = await self.session.request( # type: ignore
response = await cast(ClientSession, self.session).request(
method=method,
url=url,
params=params,
Expand Down
14 changes: 8 additions & 6 deletions src/flexmeasures_client/s2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ def wrap(*args, **kwargs):
# TODO: implement function __hash__ in ID that returns
# the value of __root__, this way we would be able to use
# the ID as key directly
self.incoming_messages[
get_message_id(incoming_message)
] = incoming_message
self.incoming_messages[get_message_id(incoming_message)] = (
incoming_message
)

outgoing_message = func(self, incoming_message)

self.outgoing_messages[
get_message_id(outgoing_message)
] = outgoing_message
self.outgoing_messages[get_message_id(outgoing_message)] = (
outgoing_message
)

return outgoing_message

Expand Down Expand Up @@ -80,6 +80,8 @@ class Handler:

outgoing_messages_status: SizeLimitOrderedDict

background_tasks: set

def __init__(self, max_size: int = 100) -> None:
"""
Handler
Expand Down
15 changes: 11 additions & 4 deletions src/flexmeasures_client/s2/cem.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ def __init__(
def supports_control_type(self, control_type: ControlType):
return control_type in self._resource_manager_details.available_control_types

def close(self):
async def close(self):
self._is_closed = True

for control_type, handler in self._control_types_handlers.items():
print(control_type, handler)
await handler.close()

def is_closed(self):
return self._is_closed

Expand All @@ -92,9 +96,9 @@ def register_control_type(self, control_type_handler: ControlTypeHandler):
control_type_handler._sending_queue = self._sending_queue

# store control_type_handler
self._control_types_handlers[
control_type_handler._control_type
] = control_type_handler
self._control_types_handlers[control_type_handler._control_type] = (
control_type_handler
)

async def handle_message(self, message: Dict | pydantic.BaseModel | str):
"""
Expand Down Expand Up @@ -273,3 +277,6 @@ def handle_revoke_object(self, message: RevokeObject):
)

return get_reception_status(message, ReceptionStatusValues.OK)

async def send_message(self, message):
await self._sending_queue.put(message)
86 changes: 68 additions & 18 deletions src/flexmeasures_client/s2/control_types/FRBC/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class FRBC(ControlTypeHandler):
_timer_status_history: SizeLimitOrderedDict[str, FRBCTimerStatus]
_actuator_status_history: SizeLimitOrderedDict[str, FRBCActuatorStatus]
_storage_status_history: SizeLimitOrderedDict[str, FRBCStorageStatus]
background_tasks: set

def __init__(self, max_size: int = 100) -> None:
super().__init__(max_size)
Expand All @@ -51,6 +52,7 @@ def __init__(self, max_size: int = 100) -> None:
self._system_description_history = SizeLimitOrderedDict(max_size=max_size)
self._leakage_behaviour_history = SizeLimitOrderedDict(max_size=max_size)
self._usage_forecast_history = SizeLimitOrderedDict(max_size=max_size)
self.background_tasks = set()

@register(FRBCSystemDescription)
def handle_system_description(
Expand All @@ -62,24 +64,37 @@ def handle_system_description(
self._system_description_history[system_description_id] = message

# schedule trigger_schedule to run soon concurrently
asyncio.create_task(self.trigger_schedule(system_description_id))

task = asyncio.create_task(self.trigger_schedule(system_description_id))
self.background_tasks.add(
task
) # important to avoid a task disappearing mid-execution.
task.add_done_callback(self.background_tasks.discard)
return get_reception_status(message, status=ReceptionStatusValues.OK)

async def send_storage_status(self, status: FRBCStorageStatus):
raise NotImplementedError()
@register(FRBCUsageForecast)
def handle_usage_forecast(self, message: FRBCUsageForecast) -> pydantic.BaseModel:
message_id = str(message.message_id)

async def send_actuator_status(self, status: FRBCActuatorStatus):
raise NotImplementedError()
self._usage_forecast_history[message_id] = message

task = asyncio.create_task(self.send_usage_forecast(message))
self.background_tasks.add(
task
) # important to avoid a task disappearing mid-execution.
task.add_done_callback(self.background_tasks.discard)
return get_reception_status(message, status=ReceptionStatusValues.OK)

@register(FRBCStorageStatus)
def handle_storage_status(self, message: FRBCStorageStatus) -> pydantic.BaseModel:
message_id = str(message.message_id)

self._storage_status_history[message_id] = message

asyncio.create_task(self.send_storage_status(message))

task = asyncio.create_task(self.send_storage_status(message))
self.background_tasks.add(
task
) # important to avoid a task disappearing mid-execution.
task.add_done_callback(self.background_tasks.discard)
return get_reception_status(message, status=ReceptionStatusValues.OK)

@register(FRBCActuatorStatus)
Expand All @@ -88,29 +103,64 @@ def handle_actuator_status(self, message: FRBCActuatorStatus) -> pydantic.BaseMo

self._actuator_status_history[message_id] = message

asyncio.create_task(self.send_actuator_status(message))

task = asyncio.create_task(self.send_actuator_status(message))
self.background_tasks.add(
task
) # important to avoid a task disappearing mid-execution.
task.add_done_callback(self.background_tasks.discard)
return get_reception_status(message, status=ReceptionStatusValues.OK)

@register(FRBCLeakageBehaviour)
def handle_leakage_behaviour(
self, message: FRBCLeakageBehaviour
) -> pydantic.BaseModel:
# return get_reception_status(message, status=ReceptionStatusValues.OK)
raise NotImplementedError()
message_id = str(message.message_id)

@register(FRBCUsageForecast)
def handle_usage_forecast(self, message: FRBCUsageForecast) -> pydantic.BaseModel:
# return get_reception_status(message, status=ReceptionStatusValues.OK)
raise NotImplementedError()
self._leakage_behaviour_history[message_id] = message

async def trigger_schedule(self, system_description_id: str):
raise NotImplementedError()
task = asyncio.create_task(self.send_leakage_behaviour(message))
self.background_tasks.add(
task
) # important to avoid a task disappearing mid-execution.
task.add_done_callback(self.background_tasks.discard)
return get_reception_status(message, status=ReceptionStatusValues.OK)

@register(FRBCFillLevelTargetProfile)
def handle_fill_level_target_profile(
self, message: FRBCFillLevelTargetProfile
) -> pydantic.BaseModel:
message_id = str(message.message_id)

self._fill_level_target_profile_history[message_id] = message

task = asyncio.create_task(self.send_fill_level_target_profile(message))
self.background_tasks.add(
task
) # important to avoid a task disappearing mid-execution.
task.add_done_callback(self.background_tasks.discard)
return get_reception_status(message, status=ReceptionStatusValues.OK)

@register(FRBCTimerStatus)
def handle_frbc_timer_status(self, message: FRBCTimerStatus) -> pydantic.BaseModel:
return get_reception_status(message, status=ReceptionStatusValues.OK)

async def send_storage_status(self, status: FRBCStorageStatus):
raise NotImplementedError()

async def send_actuator_status(self, status: FRBCActuatorStatus):
raise NotImplementedError()

async def send_leakage_behaviour(self, leakage_behaviour: FRBCLeakageBehaviour):
raise NotImplementedError()

async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast):
raise NotImplementedError()

async def send_fill_level_target_profile(
self, fill_level_target_profile: FRBCFillLevelTargetProfile
):
raise NotImplementedError()


class FRBCTest(FRBC):
"""Dummy class to simulate the triggering of a schedule."""
Expand Down
40 changes: 10 additions & 30 deletions src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,6 @@ async def send_actuator_status(self, status: FRBCActuatorStatus):
duration=timedelta(minutes=15),
)

# await self._fm_client.post_measurements(
# self._soc_sensor_id
# )

# system_description = self.find_system_description_from_actuator()

# if system_description is None:
# return

# #for a
# if system_description is not None:

# self._system_description_history[]
# status.active_operation_mode_id
# status.actuator_id
# status.operation_mode_factor

async def trigger_schedule(self, system_description_id: str):
"""Translates S2 System Description into FM API calls"""

Expand All @@ -108,24 +91,21 @@ async def trigger_schedule(self, system_description_id: str):
return

# call schedule
schedule_id = await self._fm_client.trigger_storage_schedule(
schedule = await self._fm_client.trigger_and_get_schedule(
start=system_description.valid_from
+ self._valid_from_shift, # TODO: localize datetime
sensor_id=self._power_sensor_id,
production_price_sensor=self._price_sensor_id,
consumption_price_sensor=self._price_sensor_id,
soc_unit="MWh",
soc_at_start=soc_at_start, # TODO: use forecast of the SOC instead
flex_context=dict(
production_price_sensor=self._price_sensor_id,
consumption_price_sensor=self._price_sensor_id,
),
flex_model=dict(
soc_unit="MWh",
soc_at_start=soc_at_start, # TODO: use forecast of the SOC instead
),
duration=self._schedule_duration, # next 12 hours
# TODO: add SOC MAX AND SOC MIN FROM fill_level_range,
# this needs chages on the client
)

# wait for the schedule to finish
schedule = await self._fm_client.get_schedule(
sensor_id=self._power_sensor_id,
schedule_id=schedule_id,
duration=self._schedule_duration,
# this needs changes on the client
)

# translate FlexMeasures schedule into instructions. SOC -> Power -> PowerFactor
Expand Down
Loading

0 comments on commit b88c8d6

Please sign in to comment.