diff --git a/.flake8 b/.flake8 new file mode 100644 index 00000000..dcc8dcee --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max_line_length = 120 diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml new file mode 100644 index 00000000..ed904dee --- /dev/null +++ b/.github/workflows/pre-commit.yml @@ -0,0 +1,16 @@ +name: pre-commit + +on: [pull_request] + +jobs: + pre-commit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + # required to grab the history of the PR + fetch-depth: 0 + - uses: actions/setup-python@v3 + - uses: pre-commit/action@v3.0.0 + with: + extra_args: --color=always --from-ref ${{ github.event.pull_request.base.sha }} --to-ref ${{ github.event.pull_request.head.sha }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 00000000..aacdcea1 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,17 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v3.2.0 + hooks: + - id: trailing-whitespace + exclude_types: ["dbc"] + - id: end-of-file-fixer + exclude_types: ["dbc"] + - id: check-yaml + - id: check-added-large-files + +- repo: https://github.com/pycqa/flake8 + rev: '6.0.0' + hooks: + - id: flake8 diff --git a/README.md b/README.md index 12434459..c69f12a0 100644 --- a/README.md +++ b/README.md @@ -9,3 +9,10 @@ Name | Description [DBC feeder](./dbc2val) | DBC feeder for `kuksa.val` server/databroker [SOME/IP feeder](./someip2val) | SOME/IP feeder for `kuksa.val` databroker [Replay](./replay) | Replay script for previously recorded files when providing `kuksa.val` with --record argument + + +## Pre-commit set up +This repository is set up to use pre-commit hooks. +After you clone the project, run `pre-commit install` to install pre-commit into your git hooks. +Pre-commit will now run on every commit. +Every time you clone a project using pre-commit running pre-commit install should always be the first thing you do. diff --git a/dbc2val/CHANGELOG.md b/dbc2val/CHANGELOG.md index 8bd847df..a70a951a 100644 --- a/dbc2val/CHANGELOG.md +++ b/dbc2val/CHANGELOG.md @@ -5,4 +5,4 @@ This file lists important changes to dbc2val ## Refactoring and changed configuration format (2023-02) Feeder refactored and new mapping format based on VSS introduced, see [documentation](mapping.md). -Old mapping format no longer supported. \ No newline at end of file +Old mapping format no longer supported. diff --git a/dbc2val/Readme.md b/dbc2val/Readme.md index 7efd5f5a..0fb2debc 100644 --- a/dbc2val/Readme.md +++ b/dbc2val/Readme.md @@ -114,8 +114,13 @@ A smaller excerpt from the above sample, with less signals. | use-socketcan | False | - | - | `--use-socketcan` | Use SocketCAN (overriding any use of --dumpfile) | | mapping | mapping/vss_3.1.1/vss_dbc.json | [general].mapping | MAPPING_FILE | `--mapping` | Mapping file used to map CAN signals to databroker datapoints. Take a look on usage of the mapping file | | server-type | kuksa_databroker | [general].server_type | SERVER_TYPE | `--server-type` | Which type of server the feeder should connect to (kuksa_val_server or kuksa_databroker | -| DAPR_GRPC_PORT | - | - | DAPR_GRPC_PORT | - | Override broker address & connect to DAPR sidecar @ 127.0.0.1:DAPR_GRPC_PORT | -| VEHICLEDATABROKER_DAPR_APP_ID | - | - | VEHICLEDATABROKER_DAPR_APP_ID | - | Add dapr-app-id metadata | +| ip | 127.0.0.1 (Databroker), localhost (Server) | [general].ip | KUKSA_ADDRESS | -| IP address for Server/Databroker| +| port | 55555 (Databroker), 8090 (Server) | [general].port | KUKSA_PORT | -| Port for Server/Databroker| +| tls | False (Databroker), True (Server) | [general].tls | - | `--tls [True or False]` | Shall tls be used for Server/Databroker connection| +| token | Undefined (Databroker), Kuksa-client default (Server) | [general].token | - | - | Token path to use. Only needed if Databroker/Server requires authentication| +| VEHICLEDATABROKER_DAPR_APP_ID | - | - | VEHICLEDATABROKER_DAPR_APP_ID | - | Add dapr-app-id metadata. Only relevant for KUKSA.val Databroker + +*Note that the [default config file](config/dbc_feeder.ini) include default Databroker settings and must be modified if you intend to use it for KUKSA.val Server* Configuration options have the following priority (highest at top). @@ -233,26 +238,22 @@ To set the log level to DEBUG $ LOG_LEVEL=debug ./dbcfeeder.py ``` -Set log level to INFO, but for dbcfeeder.broker set it to DEBUG +Set log level to INFO, but for dbcfeederlib.databrokerclientwrapper set it to DEBUG ```console -$ LOG_LEVEL=info,dbcfeeder.broker_client=debug ./dbcfeeder.py +$ LOG_LEVEL=info,dbcfeederlib.databrokerclientwrapper=debug ./dbcfeeder.py ``` or, since INFO is the default log level, this is equivalent to: ```console -$ LOG_LEVEL=dbcfeeder.broker_client=debug ./dbcfeeder.py +$ LOG_LEVEL=dbcfeederlib.databrokerclientwrapper=debug ./dbcfeeder.py ``` Available loggers: - dbcfeeder -- dbcfeeder.broker_client -- databroker -- dbcreader -- dbcmapper -- can -- j1939 +- dbcfeederlib.* (one for every file in the dbcfeeder directory) +- kuksa-client (to control loggings provided by [kuksa-client](https://github.com/eclipse/kuksa.val/tree/master/kuksa-client)) ## ELM/OBDLink support @@ -291,4 +292,3 @@ $ pip install . ``` The detailed documentation to this feature can be found [here](https://dias-kuksa-doc.readthedocs.io/en/latest/contents/j1939.html). - diff --git a/dbc2val/config/dbc_feeder.ini b/dbc2val/config/dbc_feeder.ini index 7d3e8c8a..18cbcf68 100644 --- a/dbc2val/config/dbc_feeder.ini +++ b/dbc2val/config/dbc_feeder.ini @@ -17,22 +17,32 @@ server_type = kuksa_databroker # VSS mapping file mapping = mapping/vss_3.1.1/vss_dbc.json -[kuksa_val_server] -# kuksa_val_server IP address or host name +# Same configs used for KUKSA.val Server and Databroker +# Note that default values below corresponds to Databroker +# Default values for KUKSA.val Server is commented below + +# IP address for server (KUKSA.val Server or Databroker) +ip = 127.0.0.1 # ip = localhost + +# Port for server (KUKSA.val Server or Databroker) +port = 55555 # port = 8090 -# protocol = ws -# insecure = False -# JWT security token file -# token=../../kuksa_certificates/jwt/super-admin.json.token - -[kuksa_databroker] -# kuksa_databroker IP address or host name -# ip = 127.0.0.1 -# port = 55555 -# protocol = grpc -# kuksa_databroker does not yet support security features -# insecure = True + +# Shall TLS be used (default False for Databroker, True for KUKSA.val Server) +tls = False +# tls = True + +# Token file for authorization. +# Default behavior differ between servers +# For KUKSA.val Databroker the KUKSA.val default token not included in packages and containers +# If you run your Databroker so it require authentication you must specify token +# The example below works if you have cloned kuksa.val in parallel to kuksa.val.feeders +#token=../../kuksa.val/jwt/provide-all.token +# For KUKSA.val Server the default behavior is to use the token provided as part of kuksa-client +# So you only need to specify a different token if you want to use a different token +# Possibly like below +# token=../../kuksa.val/kuksa_certificates/jwt/super-admin.json.token [can] # CAN port, use elmcan to start the elmcan bridge diff --git a/dbc2val/createvcan.sh b/dbc2val/createvcan.sh index 7dda35e5..b86e887f 100755 --- a/dbc2val/createvcan.sh +++ b/dbc2val/createvcan.sh @@ -68,5 +68,3 @@ fi virtualCanConfigure echo "createvcan: Done." - - diff --git a/dbc2val/dbcfeeder.py b/dbc2val/dbcfeeder.py index d1d2fe43..74aa9580 100755 --- a/dbc2val/dbcfeeder.py +++ b/dbc2val/dbcfeeder.py @@ -24,40 +24,38 @@ import argparse import configparser -import contextlib import enum import logging import os import queue -import json import sys import time from signal import SIGINT, SIGTERM, signal from typing import Any from typing import Dict -import grpc - -from kuksa_client import KuksaClientThread -import kuksa_client.grpc - from dbcfeederlib import canplayer from dbcfeederlib import dbc2vssmapper from dbcfeederlib import dbcreader from dbcfeederlib import j1939reader -from dbcfeederlib import databroker +from dbcfeederlib import databrokerclientwrapper +from dbcfeederlib import serverclientwrapper +from dbcfeederlib import clientwrapper from dbcfeederlib import elm2canbridge log = logging.getLogger("dbcfeeder") class ServerType(str, enum.Enum): + """Enum class to indicate type of server dbcfeeder is connecting to""" KUKSA_VAL_SERVER = 'kuksa_val_server' KUKSA_DATABROKER = 'kuksa_databroker' def init_logging(loglevel): - # create console handler and set level to debug + """Set up console logger""" + # create console handler and set level to debug. This just means that it can show DEBUG messages. + # What actually is shown is controlled by logging configuration console_logger = logging.StreamHandler() console_logger.setLevel(logging.DEBUG) @@ -79,6 +77,7 @@ def init_logging(loglevel): class ColorFormatter(logging.Formatter): + """Color formatter that can be used for terminals""" FORMAT = "{time} {{loglevel}} {logger} {msg}".format( time="\x1b[2m%(asctime)s\x1b[0m", # grey logger="\x1b[2m%(name)s:\x1b[0m", # grey @@ -99,22 +98,25 @@ def format(self, record): class Feeder: - def __init__(self, server_type: ServerType, kuksa_client_config: Dict[str, Any], + """ + The feeder is responsible for setting up a queue. + It will get a mapping config as input (in start) and will then: + Start a DBCReader that extracts interesting CAN messages and adds to the queue. + Start a CANplayer if you run with a CAN dump file as input. + Start listening to the queue and transform CAN messages to VSS data and if conditions + are fulfilled send them to the client wrapper which in turn send it to the bckend supported by the wrapper. + """ + def __init__(self, client_wrapper: clientwrapper.ClientWrapper, elmcan_config: Dict[str, Any]): self._shutdown = False self._reader = None self._player = None self._mapper = None - self._provider = None - self._connected = False self._registered = False - self._can_queue : queue.Queue[dbc2vssmapper.VSSObservation] = queue.Queue() - self._server_type = server_type - self._kuksa_client_config = kuksa_client_config + self._can_queue: queue.Queue[dbc2vssmapper.VSSObservation] = queue.Queue() + self._client_wrapper = client_wrapper self._elmcan_config = elmcan_config - self._exit_stack = contextlib.ExitStack() self._disconnect_time = 0.0 - self._kuksa = None def start( self, @@ -123,8 +125,7 @@ def start( mappingfile, candumpfile=None, use_j1939=False, - use_strict_parsing=False, - grpc_metadata=None, + use_strict_parsing=False ): log.info("Using mapping: {}".format(mappingfile)) self._mapper = dbc2vssmapper.Mapper(mappingfile) @@ -149,8 +150,9 @@ def start( if candumpfile: # use dumpfile log.info( - "Using virtual bus to replay CAN messages (channel: %s)", + "Using virtual bus to replay CAN messages (channel: %s) (dumpfile: %s)", canport, + candumpfile ) self._reader.start_listening( bustype="virtual", @@ -170,20 +172,6 @@ def start( log.info("Using socket CAN device '%s'", canport) self._reader.start_listening(bustype="socketcan", channel=canport) - if self._server_type is ServerType.KUKSA_DATABROKER: - databroker_address = f"{self._kuksa_client_config['ip']}:{self._kuksa_client_config['port']}" - log.info("Connecting to Data Broker using %s", databroker_address) - vss_client = self._exit_stack.enter_context(kuksa_client.grpc.VSSClient( - host=self._kuksa_client_config['ip'], - port=self._kuksa_client_config['port'], - )) - vss_client.channel.subscribe( - lambda connectivity: self.on_broker_connectivity_change(connectivity), - try_to_connect=False, - ) - self._provider = databroker.Provider(vss_client, grpc_metadata) - else: - log.info("Will use kuksa-val-server") self._run() def stop(self): @@ -194,35 +182,12 @@ def stop(self): self._reader.stop() if self._player is not None: self._player.stop() - if self._kuksa is not None: - self._kuksa.stop() - - self._exit_stack.close() + self._client_wrapper.stop() + self._mapper = None def is_stopping(self): return self._shutdown - def on_broker_connectivity_change(self, connectivity): - log.info("Connectivity to data broker changed to: %s", connectivity) - if ( - connectivity == grpc.ChannelConnectivity.READY or - connectivity == grpc.ChannelConnectivity.IDLE - ): - # Can change between READY and IDLE. Only act if coming from - # unconnected state - if not self._connected: - log.info("Connected to data broker") - self._connected = True - self._disconnect_time = 0.0 - else: - if self._connected: - log.info("Disconnected from data broker") - else: - if connectivity == grpc.ChannelConnectivity.CONNECTING: - log.info("Trying to connect to data broker") - self._connected = False - self._registered = False - def _register_datapoints(self) -> bool: """ Check that data points are registered. @@ -230,46 +195,45 @@ def _register_datapoints(self) -> bool: Returns True on success. """ log.info("Check that datapoints are registered") + if self._mapper is None: + log.error("_register_datapoints called before feeder has been started") + return False all_registered = True for entry in self._mapper.mapping.values(): for vss_mapping in entry: - #for target_name, target_attr in self._mapper.mapping[entry]["targets"].items(): - registered = self._provider.check_registered( - vss_mapping.vss_name, - vss_mapping.datatype.upper(), - vss_mapping.description, - ) - if not registered: + log.debug("Checking if signal %s is registered", vss_mapping.vss_name) + resp = self._client_wrapper.is_signal_defined(vss_mapping.vss_name) + if not resp: all_registered = False return all_registered def _run(self): - if self._server_type is ServerType.KUKSA_VAL_SERVER: - self._kuksa = KuksaClientThread(self._kuksa_client_config) - self._kuksa.start() - self._kuksa.authorize() + self._client_wrapper.start() + log.info("Authorized") processing_started = False messages_sent = 0 last_sent_log_entry = 0 queue_max_size = 0 while self._shutdown is False: - if self._server_type is ServerType.KUKSA_DATABROKER: - if not self._connected: - sleep_time = 0.2 - time.sleep(sleep_time) - self._disconnect_time += sleep_time - if self._disconnect_time > 5: - log.info("Databroker still not connected!") - self._disconnect_time = 0.0 + if self._client_wrapper.is_connected(): + self._disconnect_time = 0.0 + else: + # As we actually cannot register + self._registered = False + sleep_time = 0.2 + time.sleep(sleep_time) + self._disconnect_time += sleep_time + if self._disconnect_time > 5: + log.info("Server/Databroker still not connected!") + self._disconnect_time = 0.0 + continue + if not self._registered: + if not self._register_datapoints(): + log.error("Not all datapoints registered, exiting!") + self.stop() continue - if not self._registered: - time.sleep(1) - if not self._register_datapoints(): - log.error("Not all datapoints registered, exiting!") - self.stop() - continue - self._registered = True + self._registered = True try: if not processing_started: processing_started = True @@ -288,26 +252,10 @@ def _run(self): else: # get values out of the canreplay and map to desired signals target = vss_observation.vss_name - success = True - if self._server_type is ServerType.KUKSA_DATABROKER: - try: - self._provider.update_datapoint(target, value) - except kuksa_client.grpc.VSSClientError: - log.error(f"Error sending {target} to databroker", exc_info=True) - success = False - else: - # KUKSA server expects a string value without quotes - if isinstance(value,bool): - # For bool KUKSA server expects lower case (true/false) rather than Python case (True/False) - send_value = json.dumps(value) - else: - send_value = str(value) - resp = json.loads(self._kuksa.setValue(target, send_value)) - if "error" in resp: - log.error(f"Error sending {target} to kuksa-val-server: {resp['error']}") - success = False + + success = self._client_wrapper.update_datapoint(target, value) if success: - log.debug("Succeeded sending DataPoint(%s, %s, %f)", target, value,vss_observation.time) + log.debug("Succeeded sending DataPoint(%s, %s, %f)", target, value, vss_observation.time) # Give status message after 1, 2, 4, 8, 16, 32, 64, .... messages have been sent messages_sent += 1 if messages_sent >= (2 * last_sent_log_entry): @@ -355,8 +303,10 @@ def parse_config(filename): return config + def main(argv): - # argument support + """Main entrypoint for dbcfeeder""" + log.info(f"Argv is {argv}") parser = argparse.ArgumentParser(description="dbcfeeder") parser.add_argument("--config", metavar="FILE", help="Configuration file") parser.add_argument( @@ -376,7 +326,7 @@ def main(argv): parser.add_argument( "--mapping", metavar="FILE", - help="Mapping file used to map CAN signals to databroker datapoints", + help="Mapping file used to map CAN signals to VSS datapoints", ) parser.add_argument( "--server-type", @@ -404,35 +354,39 @@ def main(argv): server_type = args.server_type elif os.environ.get("SERVER_TYPE"): server_type = ServerType(os.environ.get("SERVER_TYPE")) - elif "general" in config and "server_type" in config["general"]: + elif "server_type" in config["general"]: server_type = ServerType(config["general"]["server_type"]) else: server_type = ServerType.KUKSA_VAL_SERVER + if server_type not in [ServerType.KUKSA_VAL_SERVER, ServerType.KUKSA_DATABROKER]: + raise ValueError(f"Unsupported server type: {server_type}") + + # The wrappers contain default settings, so we only need to change settings + # if given by dbcfeeder configs/arguments/env-variables if server_type is ServerType.KUKSA_VAL_SERVER: - config.setdefault("kuksa_val_server", {}) - config["kuksa_val_server"].setdefault("ip", "localhost") - config["kuksa_val_server"].setdefault("port", "8090") - config["kuksa_val_server"].setdefault("protocol", "ws") - config["kuksa_val_server"].setdefault("insecure", "False") - kuksa_client_config = config["kuksa_val_server"] + client_wrapper = serverclientwrapper.ServerClientWrapper() elif server_type is ServerType.KUKSA_DATABROKER: - config.setdefault("kuksa_databroker", {}) - config["kuksa_databroker"].setdefault("ip", "127.0.0.1") - config["kuksa_databroker"].setdefault("port", "55555") - config["kuksa_databroker"].setdefault("protocol", "grpc") - config["kuksa_databroker"].setdefault("insecure", "True") - kuksa_client_config = config["kuksa_databroker"] - - if os.environ.get("DAPR_GRPC_PORT"): - kuksa_client_config["ip"] = "127.0.0.1" - kuksa_client_config["port"] = os.environ.get("DAPR_GRPC_PORT") - elif os.environ.get("VDB_ADDRESS"): - vdb_address, vdb_port = os.environ.get("VDB_ADDRESS").split(':', maxsplit=1) - kuksa_client_config["ip"] = vdb_address - kuksa_client_config["port"] = vdb_port + client_wrapper = databrokerclientwrapper.DatabrokerClientWrapper() + + if os.environ.get("KUKSA_ADDRESS"): + client_wrapper.set_ip(os.environ.get("KUKSA_ADDRESS")) + elif "ip" in config["general"]: + client_wrapper.set_ip(config["general"]["ip"]) + + if os.environ.get("KUKSA_PORT"): + client_wrapper.set_port(os.environ.get("KUKSA_PORT")) + elif "port" in config["general"]: + client_wrapper.set_port(config["general"]["port"]) + + if "tls" in config["general"]: + client_wrapper.set_tls(config["general"].getboolean("tls")) + + if "token" in config["general"]: + log.info(f"Given token information: {config['general']['token']}") + client_wrapper.set_token_path(config["general"]["token"]) else: - raise ValueError(f"Unsupported server type: {server_type}") + log.info("Token information not given") if args.mapping: mappingfile = args.mapping @@ -486,26 +440,21 @@ def main(argv): elif "can" in config and "candumpfile" in config["can"]: candumpfile = config["can"]["candumpfile"] - if os.environ.get("VEHICLEDATABROKER_DAPR_APP_ID"): - grpc_metadata = ( - ("dapr-app-id", os.environ.get("VEHICLEDATABROKER_DAPR_APP_ID")), - ) - else: - grpc_metadata = None + client_wrapper.get_client_specific_configs() elmcan_config = [] if canport == "elmcan": - if candumpfile != None: + if candumpfile is not None: log.error("It is a contradiction specifying both elmcan and candumpfile!") sys.exit(-1) - if not "elmcan" in config: + if "elmcan" not in config: log.error("Cannot use elmcan without elmcan config!") sys.exit(-1) elmcan_config = config["elmcan"] - feeder = Feeder(server_type, kuksa_client_config, elmcan_config) + feeder = Feeder(client_wrapper, elmcan_config) - def signal_handler(signal_received, frame): + def signal_handler(signal_received, *_): log.info(f"Received signal {signal_received}, stopping...") # If we get told to shutdown a second time. Just do it. @@ -525,17 +474,16 @@ def signal_handler(signal_received, frame): mappingfile=mappingfile, candumpfile=candumpfile, use_j1939=use_j1939, - use_strict_parsing=args.strict, - grpc_metadata=grpc_metadata, + use_strict_parsing=args.strict ) return 0 def parse_env_log(env_log, default=logging.INFO): - def parse_level(level, default=default): - if type(level) is str: - if level.lower() in [ + def parse_level(specified_level, default=default): + if isinstance(specified_level, str): + if specified_level.lower() in [ "debug", "info", "warn", @@ -543,12 +491,11 @@ def parse_level(level, default=default): "error", "critical", ]: - return level.upper() - else: - raise Exception(f"could not parse '{level}' as a log level") + return specified_level.upper() + raise Exception(f"could not parse '{specified_level}' as a log level") return default - loglevels = {} + parsed_loglevels = {} if env_log is not None: log_specs = env_log.split(",") @@ -556,19 +503,18 @@ def parse_level(level, default=default): spec_parts = log_spec.split("=") if len(spec_parts) == 1: # This is a root level spec - if "root" in loglevels: + if "root" in parsed_loglevels: raise Exception("multiple root loglevels specified") - else: - loglevels["root"] = parse_level(spec_parts[0]) + parsed_loglevels["root"] = parse_level(spec_parts[0]) if len(spec_parts) == 2: - logger = spec_parts[0] - level = spec_parts[1] - loglevels[logger] = parse_level(level) + logger_name = spec_parts[0] + logger_level = spec_parts[1] + parsed_loglevels[logger_name] = parse_level(logger_level) - if "root" not in loglevels: - loglevels["root"] = default + if "root" not in parsed_loglevels: + parsed_loglevels["root"] = default - return loglevels + return parsed_loglevels if __name__ == "__main__": @@ -577,17 +523,13 @@ def parse_level(level, default=default): # Set log level to debug # LOG_LEVEL=debug ./dbcfeeder.py # - # Set log level to INFO, but for dbcfeeder.broker set it to DEBUG - # LOG_LEVEL=info,dbcfeeder.broker_client=debug ./dbcfeeder.py + # Set log level to INFO, but for dbcfeederlib.databrokerclientwrapper set it to DEBUG + # LOG_LEVEL=info,dbcfeederlib.databrokerclientwrapper=debug ./dbcfeeder.py # # Other available loggers: - # dbcfeeder - # dbcfeeder.broker_client - # databroker (useful for feeding values debug) - # dbcreader - # dbcmapper - # can - # j1939 + # dbcfeeder (main dbcfeeder file) + # dbcfeederlib.* (Every file have their own logger, like dbcfeederlib.databrokerclientwrapper) + # kuksa_client (If you want to get additional information from kuksa-client python library) # loglevels = parse_env_log(os.environ.get("LOG_LEVEL")) diff --git a/dbc2val/dbcfeederlib/clientwrapper.py b/dbc2val/dbcfeederlib/clientwrapper.py new file mode 100644 index 00000000..0f5ad127 --- /dev/null +++ b/dbc2val/dbcfeederlib/clientwrapper.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 + +################################################################################# +# Copyright (c) 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License 2.0 which is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +################################################################################# + +import logging +from typing import Any + +from abc import ABC, abstractmethod + +log = logging.getLogger(__name__) + + +class ClientWrapper(ABC): + """ + Wraps client-specific functionality so that that main dbcfeeder does not need to care about it. + This acts as a base class, each client (type and/or technology) shall be in a separate file + This file shall be feeder/provider independent, and can possibly be moved to kuksa.val/kuksa-client + """ + def __init__(self, ip: str, port: int, token_path: str, tls: bool = True): + """ + This init method is only supposed to be called by subclassed __init__ functions + """ + self._ip = ip + self._port = port + self._token_path = token_path + self._tls = tls + self._registered = False + + def set_ip(self, ip: str): + """ Set IP address to use """ + self._ip = ip + + def set_port(self, port: int): + """ Set port to use """ + self._port = port + + def set_tls(self, tls: bool): + """ + Set if TLS shall be used (including server auth). + Currently we rely on default location for root cert as defined by kuksa-client + """ + self._tls = tls + + def set_token_path(self, token_path: str): + self._token_path = token_path + + # Abstract methods to implement + @abstractmethod + def get_client_specific_configs(self): + pass + + @abstractmethod + def start(self): + pass + + @abstractmethod + def is_connected(self) -> bool: + pass + + @abstractmethod + def is_signal_defined(self, vss_name: str) -> bool: + pass + + @abstractmethod + def update_datapoint(self, name: str, value: Any) -> bool: + pass + + @abstractmethod + def stop(self): + pass diff --git a/dbc2val/dbcfeederlib/databroker.py b/dbc2val/dbcfeederlib/databroker.py deleted file mode 100644 index d0d6bcc9..00000000 --- a/dbc2val/dbcfeederlib/databroker.py +++ /dev/null @@ -1,81 +0,0 @@ -#!/usr/bin/env python3 - -################################################################################# -# Copyright (c) 2022 Contributors to the Eclipse Foundation -# -# See the NOTICE file(s) distributed with this work for additional -# information regarding copyright ownership. -# -# This program and the accompanying materials are made available under the -# terms of the Apache License 2.0 which is available at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# SPDX-License-Identifier: Apache-2.0 -################################################################################# - -import logging -from typing import Any -from typing import Optional -from typing import Union - -import grpc.aio - -import kuksa_client.grpc -from kuksa_client.grpc import Datapoint -from kuksa_client.grpc import DataEntry -from kuksa_client.grpc import DataType -from kuksa_client.grpc import EntryUpdate -from kuksa_client.grpc import Field -from kuksa_client.grpc import Metadata -from kuksa_client.grpc import VSSClient - -log = logging.getLogger(__name__) - - -class Provider: - def __init__(self, vss_client: VSSClient, grpc_metadata: Optional[grpc.aio.Metadata] = None): - self._name_to_type : dict[str, DataType]= {} - self._rpc_kwargs = {'metadata': grpc_metadata} - log.info("Using %s", self._rpc_kwargs) - self._vss_client = vss_client - - def check_registered(self, name: str, data_type: Union[str, DataType], description: str) -> bool: - """ - Check if the signal is registered. If not raise an exception. - In the future this method may try register signals that are not yet registered. - The arguments data_type and description are kept for that purpose. - Returns True if check succeeds. - """ - if isinstance(data_type, str): - data_type = getattr(DataType, data_type) - try: - log.debug("Checking if signal %s is registered",name) - metadata = self._vss_client.get_metadata((name,), **self._rpc_kwargs) - if len(metadata) == 1: - self._name_to_type[name] = metadata[name].data_type - log.info( - "%s is already registered with type %s", - name, - metadata[name].data_type.name, - ) - return True - log.error("Unexpected metadata response when checking for %s: %s", name, metadata) - except kuksa_client.grpc.VSSClientError as client_error: - code = client_error.error.get('code') - if code == 404: - log.error("Signal %s is not registered", name) - else: - log.error("Error checking registration of %s", name, exc_info=True) - return False - - def update_datapoint(self, name: str, value: Any): - updates = (EntryUpdate(DataEntry( - name, - value=Datapoint(value=value), - # Specifying data_type removes the need for the client to query data_type from the server before - # issuing every set() call. - metadata=Metadata(data_type=self._name_to_type[name]), - ), (Field.VALUE,)),) - - self._vss_client.set(updates=updates, **self._rpc_kwargs) - log.debug("%s => %s", name, value) diff --git a/dbc2val/dbcfeederlib/databrokerclientwrapper.py b/dbc2val/dbcfeederlib/databrokerclientwrapper.py new file mode 100644 index 00000000..3bf2db2f --- /dev/null +++ b/dbc2val/dbcfeederlib/databrokerclientwrapper.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 + +################################################################################# +# Copyright (c) 2022 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License 2.0 which is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +################################################################################# + +import logging +from typing import Any +from typing import Dict + +import os +import contextlib + +import grpc.aio + +import kuksa_client.grpc +from kuksa_client.grpc import Datapoint +from kuksa_client.grpc import DataEntry +from kuksa_client.grpc import DataType +from kuksa_client.grpc import EntryUpdate +from kuksa_client.grpc import Field +from kuksa_client.grpc import Metadata +from dbcfeederlib import clientwrapper + +log = logging.getLogger(__name__) + + +class DatabrokerClientWrapper(clientwrapper.ClientWrapper): + """ + Client Wrapper using the interface in + https://github.com/eclipse/kuksa.val/blob/master/kuksa-client/kuksa_client/grpc/__init__.py + """ + # No default token path given as no dewfault token included in packages/containers + def __init__(self, ip: str = "127.0.0.1", port: int = 55555, + token_path: str = "", + tls: bool = False): + """ + Init Databroker client wrapper, by default (for now) without TLS + """ + self._grpc_client = None + self._name_to_type: dict[str, DataType] = {} + self._rpc_kwargs: Dict[str, str] = {} + self._connected = False + self._exit_stack = contextlib.ExitStack() + super().__init__(ip, port, token_path, tls) + + def get_client_specific_configs(self): + """ + Get client specific configs and env variables + """ + + if os.environ.get("VEHICLEDATABROKER_DAPR_APP_ID"): + grpc_metadata = ( + ("dapr-app-id", os.environ.get("VEHICLEDATABROKER_DAPR_APP_ID")), + ) + self._rpc_kwargs = {'metadata': grpc_metadata} + + def start(self): + """ + Start connection to databroker and authorize + """ + + log.info(f"Connecting to Data Broker using {self._ip}:{self._port}") + + # For now will just throw a FileNotFoundError if file cannot be found + token = "" + if self._token_path != "": + log.info(f"Token path specified is {self._token_path}") + file = open(self._token_path, 'r') + token = file.read() + log.debug(f"Token is: {token}") + else: + log.info("No token path specified. KUKSA.val Databroker must run without authentication!") + + # We do not connect directly when we create VSSClient + # Instead we provide token first when we do authorize + # The alternative approach would be to provide token in constructor + # with/without ensure_startup_connection and not actively call "authorize" + # The only practical difference is how progress and errors (if any) are reported! + self._grpc_client = self._exit_stack.enter_context(kuksa_client.grpc.VSSClient( + host=self._ip, + port=self._port, + ensure_startup_connection=False + )) + self._grpc_client.authorize(token=token, **self._rpc_kwargs) + self._grpc_client.channel.subscribe( + lambda connectivity: self.on_broker_connectivity_change(connectivity), + try_to_connect=False, + ) + + def on_broker_connectivity_change(self, connectivity): + log.info("Connectivity to data broker changed to: %s", connectivity) + if ( + connectivity == grpc.ChannelConnectivity.READY or + connectivity == grpc.ChannelConnectivity.IDLE + ): + # Can change between READY and IDLE. Only act if coming from + # unconnected state + if not self._connected: + log.info("Connected to data broker") + self._connected = True + else: + if self._connected: + log.info("Disconnected from data broker") + else: + if connectivity == grpc.ChannelConnectivity.CONNECTING: + log.info("Trying to connect to data broker") + self._connected = False + + def is_connected(self) -> bool: + return self._connected + + def is_signal_defined(self, vss_name: str) -> bool: + """ + Check if the signal is registered. If not log an error. + In the future this method may try register signals that are not yet registered. + The arguments data_type and description are kept for that purpose. + Returns True if check succeeds. + """ + if self._grpc_client is None: + log.warning("is_signal_defined called before client has been started") + return False + try: + log.debug("Checking if signal %s is registered", vss_name) + metadata = self._grpc_client.get_metadata((vss_name,), **self._rpc_kwargs) + if len(metadata) == 1: + self._name_to_type[vss_name] = metadata[vss_name].data_type + log.info( + "%s is already registered with type %s", + vss_name, + metadata[vss_name].data_type.name, + ) + return True + log.error("Unexpected metadata response when checking for %s: %s", vss_name, metadata) + except kuksa_client.grpc.VSSClientError as client_error: + code = client_error.error.get('code') + if code == 404: + log.error("Signal %s is not registered", vss_name) + else: + log.error("Error checking registration of %s", vss_name, exc_info=True) + return False + + def update_datapoint(self, name: str, value: Any) -> bool: + """ + Update datapoint. + Supported format for value is still a bit unclear/undefined. + Like an a bool VSS signal both be fed as a Python bool and a string representing json true/false value + (possibly with correct case) + """ + if self._grpc_client is None: + log.warning("update_datapoint called before client has been started") + return False + try: + + updates = (EntryUpdate(DataEntry( + name, + value=Datapoint(value=value), + # Specifying data_type removes the need for the client to query data_type from the server before + # issuing every set() call. + metadata=Metadata(data_type=self._name_to_type[name]), + ), (Field.VALUE,)),) + + self._grpc_client.set(updates=updates, **self._rpc_kwargs) + log.debug("%s => %s", name, value) + + except kuksa_client.grpc.VSSClientError: + log.error(f"Error sending {value} to databroker", exc_info=True) + return False + + return True + + def stop(self): + log.info("Stopping databroker client") + if self._grpc_client is None: + log.warning("stop called before client has been started") + else: + self._exit_stack.close() + self._grpc_client = None diff --git a/dbc2val/dbcfeederlib/serverclientwrapper.py b/dbc2val/dbcfeederlib/serverclientwrapper.py new file mode 100644 index 00000000..851516b5 --- /dev/null +++ b/dbc2val/dbcfeederlib/serverclientwrapper.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 + +################################################################################# +# Copyright (c) 2022 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License 2.0 which is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +################################################################################# + +import logging +from typing import Any +import json + +from dbcfeederlib import clientwrapper + + +from kuksa_client import KuksaClientThread + +log = logging.getLogger(__name__) + + +class ServerClientWrapper(clientwrapper.ClientWrapper): + def __init__(self, ip: str = "localhost", port: int = 8090, + token_path: str = "", + tls: bool = True): + # For WebSocket (KUKSA.val Server) we do not send in token information by default + # That means in practice that the default defined in kuksa-client will be used + # (as of today 'jwt/all-read-write.json.token') + log.debug("Initializing ServerClientWrapper") + super().__init__(ip, port, token_path, tls) + + # Holder for configs used by kuksa-client + # for default values + # see https://github.com/eclipse/kuksa.val/blob/master/kuksa-client/kuksa_client/cli_backend/__init__.py + self._client_config = {} + # Set read-only configs, others to be set just before we use config as they may change + self._client_config["protocol"] = "ws" + self._kuksa = None + + def get_client_specific_configs(self): + """ + Get client specific configs and env variables + """ + log.debug("No additional configs for KUKSA.val server") + + def start(self): + """ + Start connection to server and authorize + """ + + # Finalize config + self._client_config["ip"] = self._ip + self._client_config["port"] = self._port + self._client_config["insecure"] = not self._tls + # Do not set token if it is empty to allow default client lib info to be used + if self._token_path != "": + self._client_config["token"] = self._token_path + + # TODO add data for root cert if using TLS and if given + + self._kuksa = KuksaClientThread(self._client_config) + self._kuksa.start() + self._kuksa.authorize() + + def is_connected(self) -> bool: + # This one is quite unreliable, see https://github.com/eclipse/kuksa.val/issues/523 + if self._kuksa is None: + log.warning("is_connected called before client has been started") + return False + return self._kuksa.checkConnection() + + def is_signal_defined(self, vss_name: str) -> bool: + if self._kuksa is None: + log.warning("is_signal_defined called before client has been started") + return False + """Check if signal is defined in server """ + resp = json.loads(self._kuksa.getMetaData(vss_name)) + if "error" in resp: + log.error(f"Signal {vss_name} appears not to be registered: {resp['error']}") + return False + log.info(f"Signal {vss_name} is registered: {resp}") + return True + + def update_datapoint(self, name: str, value: Any) -> bool: + """ + Update datapoint. + Supported format for value is still a bit unclear/undefined. + Like an a bool VSS signal both be fed as a Python bool and a string representing json true/false value + (possibly with correct case) + """ + if self._kuksa is None: + log.warning("update_datapoint called before client has been started") + return False + success = True + if isinstance(value, bool): + # For bool KUKSA server expects lower case (true/false) rather than Python case (True/False) + send_value = json.dumps(value) + else: + send_value = str(value) + tmp_text = self._kuksa.setValue(name, send_value) + log.debug(f"Got setValue response for {name}:{send_value}:{tmp_text}") + resp = json.loads(tmp_text) + if "error" in resp: + log.error(f"Error sending {name} to kuksa-val-server: {resp['error']}") + success = False + + return success + + def stop(self): + log.info("Stopping server client") + if self._kuksa is not None: + self._kuksa.stop() + self._kuksa = None diff --git a/dbc2val/requirements-dev.txt b/dbc2val/requirements-dev.txt index 5df9d719..5e60c657 100644 --- a/dbc2val/requirements-dev.txt +++ b/dbc2val/requirements-dev.txt @@ -2,7 +2,7 @@ # # For build and test install both dependencies in this file as well as dependencies in requirements.txt # -# Example: +# Example: # pip3 install --no-cache-dir -r requirements.txt -r requirements-dev.txt # pytest diff --git a/dbc2val/requirements.in b/dbc2val/requirements.in index 1d19e2ee..19e98b71 100644 --- a/dbc2val/requirements.in +++ b/dbc2val/requirements.in @@ -6,7 +6,7 @@ # For creating/distributing a binary, all dependencies should # be pinned to specific versions in order to provide for a reproducible # build. -# +# # The 'pip-tools' package's 'pip-compile' command can be used for that # purpose. The following command will take the inputs from this file # and create a 'requirements.txt' file with pinned versions of all