Skip to content

Commit

Permalink
Adding Capabilities and Doors
Browse files Browse the repository at this point in the history
  • Loading branch information
tillsteinbach committed Dec 30, 2024
1 parent 437a46b commit 04c6f7c
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 8 deletions.
58 changes: 58 additions & 0 deletions src/carconnectivity_connectors/skoda/capability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Module for Skoda vehicle capability class."""
from __future__ import annotations
from typing import TYPE_CHECKING

from enum import IntEnum

from carconnectivity.objects import GenericObject
from carconnectivity.attributes import StringAttribute

if TYPE_CHECKING:
from carconnectivity_connectors.skoda.vehicle import SkodaVehicle


class Capability(GenericObject):
"""
Represents a capability of a Skoda vehicle.
"""

def __init__(self, capability_id: str, vehicle: SkodaVehicle) -> None:
if vehicle is None:
raise ValueError('Cannot create capability without vehicle')
if id is None:
raise ValueError('Capability ID cannot be None')
super().__init__(object_id=capability_id, parent=vehicle)
self.capability_id = StringAttribute("id", self, capability_id)
self.statuses = list[Capability.Status]
self.enabled = True

def __str__(self) -> str:
return_string = f'{self.capability_id}'
return return_string

class Status(IntEnum):
"""
Enum for capability status.
"""
UNKNOWN = 0
DEACTIVATED = 1001
INITIALLY_DISABLED = 1003
DISABLED_BY_USER = 1004
OFFLINE_MODE = 1005
WORKSHOP_MODE = 1006
MISSING_OPERATION = 1007
MISSING_SERVICE = 1008
PLAY_PROTECTION = 1009
POWER_BUDGET_REACHED = 1010
DEEP_SLEEP = 1011
LOCATION_DATA_DISABLED = 1013
LICENSE_INACTIVE = 2001
LICENSE_EXPIRED = 2002
MISSING_LICENSE = 2003
USER_NOT_VERIFIED = 3001
TERMS_AND_CONDITIONS_NOT_ACCEPTED = 3002
INSUFFICIENT_RIGHTS = 3003
CONSENT_MISSING = 3004
LIMITED_FEATURE = 3005
AUTH_APP_CERT_ERROR = 3006
STATUS_UNSUPPORTED = 4001
94 changes: 86 additions & 8 deletions src/carconnectivity_connectors/skoda/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
from datetime import datetime, timedelta
import requests

from carconnectivity.vehicle import GenericVehicle, ElectricVehicle
from carconnectivity.vehicle import GenericVehicle
from carconnectivity.garage import Garage
from carconnectivity.errors import AuthenticationError, APIError, TooManyRequestsError, RetrievalError
from carconnectivity.util import robust_time_parse
from carconnectivity.util import robust_time_parse, log_extra_keys
from carconnectivity.units import Length
from carconnectivity.doors import Doors
from carconnectivity_connectors.base.connector import BaseConnector
from carconnectivity_connectors.skoda.auth.session_manager import SessionManager, SessionUser, Service
from carconnectivity_connectors.skoda.vehicle import SkodaElectricVehicle
from carconnectivity_connectors.skoda.capability import Capability


if TYPE_CHECKING:
Expand All @@ -25,6 +28,7 @@
from carconnectivity.carconnectivity import CarConnectivity

LOG: logging.Logger = logging.getLogger("carconnectivity-connector-skoda")
LOG_API_DEBUG: logging.Logger = logging.getLogger("carconnectivity-connector-skoda-api-debug")


class Connector(BaseConnector):
Expand Down Expand Up @@ -126,20 +130,47 @@ def fetch_vehicles(self) -> None:
garage: Garage = self.car_connectivity.garage
url = 'https://api.connect.skoda-auto.cz/api/v4/garage'
data: Dict[str, Any] | None = self._fetch_data(url, session=self._session)

print(data)
seen_vehicle_vins: set[str] = set()
if data is not None:
if 'vehicles' in data and data['vehicles'] is not None:
for vehicle_dict in data['vehicles']:
if 'vin' in vehicle_dict and vehicle_dict['vin'] is not None:
vehicle: Optional[GenericVehicle] = garage.get_vehicle(vehicle_dict['vin'])
seen_vehicle_vins.add(vehicle_dict['vin'])
vehicle: Optional[SkodaElectricVehicle] = garage.get_vehicle(vehicle_dict['vin']) # pyright: ignore[reportAssignmentType]
if not vehicle:
vehicle = ElectricVehicle(vin=vehicle_dict['vin'], garage=garage)
vehicle = SkodaElectricVehicle(vin=vehicle_dict['vin'], garage=garage)
garage.add_vehicle(vehicle_dict['vin'], vehicle)

if 'licensePlate' in vehicle_dict and vehicle_dict['licensePlate'] is not None:
vehicle.license_plate._set_value(vehicle_dict['licensePlate']) # pylint: disable=protected-access

if 'capabilities' in vehicle_dict and vehicle_dict['capabilities'] is not None:
if 'capabilities' in vehicle_dict['capabilities'] and vehicle_dict['capabilities']['capabilities'] is not None:
found_capabilities = set()
for capability_dict in vehicle_dict['capabilities']['capabilities']:
if 'id' in capability_dict and capability_dict['id'] is not None:
capability_id = capability_dict['id']
found_capabilities.add(capability_id)
if capability_id in vehicle.capabilities:
capability: Capability = vehicle.capabilities[capability_id]
else:
capability = Capability(capability_id=capability_id, vehicle=vehicle)
vehicle.capabilities[capability_id] = capability
for capability_id in vehicle.capabilities.keys() - found_capabilities:
vehicle.capabilities[capability_id].enabled = False
vehicle.capabilities.pop(capability_id)

if 'specification' in vehicle_dict and vehicle_dict['specification'] is not None:
if 'model' in vehicle_dict['specification'] and vehicle_dict['specification']['model'] is not None:
vehicle.model._set_value(vehicle_dict['specification']['model']) # pylint: disable=protected-access
log_extra_keys(LOG_API_DEBUG, 'specification', vehicle_dict['specification'], {'model'})
log_extra_keys(LOG_API_DEBUG, 'vehicles', vehicle_dict, {'vin', 'licensePlate', 'capabilities', 'specification'})
self.fetch_vehicle_status(vehicle)
for vin in set(garage.list_vehicle_vins()) - seen_vehicle_vins:
vehicle_to_remove = garage.get_vehicle(vin)
if vehicle_to_remove is not None and vehicle_to_remove.is_managed_by_connector(self):
garage.remove_vehicle(vin)

def fetch_vehicle_status(self, vehicle: GenericVehicle) -> None:
"""
Expand All @@ -158,11 +189,58 @@ def fetch_vehicle_status(self, vehicle: GenericVehicle) -> None:
remote = data['remote']
if 'capturedAt' in remote and remote['capturedAt'] is not None:
captured_at: datetime = robust_time_parse(remote['capturedAt'])

