Skip to content

Commit

Permalink
Merge pull request #11 from virta-ltd/fix-ocpp-j-handler-for-cpms-mes…
Browse files Browse the repository at this point in the history
…sages

Fix OCPP-J handling of incoming messages and fill missing options automatically when needed
  • Loading branch information
aghajani authored Feb 4, 2025
2 parents 5dfcb78 + 4e6ea7a commit 30a7120
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 34 deletions.
56 changes: 30 additions & 26 deletions device/ocpp_j/abstract_device_ocpp_j.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import readline
readline.get_completion_type()


class AbstractDeviceOcppJ(DeviceAbstract):
server_address = ""
__logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -113,21 +114,34 @@ async def action_data_transfer(self, **options) -> bool:
self.logger.info(f"Action {action} End")
return True

def fill_missing_options_charge_start(self, options):
if "chargeStartTime" not in options:
options["chargeStartTime"] = self.utcnow_iso()
if "meterStart" not in options:
options["meterStart"] = 1000

def fill_missing_options_charge_stop(self, options):
if "chargeStopTime" not in options:
options["chargeStopTime"] = self.utcnow_iso()
if "meterStop" not in options:
options["meterStop"] = self.charge_meter_value_current(**options)

def charge_meter_value_current(self, **options):
self.fill_missing_options_charge_start(options)
return math.floor(options["meterStart"] + (
(self.utcnow() - datetime.datetime.fromisoformat(options["chargeStartTime"])).total_seconds() / 60
* options.pop("chargedKwhPerMinute", 1)
* 1000
))

async def flow_heartbeat(self) -> bool:
log_title = self.flow_heartbeat.__name__
self.logger.info(f"Flow {log_title} Start")
if not await self.action_heart_beat():
return False
self.logger.info(f"Flow {log_title} End")
return True

async def flow_authorize(self, **options) -> bool:
log_title = self.flow_authorize.__name__
self.logger.info(f"Flow {log_title} Start")
Expand All @@ -142,10 +156,6 @@ async def flow_charge(self, auto_stop: bool, **options) -> bool:
if not await self.action_authorize(**options):
self.charge_in_progress = False
return False
if "chargeStartTime" not in options:
options["chargeStartTime"] = self.utcnow_iso()
if "meterStart" not in options:
options["meterStart"] = 1000
if not await self.action_charge_start(**options):
self.charge_in_progress = False
return False
Expand All @@ -161,10 +171,6 @@ async def flow_charge(self, auto_stop: bool, **options) -> bool:
if not await self.action_status_update("Finishing", **options):
self.charge_in_progress = False
return False
if "chargeStopTime" not in options:
options["chargeStopTime"] = self.utcnow_iso()
if "meterStop" not in options:
options["meterStop"] = self.charge_meter_value_current(**options)
if not await self.action_charge_stop(**options):
self.charge_in_progress = False
return False
Expand Down Expand Up @@ -237,6 +243,10 @@ async def __loop_internal(self):
pass

