Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SetAttribute support to the server #606

Merged
merged 1 commit into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions kmip/services/server/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,7 @@ def _list_objects_with_access_controls(
return managed_objects_allowed

def _process_operation(self, operation, payload):
# TODO (peterhamilton) Alphabetize this.
if operation == enums.Operation.CREATE:
return self._process_create(payload)
elif operation == enums.Operation.CREATE_KEY_PAIR:
Expand Down Expand Up @@ -1189,6 +1190,8 @@ def _process_operation(self, operation, payload):
return self._process_decrypt(payload)
elif operation == enums.Operation.SIGNATURE_VERIFY:
return self._process_signature_verify(payload)
elif operation == enums.Operation.SET_ATTRIBUTE:
return self._process_set_attribute(payload)
elif operation == enums.Operation.MAC:
return self._process_mac(payload)
elif operation == enums.Operation.SIGN:
Expand Down Expand Up @@ -1549,6 +1552,54 @@ def _process_delete_attribute(self, payload):

return response_payload

@_kmip_version_supported('2.0')
def _process_set_attribute(self, payload):
self._logger.info("Processing operation: SetAttribute")

unique_identifier = self._id_placeholder
if payload.unique_identifier:
unique_identifier = payload.unique_identifier

managed_object = self._get_object_with_access_controls(
unique_identifier,
enums.Operation.SET_ATTRIBUTE
)

attribute_name = enums.convert_attribute_tag_to_name(
payload.new_attribute.attribute.tag
)
if self._attribute_policy.is_attribute_multivalued(attribute_name):
raise exceptions.KmipError(
status=enums.ResultStatus.OPERATION_FAILED,
reason=enums.ResultReason.MULTI_VALUED_ATTRIBUTE,
message=(
"The '{}' attribute is multi-valued. Multi-valued "
"attributes cannot be set with the SetAttribute "
"operation.".format(attribute_name)
)
)
if not self._attribute_policy.is_attribute_modifiable_by_client(
attribute_name
):
raise exceptions.KmipError(
status=enums.ResultStatus.OPERATION_FAILED,
reason=enums.ResultReason.READ_ONLY_ATTRIBUTE,
message=(
"The '{}' attribute is read-only and cannot be modified "
"by the client.".format(attribute_name)
)
)

self._set_attributes_on_managed_object(
managed_object,
{attribute_name: payload.new_attribute.attribute}
)
self._data_session.commit()

return payloads.SetAttributeResponsePayload(
unique_identifier=unique_identifier
)

@_kmip_version_supported('1.0')
def _process_register(self, payload):
self._logger.info("Processing operation: Register")
Expand Down
14 changes: 14 additions & 0 deletions kmip/services/server/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,20 @@ def is_attribute_deletable_by_client(self, attribute):
rule_set = self._attribute_rule_sets.get(attribute)
return rule_set.deletable_by_client

def is_attribute_modifiable_by_client(self, attribute):
"""
Check if the attribute can be modified by the client.

Args:
attribute (string): The name of the attribute (e.g., "Name").

Returns:
bool: True if the attribute can be modified by the client. False
otherwise.
"""
rule_set = self._attribute_rule_sets.get(attribute)
return rule_set.modifiable_by_client

def is_attribute_applicable_to_object_type(self, attribute, object_type):
"""
Check if the attribute is supported by the given object type.
Expand Down
151 changes: 151 additions & 0 deletions kmip/tests/unit/services/server/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ def test_process_operation(self):
e._process_signature_verify = mock.MagicMock()
e._process_mac = mock.MagicMock()
e._process_sign = mock.MagicMock()
e._process_set_attribute = mock.MagicMock()

e._process_operation(enums.Operation.CREATE, None)
e._process_operation(enums.Operation.CREATE_KEY_PAIR, None)
Expand All @@ -742,6 +743,7 @@ def test_process_operation(self):
e._process_operation(enums.Operation.SIGN, None)
e._process_operation(enums.Operation.SIGNATURE_VERIFY, None)
e._process_operation(enums.Operation.MAC, None)
e._process_operation(enums.Operation.SET_ATTRIBUTE, None)

e._process_create.assert_called_with(None)
e._process_create_key_pair.assert_called_with(None)
Expand All @@ -761,6 +763,7 @@ def test_process_operation(self):
e._process_decrypt.assert_called_with(None)
e._process_signature_verify.assert_called_with(None)
e._process_mac.assert_called_with(None)
e._process_set_attribute.assert_called_with(None)

def test_unsupported_operation(self):
"""
Expand Down Expand Up @@ -4122,6 +4125,154 @@ def test_delete_attribute_with_invalid_attribute_index(self):
*args
)

