From a28116e2b7fc5c9e0824e9daa84f96df597a4bc3 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Fri, 6 Dec 2019 16:55:12 -0500 Subject: [PATCH] Add ModifyAttribute support to the server This change adds ModifyAttribute operation support to the PyKMIP server, including additional attribute policy functionality to check for certain attribute characteristics that preclude ModifyAttribute operation functionality. New unit tests have been added to cover these changes. Partially implements #547 --- kmip/services/server/engine.py | 341 +++++ .../tests/unit/services/server/test_engine.py | 1171 +++++++++++++++++ 2 files changed, 1512 insertions(+) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 06976b21..ed784b0e 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -751,6 +751,96 @@ def _get_attribute_from_managed_object(self, managed_object, attr_name): # for unrecognized attributes. This satisfies the spec. return None + def _get_attribute_index_from_managed_object( + self, + managed_object, + attribute_name, + attribute_value + ): + """ + Find the attribute index for the specified attribute value. + + Args: + managed_object (pie.ManagedObject): A managed object kept by the + server. Usually obtained from _get_object_with_access_controls. + Required. + attribute_name (string): The name of the attribute to look up. + Required. + attribute_value (primitive.Base): A primitive object representing + the attribute value. If a simple object (e.g., Integer) just + do a direct comparison on its value. If a complex object (e.g., + Struct) do a comparison on all of the object fields. Required. + + Returns: + int - the attribute index of the attribute value on the managed + object, if it exists, 0 for single-valued attributes + None - if the attribute value could not be found on the managed + object + """ + if attribute_name == "Application Specific Information": + a = attribute_value + for count, v in enumerate(managed_object.app_specific_info): + if ((a.application_namespace == v.application_namespace) and + (a.application_data == v.application_data)): + return count + return None + elif attribute_name == "Certificate Type": + if attribute_value.value == managed_object.certificate_type: + return 0 + return None + elif attribute_name == "Cryptographic Algorithm": + if attribute_value.value == managed_object.cryptographic_algorithm: + return 0 + return None + elif attribute_name == "Cryptographic Length": + if attribute_value.value == managed_object.cryptographic_length: + return 0 + return None + elif attribute_name == "Cryptographic Usage Mask": + v = attribute_value.value + combined_mask = 0 + for mask in managed_object.cryptographic_usage_masks: + combined_mask |= mask.value + if v == combined_mask: + return 0 + return None + elif attribute_name == "Initial Date": + if attribute_value.value == managed_object.initial_date: + return 0 + return None + elif attribute_name == "Name": + for count, v in enumerate(managed_object.names): + if attribute_value.name_value.value == v: + return count + return None + elif attribute_name == "Object Group": + for count, v in enumerate(managed_object.object_groups): + if attribute_value.value == v.object_group: + return count + return None + elif attribute_name == "Object Type": + if attribute_value.value == managed_object.object_type: + return 0 + return None + elif attribute_name == "Operation Policy Name": + if attribute_value.value == managed_object.operation_policy_name: + return 0 + return None + elif attribute_name == "Sensitive": + if attribute_value.value == managed_object.sensitive: + return 0 + return None + elif attribute_name == "State": + if attribute_value.value == managed_object.state: + return 0 + return None + elif attribute_name == "Unique Identifier": + if attribute_value.value == str(managed_object.unique_identifier): + return 0 + return None + else: + return None + def _set_attributes_on_managed_object(self, managed_object, attributes): """ Given a kmip.pie object and a dictionary of attributes, attempt to set @@ -847,6 +937,38 @@ def _set_attribute_on_managed_object(self, managed_object, attribute): "The {0} attribute is unsupported.".format(attribute_name) ) + def _set_attribute_on_managed_object_by_index( + self, + managed_object, + attribute_name, + attribute_value, + attribute_index + ): + """ + Set the attribute value for the specified attribute index. + + Args: + managed_object (pie.ManagedObject): A managed object kept by the + server. Usually obtained from _get_object_with_access_controls. + Required. + attribute_name (string): The name of the attribute to modify. + Required. + attribute_value (primitive.Base): A primitive object representing + the new attribute value to set on the managd object. Required. + attribute_index (int): The index of the existing attribute to + modify. Required. + """ + if attribute_name == "Application Specific Information": + a = managed_object.app_specific_info[attribute_index] + a.application_namespace = attribute_value.application_namespace + a.application_data = attribute_value.application_data + elif attribute_name == "Name": + name_value = attribute_value.name_value + managed_object.names[attribute_index] = name_value.value + elif attribute_name == "Object Group": + a = managed_object.object_groups[attribute_index] + a.object_group = attribute_value.value + def _delete_attribute_from_managed_object(self, managed_object, attribute): attribute_name, attribute_index, attribute_value = attribute object_type = managed_object._object_type @@ -1192,6 +1314,8 @@ def _process_operation(self, operation, payload): return self._process_signature_verify(payload) elif operation == enums.Operation.SET_ATTRIBUTE: return self._process_set_attribute(payload) + elif operation == enums.Operation.MODIFY_ATTRIBUTE: + return self._process_modify_attribute(payload) elif operation == enums.Operation.MAC: return self._process_mac(payload) elif operation == enums.Operation.SIGN: @@ -1600,6 +1724,223 @@ def _process_set_attribute(self, payload): unique_identifier=unique_identifier ) + @_kmip_version_supported('1.0') + def _process_modify_attribute(self, payload): + self._logger.info("Processing operation: ModifyAttribute") + + unique_identifier = self._id_placeholder + if payload.unique_identifier: + unique_identifier = payload.unique_identifier + + managed_object = self._get_object_with_access_controls( + unique_identifier, + enums.Operation.MODIFY_ATTRIBUTE + ) + + if self._protocol_version >= contents.ProtocolVersion(2, 0): + current_attribute = payload.current_attribute + if current_attribute: + current_attribute = current_attribute.attribute + new_attribute = payload.new_attribute.attribute + + attribute_name = enums.convert_attribute_tag_to_name( + new_attribute.tag + ) + + if not self._attribute_policy.is_attribute_modifiable_by_client( + attribute_name + ): + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.PERMISSION_DENIED, + message=( + "The '{}' attribute is read-only and cannot be " + "modified.".format(attribute_name) + ) + ) + + is_multivalued = self._attribute_policy.is_attribute_multivalued( + attribute_name + ) + + if is_multivalued: + if current_attribute is None: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.ATTRIBUTE_INSTANCE_NOT_FOUND, + message=( + "The '{}' attribute is multivalued so the current " + "attribute must be specified.".format( + attribute_name + ) + ) + ) + else: + index = self._get_attribute_index_from_managed_object( + managed_object, + attribute_name, + current_attribute + ) + if index is None: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.ATTRIBUTE_NOT_FOUND, + message=( + "The specified current attribute could not be " + "found on the managed object." + ) + ) + else: + self._set_attribute_on_managed_object_by_index( + managed_object, + attribute_name, + new_attribute, + index + ) + self._data_session.commit() + else: + if current_attribute is None: + # Verify the attribute is set. + existing_attr = self._get_attribute_from_managed_object( + managed_object, + attribute_name + ) + if existing_attr is None: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.ATTRIBUTE_NOT_FOUND, + message=( + "The '{}' attribute is not set on the managed " + "object. It must be set before it can be " + "modified.".format(attribute_name) + ) + ) + else: + # Verify the attribute matches the current attribute. + index = self._get_attribute_index_from_managed_object( + managed_object, + attribute_name, + current_attribute + ) + if index is None: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.ATTRIBUTE_NOT_FOUND, + message=( + "The specified current attribute could not be " + "found on the managed object." + ) + ) + + # Set the attribute value. + self._set_attribute_on_managed_object( + managed_object, + (attribute_name, new_attribute) + ) + self._data_session.commit() + + return payloads.ModifyAttributeResponsePayload( + unique_identifier=unique_identifier + ) + + else: + attribute_name = payload.attribute.attribute_name.value + attribute_index = payload.attribute.attribute_index + if attribute_index: + attribute_index = attribute_index.value + attribute_value = payload.attribute.attribute_value + + if not self._attribute_policy.is_attribute_modifiable_by_client( + attribute_name + ): + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.PERMISSION_DENIED, + message=( + "The '{}' attribute is read-only and cannot be " + "modified.".format(attribute_name) + ) + ) + + is_multivalued = self._attribute_policy.is_attribute_multivalued( + attribute_name + ) + + modified_attribute = None + + if is_multivalued: + if attribute_index is None: + attribute_index = 0 + + existing_attributes = self._get_attribute_from_managed_object( + managed_object, + attribute_name + ) + if 0 <= attribute_index <= len(existing_attributes): + self._set_attribute_on_managed_object_by_index( + managed_object, + attribute_name, + attribute_value, + attribute_index + ) + self._data_session.commit() + else: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.ITEM_NOT_FOUND, + message=( + "No matching attribute instance could be found " + "for the specified attribute index." + ) + ) + + existing_attributes = self._get_attributes_from_managed_object( + managed_object, + [attribute_name] + ) + modified_attribute = existing_attributes[attribute_index] + else: + if attribute_index is not None: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.INVALID_FIELD, + message=( + "The attribute index cannot be specified for a " + "single-valued attribute." + ) + ) + existing_attribute = self._get_attributes_from_managed_object( + managed_object, + [attribute_name] + ) + if len(existing_attribute) == 0: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.INVALID_FIELD, + message=( + "The '{}' attribute is not set on the managed " + "object. It must be set before it can be " + "modified.".format(attribute_name) + ) + ) + else: + self._set_attribute_on_managed_object( + managed_object, + (attribute_name, attribute_value) + ) + self._data_session.commit() + + existing_attributes = self._get_attributes_from_managed_object( + managed_object, + [attribute_name] + ) + modified_attribute = existing_attributes[0] + + return payloads.ModifyAttributeResponsePayload( + unique_identifier=unique_identifier, + attribute=modified_attribute + ) + @_kmip_version_supported('1.0') def _process_register(self, payload): self._logger.info("Processing operation: Register") diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 965e0f54..004d7dc0 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -723,6 +723,7 @@ def test_process_operation(self): e._process_mac = mock.MagicMock() e._process_sign = mock.MagicMock() e._process_set_attribute = mock.MagicMock() + e._process_modify_attribute = mock.MagicMock() e._process_operation(enums.Operation.CREATE, None) e._process_operation(enums.Operation.CREATE_KEY_PAIR, None) @@ -744,6 +745,7 @@ def test_process_operation(self): e._process_operation(enums.Operation.SIGNATURE_VERIFY, None) e._process_operation(enums.Operation.MAC, None) e._process_operation(enums.Operation.SET_ATTRIBUTE, None) + e._process_operation(enums.Operation.MODIFY_ATTRIBUTE, None) e._process_create.assert_called_with(None) e._process_create_key_pair.assert_called_with(None) @@ -764,6 +766,7 @@ def test_process_operation(self): e._process_signature_verify.assert_called_with(None) e._process_mac.assert_called_with(None) e._process_set_attribute.assert_called_with(None) + e._process_modify_attribute.assert_called_with(None) def test_unsupported_operation(self): """ @@ -1632,6 +1635,344 @@ def test_get_attribute_from_managed_object(self): ) self.assertEqual(None, result) + def test_get_attribute_index_from_managed_object(self): + """ + Test that an attribute's index can be retrieved from a given managed + object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + symmetric_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'', + masks=[enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT] + ) + certificate = pie_objects.X509Certificate( + b'' + ) + + e._data_session.add(symmetric_key) + e._data_session.add(certificate) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._set_attribute_on_managed_object( + symmetric_key, + ( + "Application Specific Information", + [ + attributes.ApplicationSpecificInformation( + application_namespace="Example Namespace", + application_data="Example Data" + ) + ] + ) + ) + e._set_attribute_on_managed_object( + symmetric_key, + ( + "Name", + [ + attributes.Name( + name_value=attributes.Name.NameValue("Name 1") + ), + attributes.Name( + name_value=attributes.Name.NameValue("Name 2") + ) + ] + ) + ) + e._set_attribute_on_managed_object( + symmetric_key, + ( + "Object Group", + [ + primitives.TextString( + "Example Group", + tag=enums.Tags.OBJECT_GROUP + ) + ] + ) + ) + + # Test getting the index for an ApplicationSpecificInfo attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Application Specific Information", + attributes.ApplicationSpecificInformation( + application_namespace="Example Namespace", + application_data="Example Data" + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Application Specific Information", + attributes.ApplicationSpecificInformation( + application_namespace="Wrong Namespace", + application_data="Wrong Data" + ) + ) + self.assertIsNone(index) + + # Test getting the index for a CertificateType attribute + index = e._get_attribute_index_from_managed_object( + certificate, + "Certificate Type", + primitives.Enumeration( + enums.CertificateType, + enums.CertificateType.X_509, + tag=enums.Tags.CERTIFICATE_TYPE + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + certificate, + "Certificate Type", + primitives.Enumeration( + enums.CertificateType, + enums.CertificateType.PGP, + tag=enums.Tags.CERTIFICATE_TYPE + ) + ) + self.assertIsNone(index) + + # Test getting the index for a CryptographicAlgorithm attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Cryptographic Algorithm", + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Cryptographic Algorithm", + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.RSA, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + self.assertIsNone(index) + + # Test getting the index for a CryptographicLength attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Cryptographic Length", + primitives.Integer( + 0, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Cryptographic Length", + primitives.Integer( + 128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + self.assertIsNone(index) + + # Test getting the index for a CryptographicUsageMasks attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Cryptographic Usage Mask", + primitives.Integer( + 12, + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Cryptographic Usage Mask", + primitives.Integer( + 0, + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + ) + self.assertIsNone(index) + + # Test getting the index for a InitialDate attribute + date = e._get_attribute_from_managed_object( + symmetric_key, + "Initial Date" + ) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Initial Date", + primitives.DateTime( + date, + tag=enums.Tags.INITIAL_DATE + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Initial Date", + primitives.DateTime( + 9999, + tag=enums.Tags.INITIAL_DATE + ) + ) + self.assertIsNone(index) + + # Test getting the index for a Name attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Name", + attributes.Name( + name_value=attributes.Name.NameValue("Name 2") + ) + ) + self.assertEqual(2, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Name", + attributes.Name( + name_value=attributes.Name.NameValue("Name 3") + ) + ) + self.assertIsNone(index) + + # Test getting the index for a ObjectGroup attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Object Group", + primitives.TextString( + "Example Group", + tag=enums.Tags.OBJECT_GROUP + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Object Group", + primitives.TextString( + "Invalid Group", + tag=enums.Tags.OBJECT_GROUP + ) + ) + self.assertIsNone(index) + + # Test getting the index for a ObjectType attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Object Type", + primitives.Enumeration( + enums.ObjectType, + enums.ObjectType.SYMMETRIC_KEY, + tag=enums.Tags.OBJECT_TYPE + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Object Type", + primitives.Enumeration( + enums.ObjectType, + enums.ObjectType.CERTIFICATE, + tag=enums.Tags.OBJECT_TYPE + ) + ) + self.assertIsNone(index) + + # Test getting the index for a OperationPolicyName attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Operation Policy Name", + primitives.TextString( + "default", + tag=enums.Tags.OPERATION_POLICY_NAME + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Operation Policy Name", + primitives.TextString( + "invalid", + tag=enums.Tags.OPERATION_POLICY_NAME + ) + ) + self.assertIsNone(index) + + # Test getting the index for a Sensitive attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Sensitive", + primitives.Boolean( + False, + tag=enums.Tags.SENSITIVE + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Sensitive", + primitives.Boolean( + True, + tag=enums.Tags.SENSITIVE + ) + ) + self.assertIsNone(index) + + # Test getting the index for a State attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "State", + primitives.Enumeration( + enums.State, + enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "State", + primitives.Enumeration( + enums.State, + enums.State.ACTIVE, + tag=enums.Tags.STATE + ) + ) + self.assertIsNone(index) + + # Test getting the index for a UniqueIdentifier attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Unique Identifier", + primitives.TextString(value="1", tag=enums.Tags.UNIQUE_IDENTIFIER) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Unique Identifier", + primitives.TextString(value="9", tag=enums.Tags.UNIQUE_IDENTIFIER) + ) + self.assertIsNone(index) + + # Test getting the index for an unsupported attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Archive Date", + None + ) + self.assertIsNone(index) + def test_set_attributes_on_managed_object(self): """ Test that multiple attributes can be set on a given managed object. @@ -1940,6 +2281,156 @@ def test_set_attribute_on_managed_object_unsupported_features(self): *args ) + def test_set_attribute_on_managed_object_by_index(self): + """ + Test that an attribute can be modified on a managed object given its + name, index, and new value. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + symmetric_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'', + masks=[enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT] + ) + + e._data_session.add(symmetric_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._set_attribute_on_managed_object( + symmetric_key, + ( + "Application Specific Information", + [ + attributes.ApplicationSpecificInformation( + application_namespace="Example Namespace 1", + application_data="Example Data 1" + ), + attributes.ApplicationSpecificInformation( + application_namespace="Example Namespace 2", + application_data="Example Data 2" + ) + ] + ) + ) + e._set_attribute_on_managed_object( + symmetric_key, + ( + "Name", + [ + attributes.Name( + name_value=attributes.Name.NameValue("Name 1") + ), + attributes.Name( + name_value=attributes.Name.NameValue("Name 2") + ) + ] + ) + ) + e._set_attribute_on_managed_object( + symmetric_key, + ( + "Object Group", + [ + primitives.TextString( + "Example Group 1", + tag=enums.Tags.OBJECT_GROUP + ), + primitives.TextString( + "Example Group 2", + tag=enums.Tags.OBJECT_GROUP + ) + ] + ) + ) + + # Test setting an ApplicationSpecificInformation attribute by index + a = e._get_attribute_from_managed_object( + symmetric_key, + "Application Specific Information" + ) + self.assertEqual(2, len(a)) + self.assertEqual( + "Example Namespace 1", + a[0].get("application_namespace") + ) + self.assertEqual("Example Data 1", a[0].get("application_data")) + self.assertEqual( + "Example Namespace 2", + a[1].get("application_namespace") + ) + self.assertEqual("Example Data 2", a[1].get("application_data")) + e._set_attribute_on_managed_object_by_index( + symmetric_key, + "Application Specific Information", + attributes.ApplicationSpecificInformation( + application_namespace="Example Namespace 3", + application_data="Example Data 3" + ), + 1 + ) + a = e._get_attribute_from_managed_object( + symmetric_key, + "Application Specific Information" + ) + self.assertEqual(2, len(a)) + self.assertEqual( + "Example Namespace 1", + a[0].get("application_namespace") + ) + self.assertEqual("Example Data 1", a[0].get("application_data")) + self.assertEqual( + "Example Namespace 3", + a[1].get("application_namespace") + ) + self.assertEqual("Example Data 3", a[1].get("application_data")) + + # Test setting a Name attribute by index + a = e._get_attribute_from_managed_object(symmetric_key, "Name") + self.assertEqual(3, len(a)) + self.assertEqual("Symmetric Key", a[0].name_value.value) + self.assertEqual("Name 1", a[1].name_value.value) + self.assertEqual("Name 2", a[2].name_value.value) + e._set_attribute_on_managed_object_by_index( + symmetric_key, + "Name", + attributes.Name( + name_value=attributes.Name.NameValue("Name 3") + ), + 1 + ) + a = e._get_attribute_from_managed_object(symmetric_key, "Name") + self.assertEqual(3, len(a)) + self.assertEqual("Symmetric Key", a[0].name_value.value) + self.assertEqual("Name 3", a[1].name_value.value) + self.assertEqual("Name 2", a[2].name_value.value) + + # Test setting an ObjectGroup attribute by index + a = e._get_attribute_from_managed_object(symmetric_key, "Object Group") + self.assertEqual(2, len(a)) + self.assertEqual("Example Group 1", a[0]) + self.assertEqual("Example Group 2", a[1]) + e._set_attribute_on_managed_object_by_index( + symmetric_key, + "Object Group", + primitives.TextString( + "Example Group 3", + tag=enums.Tags.OBJECT_GROUP + ), + 1 + ) + a = e._get_attribute_from_managed_object(symmetric_key, "Object Group") + self.assertEqual(2, len(a)) + self.assertEqual("Example Group 1", a[0]) + self.assertEqual("Example Group 3", a[1]) + def test_delete_attribute_from_managed_object(self): """ Test that various attributes can be deleted correctly from a given @@ -4273,6 +4764,686 @@ def test_set_attribute_with_non_client_modifiable_attribute(self): *args ) + def test_modify_attribute(self): + """ + Test that a ModifyAttribute request can be processed correctly. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(1, 4) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + # Confirm that the attribute is set to its default value by + # fetching the managed object fresh from the database and + # checking it. + managed_object = e._get_object_with_access_controls( + "1", + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertFalse(managed_object.sensitive) + + payload = payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + attribute=attribute_factory.create_attribute( + enums.AttributeType.SENSITIVE, + True + ) + ) + + response_payload = e._process_modify_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: ModifyAttribute" + ) + self.assertEqual("1", response_payload.unique_identifier) + self.assertEqual( + "Sensitive", + response_payload.attribute.attribute_name.value + ) + self.assertIsNone(response_payload.attribute.attribute_index) + self.assertEqual( + True, + response_payload.attribute.attribute_value.value + ) + + # Confirm that the attribute was actually set by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertTrue(managed_object.sensitive) + + def test_modify_attribute_with_unmodifiable_attribute(self): + """ + Test that a KmipError is raised when attempting to modify an + unmodifiable attribute. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(1, 4) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + attribute=attribute_factory.create_attribute( + enums.AttributeType.UNIQUE_IDENTIFIER, + "2" + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Unique Identifier' attribute is read-only and cannot be " + "modified.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_with_multivalued(self): + """ + Test that a ModifyAttribute request can be processed correctly. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(1, 4) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + # Confirm that the attribute is set to its default value by + # fetching the managed object fresh from the database and + # checking it. + managed_object = e._get_object_with_access_controls( + "1", + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertEqual("Symmetric Key", managed_object.names[0]) + + payload = payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + attribute=attribute_factory.create_attribute( + enums.AttributeType.NAME, + "Modified Name" + ) + ) + + response_payload = e._process_modify_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: ModifyAttribute" + ) + self.assertEqual("1", response_payload.unique_identifier) + self.assertEqual( + "Name", + response_payload.attribute.attribute_name.value + ) + self.assertEqual(0, response_payload.attribute.attribute_index.value) + self.assertEqual( + "Modified Name", + response_payload.attribute.attribute_value.name_value.value + ) + + # Confirm that the attribute was actually set by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertEqual("Modified Name", managed_object.names[0]) + + def test_modify_attribute_with_multivalued_no_index_match(self): + """ + Test that a KmipError is raised when attempting to modify an attribute + based on an index that doesn't exist. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(1, 4) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + attribute=attribute_factory.create_attribute( + enums.AttributeType.NAME, + "Modified Name", + index=3 + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "No matching attribute instance could be found for the specified " + "attribute index.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_with_singlevalued_index_specified(self): + """ + Test that a KmipError is raised when attempting to modify a + single-valued attribute while also specifying the attribute index. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(1, 4) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + attribute=attribute_factory.create_attribute( + enums.AttributeType.SENSITIVE, + True, + index=0 + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The attribute index cannot be specified for a single-valued " + "attribute.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_with_singlevalued_unset_attr(self): + """ + Test that a KmipError is raised when attempting to modify an + unset attribute. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(1, 4) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + e._get_attributes_from_managed_object = mock.Mock(return_value=[]) + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + e._set_attribute_on_managed_object( + secret, + ("Sensitive", primitives.Boolean(None)) + ) + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + attribute=attribute_factory.create_attribute( + enums.AttributeType.SENSITIVE, + True + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Sensitive' attribute is not set on the managed " + "object. It must be set before it can be modified.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_kmip_2_0(self): + """ + Test that a ModifyAttribute request can be processed correctly with + KMIP 2.0 parameters. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + # Confirm that the attribute is set to its default value by + # fetching the managed object fresh from the database and + # checking it. + managed_object = e._get_object_with_access_controls( + "1", + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertFalse(managed_object.sensitive) + + payload = payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + current_attribute=objects.CurrentAttribute( + attribute=primitives.Boolean( + False, + tag=enums.Tags.SENSITIVE + ) + ), + new_attribute=objects.NewAttribute( + attribute=primitives.Boolean( + True, + tag=enums.Tags.SENSITIVE + ) + ) + ) + + response_payload = e._process_modify_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: ModifyAttribute" + ) + self.assertEqual("1", response_payload.unique_identifier) + self.assertIsNone(response_payload.attribute) + + # Confirm that the attribute was actually set by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertTrue(managed_object.sensitive) + + def test_modify_attribute_kmip_2_0_with_unmodifiable_attribute(self): + """ + Test that a KmipError is raised when attempting to modify an + unmodifiable attribute with KMIP 2.0 parameters. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + current_attribute=objects.CurrentAttribute( + attribute=primitives.TextString( + "1", + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + ), + new_attribute=objects.NewAttribute( + attribute=primitives.TextString( + "2", + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Unique Identifier' attribute is read-only and cannot be " + "modified.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_kmip_2_0_with_multivalued(self): + """ + Test that a ModifyAttribute request can be processed correctly with + KMIP 2.0 parameters. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + # Confirm that the attribute is set to its default value by + # fetching the managed object fresh from the database and + # checking it. + managed_object = e._get_object_with_access_controls( + "1", + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertEqual("Symmetric Key", managed_object.names[0]) + + payload = payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + current_attribute=objects.CurrentAttribute( + attribute=attributes.Name( + name_value=attributes.Name.NameValue("Symmetric Key") + ) + ), + new_attribute=objects.NewAttribute( + attribute=attributes.Name( + name_value=attributes.Name.NameValue("Modified Name") + ) + ) + ) + + response_payload = e._process_modify_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: ModifyAttribute" + ) + self.assertEqual("1", response_payload.unique_identifier) + self.assertIsNone(response_payload.attribute) + + # Confirm that the attribute was actually set by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertEqual("Modified Name", managed_object.names[0]) + + def test_modify_attribute_kmip_2_0_with_multivalued_no_current(self): + """ + Test that a KmipError is raised when attempting to modifyg a + multivalued attribute with no current attribute. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + new_attribute=objects.NewAttribute( + attribute=attributes.Name( + name_value=attributes.Name.NameValue("Modified Name") + ) + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Name' attribute is multivalued so the current attribute " + "must be specified.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_kmip_2_0_with_multivalued_no_attr_match(self): + """ + Test that a KmipError is raised when attempting to modify an + non-existent attribute value. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + current_attribute=objects.CurrentAttribute( + attribute=attributes.Name( + name_value=attributes.Name.NameValue("Invalid Key") + ) + ), + new_attribute=objects.NewAttribute( + attribute=attributes.Name( + name_value=attributes.Name.NameValue("Modified Name") + ) + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The specified current attribute could not be found on the " + "managed object.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_kmip_2_0_with_singlevalued_unset_attr(self): + """ + Test that a KmipError is raised when attempting to modify an + unset attribute with KMIP 2.0 parameters. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + e._get_attribute_from_managed_object = mock.Mock(return_value=None) + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._set_attribute_on_managed_object( + secret, + ("Sensitive", primitives.Boolean(None)) + ) + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + new_attribute=objects.NewAttribute( + attribute=primitives.Boolean( + True, + tag=enums.Tags.SENSITIVE + ) + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Sensitive' attribute is not set on the managed " + "object. It must be set before it can be modified.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_kmip_2_0_with_singlevalued_no_attr_match(self): + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + current_attribute=objects.CurrentAttribute( + attribute=primitives.Boolean( + True, + tag=enums.Tags.SENSITIVE + ) + ), + new_attribute=objects.NewAttribute( + attribute=primitives.Boolean( + False, + tag=enums.Tags.SENSITIVE + ) + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The specified current attribute could not be found on the " + "managed object.", + e._process_modify_attribute, + *args + ) + def test_register(self): """ Test that a Register request can be processed correctly.