Skip to content

Commit

Permalink
Big change to other endpoints using a single session
Browse files Browse the repository at this point in the history
  • Loading branch information
tillsteinbach committed Jan 2, 2025
1 parent aa0cd44 commit e825a43
Show file tree
Hide file tree
Showing 8 changed files with 1,011 additions and 770 deletions.
139 changes: 138 additions & 1 deletion src/carconnectivity_connectors/skoda/auth/auth_util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,141 @@
def add_bearer_auth_header(token, headers=None):

"""
This module provides utility functions and classes for handling authentication and parsing HTML forms
and scripts for the Volkswagen car connectivity connector.
"""
from __future__ import annotations
from typing import TYPE_CHECKING

import json
import re
from html.parser import HTMLParser

if TYPE_CHECKING:
from typing import Optional, Dict


def add_bearer_auth_header(token, headers: Optional[Dict[str, str]] = None) -> Dict[str, str]:
"""
Adds a Bearer token to the Authorization header.
Args:
token (str): The Bearer token to be added to the headers.
headers (Optional[Dict[str, str]]): An optional dictionary of headers to which the Authorization header will be added.
If not provided, a new dictionary will be created.
Returns:
Dict[str, str]: The headers dictionary with the added Authorization header.
"""
headers = headers or {}
headers['Authorization'] = f'Bearer {token}'
return headers


class HTMLFormParser(HTMLParser):
"""
A custom HTML parser to extract form data from HTML content.
"""
def __init__(self, form_id) -> None:
super().__init__()
self._form_id = form_id
self._inside_form: bool = False
self.target = None
self.data = {}

def _get_attr(self, attrs, name):
for attr in attrs:
if attr[0] == name:
return attr[1]
return None

def handle_starttag(self, tag, attrs) -> None:
if self._inside_form and tag == 'input':
self.handle_input(attrs)
return

if tag == 'form' and self._get_attr(attrs, 'id') == self._form_id:
self._inside_form = True
self.target = self._get_attr(attrs, 'action')

def handle_endtag(self, tag) -> None:
if tag == 'form' and self._inside_form:
self._inside_form = False

def handle_input(self, attrs) -> None:
if not self._inside_form:
return

name = self._get_attr(attrs, 'name')
value = self._get_attr(attrs, 'value')

if name:
self.data[name] = value


class ScriptFormParser(HTMLParser):
fields: list[str] = []
targetField: str = ''

def __init__(self):
super().__init__()
self._inside_script = False
self.data = {}
self.target = None

def handle_starttag(self, tag, attrs) -> None:
if not self._inside_script and tag == 'script':
self._inside_script = True

def handle_endtag(self, tag) -> None:
if self._inside_script and tag == 'script':
self._inside_script = False

def handle_data(self, data) -> None:
if not self._inside_script:
return

match: re.Match[str] | None = re.search(r'templateModel: (.*?),\n', data)
if not match:
return

result = json.loads(match.group(1))
self.target = result.get(self.targetField, None)
self.data = {k: v for k, v in result.items() if k in self.fields}

match2 = re.search(r'csrf_token: \'(.*?)\'', data)
if match2:
self.data['_csrf'] = match2.group(1)


class CredentialsFormParser(ScriptFormParser):
fields: list[str] = ['relayState', 'hmac', 'registerCredentialsPath', 'error', 'errorCode']
targetField: str = 'postAction'


class TermsAndConditionsFormParser(ScriptFormParser):
fields: list[str] = ['relayState', 'hmac', 'countryOfResidence', 'legalDocuments']
targetField: str = 'loginUrl'

def handle_data(self, data) -> None:
if not self._inside_script:
return

super().handle_data(data)

if 'countryOfResidence' in self.data:
self.data['countryOfResidence'] = self.data['countryOfResidence'].upper()

if 'legalDocuments' not in self.data:
return

for key in self.data['legalDocuments'][0]:
# Skip unnecessary keys
if key in ('skipLink', 'declineLink', 'majorVersion', 'minorVersion', 'changeSummary'):
continue

# Move values under a new key while converting boolean values to 'yes' or 'no'
v = self.data['legalDocuments'][0][key]
self.data[f'legalDocuments[0].{key}'] = ('yes' if v else 'no') if isinstance(v, bool) else v

# Remove the original object
del self.data['legalDocuments']
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
"""Implements a custom Retry class that allows for blacklisting certain status codes that will not retry."""
from urllib3.util.retry import Retry


class BlacklistRetry(Retry):
def __init__(self, status_blacklist=None, **kwargs):
"""
BlacklistRetry class extends the Retry class to include a blacklist of status codes that should not be retried.
"""
def __init__(self, status_blacklist=None, **kwargs) -> None:
self.status_blacklist = status_blacklist
super().__init__(**kwargs)

def is_retry(self, method, status_code, has_retry_after=False):
def is_retry(self, method, status_code, has_retry_after=False) -> bool:
"""
Determines if a request should be retried based on the HTTP method, status code,
and the presence of a 'Retry-After' header.
Args:
method (str): The HTTP method of the request (e.g., 'GET', 'POST').
status_code (int): The HTTP status code of the response.
has_retry_after (bool): Indicates if the response contains a 'Retry-After' header.
Returns:
bool: True if the request should be retried, False otherwise.
"""
if self.status_blacklist is not None and status_code in self.status_blacklist:
return False
else:
Expand Down
Loading

0 comments on commit e825a43

Please sign in to comment.