def test_set_attribute(self):
"""
Test that a SetAttribute request can be processed correctly.
"""
e = engine.KmipEngine()
e._protocol_version = contents.ProtocolVersion(2, 0)
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()

secret = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)

e._data_session.add(secret)
e._data_session.commit()
e._data_session = e._data_store_session_factory()

# Confirm that the attribute is set to its default value by
# fetching the managed object fresh from the database and
# checking it.
managed_object = e._get_object_with_access_controls(
"1",
enums.Operation.SET_ATTRIBUTE
)
self.assertFalse(managed_object.sensitive)

payload = payloads.SetAttributeRequestPayload(
unique_identifier="1",
new_attribute=objects.NewAttribute(
attribute=primitives.Boolean(
value=True,
tag=enums.Tags.SENSITIVE
)
)
)

response_payload = e._process_set_attribute(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()

e._logger.info.assert_any_call(
"Processing operation: SetAttribute"
)
self.assertEqual(
"1",
response_payload.unique_identifier
)

# Confirm that the attribute was actually set by fetching the
# managed object fresh from the database and checking it.
managed_object = e._get_object_with_access_controls(
response_payload.unique_identifier,
enums.Operation.SET_ATTRIBUTE
)
self.assertTrue(managed_object.sensitive)

def test_set_attribute_with_multivalued_attribute(self):
"""
Test that a KmipError is raised when attempting to set the value of
a multivalued attribute.
"""
e = engine.KmipEngine()
e._protocol_version = contents.ProtocolVersion(2, 0)
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()

secret = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)

e._data_session.add(secret)
e._data_session.commit()
e._data_session = e._data_store_session_factory()

args = (
payloads.SetAttributeRequestPayload(
unique_identifier="1",
new_attribute=objects.NewAttribute(
attribute=primitives.TextString(
value="New Name",
tag=enums.Tags.NAME
)
)
),
)

self.assertRaisesRegex(
exceptions.KmipError,
"The 'Name' attribute is multi-valued. Multi-valued attributes "
"cannot be set with the SetAttribute operation.",
e._process_set_attribute,
*args
)

def test_set_attribute_with_non_client_modifiable_attribute(self):
"""
Test that a KmipError is raised when attempting to set the value of
a attribute not modifiable by the client.
"""
e = engine.KmipEngine()
e._protocol_version = contents.ProtocolVersion(2, 0)
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()

secret = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)

e._data_session.add(secret)
e._data_session.commit()
e._data_session = e._data_store_session_factory()

args = (
payloads.SetAttributeRequestPayload(
unique_identifier="1",
new_attribute=objects.NewAttribute(
attribute=primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.RSA,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
)
)
),
)

self.assertRaisesRegex(
exceptions.KmipError,
"The 'Cryptographic Algorithm' attribute is read-only and cannot "
"be modified by the client.",
e._process_set_attribute,
*args
)

def test_register(self):
"""
Test that a Register request can be processed correctly.
Expand Down
14 changes: 14 additions & 0 deletions kmip/tests/unit/services/server/test_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ def test_is_attribute_deletable_by_client(self):
rules.is_attribute_deletable_by_client("Contact Information")
)

def test_is_attribute_modifiable_by_client(self):
"""
Test that is_attribute_modifiable_by_client returns the expected
results in all cases.
"""
rules = policy.AttributePolicy(contents.ProtocolVersion(1, 0))

self.assertFalse(
rules.is_attribute_modifiable_by_client("Unique Identifier")
)
self.assertTrue(
rules.is_attribute_modifiable_by_client("Name")
)

def test_is_attribute_applicable_to_object_type(self):
"""
Test that is_attribute_applicable_to_object_type returns the
Expand Down