if 'mileageInKm' in remote and remote['mileageInKm'] is not None:
vehicle.odometer._set_value(value=remote['mileageInKm'], measured=captured_at, unit=Length.KM) # pylint: disable=protected-access
else:
raise APIError('Could not fetch vehicle status, capturedAt missing')
if 'mileageInKm' in remote and remote['mileageInKm'] is not None:
vehicle.odometer._set_value(value=remote['mileageInKm'], measured=captured_at, unit=Length.KM) # pylint: disable=protected-access
if 'status' in remote and remote['status'] is not None:
if 'open' in remote['status'] and remote['status']['open'] is not None:
if remote['status']['open'] == 'YES':
vehicle.doors.open_state._set_value(Doors.OpenState.OPEN, measured=captured_at) # pylint: disable=protected-access
elif remote['status']['open'] == 'NO':
vehicle.doors.open_state._set_value(Doors.OpenState.CLOSED, measured=captured_at) # pylint: disable=protected-access
else:
vehicle.doors.open_state._set_value(Doors.OpenState.UNKNOWN, measured=captured_at) # pylint: disable=protected-access
LOG_API_DEBUG.info('Unknown door open state: %s', remote['status']['open'])
if 'locked' in remote['status'] and remote['status']['locked'] is not None:
if remote['status']['locked'] == 'YES':
vehicle.doors.lock_state._set_value(Doors.LockState.LOCKED, measured=captured_at) # pylint: disable=protected-access
elif remote['status']['locked'] == 'NO':
vehicle.doors.lock_state._set_value(Doors.LockState.UNLOCKED, measured=captured_at) # pylint: disable=protected-access
else:
vehicle.doors.lock_state._set_value(Doors.LockState.UNKNOWN, measured=captured_at) # pylint: disable=protected-access
LOG_API_DEBUG.info('Unknown door lock state: %s', remote['status']['locked'])
if 'doors' in remote and remote['doors'] is not None:
seen_door_ids: set[str] = set()
for door_status in remote['doors']:
door_id = door_status['name']
seen_door_ids.add(door_id)
if door_id in vehicle.doors.doors:
door: Doors.Door = vehicle.doors.doors[door_id]
else:
door = Doors.Door(door_id=door_id, doors=vehicle.doors)
vehicle.doors.doors[door_id] = door
if 'status' in door_status and door_status['status'] is not None:
if door_status['status'] == 'OPEN':
door.lock_state._set_value(Doors.LockState.UNLOCKED, measured=captured_at) # pylint: disable=protected-access
door.open_state._set_value(Doors.OpenState.OPEN, measured=captured_at) # pylint: disable=protected-access
elif door_status['status'] == 'CLOSED':
door.lock_state._set_value(Doors.LockState.UNKNOWN, measured=captured_at) # pylint: disable=protected-access
door.open_state._set_value(Doors.OpenState.CLOSED, measured=captured_at) # pylint: disable=protected-access
elif door_status['status'] == 'LOCKED':
door.lock_state._set_value(Doors.LockState.LOCKED, measured=captured_at) # pylint: disable=protected-access
door.open_state._set_value(Doors.OpenState.CLOSED, measured=captured_at) # pylint: disable=protected-access
elif door_status['status'] == 'UNSUPPORTED':
door.lock_state._set_value(Doors.LockState.UNKNOWN, measured=captured_at) # pylint: disable=protected-access
door.open_state._set_value(Doors.OpenState.UNKNOWN, measured=captured_at) # pylint: disable=protected-access
else:
door.lock_state._set_value(Doors.LockState.UNKNOWN, measured=captured_at) # pylint: disable=protected-access
door.open_state._set_value(Doors.OpenState.UNKNOWN, measured=captured_at) # pylint: disable=protected-access

