Skip to content

Commit

Permalink
Fixes for using activate_session to change users.
Browse files Browse the repository at this point in the history
- The updated ServerNonce was not saved after activate_session, which
  means that subsequent activate_sessions would fail with
  BadIdentityTokenInvalid.
- The _username and _password attributes of Client were never updated but
  checked in the code (_add_user_auth function).
  • Loading branch information
mver-al committed Jan 8, 2025
1 parent 3c6317b commit 2242486
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions asyncua/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,9 @@ async def activate_session(
self._add_certificate_auth(params, user_certificate, challenge)
else:
self._add_user_auth(params, username, password)
return await self.uaclient.activate_session(params)
res = await self.uaclient.activate_session(params)
self._server_nonce = res.ServerNonce
return res

def _add_anonymous_auth(self, params):
params.UserIdentityToken = ua.AnonymousIdentityToken()
Expand All @@ -676,8 +678,11 @@ def _add_certificate_auth(self, params, certificate, challenge):
params.UserTokenSignature.Signature = sig

def _add_user_auth(self, params, username: str, password: str):
self.set_user(username)
self.set_password(password)

params.UserIdentityToken = ua.UserNameIdentityToken()
params.UserIdentityToken.UserName = username
params.UserIdentityToken.UserName = self._username
policy = self.server_policy(ua.UserTokenType.UserName)
if not policy.SecurityPolicyUri or policy.SecurityPolicyUri == security_policies.SecurityPolicyNone.URI:
# see specs part 4, 7.36.3: if the token is NOT encrypted,
Expand All @@ -686,10 +691,10 @@ def _add_user_auth(self, params, username: str, password: str):
if self._password:
if self.security_policy.Mode != ua.MessageSecurityMode.SignAndEncrypt:
_logger.warning("Sending plain-text password")
params.UserIdentityToken.Password = password.encode("utf8")
params.UserIdentityToken.Password = self._password.encode("utf8")
params.UserIdentityToken.EncryptionAlgorithm = None
elif self._password:
data, uri = self._encrypt_password(password, policy.SecurityPolicyUri)
data, uri = self._encrypt_password(self._password, policy.SecurityPolicyUri)
params.UserIdentityToken.Password = data
params.UserIdentityToken.EncryptionAlgorithm = uri
params.UserIdentityToken.PolicyId = policy.PolicyId
Expand Down

0 comments on commit 2242486

Please sign in to comment.