Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create FillRateBasedControlTUNES #83

Merged
merged 21 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ jobs:
id-token: write
steps:
- uses: actions/checkout@v3
with: {fetch-depth: 0} # deep clone for setuptools-scm
- uses: actions/setup-python@v4
with: {python-version: "3.11"}
- name: Retrieve pre-built distribution files
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ repos:
- id: isort

- repo: https://github.com/psf/black
rev: 23.3.0
rev: 24.8.0
hooks:
- id: black
language_version: python3
Expand Down
6 changes: 3 additions & 3 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ python_requires >= 3.9
# For more information, check out https://semver.org/.
install_requires =
importlib-metadata; python_version<"3.8"
aiohttp
pandas
aiohttp<=3.9.1
pandas>=2.1.4
pydantic>=1.10.8,<2.0
s2-python==0.2.0.dev2
s2-python==0.1.3
async_timeout

[options.packages.find]
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
PyScaffold helps you to put up the scaffold of your new Python project.
Learn more under: https://pyscaffold.org/
"""

from setuptools import setup

if __name__ == "__main__":
Expand Down
9 changes: 6 additions & 3 deletions src/flexmeasures_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import socket
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
from typing import Any, cast

import async_timeout
import pandas as pd
Expand Down Expand Up @@ -59,6 +59,9 @@ class FlexMeasuresClient:
session: ClientSession | None = None

def __post_init__(self):
if self.session is None:
self.session = ClientSession()

if not re.match(r".+\@.+\..+", self.email):
raise EmailValidationError(
f"{self.email} is not an email address format string"
Expand Down Expand Up @@ -102,7 +105,7 @@ def determine_port(self):

async def close(self):
"""Function to close FlexMeasuresClient session when all requests are done"""
await self.session.close()
await cast(ClientSession, self.session).close()

async def request(
self,
Expand Down Expand Up @@ -198,7 +201,7 @@ async def request_once(

"""Sends a single request to FlexMeasures and checks the response"""
self.ensure_session()
response = await self.session.request( # type: ignore
response = await cast(ClientSession, self.session).request(
method=method,
url=url,
params=params,
Expand Down
14 changes: 8 additions & 6 deletions src/flexmeasures_client/s2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ def wrap(*args, **kwargs):
# TODO: implement function __hash__ in ID that returns
# the value of __root__, this way we would be able to use
# the ID as key directly
self.incoming_messages[
get_message_id(incoming_message)
] = incoming_message
self.incoming_messages[get_message_id(incoming_message)] = (
incoming_message
)

outgoing_message = func(self, incoming_message)

self.outgoing_messages[
get_message_id(outgoing_message)
] = outgoing_message
self.outgoing_messages[get_message_id(outgoing_message)] = (
outgoing_message
)

return outgoing_message

Expand Down Expand Up @@ -80,6 +80,8 @@ class Handler:

outgoing_messages_status: SizeLimitOrderedDict

background_tasks: set

def __init__(self, max_size: int = 100) -> None:
"""
Handler
Expand Down
15 changes: 11 additions & 4 deletions src/flexmeasures_client/s2/cem.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ def __init__(
def supports_control_type(self, control_type: ControlType):
return control_type in self._resource_manager_details.available_control_types

def close(self):
async def close(self):
self._is_closed = True

for control_type, handler in self._control_types_handlers.items():
print(control_type, handler)
await handler.close()

def is_closed(self):
return self._is_closed

Expand All @@ -92,9 +96,9 @@ def register_control_type(self, control_type_handler: ControlTypeHandler):
control_type_handler._sending_queue = self._sending_queue

# store control_type_handler
self._control_types_handlers[
control_type_handler._control_type
] = control_type_handler
self._control_types_handlers[control_type_handler._control_type] = (
control_type_handler
)

async def handle_message(self, message: Dict | pydantic.BaseModel | str):
"""
Expand Down Expand Up @@ -273,3 +277,6 @@ def handle_revoke_object(self, message: RevokeObject):
)

return get_reception_status(message, ReceptionStatusValues.OK)

async def send_message(self, message):
await self._sending_queue.put(message)
86 changes: 68 additions & 18 deletions src/flexmeasures_client/s2/control_types/FRBC/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class FRBC(ControlTypeHandler):
_timer_status_history: SizeLimitOrderedDict[str, FRBCTimerStatus]
_actuator_status_history: SizeLimitOrderedDict[str, FRBCActuatorStatus]
_storage_status_history: SizeLimitOrderedDict[str, FRBCStorageStatus]
background_tasks: set

def __init__(self, max_size: int = 100) -> None:
super().__init__(max_size)
Expand All @@ -51,6 +52,7 @@ def __init__(self, max_size: int = 100) -> None:
self._system_description_history = SizeLimitOrderedDict(max_size=max_size)
self._leakage_behaviour_history = SizeLimitOrderedDict(max_size=max_size)
self._usage_forecast_history = SizeLimitOrderedDict(max_size=max_size)
self.background_tasks = set()

@register(FRBCSystemDescription)
def handle_system_description(
Expand All @@ -62,24 +64,37 @@ def handle_system_description(
self._system_description_history[system_description_id] = message

# schedule trigger_schedule to run soon concurrently
asyncio.create_task(self.trigger_schedule(system_description_id))

task = asyncio.create_task(self.trigger_schedule(system_description_id))
self.background_tasks.add(
task
) # important to avoid a task disappearing mid-execution.
task.add_done_callback(self.background_tasks.discard)
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
return get_reception_status(message, status=ReceptionStatusValues.OK)

async def send_storage_status(self, status: FRBCStorageStatus):
raise NotImplementedError()
@register(FRBCUsageForecast)
def handle_usage_forecast(self, message: FRBCUsageForecast) -> pydantic.BaseModel:
message_id = str(message.message_id)

async def send_actuator_status(self, status: FRBCActuatorStatus):
raise NotImplementedError()
self._usage_forecast_history[message_id] = message

task = asyncio.create_task(self.send_usage_forecast(message))
self.background_tasks.add(
task
) # important to avoid a task disappearing mid-execution.
task.add_done_callback(self.background_tasks.discard)
return get_reception_status(message, status=ReceptionStatusValues.OK)

@register(FRBCStorageStatus)
def handle_storage_status(self, message: FRBCStorageStatus) -> pydantic.BaseModel:
message_id = str(message.message_id)

self._storage_status_history[message_id] = message

asyncio.create_task(self.send_storage_status(message))

task = asyncio.create_task(self.send_storage_status(message))
self.background_tasks.add(
task
) # important to avoid a task disappearing mid-execution.
task.add_done_callback(self.background_tasks.discard)
Comment on lines +93 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: candidate code block for refactoring.

return get_reception_status(message, status=ReceptionStatusValues.OK)

@register(FRBCActuatorStatus)
Expand All @@ -88,29 +103,64 @@ def handle_actuator_status(self, message: FRBCActuatorStatus) -> pydantic.BaseMo

self._actuator_status_history[message_id] = message

asyncio.create_task(self.send_actuator_status(message))

task = asyncio.create_task(self.send_actuator_status(message))
self.background_tasks.add(
task
) # important to avoid a task disappearing mid-execution.
task.add_done_callback(self.background_tasks.discard)
return get_reception_status(message, status=ReceptionStatusValues.OK)

@register(FRBCLeakageBehaviour)
def handle_leakage_behaviour(
self, message: FRBCLeakageBehaviour
) -> pydantic.BaseModel:
# return get_reception_status(message, status=ReceptionStatusValues.OK)
raise NotImplementedError()
message_id = str(message.message_id)

@register(FRBCUsageForecast)
def handle_usage_forecast(self, message: FRBCUsageForecast) -> pydantic.BaseModel:
# return get_reception_status(message, status=ReceptionStatusValues.OK)
raise NotImplementedError()
self._leakage_behaviour_history[message_id] = message

async def trigger_schedule(self, system_description_id: str):
raise NotImplementedError()
task = asyncio.create_task(self.send_leakage_behaviour(message))
self.background_tasks.add(
task
) # important to avoid a task disappearing mid-execution.
task.add_done_callback(self.background_tasks.discard)
return get_reception_status(message, status=ReceptionStatusValues.OK)

@register(FRBCFillLevelTargetProfile)
def handle_fill_level_target_profile(
self, message: FRBCFillLevelTargetProfile
) -> pydantic.BaseModel:
message_id = str(message.message_id)

self._fill_level_target_profile_history[message_id] = message

task = asyncio.create_task(self.send_fill_level_target_profile(message))
self.background_tasks.add(
task
) # important to avoid a task disappearing mid-execution.
task.add_done_callback(self.background_tasks.discard)
return get_reception_status(message, status=ReceptionStatusValues.OK)

@register(FRBCTimerStatus)
def handle_frbc_timer_status(self, message: FRBCTimerStatus) -> pydantic.BaseModel:
return get_reception_status(message, status=ReceptionStatusValues.OK)

async def send_storage_status(self, status: FRBCStorageStatus):
raise NotImplementedError()

async def send_actuator_status(self, status: FRBCActuatorStatus):
raise NotImplementedError()

async def send_leakage_behaviour(self, leakage_behaviour: FRBCLeakageBehaviour):
raise NotImplementedError()

async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast):
raise NotImplementedError()

async def send_fill_level_target_profile(
self, fill_level_target_profile: FRBCFillLevelTargetProfile
):
raise NotImplementedError()
Comment on lines +147 to +162
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct to say these are meant to be implemented in subclasses? (If so, note to self: add inline comments.)



class FRBCTest(FRBC):
"""Dummy class to simulate the triggering of a schedule."""
Expand Down
40 changes: 10 additions & 30 deletions src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,6 @@ async def send_actuator_status(self, status: FRBCActuatorStatus):
duration=timedelta(minutes=15),
)

# await self._fm_client.post_measurements(
# self._soc_sensor_id
# )

# system_description = self.find_system_description_from_actuator()

# if system_description is None:
# return

# #for a
# if system_description is not None:

# self._system_description_history[]
# status.active_operation_mode_id
# status.actuator_id
# status.operation_mode_factor

async def trigger_schedule(self, system_description_id: str):
"""Translates S2 System Description into FM API calls"""

Expand All @@ -108,24 +91,21 @@ async def trigger_schedule(self, system_description_id: str):
return

# call schedule
schedule_id = await self._fm_client.trigger_storage_schedule(
schedule = await self._fm_client.trigger_and_get_schedule(
start=system_description.valid_from
+ self._valid_from_shift, # TODO: localize datetime
sensor_id=self._power_sensor_id,
production_price_sensor=self._price_sensor_id,
consumption_price_sensor=self._price_sensor_id,
soc_unit="MWh",
soc_at_start=soc_at_start, # TODO: use forecast of the SOC instead
flex_context=dict(
production_price_sensor=self._price_sensor_id,
consumption_price_sensor=self._price_sensor_id,
),
flex_model=dict(
soc_unit="MWh",
soc_at_start=soc_at_start, # TODO: use forecast of the SOC instead
),
duration=self._schedule_duration, # next 12 hours
# TODO: add SOC MAX AND SOC MIN FROM fill_level_range,
# this needs chages on the client
)

# wait for the schedule to finish
schedule = await self._fm_client.get_schedule(
sensor_id=self._power_sensor_id,
schedule_id=schedule_id,
duration=self._schedule_duration,
# this needs changes on the client
)

# translate FlexMeasures schedule into instructions. SOC -> Power -> PowerFactor
Expand Down
Loading
Loading