diff --git a/.flake8 b/.flake8 new file mode 100644 index 00000000..dcc8dcee --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max_line_length = 120 diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml new file mode 100644 index 00000000..ed904dee --- /dev/null +++ b/.github/workflows/pre-commit.yml @@ -0,0 +1,16 @@ +name: pre-commit + +on: [pull_request] + +jobs: + pre-commit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + # required to grab the history of the PR + fetch-depth: 0 + - uses: actions/setup-python@v3 + - uses: pre-commit/action@v3.0.0 + with: + extra_args: --color=always --from-ref ${{ github.event.pull_request.base.sha }} --to-ref ${{ github.event.pull_request.head.sha }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 00000000..aacdcea1 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,17 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v3.2.0 + hooks: + - id: trailing-whitespace + exclude_types: ["dbc"] + - id: end-of-file-fixer + exclude_types: ["dbc"] + - id: check-yaml + - id: check-added-large-files + +- repo: https://github.com/pycqa/flake8 + rev: '6.0.0' + hooks: + - id: flake8 diff --git a/README.md b/README.md index 12375f38..53e8a40b 100644 --- a/README.md +++ b/README.md @@ -10,3 +10,10 @@ Name | Description [SOME/IP feeder](./someip2val) | SOME/IP feeder for KUKSA.val Databroker [DDS Provider](./dds2val) | DDS provider for KUKSA.val Databroker [Replay](./replay) | KUKSA.val Server replay script for previously recorded files, created by providing KUKSA.val Server with `--record` argument + +## Pre-commit set up +This repository is set up to use [pre-commit](https://pre-commit.com/) hooks. +Use `pip install pre-commit` to install pre-commit. +After you clone the project, run `pre-commit install` to install pre-commit into your git hooks. +Pre-commit will now run on every commit. +Every time you clone a project using pre-commit running pre-commit install should always be the first thing you do. diff --git a/dbc2val/CHANGELOG.md b/dbc2val/CHANGELOG.md index 8bd847df..a70a951a 100644 --- a/dbc2val/CHANGELOG.md +++ b/dbc2val/CHANGELOG.md @@ -5,4 +5,4 @@ This file lists important changes to dbc2val ## Refactoring and changed configuration format (2023-02) Feeder refactored and new mapping format based on VSS introduced, see [documentation](mapping.md). -Old mapping format no longer supported. \ No newline at end of file +Old mapping format no longer supported. diff --git a/dbc2val/Dockerfile b/dbc2val/Dockerfile index b28575da..a95a4f4c 100644 --- a/dbc2val/Dockerfile +++ b/dbc2val/Dockerfile @@ -29,9 +29,11 @@ RUN python3 -m venv /opt/venv ENV PATH="/opt/venv/bin:$PATH" RUN /opt/venv/bin/python3 -m pip install --upgrade pip \ - && pip3 install --no-cache-dir -r requirements.txt + && pip3 install --pre --no-cache-dir -r requirements.txt -RUN pip3 install wheel scons && pip3 install pyinstaller patchelf==0.17.0.0 staticx +# staticx v0.13.8 cannot use pyinstaller 5.10.0 +# see https://github.com/JonathonReinhart/staticx/issues/235 +RUN pip3 install wheel scons && pip3 install pyinstaller==5.9.0 patchelf==0.17.0.0 staticx # By default we use certificates and tokens from kuksa_certificates, so they must be included RUN pyinstaller --collect-data kuksa_certificates --hidden-import can.interfaces.socketcan --clean -F -s dbcfeeder.py diff --git a/dbc2val/Readme.md b/dbc2val/Readme.md index 37e10e5b..dbd9d5b2 100644 --- a/dbc2val/Readme.md +++ b/dbc2val/Readme.md @@ -46,9 +46,11 @@ $ python -V 3. Install the needed python packages ```console -$ pip install -r requirements.txt +$ pip install --pre -r requirements.txt ``` +*Note - `--pre` currently needed as dbcfeeder relies on a pre-release of kuksa-client* + 4. If you want to run tests and linters, you will also need to install development dependencies ```console @@ -104,19 +106,32 @@ A smaller excerpt from the above sample, with fewer signals. ## Configuration -| Command Line Argument | Environment Variable | Config File Property | Default Value | Description | +<<<<<<< HEAD +| Command Line Argument | Environment Variable | Config File Property | Default Value | Description | |:----------------------|:--------------------------------|:------------------------|:---------------------------------|-----------------------| -| *--config* | - | - | - | Configuration file | +| *--config* | - | - | *See below* | Configuration file | | *--dbcfile* | *DBC_FILE* | *[can].dbc* | | DBC file used for parsing CAN traffic | | *--dumpfile* | *CANDUMP_FILE* | *[can].candumpfile* | | Replay recorded CAN traffic from dumpfile | | *--canport* | *CAN_PORT* | *[can].port* | | Read from this CAN interface | | *--use-j1939* | *USE_J1939* | *[can].j1939* | `False` | Use J1939 when decoding CAN frames. Setting the environment value to any value is equivalent to activating the switch on the command line. | | *--use-socketcan* | - | - | `False` | Use SocketCAN (overriding any use of --dumpfile) | -| *--mapping* | *MAPPING_FILE* | *[general].mapping* | `mapping/vss_3.1.1/vss_dbc.json` |Mapping file used to map CAN signals to databroker datapoints. Take a look on usage of the mapping file | +| *--mapping* | *MAPPING_FILE* | *[general].mapping* | `mapping/vss_3.1.1/vss_dbc.json` | Mapping file used to map CAN signals to databroker datapoints. | | *--server-type* | *SERVER_TYPE* | *[general].server_type* | `kuksa_databroker` | Which type of server the feeder should connect to (`kuksa_val_server` or `kuksa_databroker`) | -| - | *VDB_ADDRESS* | - | `127.0.0.1:55555` | The IP address/host name and port number of the databroker (only applicable for server type `kuksa_databroker`) | -| - | *DAPR_GRPC_PORT* | - | - | Override broker address & connect to DAPR sidecar @ 127.0.0.1:DAPR_GRPC_PORT | -| - | *VEHICLEDATABROKER_DAPR_APP_ID* | - | - | Add dapr-app-id metadata | +| - | *KUKSA_ADDRESS* | *[general].ip* | `127.0.0.1` | IP address for Server/Databroker | +| - | *KUKSA_PORT* | *[general].port* | `55555` | Port for Server/Databroker | +| *--tls* | - | *[general].tls* | `False` | Shall tls be used for Server/Databroker connection? | +| - | - | *[general].token* | *Undefined* | Token path. Only needed if Databroker/Server requires authentication | +| - | *VEHICLEDATABROKER_DAPR_APP_ID* | - | - | Add dapr-app-id metadata. Only relevant for KUKSA.val Databroker | + +*Note that the [default config file](config/dbc_feeder.ini) include default Databroker settings and must be modified if you intend to use it for KUKSA.val Server* + +If `--config` is not given, the dbcfeeder will look for configuration files in the following locations: + +* `/config/dbc_feeder.in` +* `/etc/dbc_feeder.ini` +* `config/dbc_feeder.ini` + +The first one found will be used. Configuration options have the following priority (highest at top). @@ -218,6 +233,26 @@ docker run --net=host -e LOG_LEVEL=INFO dbcfeeder:latest --server-type kuksa_da docker run --net=host -e LOG_LEVEL=INFO dbcfeeder:latest --server-type kuksa_val_server ``` +### KUKSA.val Server/Databroker Authentication when using Docker + +The docker container contains default certificates for KUKSA.val server, and if the configuration file does not +specify token file the [default token file](https://github.com/eclipse/kuksa.val/blob/master/kuksa_certificates/jwt/all-read-write.json.token) +provided by [kuksa-client](https://github.com/eclipse/kuksa.val/tree/master/kuksa-client) will be used. + +No default token is included for KUKSA.val Databroker. Instead the user must specify the token file in the config file. +The token must also be available for the running docker container, for example by mounting the directory container +when starting the container. Below is an example based on that the token file +[provide-all.token](https://github.com/eclipse/kuksa.val/blob/master/jwt/provide-all.token) is used and that `kuksa.val` +is cloned to `/home/user/kuksa.val`. Then the token can be accessed by mounting the `jwt`folder using the `-v` +and specify `token=/jwt/provide-all.token` in the [default configuration file](config/dbc_feeder.ini). + + +```console +docker run --net=host -e LOG_LEVEL=INFO -v /home/user/kuksa.val/jwt:/jwt dbcfeeder:latest +``` + +*Note that authentication in KUKSA.val Databroker by default is deactivated, and then no token needs to be given!* + ## Mapping file The mapping file describes mapping between VSS signals and DBC signals. @@ -234,26 +269,22 @@ To set the log level to DEBUG $ LOG_LEVEL=debug ./dbcfeeder.py ``` -Set log level to INFO, but for dbcfeeder.broker set it to DEBUG +Set log level to INFO, but for dbcfeederlib.databrokerclientwrapper set it to DEBUG ```console -$ LOG_LEVEL=info,dbcfeeder.broker_client=debug ./dbcfeeder.py +$ LOG_LEVEL=info,dbcfeederlib.databrokerclientwrapper=debug ./dbcfeeder.py ``` or, since INFO is the default log level, this is equivalent to: ```console -$ LOG_LEVEL=dbcfeeder.broker_client=debug ./dbcfeeder.py +$ LOG_LEVEL=dbcfeederlib.databrokerclientwrapper=debug ./dbcfeeder.py ``` Available loggers: - dbcfeeder -- dbcfeeder.broker_client -- databroker -- dbcreader -- dbcmapper -- can -- j1939 +- dbcfeederlib.* (one for every file in the dbcfeeder directory) +- kuksa-client (to control loggings provided by [kuksa-client](https://github.com/eclipse/kuksa.val/tree/master/kuksa-client)) ## ELM/OBDLink support @@ -284,4 +315,3 @@ large-sized messages that are delivered with more than one CAN frame because the than a CAN frame's maximum payload of 8 bytes. To enable the J1939 mode, simply put `--use-j1939` in the command when running `dbcfeeder.py`. Support for J1939 is provided by means of the [can-j1939 package](https://pypi.org/project/can-j1939/). - diff --git a/dbc2val/config/dbc_feeder.ini b/dbc2val/config/dbc_feeder.ini index 7d3e8c8a..18cbcf68 100644 --- a/dbc2val/config/dbc_feeder.ini +++ b/dbc2val/config/dbc_feeder.ini @@ -17,22 +17,32 @@ server_type = kuksa_databroker # VSS mapping file mapping = mapping/vss_3.1.1/vss_dbc.json -[kuksa_val_server] -# kuksa_val_server IP address or host name +# Same configs used for KUKSA.val Server and Databroker +# Note that default values below corresponds to Databroker +# Default values for KUKSA.val Server is commented below + +# IP address for server (KUKSA.val Server or Databroker) +ip = 127.0.0.1 # ip = localhost + +# Port for server (KUKSA.val Server or Databroker) +port = 55555 # port = 8090 -# protocol = ws -# insecure = False -# JWT security token file -# token=../../kuksa_certificates/jwt/super-admin.json.token - -[kuksa_databroker] -# kuksa_databroker IP address or host name -# ip = 127.0.0.1 -# port = 55555 -# protocol = grpc -# kuksa_databroker does not yet support security features -# insecure = True + +# Shall TLS be used (default False for Databroker, True for KUKSA.val Server) +tls = False +# tls = True + +# Token file for authorization. +# Default behavior differ between servers +# For KUKSA.val Databroker the KUKSA.val default token not included in packages and containers +# If you run your Databroker so it require authentication you must specify token +# The example below works if you have cloned kuksa.val in parallel to kuksa.val.feeders +#token=../../kuksa.val/jwt/provide-all.token +# For KUKSA.val Server the default behavior is to use the token provided as part of kuksa-client +# So you only need to specify a different token if you want to use a different token +# Possibly like below +# token=../../kuksa.val/kuksa_certificates/jwt/super-admin.json.token [can] # CAN port, use elmcan to start the elmcan bridge diff --git a/dbc2val/createvcan.sh b/dbc2val/createvcan.sh index 7dda35e5..b86e887f 100755 --- a/dbc2val/createvcan.sh +++ b/dbc2val/createvcan.sh @@ -68,5 +68,3 @@ fi virtualCanConfigure echo "createvcan: Done." - - diff --git a/dbc2val/dbcfeeder.py b/dbc2val/dbcfeeder.py index 3429c4ec..d32bb048 100755 --- a/dbc2val/dbcfeeder.py +++ b/dbc2val/dbcfeeder.py @@ -24,40 +24,38 @@ import argparse import configparser -import contextlib import enum import logging import os import queue -import json import sys import time from signal import SIGINT, SIGTERM, signal from typing import Any from typing import Dict -import grpc - -from kuksa_client import KuksaClientThread -import kuksa_client.grpc - from dbcfeederlib import canplayer from dbcfeederlib import dbc2vssmapper from dbcfeederlib import dbcreader from dbcfeederlib import j1939reader -from dbcfeederlib import databroker +from dbcfeederlib import databrokerclientwrapper +from dbcfeederlib import serverclientwrapper +from dbcfeederlib import clientwrapper from dbcfeederlib import elm2canbridge log = logging.getLogger("dbcfeeder") class ServerType(str, enum.Enum): + """Enum class to indicate type of server dbcfeeder is connecting to""" KUKSA_VAL_SERVER = 'kuksa_val_server' KUKSA_DATABROKER = 'kuksa_databroker' def init_logging(loglevel): - # create console handler and set level to debug + """Set up console logger""" + # create console handler and set level to debug. This just means that it can show DEBUG messages. + # What actually is shown is controlled by logging configuration console_logger = logging.StreamHandler() console_logger.setLevel(logging.DEBUG) @@ -79,6 +77,7 @@ def init_logging(loglevel): class ColorFormatter(logging.Formatter): + """Color formatter that can be used for terminals""" FORMAT = "{time} {{loglevel}} {logger} {msg}".format( time="\x1b[2m%(asctime)s\x1b[0m", # grey logger="\x1b[2m%(name)s:\x1b[0m", # grey @@ -99,22 +98,25 @@ def format(self, record): class Feeder: - def __init__(self, server_type: ServerType, kuksa_client_config: Dict[str, Any], + """ + The feeder is responsible for setting up a queue. + It will get a mapping config as input (in start) and will then: + Start a DBCReader that extracts interesting CAN messages and adds to the queue. + Start a CANplayer if you run with a CAN dump file as input. + Start listening to the queue and transform CAN messages to VSS data and if conditions + are fulfilled send them to the client wrapper which in turn send it to the bckend supported by the wrapper. + """ + def __init__(self, client_wrapper: clientwrapper.ClientWrapper, elmcan_config: Dict[str, Any]): self._shutdown = False self._reader = None self._player = None self._mapper = None - self._provider = None - self._connected = False self._registered = False - self._can_queue : queue.Queue[dbc2vssmapper.VSSObservation] = queue.Queue() - self._server_type = server_type - self._kuksa_client_config = kuksa_client_config + self._can_queue: queue.Queue[dbc2vssmapper.VSSObservation] = queue.Queue() + self._client_wrapper = client_wrapper self._elmcan_config = elmcan_config - self._exit_stack = contextlib.ExitStack() self._disconnect_time = 0.0 - self._kuksa = None def start( self, @@ -123,8 +125,7 @@ def start( mappingfile, candumpfile=None, use_j1939=False, - use_strict_parsing=False, - grpc_metadata=None, + use_strict_parsing=False ): log.info("Using mapping: {}".format(mappingfile)) self._mapper = dbc2vssmapper.Mapper(mappingfile) @@ -149,8 +150,9 @@ def start( if candumpfile: # use dumpfile log.info( - "Using virtual bus to replay CAN messages (channel: %s)", + "Using virtual bus to replay CAN messages (channel: %s) (dumpfile: %s)", canport, + candumpfile ) self._reader.start_listening( bustype="virtual", @@ -170,20 +172,6 @@ def start( log.info("Using socket CAN device '%s'", canport) self._reader.start_listening(bustype="socketcan", channel=canport) - if self._server_type is ServerType.KUKSA_DATABROKER: - databroker_address = f"{self._kuksa_client_config['ip']}:{self._kuksa_client_config['port']}" - log.info("Connecting to Data Broker using %s", databroker_address) - vss_client = self._exit_stack.enter_context(kuksa_client.grpc.VSSClient( - host=self._kuksa_client_config['ip'], - port=self._kuksa_client_config['port'], - )) - vss_client.channel.subscribe( - lambda connectivity: self.on_broker_connectivity_change(connectivity), - try_to_connect=False, - ) - self._provider = databroker.Provider(vss_client, grpc_metadata) - else: - log.info("Will use kuksa-val-server") self._run() def stop(self): @@ -194,35 +182,12 @@ def stop(self): self._reader.stop() if self._player is not None: self._player.stop() - if self._kuksa is not None: - self._kuksa.stop() - - self._exit_stack.close() + self._client_wrapper.stop() + self._mapper = None def is_stopping(self): return self._shutdown - def on_broker_connectivity_change(self, connectivity): - log.info("Connectivity to data broker changed to: %s", connectivity) - if ( - connectivity == grpc.ChannelConnectivity.READY or - connectivity == grpc.ChannelConnectivity.IDLE - ): - # Can change between READY and IDLE. Only act if coming from - # unconnected state - if not self._connected: - log.info("Connected to data broker") - self._connected = True - self._disconnect_time = 0.0 - else: - if self._connected: - log.info("Disconnected from data broker") - else: - if connectivity == grpc.ChannelConnectivity.CONNECTING: - log.info("Trying to connect to data broker") - self._connected = False - self._registered = False - def _register_datapoints(self) -> bool: """ Check that data points are registered. @@ -230,46 +195,45 @@ def _register_datapoints(self) -> bool: Returns True on success. """ log.info("Check that datapoints are registered") + if self._mapper is None: + log.error("_register_datapoints called before feeder has been started") + return False all_registered = True for entry in self._mapper.mapping.values(): for vss_mapping in entry: - #for target_name, target_attr in self._mapper.mapping[entry]["targets"].items(): - registered = self._provider.check_registered( - vss_mapping.vss_name, - vss_mapping.datatype.upper(), - vss_mapping.description, - ) - if not registered: + log.debug("Checking if signal %s is registered", vss_mapping.vss_name) + resp = self._client_wrapper.is_signal_defined(vss_mapping.vss_name) + if not resp: all_registered = False return all_registered def _run(self): - if self._server_type is ServerType.KUKSA_VAL_SERVER: - self._kuksa = KuksaClientThread(self._kuksa_client_config) - self._kuksa.start() - self._kuksa.authorize() + self._client_wrapper.start() + log.info("Authorized") processing_started = False messages_sent = 0 last_sent_log_entry = 0 queue_max_size = 0 while self._shutdown is False: - if self._server_type is ServerType.KUKSA_DATABROKER: - if not self._connected: - sleep_time = 0.2 - time.sleep(sleep_time) - self._disconnect_time += sleep_time - if self._disconnect_time > 5: - log.info("Databroker still not connected!") - self._disconnect_time = 0.0 + if self._client_wrapper.is_connected(): + self._disconnect_time = 0.0 + else: + # As we actually cannot register + self._registered = False + sleep_time = 0.2 + time.sleep(sleep_time) + self._disconnect_time += sleep_time + if self._disconnect_time > 5: + log.info("Server/Databroker still not connected!") + self._disconnect_time = 0.0 + continue + if not self._registered: + if not self._register_datapoints(): + log.error("Not all datapoints registered, exiting!") + self.stop() continue - if not self._registered: - time.sleep(1) - if not self._register_datapoints(): - log.error("Not all datapoints registered, exiting!") - self.stop() - continue - self._registered = True + self._registered = True try: if not processing_started: processing_started = True @@ -288,26 +252,10 @@ def _run(self): else: # get values out of the canreplay and map to desired signals target = vss_observation.vss_name - success = True - if self._server_type is ServerType.KUKSA_DATABROKER: - try: - self._provider.update_datapoint(target, value) - except kuksa_client.grpc.VSSClientError: - log.error(f"Error sending {target} to databroker", exc_info=True) - success = False - else: - # KUKSA server expects a string value without quotes - if isinstance(value,bool): - # For bool KUKSA server expects lower case (true/false) rather than Python case (True/False) - send_value = json.dumps(value) - else: - send_value = str(value) - resp = json.loads(self._kuksa.setValue(target, send_value)) - if "error" in resp: - log.error(f"Error sending {target} to kuksa-val-server: {resp['error']}") - success = False + + success = self._client_wrapper.update_datapoint(target, value) if success: - log.debug("Succeeded sending DataPoint(%s, %s, %f)", target, value,vss_observation.time) + log.debug("Succeeded sending DataPoint(%s, %s, %f)", target, value, vss_observation.time) # Give status message after 1, 2, 4, 8, 16, 32, 64, .... messages have been sent messages_sent += 1 if messages_sent >= (2 * last_sent_log_entry): @@ -355,8 +303,10 @@ def parse_config(filename): return config + def main(argv): - # argument support + """Main entrypoint for dbcfeeder""" + log.info(f"Argv is {argv}") parser = argparse.ArgumentParser(description="dbcfeeder") parser.add_argument("--config", metavar="FILE", help="Configuration file") parser.add_argument( @@ -376,7 +326,7 @@ def main(argv): parser.add_argument( "--mapping", metavar="FILE", - help="Mapping file used to map CAN signals to databroker datapoints", + help="Mapping file used to map CAN signals to VSS datapoints", ) parser.add_argument( "--server-type", @@ -404,35 +354,39 @@ def main(argv): server_type = args.server_type elif os.environ.get("SERVER_TYPE"): server_type = ServerType(os.environ.get("SERVER_TYPE")) - elif "general" in config and "server_type" in config["general"]: + elif "server_type" in config["general"]: server_type = ServerType(config["general"]["server_type"]) else: server_type = ServerType.KUKSA_VAL_SERVER + if server_type not in [ServerType.KUKSA_VAL_SERVER, ServerType.KUKSA_DATABROKER]: + raise ValueError(f"Unsupported server type: {server_type}") + + # The wrappers contain default settings, so we only need to change settings + # if given by dbcfeeder configs/arguments/env-variables if server_type is ServerType.KUKSA_VAL_SERVER: - config.setdefault("kuksa_val_server", {}) - config["kuksa_val_server"].setdefault("ip", "localhost") - config["kuksa_val_server"].setdefault("port", "8090") - config["kuksa_val_server"].setdefault("protocol", "ws") - config["kuksa_val_server"].setdefault("insecure", "False") - kuksa_client_config = config["kuksa_val_server"] + client_wrapper = serverclientwrapper.ServerClientWrapper() elif server_type is ServerType.KUKSA_DATABROKER: - config.setdefault("kuksa_databroker", {}) - config["kuksa_databroker"].setdefault("ip", "127.0.0.1") - config["kuksa_databroker"].setdefault("port", "55555") - config["kuksa_databroker"].setdefault("protocol", "grpc") - config["kuksa_databroker"].setdefault("insecure", "True") - kuksa_client_config = config["kuksa_databroker"] - - if os.environ.get("DAPR_GRPC_PORT"): - kuksa_client_config["ip"] = "127.0.0.1" - kuksa_client_config["port"] = os.environ.get("DAPR_GRPC_PORT") - elif os.environ.get("VDB_ADDRESS"): - vdb_address, vdb_port = os.environ.get("VDB_ADDRESS").split(':', maxsplit=1) - kuksa_client_config["ip"] = vdb_address - kuksa_client_config["port"] = vdb_port + client_wrapper = databrokerclientwrapper.DatabrokerClientWrapper() + + if os.environ.get("KUKSA_ADDRESS"): + client_wrapper.set_ip(os.environ.get("KUKSA_ADDRESS")) + elif "ip" in config["general"]: + client_wrapper.set_ip(config["general"]["ip"]) + + if os.environ.get("KUKSA_PORT"): + client_wrapper.set_port(os.environ.get("KUKSA_PORT")) + elif "port" in config["general"]: + client_wrapper.set_port(config["general"]["port"]) + + if "tls" in config["general"]: + client_wrapper.set_tls(config["general"].getboolean("tls")) + + if "token" in config["general"]: + log.info(f"Given token information: {config['general']['token']}") + client_wrapper.set_token_path(config["general"]["token"]) else: - raise ValueError(f"Unsupported server type: {server_type}") + log.info("Token information not given") if args.mapping: mappingfile = args.mapping @@ -486,26 +440,21 @@ def main(argv): elif "can" in config and "candumpfile" in config["can"]: candumpfile = config["can"]["candumpfile"] - if os.environ.get("VEHICLEDATABROKER_DAPR_APP_ID"): - grpc_metadata = ( - ("dapr-app-id", os.environ.get("VEHICLEDATABROKER_DAPR_APP_ID")), - ) - else: - grpc_metadata = None + client_wrapper.get_client_specific_configs() elmcan_config = [] if canport == "elmcan": - if candumpfile != None: + if candumpfile is not None: log.error("It is a contradiction specifying both elmcan and candumpfile!") sys.exit(-1) - if not "elmcan" in config: + if "elmcan" not in config: log.error("Cannot use elmcan without elmcan config!") sys.exit(-1) elmcan_config = config["elmcan"] - feeder = Feeder(server_type, kuksa_client_config, elmcan_config) + feeder = Feeder(client_wrapper, elmcan_config) - def signal_handler(signal_received, frame): + def signal_handler(signal_received, *_): log.info(f"Received signal {signal_received}, stopping...") # If we get told to shutdown a second time. Just do it. @@ -525,17 +474,16 @@ def signal_handler(signal_received, frame): mappingfile=mappingfile, candumpfile=candumpfile, use_j1939=use_j1939, - use_strict_parsing=args.strict, - grpc_metadata=grpc_metadata, + use_strict_parsing=args.strict ) return 0 def parse_env_log(env_log, default=logging.INFO): - def parse_level(level, default=default): - if type(level) is str: - if level.lower() in [ + def parse_level(specified_level, default=default): + if isinstance(specified_level, str): + if specified_level.lower() in [ "debug", "info", "warn", @@ -543,12 +491,11 @@ def parse_level(level, default=default): "error", "critical", ]: - return level.upper() - else: - raise Exception(f"could not parse '{level}' as a log level") + return specified_level.upper() + raise Exception(f"could not parse '{specified_level}' as a log level") return default - loglevels = {} + parsed_loglevels = {} if env_log is not None: log_specs = env_log.split(",") @@ -556,19 +503,18 @@ def parse_level(level, default=default): spec_parts = log_spec.split("=") if len(spec_parts) == 1: # This is a root level spec - if "root" in loglevels: + if "root" in parsed_loglevels: raise Exception("multiple root loglevels specified") - else: - loglevels["root"] = parse_level(spec_parts[0]) + parsed_loglevels["root"] = parse_level(spec_parts[0]) if len(spec_parts) == 2: - logger = spec_parts[0] - level = spec_parts[1] - loglevels[logger] = parse_level(level) + logger_name = spec_parts[0] + logger_level = spec_parts[1] + parsed_loglevels[logger_name] = parse_level(logger_level) - if "root" not in loglevels: - loglevels["root"] = default + if "root" not in parsed_loglevels: + parsed_loglevels["root"] = default - return loglevels + return parsed_loglevels if __name__ == "__main__": @@ -577,17 +523,13 @@ def parse_level(level, default=default): # Set log level to debug # LOG_LEVEL=debug ./dbcfeeder.py # - # Set log level to INFO, but for dbcfeeder.broker set it to DEBUG - # LOG_LEVEL=info,dbcfeeder.broker_client=debug ./dbcfeeder.py + # Set log level to INFO, but for dbcfeederlib.databrokerclientwrapper set it to DEBUG + # LOG_LEVEL=info,dbcfeederlib.databrokerclientwrapper=debug ./dbcfeeder.py # # Other available loggers: - # dbcfeeder - # dbcfeeder.broker_client - # databroker (useful for feeding values debug) - # dbcreader - # dbcmapper - # can - # j1939 + # dbcfeeder (main dbcfeeder file) + # dbcfeederlib.* (Every file have their own logger, like dbcfeederlib.databrokerclientwrapper) + # kuksa_client (If you want to get additional information from kuksa-client python library) # loglevels = parse_env_log(os.environ.get("LOG_LEVEL")) diff --git a/dbc2val/dbcfeederlib/clientwrapper.py b/dbc2val/dbcfeederlib/clientwrapper.py new file mode 100644 index 00000000..0f5ad127 --- /dev/null +++ b/dbc2val/dbcfeederlib/clientwrapper.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 + +################################################################################# +# Copyright (c) 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License 2.0 which is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +################################################################################# + +import logging +from typing import Any + +from abc import ABC, abstractmethod + +log = logging.getLogger(__name__) + + +class ClientWrapper(ABC): + """ + Wraps client-specific functionality so that that main dbcfeeder does not need to care about it. + This acts as a base class, each client (type and/or technology) shall be in a separate file + This file shall be feeder/provider independent, and can possibly be moved to kuksa.val/kuksa-client + """ + def __init__(self, ip: str, port: int, token_path: str, tls: bool = True): + """ + This init method is only supposed to be called by subclassed __init__ functions + """ + self._ip = ip + self._port = port + self._token_path = token_path + self._tls = tls + self._registered = False + + def set_ip(self, ip: str): + """ Set IP address to use """ + self._ip = ip + + def set_port(self, port: int): + """ Set port to use """ + self._port = port + + def set_tls(self, tls: bool): + """ + Set if TLS shall be used (including server auth). + Currently we rely on default location for root cert as defined by kuksa-client + """ + self._tls = tls + + def set_token_path(self, token_path: str): + self._token_path = token_path + + # Abstract methods to implement + @abstractmethod + def get_client_specific_configs(self): + pass + + @abstractmethod + def start(self): + pass + + @abstractmethod + def is_connected(self) -> bool: + pass + + @abstractmethod + def is_signal_defined(self, vss_name: str) -> bool: + pass + + @abstractmethod + def update_datapoint(self, name: str, value: Any) -> bool: + pass + + @abstractmethod + def stop(self): + pass diff --git a/dbc2val/dbcfeederlib/databroker.py b/dbc2val/dbcfeederlib/databroker.py deleted file mode 100644 index d0d6bcc9..00000000 --- a/dbc2val/dbcfeederlib/databroker.py +++ /dev/null @@ -1,81 +0,0 @@ -#!/usr/bin/env python3 - -################################################################################# -# Copyright (c) 2022 Contributors to the Eclipse Foundation -# -# See the NOTICE file(s) distributed with this work for additional -# information regarding copyright ownership. -# -# This program and the accompanying materials are made available under the -# terms of the Apache License 2.0 which is available at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# SPDX-License-Identifier: Apache-2.0 -################################################################################# - -import logging -from typing import Any -from typing import Optional -from typing import Union - -import grpc.aio - -import kuksa_client.grpc -from kuksa_client.grpc import Datapoint -from kuksa_client.grpc import DataEntry -from kuksa_client.grpc import DataType -from kuksa_client.grpc import EntryUpdate -from kuksa_client.grpc import Field -from kuksa_client.grpc import Metadata -from kuksa_client.grpc import VSSClient - -log = logging.getLogger(__name__) - - -class Provider: - def __init__(self, vss_client: VSSClient, grpc_metadata: Optional[grpc.aio.Metadata] = None): - self._name_to_type : dict[str, DataType]= {} - self._rpc_kwargs = {'metadata': grpc_metadata} - log.info("Using %s", self._rpc_kwargs) - self._vss_client = vss_client - - def check_registered(self, name: str, data_type: Union[str, DataType], description: str) -> bool: - """ - Check if the signal is registered. If not raise an exception. - In the future this method may try register signals that are not yet registered. - The arguments data_type and description are kept for that purpose. - Returns True if check succeeds. - """ - if isinstance(data_type, str): - data_type = getattr(DataType, data_type) - try: - log.debug("Checking if signal %s is registered",name) - metadata = self._vss_client.get_metadata((name,), **self._rpc_kwargs) - if len(metadata) == 1: - self._name_to_type[name] = metadata[name].data_type - log.info( - "%s is already registered with type %s", - name, - metadata[name].data_type.name, - ) - return True - log.error("Unexpected metadata response when checking for %s: %s", name, metadata) - except kuksa_client.grpc.VSSClientError as client_error: - code = client_error.error.get('code') - if code == 404: - log.error("Signal %s is not registered", name) - else: - log.error("Error checking registration of %s", name, exc_info=True) - return False - - def update_datapoint(self, name: str, value: Any): - updates = (EntryUpdate(DataEntry( - name, - value=Datapoint(value=value), - # Specifying data_type removes the need for the client to query data_type from the server before - # issuing every set() call. - metadata=Metadata(data_type=self._name_to_type[name]), - ), (Field.VALUE,)),) - - self._vss_client.set(updates=updates, **self._rpc_kwargs) - log.debug("%s => %s", name, value) diff --git a/dbc2val/dbcfeederlib/databrokerclientwrapper.py b/dbc2val/dbcfeederlib/databrokerclientwrapper.py new file mode 100644 index 00000000..3bf2db2f --- /dev/null +++ b/dbc2val/dbcfeederlib/databrokerclientwrapper.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 + +################################################################################# +# Copyright (c) 2022 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License 2.0 which is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +################################################################################# + +import logging +from typing import Any +from typing import Dict + +import os +import contextlib + +import grpc.aio + +import kuksa_client.grpc +from kuksa_client.grpc import Datapoint +from kuksa_client.grpc import DataEntry +from kuksa_client.grpc import DataType +from kuksa_client.grpc import EntryUpdate +from kuksa_client.grpc import Field +from kuksa_client.grpc import Metadata +from dbcfeederlib import clientwrapper + +log = logging.getLogger(__name__) + + +class DatabrokerClientWrapper(clientwrapper.ClientWrapper): + """ + Client Wrapper using the interface in + https://github.com/eclipse/kuksa.val/blob/master/kuksa-client/kuksa_client/grpc/__init__.py + """ + # No default token path given as no dewfault token included in packages/containers + def __init__(self, ip: str = "127.0.0.1", port: int = 55555, + token_path: str = "", + tls: bool = False): + """ + Init Databroker client wrapper, by default (for now) without TLS + """ + self._grpc_client = None + self._name_to_type: dict[str, DataType] = {} + self._rpc_kwargs: Dict[str, str] = {} + self._connected = False + self._exit_stack = contextlib.ExitStack() + super().__init__(ip, port, token_path, tls) + + def get_client_specific_configs(self): + """ + Get client specific configs and env variables + """ + + if os.environ.get("VEHICLEDATABROKER_DAPR_APP_ID"): + grpc_metadata = ( + ("dapr-app-id", os.environ.get("VEHICLEDATABROKER_DAPR_APP_ID")), + ) + self._rpc_kwargs = {'metadata': grpc_metadata} + + def start(self): + """ + Start connection to databroker and authorize + """ + + log.info(f"Connecting to Data Broker using {self._ip}:{self._port}") + + # For now will just throw a FileNotFoundError if file cannot be found + token = "" + if self._token_path != "": + log.info(f"Token path specified is {self._token_path}") + file = open(self._token_path, 'r') + token = file.read() + log.debug(f"Token is: {token}") + else: + log.info("No token path specified. KUKSA.val Databroker must run without authentication!") + + # We do not connect directly when we create VSSClient + # Instead we provide token first when we do authorize + # The alternative approach would be to provide token in constructor + # with/without ensure_startup_connection and not actively call "authorize" + # The only practical difference is how progress and errors (if any) are reported! + self._grpc_client = self._exit_stack.enter_context(kuksa_client.grpc.VSSClient( + host=self._ip, + port=self._port, + ensure_startup_connection=False + )) + self._grpc_client.authorize(token=token, **self._rpc_kwargs) + self._grpc_client.channel.subscribe( + lambda connectivity: self.on_broker_connectivity_change(connectivity), + try_to_connect=False, + ) + + def on_broker_connectivity_change(self, connectivity): + log.info("Connectivity to data broker changed to: %s", connectivity) + if ( + connectivity == grpc.ChannelConnectivity.READY or + connectivity == grpc.ChannelConnectivity.IDLE + ): + # Can change between READY and IDLE. Only act if coming from + # unconnected state + if not self._connected: + log.info("Connected to data broker") + self._connected = True + else: + if self._connected: + log.info("Disconnected from data broker") + else: + if connectivity == grpc.ChannelConnectivity.CONNECTING: + log.info("Trying to connect to data broker") + self._connected = False + + def is_connected(self) -> bool: + return self._connected + + def is_signal_defined(self, vss_name: str) -> bool: + """ + Check if the signal is registered. If not log an error. + In the future this method may try register signals that are not yet registered. + The arguments data_type and description are kept for that purpose. + Returns True if check succeeds. + """ + if self._grpc_client is None: + log.warning("is_signal_defined called before client has been started") + return False + try: + log.debug("Checking if signal %s is registered", vss_name) + metadata = self._grpc_client.get_metadata((vss_name,), **self._rpc_kwargs) + if len(metadata) == 1: + self._name_to_type[vss_name] = metadata[vss_name].data_type + log.info( + "%s is already registered with type %s", + vss_name, + metadata[vss_name].data_type.name, + ) + return True + log.error("Unexpected metadata response when checking for %s: %s", vss_name, metadata) + except kuksa_client.grpc.VSSClientError as client_error: + code = client_error.error.get('code') + if code == 404: + log.error("Signal %s is not registered", vss_name) + else: + log.error("Error checking registration of %s", vss_name, exc_info=True) + return False + + def update_datapoint(self, name: str, value: Any) -> bool: + """ + Update datapoint. + Supported format for value is still a bit unclear/undefined. + Like an a bool VSS signal both be fed as a Python bool and a string representing json true/false value + (possibly with correct case) + """ + if self._grpc_client is None: + log.warning("update_datapoint called before client has been started") + return False + try: + + updates = (EntryUpdate(DataEntry( + name, + value=Datapoint(value=value), + # Specifying data_type removes the need for the client to query data_type from the server before + # issuing every set() call. + metadata=Metadata(data_type=self._name_to_type[name]), + ), (Field.VALUE,)),) + + self._grpc_client.set(updates=updates, **self._rpc_kwargs) + log.debug("%s => %s", name, value) + + except kuksa_client.grpc.VSSClientError: + log.error(f"Error sending {value} to databroker", exc_info=True) + return False + + return True + + def stop(self): + log.info("Stopping databroker client") + if self._grpc_client is None: + log.warning("stop called before client has been started") + else: + self._exit_stack.close() + self._grpc_client = None diff --git a/dbc2val/dbcfeederlib/serverclientwrapper.py b/dbc2val/dbcfeederlib/serverclientwrapper.py new file mode 100644 index 00000000..851516b5 --- /dev/null +++ b/dbc2val/dbcfeederlib/serverclientwrapper.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 + +################################################################################# +# Copyright (c) 2022 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License 2.0 which is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +################################################################################# + +import logging +from typing import Any +import json + +from dbcfeederlib import clientwrapper + + +from kuksa_client import KuksaClientThread + +log = logging.getLogger(__name__) + + +class ServerClientWrapper(clientwrapper.ClientWrapper): + def __init__(self, ip: str = "localhost", port: int = 8090, + token_path: str = "", + tls: bool = True): + # For WebSocket (KUKSA.val Server) we do not send in token information by default + # That means in practice that the default defined in kuksa-client will be used + # (as of today 'jwt/all-read-write.json.token') + log.debug("Initializing ServerClientWrapper") + super().__init__(ip, port, token_path, tls) + + # Holder for configs used by kuksa-client + # for default values + # see https://github.com/eclipse/kuksa.val/blob/master/kuksa-client/kuksa_client/cli_backend/__init__.py + self._client_config = {} + # Set read-only configs, others to be set just before we use config as they may change + self._client_config["protocol"] = "ws" + self._kuksa = None + + def get_client_specific_configs(self): + """ + Get client specific configs and env variables + """ + log.debug("No additional configs for KUKSA.val server") + + def start(self): + """ + Start connection to server and authorize + """ + + # Finalize config + self._client_config["ip"] = self._ip + self._client_config["port"] = self._port + self._client_config["insecure"] = not self._tls + # Do not set token if it is empty to allow default client lib info to be used + if self._token_path != "": + self._client_config["token"] = self._token_path + + # TODO add data for root cert if using TLS and if given + + self._kuksa = KuksaClientThread(self._client_config) + self._kuksa.start() + self._kuksa.authorize() + + def is_connected(self) -> bool: + # This one is quite unreliable, see https://github.com/eclipse/kuksa.val/issues/523 + if self._kuksa is None: + log.warning("is_connected called before client has been started") + return False + return self._kuksa.checkConnection() + + def is_signal_defined(self, vss_name: str) -> bool: + if self._kuksa is None: + log.warning("is_signal_defined called before client has been started") + return False + """Check if signal is defined in server """ + resp = json.loads(self._kuksa.getMetaData(vss_name)) + if "error" in resp: + log.error(f"Signal {vss_name} appears not to be registered: {resp['error']}") + return False + log.info(f"Signal {vss_name} is registered: {resp}") + return True + + def update_datapoint(self, name: str, value: Any) -> bool: + """ + Update datapoint. + Supported format for value is still a bit unclear/undefined. + Like an a bool VSS signal both be fed as a Python bool and a string representing json true/false value + (possibly with correct case) + """ + if self._kuksa is None: + log.warning("update_datapoint called before client has been started") + return False + success = True + if isinstance(value, bool): + # For bool KUKSA server expects lower case (true/false) rather than Python case (True/False) + send_value = json.dumps(value) + else: + send_value = str(value) + tmp_text = self._kuksa.setValue(name, send_value) + log.debug(f"Got setValue response for {name}:{send_value}:{tmp_text}") + resp = json.loads(tmp_text) + if "error" in resp: + log.error(f"Error sending {name} to kuksa-val-server: {resp['error']}") + success = False + + return success + + def stop(self): + log.info("Stopping server client") + if self._kuksa is not None: + self._kuksa.stop() + self._kuksa = None diff --git a/dbc2val/requirements-dev.txt b/dbc2val/requirements-dev.txt index 5df9d719..5e60c657 100644 --- a/dbc2val/requirements-dev.txt +++ b/dbc2val/requirements-dev.txt @@ -2,7 +2,7 @@ # # For build and test install both dependencies in this file as well as dependencies in requirements.txt # -# Example: +# Example: # pip3 install --no-cache-dir -r requirements.txt -r requirements-dev.txt # pytest diff --git a/dbc2val/requirements.in b/dbc2val/requirements.in index 1d19e2ee..0502548f 100644 --- a/dbc2val/requirements.in +++ b/dbc2val/requirements.in @@ -6,7 +6,7 @@ # For creating/distributing a binary, all dependencies should # be pinned to specific versions in order to provide for a reproducible # build. -# +# # The 'pip-tools' package's 'pip-compile' command can be used for that # purpose. The following command will take the inputs from this file # and create a 'requirements.txt' file with pinned versions of all @@ -21,6 +21,7 @@ cantools ~= 38.0 pyyaml ~= 6.0 can-j1939 ~= 2.0 py_expression_eval ~= 0.3 -kuksa-client ~= 0.3.0 +# Larger than 0.3, includes pre-released if pip installed with --pre +kuksa-client > 0.3 types-PyYAML ~= 6.0 types-protobuf ~= 4.21 diff --git a/dbc2val/requirements.txt b/dbc2val/requirements.txt index c0b50d10..fb2204ce 100644 --- a/dbc2val/requirements.txt +++ b/dbc2val/requirements.txt @@ -1,8 +1,8 @@ # -# This file is autogenerated by pip-compile with Python 3.8 +# This file is autogenerated by pip-compile with Python 3.10 # by the following command: # -# pip-compile requirements.in +# pip-compile --pre requirements.in # argparse-addons==0.12.0 # via cantools @@ -38,7 +38,7 @@ iniconfig==2.0.0 # via pytest jsonpath-ng==1.5.3 # via kuksa-client -kuksa-client==0.3.0 +kuksa-client==0.4.0a1 # via -r requirements.in msgpack==1.0.4 # via python-can @@ -66,6 +66,7 @@ pytest==7.2.1 # via can-j1939 python-can==4.1.0 # via + # -r requirements.in # can-j1939 # cantools pyyaml==6.0