-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathvehicle_app.py
128 lines (100 loc) · 4.05 KB
/
vehicle_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Copyright (c) 2022-2024 Contributors to the Eclipse Foundation
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0
""" This module contains the Vehicle App base class. """
import asyncio
import inspect
import logging
from warnings import warn
from velocitas_sdk import config
from velocitas_sdk.vdb.client import VehicleDataBrokerClient
from velocitas_sdk.vdb.subscriptions import SubscriptionManager, VdbSubscription
logger = logging.getLogger(__name__)
def subscribe_topic(topic: str):
"""Annotation to subscribe to a MQTT topic.
Args:
topic ([str]): name of the MQTT topic to subscribe to.
"""
def wrap(func):
func.subscribeTopic = topic
return func
return wrap
def subscribe_data_points(data_point_names: str, condition: str = ""):
"""Annotation to subscribe to one or more data points provided by
the vehicle data broker.
Args:
data_point_names (str): comma-separate list of data point names to
subscribe to.
condition (str, optional): condition to apply to the data points.
Defaults to None.
"""
query = "SELECT " + data_point_names
if condition:
query += " WHERE " + condition
def wrap(func):
func.subscribeDataPoints = query
return func
return wrap
class VehicleApp:
"""Vehicle App base class. All Vehicle Apps must inherit from this class"""
def __init__(self):
self.middleware = config.middleware
self._vdb_client = VehicleDataBrokerClient()
self.pubsub_client = self.middleware.pubsub_client
logger.debug("VehicleApp instantiation successfully done")
async def on_start(self):
"""Override to add additional initialization code on startup, like
- adding subscriptions to vehicle data broker
"""
async def stop(self) -> None:
"""Stop the Vehicle App"""
await SubscriptionManager.remove_all_subscriptions()
await self._vdb_client.close()
async def run(self):
"""Run the Vehicle App"""
# start middleware lifecycle
await config.middleware.start()
methods = inspect.getmembers(self)
for method in methods:
if hasattr(method[1], "subscribeTopic"):
callback = method[1]
topic = method[1].subscribeTopic
await self.pubsub_client.subscribe_topic(topic, callback)
await config.middleware.wait_until_ready()
# register vehicle data broker subscriptions after middleware is initialized
for method in methods:
if hasattr(method[1], "subscribeDataPoints"):
sub = VdbSubscription(
self._vdb_client, method[1].subscribeDataPoints, method[1]
)
try:
SubscriptionManager._add_subscription(sub)
except Exception as ex:
logger.exception(ex)
try:
asyncio.create_task(self.pubsub_client.run())
await self.on_start()
while True:
await asyncio.sleep(1)
except Exception as ex:
logger.error(ex)
await self.stop()
async def publish_mqtt_event(self, topic: str, data: str):
warn(
"publish_mqtt_event is deprecated. Use publish_event instead.",
DeprecationWarning,
stacklevel=2,
)
await self.publish_event(topic, data)
async def publish_event(self, topic: str, data: str):
await self.pubsub_client.publish_event(topic, data)