async def by_middleware_req(self, req_id: str, req_action: str, req_payload: typing.Any):
# We must not block the event loop here for anything beside sending a response
# since this is the main event loop for the device checking for websocket messages
# If we need to do something that blocks or takes time, we must create a new task for it
next_async_task = None
resp_payload = None
if req_action in map(lambda x: str(x).lower(), [
"ClearCache",
Expand Down Expand Up @@ -279,36 +289,28 @@ async def by_middleware_req(self, req_id: str, req_action: str, req_payload: typ
resp_payload = {
"status": "Accepted"
}
await self.by_middleware_req_response_ready(req_id, resp_payload);
await self.action_meter_value(**options)
return
next_async_task = self.action_meter_value(**options)
if req_payload["requestedMessage"] == "BootNotification":
resp_payload = {
"status": "Accepted"
}
await self.by_middleware_req_response_ready(req_id, resp_payload);
await self.action_register()
return
next_async_task = self.action_register()
if req_payload["requestedMessage"] == "Heartbeat":
resp_payload = {
"status": "Accepted"
}
await self.by_middleware_req_response_ready(req_id, resp_payload);
await self.action_heart_beat()
return
next_async_task = await self.action_heart_beat()
if req_payload["requestedMessage"] == "StatusNotification":
options = {
"connectorId": req_payload["connectorId"] if "connectorId" in req_payload else 0,
}
resp_payload = {
"status": "Accepted"
}
await self.by_middleware_req_response_ready(req_id, resp_payload);
if self.charge_in_progress:
await self.action_status_update("Charging",**options)
next_async_task = await self.action_status_update("Charging", **options)
else:
await self.action_status_update("Available",**options)
return
next_async_task = await self.action_status_update("Available", **options)
if req_action == "RemoteStartTransaction".lower():
if not self.charge_can_start():
resp_payload["status"] = "Rejected"
Expand All @@ -318,21 +320,23 @@ async def by_middleware_req(self, req_id: str, req_action: str, req_payload: typ
"idTag": req_payload["idTag"] if "idTag" in req_payload else "-",
}
self.logger.info(f"Device, Read, Request, RemoteStart, Options: {json.dumps(options)}")
asyncio.create_task(utility.run_with_delay(self.flow_charge(False, **options), 2))
next_async_task = utility.run_with_delay(self.flow_charge(False, **options), 2)

if req_action == "RemoteStopTransaction".lower():
if not self.charge_can_stop(req_payload["transactionId"] if "transactionId" in req_payload else 0):
resp_payload["status"] = "Rejected"
else:
asyncio.create_task(utility.run_with_delay(self.flow_charge_stop(), 2))
next_async_task = utility.run_with_delay(self.flow_charge_stop(), 2)

if req_action == "Reset".lower():
asyncio.create_task(utility.run_with_delay(self.re_initialize(), 2))
next_async_task = utility.run_with_delay(self.re_initialize(), 2)

if resp_payload is None:
self.logger.warning(f"Device Read, Request, Unknown or not supported: {req_action}")
return
await self.by_middleware_req_response_ready(req_id, resp_payload)
if next_async_task is not None:
asyncio.create_task(next_async_task)

async def by_middleware_req_response_ready(self, req_id, resp_payload):
if resp_payload is None:
Expand Down
2 changes: 2 additions & 0 deletions device/ocpp_j/device_ocpp_j16.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ async def action_authorize(self, **options) -> bool:
return True

async def action_charge_start(self, **options) -> bool:
self.fill_missing_options_charge_start(options)
action = "StartTransaction"
self.logger.info(f"Action {action} Start")
key_name = "idTagInfo"
Expand Down Expand Up @@ -128,6 +129,7 @@ async def action_meter_value(self, meter_value: int = None, time_stamp: datetime
return True

async def action_charge_stop(self, **options) -> bool:
self.fill_missing_options_charge_stop(options)
action = "StopTransaction"
self.logger.info(f"Action {action} Start")
key_name = "idTagInfo"
Expand Down
10 changes: 2 additions & 8 deletions device/ocpp_j/device_ocpp_j201.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ async def action_authorize(self, **options) -> bool:
return True

async def action_charge_start(self, **options) -> bool:
self.fill_missing_options_charge_start(options)
action = "StartTransaction"
self.logger.info(f"Action {action} Start")
id_tag = options.pop("idTag", "-")
Expand Down Expand Up @@ -178,6 +179,7 @@ async def action_meter_value(self, meter_value: int = None, time_stamp: datetime
return True

async def action_charge_stop(self, **options) -> bool:
self.fill_missing_options_charge_stop(options)
action = "StopTransaction"
self.logger.info(f"Action {action} Start")
id_tag = options.pop("idTag", "-")
Expand Down Expand Up @@ -236,20 +238,12 @@ async def flow_charge(self, auto_stop: bool, **options) -> bool:
if not await self.action_status_update("Occupied", **options):
self.charge_in_progress = False
return False
if "chargeStartTime" not in options:
options["chargeStartTime"] = self.utcnow_iso()
if "meterStart" not in options:
options["meterStart"] = 1000
if not await self.action_charge_start(**options):
self.charge_in_progress = False
return False
if not await self.flow_charge_ongoing_loop(auto_stop, **options):
self.charge_in_progress = False
return False
if "chargeStopTime" not in options:
options["chargeStopTime"] = self.utcnow_iso()
if "meterStop" not in options:
options["meterStop"] = self.charge_meter_value_current(**options)
if not await self.action_charge_stop(**options):
self.charge_in_progress = False
return False
Expand Down

0 comments on commit 30a7120

Please sign in to comment.