Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve NetHSM API exception handling #445

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 55 additions & 12 deletions pynitrokey/nethsm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,24 +176,49 @@ def __init__(


def _handle_api_exception(e, messages={}, roles=[], state=None):
# give priority to custom messages
if e.status in messages:
message = messages[e.status]
raise NetHSMError(message)

if e.status == 401 and roles:
message = "Unauthorized -- invalid username or password"
elif e.status == 403 and roles:
roles = [role.value for role in roles]
message = "Access denied -- this operation requires the role " + " or ".join(
roles
)
elif e.status == 401 and roles:
message = "Unauthorized -- invalid username or password"
elif e.status == 405:
# 405 "Method Not Allowed" mostly happens when the UserID or KeyID contains a character
# - that ends the path of the URL like a question mark '?' :
# /api/v1/keys/?/cert will hit the keys listing endpoint instead of the key/{KeyID}/cert endpoint
# - that doesn't count as a path parameter like a slash '/' :
# /api/v1/keys///cert will be interpreted as /api/v1/keys/cert with cert as the KeyID
message = "The ID you provided contains invalid characters"
elif e.status == 406:
message = "Invalid content type requested"
elif e.status == 412 and state:
message = f"Precondition failed -- this operation can only be used on a NetHSM in the state {state.value}"
elif e.status == 429:
message = (
"Too many requests -- you may have tried the wrong credentials too often"
)
else:
message = f"Unexpected API error {e.status}: {e.reason}"

if e.api_response:
try:
body = json.loads(e.api_response.response.data)
if "message" in body:
body = None
# "custom" requests
if hasattr(e.api_response, "text") and e.api_response.text != "":
body = json.loads(e.api_response.text)
# generated code
elif (
hasattr(e.api_response, "response")
and e.api_response.response.data != ""
):
body = json.loads(e.api_response.response.data)
if body is not None and "message" in body:
message += "\n" + body["message"]
except json.JSONDecodeError:
pass
Expand Down Expand Up @@ -254,10 +279,11 @@ def request(
method, url, params=params, data=data, headers=headers, json=json
)
if not response.ok:
e = ApiException(status=response.status_code, reason=response.reason)
e.body = response.text
e.headers = response.headers
raise e
raise ApiException(
status=response.status_code,
reason=response.reason,
api_response=response,
)
return response

def get_api(self):
Expand Down Expand Up @@ -295,6 +321,8 @@ def unlock(self, passphrase):
e,
state=State.LOCKED,
messages={
# Doc says 400 could happen when the passphrase is invalid?
400: "Access denied -- wrong unlock passphrase",
403: "Access denied -- wrong unlock passphrase",
},
)
Expand Down Expand Up @@ -326,7 +354,7 @@ def provision(self, unlock_passphrase, admin_passphrase, system_time):
e,
state=State.UNPROVISIONED,
messages={
400: "Malformed request data -- e. g. weak passphrase",
400: "Malformed request data -- e. g. weak passphrase or invalid time",
},
)

Expand Down Expand Up @@ -466,8 +494,9 @@ def add_operator_tag(self, user_id, tag):
state=State.OPERATIONAL,
roles=[Role.ADMINISTRATOR],
messages={
404: f"User {user_id} not found",
304: f"Tag is already present for {user_id}",
400: "Invalid tag format or user is not an operator",
404: f"User {user_id} not found",
},
)

Expand Down Expand Up @@ -553,7 +582,14 @@ def get_random_data(self, n):
response = self.get_api().random_post(body=body)
return response.body["random"]
except ApiException as e:
_handle_api_exception(e, state=State.OPERATIONAL, roles=[Role.OPERATOR])
_handle_api_exception(
e,
state=State.OPERATIONAL,
roles=[Role.OPERATOR],
messages={
400: "Invalid length. Must be between 1 and 1024",
},
)

def get_metrics(self):
try:
Expand Down Expand Up @@ -742,6 +778,7 @@ def generate_key(self, type, mechanisms, length, key_id):
roles=[Role.ADMINISTRATOR],
messages={
400: "Bad request -- invalid input data",
409: f"Conflict -- a key with the ID {key_id} already exists",
},
)

Expand Down Expand Up @@ -819,6 +856,8 @@ def get_key_certificate(self, key_id):
roles=[Role.ADMINISTRATOR, Role.OPERATOR],
messages={
404: f"Certificate for key {key_id} not found",
# The API returns a 406 if there is no certificate or if the key does not exist
406: f"Certificate for key {key_id} not found",
},
)

Expand Down Expand Up @@ -852,6 +891,7 @@ def set_key_certificate(self, key_id, cert, mime_type):
400: "Bad Request -- invalid certificate",
404: f"Key {key_id} not found",
409: f"Conflict -- key {key_id} already has a certificate",
415: "Invalid mime type",
},
)

Expand Down Expand Up @@ -1049,7 +1089,7 @@ def set_time(self, time):
state=State.OPERATIONAL,
roles=[Role.ADMINISTRATOR],
messages={
400: "Bad request -- invalid input data",
400: "Bad request -- invalid time format",
},
)

Expand Down Expand Up @@ -1096,6 +1136,9 @@ def backup(self):
e,
state=State.OPERATIONAL,
roles=[Role.BACKUP],
messages={
412: "NetHSM is not Operational or the backup passphrase is not set",
},
)

def restore(self, backup, passphrase, time):
Expand Down