log_extra_keys(LOG_API_DEBUG, 'doors', door_status, {'name', 'status'})

log_extra_keys(LOG_API_DEBUG, 'status', remote['status'], {'open', 'locked'})
log_extra_keys(LOG_API_DEBUG, 'vehicles', remote, {'capturedAt', 'mileageInKm', 'status', 'doors'})
else:
raise APIError('Could not fetch vehicle status')
print(data)
Expand Down
76 changes: 76 additions & 0 deletions src/carconnectivity_connectors/skoda/vehicle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Module for vehicle classes."""
from __future__ import annotations
from typing import TYPE_CHECKING

from carconnectivity.vehicle import GenericVehicle, ElectricVehicle, CombustionVehicle, HybridVehicle

if TYPE_CHECKING:
from typing import Optional
from carconnectivity.garage import Garage
from carconnectivity_connectors.skoda.capability import Capability
from carconnectivity_connectors.base.connector import BaseConnector


class SkodaVehicle(GenericVehicle): # pylint: disable=too-many-instance-attributes
"""
A class to represent a generic Skoda vehicle.
Attributes:
-----------
vin : StringAttribute
The vehicle identification number (VIN) of the vehicle.
license_plate : StringAttribute
The license plate of the vehicle.
"""
def __init__(self, vin: Optional[str] = None, garage: Optional[Garage] = None, managing_connector: Optional[BaseConnector] = None,
origin: Optional[SkodaVehicle] = None) -> None:
super().__init__(vin=vin, garage=garage, origin=origin)
if origin is not None:
super().__init__(vin=vin, garage=garage, origin=origin, managing_connector=managing_connector)
self.capabilities = origin.capabilities
else:
self.capabilities: dict[str, Capability] = {}

def __str__(self) -> str:
return_string: str = super().__str__()
if self.capabilities is not None and len(self.capabilities) > 0:
return_string += 'Capabilities:\n'
for capability in self.capabilities.values():
return_string += f'\t{capability}\n'
return return_string


class SkodaElectricVehicle(ElectricVehicle, SkodaVehicle):
"""
Represents a Skoda electric vehicle.
"""
def __init__(self, vin: Optional[str] = None, garage: Optional[Garage] = None, managing_connector: Optional[BaseConnector] = None,
origin: Optional[GenericVehicle] = None) -> None:
if origin is not None:
super().__init__(origin=origin)
else:
super().__init__(vin=vin, garage=garage, managing_connector=managing_connector)


class SkodaCombustionVehicle(CombustionVehicle, SkodaVehicle):
"""
Represents a Skoda combustion vehicle.
"""
def __init__(self, vin: Optional[str] = None, garage: Optional[Garage] = None, managing_connector: Optional[BaseConnector] = None,
origin: Optional[GenericVehicle] = None) -> None:
if origin is not None:
super().__init__(origin=origin)
else:
super().__init__(vin=vin, garage=garage, managing_connector=managing_connector)


class SkodaHybridVehicle(HybridVehicle, SkodaVehicle):
"""
Represents a Skoda hybrid vehicle.
"""
def __init__(self, vin: Optional[str] = None, garage: Optional[Garage] = None, managing_connector: Optional[BaseConnector] = None,
origin: Optional[GenericVehicle] = None) -> None:
if origin is not None:
super().__init__(origin=origin)
else:
super().__init__(vin=vin, garage=garage, managing_connector=managing_connector)

0 comments on commit 04c6f7c

Please sign in to comment.