Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework token acquire/refresh logic #282

Merged
merged 4 commits into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 81 additions & 52 deletions custom_components/polestar_api/pypolestar/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,70 +101,99 @@ def is_token_valid(self) -> bool:
and self.token_expiry > datetime.now(tz=timezone.utc)
)

async def get_token(self, refresh=False) -> None:
"""Get the token from Polestar."""
async def get_token(self, refresh: bool = True, force: bool = False) -> None:
"""Ensure we have a valid access token (still valid, refreshed or initial)."""

if (
not refresh
or self.token_expiry is None
or self.token_expiry < datetime.now(tz=timezone.utc)
not force
and self.access_token is not None
and self.token_expiry
and self.token_expiry > datetime.now(tz=timezone.utc)
):
if (code := await self._get_code()) is None:
return

token_request = {
"grant_type": "authorization_code",
"client_id": OIDC_CLIENT_ID,
"code": code,
"redirect_uri": OIDC_REDIRECT_URI,
**(
{
"code_verifier": self.oidc_code_verifier,
}
if self.oidc_code_verifier
else {}
),
}

elif self.refresh_token:
token_request = {
"grant_type": "refresh_token",
"client_id": OIDC_CLIENT_ID,
"refresh_token": self.refresh_token,
}
else:
self.logger.debug("Token still valid until %s", self.token_expiry)
return

if refresh and self.refresh_token:
try:
response = await self._token_refresh()
self._parse_token_response(response)
self.logger.debug("Token refreshed")
except PolestarAuthException:
self.logger.warning("Unable to refresh token, retry with code")

try:
response = await self._authorization_code()
self._parse_token_response(response)
self.logger.debug("Initial token acquired")
return
except PolestarAuthException as exc:
raise PolestarAuthException("Unable to acquire initial token") from exc

def _parse_token_response(self, response: httpx.Response) -> None:
"""Parse response from token endpoint."""

payload = response.json()
self.latest_call_code = response.status_code

if "error" in payload:
self.logger.error("Token error: %s", payload)
raise PolestarAuthException("Token error", response.status_code)

self.access_token = payload["access_token"]
self.refresh_token = payload["refresh_token"]
self.token_lifetime = payload["expires_in"]
self.token_expiry = datetime.now(tz=timezone.utc) + timedelta(
seconds=self.token_lifetime
)

self.logger.debug("Access token updated, valid until %s", self.token_expiry)

async def _authorization_code(self) -> httpx.Response:
"""Get initial token via authorization code."""

if (code := await self._get_code()) is None:
raise PolestarAuthException("Unable to get code")

token_request = {
"grant_type": "authorization_code",
"client_id": OIDC_CLIENT_ID,
"code": code,
"redirect_uri": OIDC_REDIRECT_URI,
**(
{"code_verifier": self.oidc_code_verifier}
if self.oidc_code_verifier
else {}
),
}

self.logger.debug(
"Call token endpoint with grant_type=%s", token_request["grant_type"]
)

try:
result = await self.client_session.post(
self.oidc_configuration["token_endpoint"],
data=token_request,
timeout=HTTPX_TIMEOUT,
)
except Exception as exc:
self.latest_call_code = None
self.logger.error("Auth Token Error: %s", str(exc))
raise PolestarAuthException("Error getting token") from exc
return await self.client_session.post(
self.oidc_configuration["token_endpoint"],
data=token_request,
timeout=HTTPX_TIMEOUT,
)

payload = result.json()
self.latest_call_code = result.status_code
async def _token_refresh(self) -> httpx.Response:
"""Refresh existing token."""

try:
self.access_token = payload["access_token"]
self.refresh_token = payload["refresh_token"]
self.token_lifetime = payload["expires_in"]
self.token_expiry = datetime.now(tz=timezone.utc) + timedelta(
seconds=self.token_lifetime
)
except KeyError as exc:
self.logger.error("Token response missing expected keys: %s", exc)
raise PolestarAuthException("Incomplete token response") from exc
token_request = {
"grant_type": "refresh_token",
"client_id": OIDC_CLIENT_ID,
"refresh_token": self.refresh_token,
}

self.logger.debug("Access token updated, valid until %s", self.token_expiry)
self.logger.debug(
"Call token endpoint with grant_type=%s", token_request["grant_type"]
jschlyter marked this conversation as resolved.
Show resolved Hide resolved
)

return await self.client_session.post(
self.oidc_configuration["token_endpoint"],
data=token_request,
timeout=HTTPX_TIMEOUT,
)

async def _get_code(self) -> str | None:
query_params = await self._get_resume_path()
Expand Down
2 changes: 1 addition & 1 deletion custom_components/polestar_api/pypolestar/polestar.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async def get_ev_data(self, vin: str) -> None:

try:
if self.auth.need_token_refresh():
await self.auth.get_token(refresh=True)
await self.auth.get_token()
except PolestarAuthException as e:
self.latest_call_code = 500
self.logger.warning("Auth Exception: %s", str(e))
Expand Down
Loading