diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 06976b21..ed784b0e 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -751,6 +751,96 @@ def _get_attribute_from_managed_object(self, managed_object, attr_name): # for unrecognized attributes. This satisfies the spec. return None + def _get_attribute_index_from_managed_object( + self, + managed_object, + attribute_name, + attribute_value + ): + """ + Find the attribute index for the specified attribute value. + + Args: + managed_object (pie.ManagedObject): A managed object kept by the + server. Usually obtained from _get_object_with_access_controls. + Required. + attribute_name (string): The name of the attribute to look up. + Required. + attribute_value (primitive.Base): A primitive object representing + the attribute value. If a simple object (e.g., Integer) just + do a direct comparison on its value. If a complex object (e.g., + Struct) do a comparison on all of the object fields. Required. + + Returns: + int - the attribute index of the attribute value on the managed + object, if it exists, 0 for single-valued attributes + None - if the attribute value could not be found on the managed + object + """ + if attribute_name == "Application Specific Information": + a = attribute_value + for count, v in enumerate(managed_object.app_specific_info): + if ((a.application_namespace == v.application_namespace) and + (a.application_data == v.application_data)): + return count + return None + elif attribute_name == "Certificate Type": + if attribute_value.value == managed_object.certificate_type: + return 0 + return None + elif attribute_name == "Cryptographic Algorithm": + if attribute_value.value == managed_object.cryptographic_algorithm: + return 0 + return None + elif attribute_name == "Cryptographic Length": + if attribute_value.value == managed_object.cryptographic_length: + return 0 + return None + elif attribute_name == "Cryptographic Usage Mask": + v = attribute_value.value + combined_mask = 0 + for mask in managed_object.cryptographic_usage_masks: + combined_mask |= mask.value + if v == combined_mask: + return 0 + return None + elif attribute_name == "Initial Date": + if attribute_value.value == managed_object.initial_date: + return 0 + return None + elif attribute_name == "Name": + for count, v in enumerate(managed_object.names): + if attribute_value.name_value.value == v: + return count + return None + elif attribute_name == "Object Group": + for count, v in enumerate(managed_object.object_groups): + if attribute_value.value == v.object_group: + return count + return None + elif attribute_name == "Object Type": + if attribute_value.value == managed_object.object_type: + return 0 + return None + elif attribute_name == "Operation Policy Name": + if attribute_value.value == managed_object.operation_policy_name: + return 0 + return None + elif attribute_name == "Sensitive": + if attribute_value.value == managed_object.sensitive: + return 0 + return None + elif attribute_name == "State": + if attribute_value.value == managed_object.state: + return 0 + return None + elif attribute_name == "Unique Identifier": + if attribute_value.value == str(managed_object.unique_identifier): + return 0 + return None + else: + return None + def _set_attributes_on_managed_object(self, managed_object, attributes): """ Given a kmip.pie object and a dictionary of attributes, attempt to set @@ -847,6 +937,38 @@ def _set_attribute_on_managed_object(self, managed_object, attribute): "The {0} attribute is unsupported.".format(attribute_name) ) + def _set_attribute_on_managed_object_by_index( + self, + managed_object, + attribute_name, + attribute_value, + attribute_index + ): + """ + Set the attribute value for the specified attribute index. + + Args: + managed_object (pie.ManagedObject): A managed object kept by the + server. Usually obtained from _get_object_with_access_controls. + Required. + attribute_name (string): The name of the attribute to modify. + Required. + attribute_value (primitive.Base): A primitive object representing + the new attribute value to set on the managd object. Required. + attribute_index (int): The index of the existing attribute to + modify. Required. + """ + if attribute_name == "Application Specific Information": + a = managed_object.app_specific_info[attribute_index] + a.application_namespace = attribute_value.application_namespace + a.application_data = attribute_value.application_data + elif attribute_name == "Name": + name_value = attribute_value.name_value + managed_object.names[attribute_index] = name_value.value + elif attribute_name == "Object Group": + a = managed_object.object_groups[attribute_index] + a.object_group = attribute_value.value + def _delete_attribute_from_managed_object(self, managed_object, attribute): attribute_name, attribute_index, attribute_value = attribute object_type = managed_object._object_type @@ -1192,6 +1314,8 @@ def _process_operation(self, operation, payload): return self._process_signature_verify(payload) elif operation == enums.Operation.SET_ATTRIBUTE: return self._process_set_attribute(payload) + elif operation == enums.Operation.MODIFY_ATTRIBUTE: + return self._process_modify_attribute(payload) elif operation == enums.Operation.MAC: return self._process_mac(payload) elif operation == enums.Operation.SIGN: @@ -1600,6 +1724,223 @@ def _process_set_attribute(self, payload): unique_identifier=unique_identifier ) + @_kmip_version_supported('1.0') + def _process_modify_attribute(self, payload): + self._logger.info("Processing operation: ModifyAttribute") + + unique_identifier = self._id_placeholder + if payload.unique_identifier: + unique_identifier = payload.unique_identifier + + managed_object = self._get_object_with_access_controls( + unique_identifier, + enums.Operation.MODIFY_ATTRIBUTE + ) + + if self._protocol_version >= contents.ProtocolVersion(2, 0): + current_attribute = payload.current_attribute + if current_attribute: + current_attribute = current_attribute.attribute + new_attribute = payload.new_attribute.attribute + + attribute_name = enums.convert_attribute_tag_to_name( + new_attribute.tag + ) + + if not self._attribute_policy.is_attribute_modifiable_by_client( + attribute_name + ): + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.PERMISSION_DENIED, + message=( + "The '{}' attribute is read-only and cannot be " + "modified.".format(attribute_name) + ) + ) + + is_multivalued = self._attribute_policy.is_attribute_multivalued( + attribute_name + ) + + if is_multivalued: + if current_attribute is None: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.ATTRIBUTE_INSTANCE_NOT_FOUND, + message=( + "The '{}' attribute is multivalued so the current " + "attribute must be specified.".format( + attribute_name + ) + ) + ) + else: + index = self._get_attribute_index_from_managed_object( + managed_object, + attribute_name, + current_attribute + ) + if index is None: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.ATTRIBUTE_NOT_FOUND, + message=( + "The specified current attribute could not be " + "found on the managed object." + ) + ) + else: + self._set_attribute_on_managed_object_by_index( + managed_object, + attribute_name, + new_attribute, + index + ) + self._data_session.commit() + else: + if current_attribute is None: + # Verify the attribute is set. + existing_attr = self._get_attribute_from_managed_object( + managed_object, + attribute_name + ) + if existing_attr is None: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.ATTRIBUTE_NOT_FOUND, + message=( + "The '{}' attribute is not set on the managed " + "object. It must be set before it can be " + "modified.".format(attribute_name) + ) + ) + else: + # Verify the attribute matches the current attribute. + index = self._get_attribute_index_from_managed_object( + managed_object, + attribute_name, + current_attribute + ) + if index is None: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.ATTRIBUTE_NOT_FOUND, + message=( + "The specified current attribute could not be " + "found on the managed object." + ) + ) + + # Set the attribute value. + self._set_attribute_on_managed_object( + managed_object, + (attribute_name, new_attribute) + ) + self._data_session.commit() + + return payloads.ModifyAttributeResponsePayload( + unique_identifier=unique_identifier + ) + + else: + attribute_name = payload.attribute.attribute_name.value + attribute_index = payload.attribute.attribute_index + if attribute_index: + attribute_index = attribute_index.value + attribute_value = payload.attribute.attribute_value + + if not self._attribute_policy.is_attribute_modifiable_by_client( + attribute_name + ): + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.PERMISSION_DENIED, + message=( + "The '{}' attribute is read-only and cannot be " + "modified.".format(attribute_name) + ) + ) + + is_multivalued = self._attribute_policy.is_attribute_multivalued( + attribute_name + ) + + modified_attribute = None + + if is_multivalued: + if attribute_index is None: + attribute_index = 0 + + existing_attributes = self._get_attribute_from_managed_object( + managed_object, + attribute_name + ) + if 0 <= attribute_index <= len(existing_attributes): + self._set_attribute_on_managed_object_by_index( + managed_object, + attribute_name, + attribute_value, + attribute_index + ) + self._data_session.commit() + else: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.ITEM_NOT_FOUND, + message=( + "No matching attribute instance could be found " + "for the specified attribute index." + ) + ) + + existing_attributes = self._get_attributes_from_managed_object( + managed_object, + [attribute_name] + ) + modified_attribute = existing_attributes[attribute_index] + else: + if attribute_index is not None: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.INVALID_FIELD, + message=( + "The attribute index cannot be specified for a " + "single-valued attribute." + ) + ) + existing_attribute = self._get_attributes_from_managed_object( + managed_object, + [attribute_name] + ) + if len(existing_attribute) == 0: + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.INVALID_FIELD, + message=( + "The '{}' attribute is not set on the managed " + "object. It must be set before it can be " + "modified.".format(attribute_name) + ) + ) + else: + self._set_attribute_on_managed_object( + managed_object, + (attribute_name, attribute_value) + ) + self._data_session.commit() + + existing_attributes = self._get_attributes_from_managed_object( + managed_object, + [attribute_name] + ) + modified_attribute = existing_attributes[0] + + return payloads.ModifyAttributeResponsePayload( + unique_identifier=unique_identifier, + attribute=modified_attribute + ) + @_kmip_version_supported('1.0') def _process_register(self, payload): self._logger.info("Processing operation: Register") diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 965e0f54..004d7dc0 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -723,6 +723,7 @@ def test_process_operation(self): e._process_mac = mock.MagicMock() e._process_sign = mock.MagicMock() e._process_set_attribute = mock.MagicMock() + e._process_modify_attribute = mock.MagicMock() e._process_operation(enums.Operation.CREATE, None) e._process_operation(enums.Operation.CREATE_KEY_PAIR, None) @@ -744,6 +745,7 @@ def test_process_operation(self): e._process_operation(enums.Operation.SIGNATURE_VERIFY, None) e._process_operation(enums.Operation.MAC, None) e._process_operation(enums.Operation.SET_ATTRIBUTE, None) + e._process_operation(enums.Operation.MODIFY_ATTRIBUTE, None) e._process_create.assert_called_with(None) e._process_create_key_pair.assert_called_with(None) @@ -764,6 +766,7 @@ def test_process_operation(self): e._process_signature_verify.assert_called_with(None) e._process_mac.assert_called_with(None) e._process_set_attribute.assert_called_with(None) + e._process_modify_attribute.assert_called_with(None) def test_unsupported_operation(self): """ @@ -1632,6 +1635,344 @@ def test_get_attribute_from_managed_object(self): ) self.assertEqual(None, result) + def test_get_attribute_index_from_managed_object(self): + """ + Test that an attribute's index can be retrieved from a given managed + object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + symmetric_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'', + masks=[enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT] + ) + certificate = pie_objects.X509Certificate( + b'' + ) + + e._data_session.add(symmetric_key) + e._data_session.add(certificate) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._set_attribute_on_managed_object( + symmetric_key, + ( + "Application Specific Information", + [ + attributes.ApplicationSpecificInformation( + application_namespace="Example Namespace", + application_data="Example Data" + ) + ] + ) + ) + e._set_attribute_on_managed_object( + symmetric_key, + ( + "Name", + [ + attributes.Name( + name_value=attributes.Name.NameValue("Name 1") + ), + attributes.Name( + name_value=attributes.Name.NameValue("Name 2") + ) + ] + ) + ) + e._set_attribute_on_managed_object( + symmetric_key, + ( + "Object Group", + [ + primitives.TextString( + "Example Group", + tag=enums.Tags.OBJECT_GROUP + ) + ] + ) + ) + + # Test getting the index for an ApplicationSpecificInfo attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Application Specific Information", + attributes.ApplicationSpecificInformation( + application_namespace="Example Namespace", + application_data="Example Data" + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Application Specific Information", + attributes.ApplicationSpecificInformation( + application_namespace="Wrong Namespace", + application_data="Wrong Data" + ) + ) + self.assertIsNone(index) + + # Test getting the index for a CertificateType attribute + index = e._get_attribute_index_from_managed_object( + certificate, + "Certificate Type", + primitives.Enumeration( + enums.CertificateType, + enums.CertificateType.X_509, + tag=enums.Tags.CERTIFICATE_TYPE + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + certificate, + "Certificate Type", + primitives.Enumeration( + enums.CertificateType, + enums.CertificateType.PGP, + tag=enums.Tags.CERTIFICATE_TYPE + ) + ) + self.assertIsNone(index) + + # Test getting the index for a CryptographicAlgorithm attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Cryptographic Algorithm", + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Cryptographic Algorithm", + primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.RSA, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + self.assertIsNone(index) + + # Test getting the index for a CryptographicLength attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Cryptographic Length", + primitives.Integer( + 0, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Cryptographic Length", + primitives.Integer( + 128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + self.assertIsNone(index) + + # Test getting the index for a CryptographicUsageMasks attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Cryptographic Usage Mask", + primitives.Integer( + 12, + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Cryptographic Usage Mask", + primitives.Integer( + 0, + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + ) + self.assertIsNone(index) + + # Test getting the index for a InitialDate attribute + date = e._get_attribute_from_managed_object( + symmetric_key, + "Initial Date" + ) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Initial Date", + primitives.DateTime( + date, + tag=enums.Tags.INITIAL_DATE + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Initial Date", + primitives.DateTime( + 9999, + tag=enums.Tags.INITIAL_DATE + ) + ) + self.assertIsNone(index) + + # Test getting the index for a Name attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Name", + attributes.Name( + name_value=attributes.Name.NameValue("Name 2") + ) + ) + self.assertEqual(2, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Name", + attributes.Name( + name_value=attributes.Name.NameValue("Name 3") + ) + ) + self.assertIsNone(index) + + # Test getting the index for a ObjectGroup attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Object Group", + primitives.TextString( + "Example Group", + tag=enums.Tags.OBJECT_GROUP + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Object Group", + primitives.TextString( + "Invalid Group", + tag=enums.Tags.OBJECT_GROUP + ) + ) + self.assertIsNone(index) + + # Test getting the index for a ObjectType attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Object Type", + primitives.Enumeration( + enums.ObjectType, + enums.ObjectType.SYMMETRIC_KEY, + tag=enums.Tags.OBJECT_TYPE + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Object Type", + primitives.Enumeration( + enums.ObjectType, + enums.ObjectType.CERTIFICATE, + tag=enums.Tags.OBJECT_TYPE + ) + ) + self.assertIsNone(index) + + # Test getting the index for a OperationPolicyName attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Operation Policy Name", + primitives.TextString( + "default", + tag=enums.Tags.OPERATION_POLICY_NAME + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Operation Policy Name", + primitives.TextString( + "invalid", + tag=enums.Tags.OPERATION_POLICY_NAME + ) + ) + self.assertIsNone(index) + + # Test getting the index for a Sensitive attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Sensitive", + primitives.Boolean( + False, + tag=enums.Tags.SENSITIVE + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Sensitive", + primitives.Boolean( + True, + tag=enums.Tags.SENSITIVE + ) + ) + self.assertIsNone(index) + + # Test getting the index for a State attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "State", + primitives.Enumeration( + enums.State, + enums.State.PRE_ACTIVE, + tag=enums.Tags.STATE + ) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "State", + primitives.Enumeration( + enums.State, + enums.State.ACTIVE, + tag=enums.Tags.STATE + ) + ) + self.assertIsNone(index) + + # Test getting the index for a UniqueIdentifier attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Unique Identifier", + primitives.TextString(value="1", tag=enums.Tags.UNIQUE_IDENTIFIER) + ) + self.assertEqual(0, index) + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Unique Identifier", + primitives.TextString(value="9", tag=enums.Tags.UNIQUE_IDENTIFIER) + ) + self.assertIsNone(index) + + # Test getting the index for an unsupported attribute + index = e._get_attribute_index_from_managed_object( + symmetric_key, + "Archive Date", + None + ) + self.assertIsNone(index) + def test_set_attributes_on_managed_object(self): """ Test that multiple attributes can be set on a given managed object. @@ -1940,6 +2281,156 @@ def test_set_attribute_on_managed_object_unsupported_features(self): *args ) + def test_set_attribute_on_managed_object_by_index(self): + """ + Test that an attribute can be modified on a managed object given its + name, index, and new value. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + symmetric_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'', + masks=[enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT] + ) + + e._data_session.add(symmetric_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._set_attribute_on_managed_object( + symmetric_key, + ( + "Application Specific Information", + [ + attributes.ApplicationSpecificInformation( + application_namespace="Example Namespace 1", + application_data="Example Data 1" + ), + attributes.ApplicationSpecificInformation( + application_namespace="Example Namespace 2", + application_data="Example Data 2" + ) + ] + ) + ) + e._set_attribute_on_managed_object( + symmetric_key, + ( + "Name", + [ + attributes.Name( + name_value=attributes.Name.NameValue("Name 1") + ), + attributes.Name( + name_value=attributes.Name.NameValue("Name 2") + ) + ] + ) + ) + e._set_attribute_on_managed_object( + symmetric_key, + ( + "Object Group", + [ + primitives.TextString( + "Example Group 1", + tag=enums.Tags.OBJECT_GROUP + ), + primitives.TextString( + "Example Group 2", + tag=enums.Tags.OBJECT_GROUP + ) + ] + ) + ) + + # Test setting an ApplicationSpecificInformation attribute by index + a = e._get_attribute_from_managed_object( + symmetric_key, + "Application Specific Information" + ) + self.assertEqual(2, len(a)) + self.assertEqual( + "Example Namespace 1", + a[0].get("application_namespace") + ) + self.assertEqual("Example Data 1", a[0].get("application_data")) + self.assertEqual( + "Example Namespace 2", + a[1].get("application_namespace") + ) + self.assertEqual("Example Data 2", a[1].get("application_data")) + e._set_attribute_on_managed_object_by_index( + symmetric_key, + "Application Specific Information", + attributes.ApplicationSpecificInformation( + application_namespace="Example Namespace 3", + application_data="Example Data 3" + ), + 1 + ) + a = e._get_attribute_from_managed_object( + symmetric_key, + "Application Specific Information" + ) + self.assertEqual(2, len(a)) + self.assertEqual( + "Example Namespace 1", + a[0].get("application_namespace") + ) + self.assertEqual("Example Data 1", a[0].get("application_data")) + self.assertEqual( + "Example Namespace 3", + a[1].get("application_namespace") + ) + self.assertEqual("Example Data 3", a[1].get("application_data")) + + # Test setting a Name attribute by index + a = e._get_attribute_from_managed_object(symmetric_key, "Name") + self.assertEqual(3, len(a)) + self.assertEqual("Symmetric Key", a[0].name_value.value) + self.assertEqual("Name 1", a[1].name_value.value) + self.assertEqual("Name 2", a[2].name_value.value) + e._set_attribute_on_managed_object_by_index( + symmetric_key, + "Name", + attributes.Name( + name_value=attributes.Name.NameValue("Name 3") + ), + 1 + ) + a = e._get_attribute_from_managed_object(symmetric_key, "Name") + self.assertEqual(3, len(a)) + self.assertEqual("Symmetric Key", a[0].name_value.value) + self.assertEqual("Name 3", a[1].name_value.value) + self.assertEqual("Name 2", a[2].name_value.value) + + # Test setting an ObjectGroup attribute by index + a = e._get_attribute_from_managed_object(symmetric_key, "Object Group") + self.assertEqual(2, len(a)) + self.assertEqual("Example Group 1", a[0]) + self.assertEqual("Example Group 2", a[1]) + e._set_attribute_on_managed_object_by_index( + symmetric_key, + "Object Group", + primitives.TextString( + "Example Group 3", + tag=enums.Tags.OBJECT_GROUP + ), + 1 + ) + a = e._get_attribute_from_managed_object(symmetric_key, "Object Group") + self.assertEqual(2, len(a)) + self.assertEqual("Example Group 1", a[0]) + self.assertEqual("Example Group 3", a[1]) + def test_delete_attribute_from_managed_object(self): """ Test that various attributes can be deleted correctly from a given @@ -4273,6 +4764,686 @@ def test_set_attribute_with_non_client_modifiable_attribute(self): *args ) + def test_modify_attribute(self): + """ + Test that a ModifyAttribute request can be processed correctly. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(1, 4) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + # Confirm that the attribute is set to its default value by + # fetching the managed object fresh from the database and + # checking it. + managed_object = e._get_object_with_access_controls( + "1", + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertFalse(managed_object.sensitive) + + payload = payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + attribute=attribute_factory.create_attribute( + enums.AttributeType.SENSITIVE, + True + ) + ) + + response_payload = e._process_modify_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: ModifyAttribute" + ) + self.assertEqual("1", response_payload.unique_identifier) + self.assertEqual( + "Sensitive", + response_payload.attribute.attribute_name.value + ) + self.assertIsNone(response_payload.attribute.attribute_index) + self.assertEqual( + True, + response_payload.attribute.attribute_value.value + ) + + # Confirm that the attribute was actually set by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertTrue(managed_object.sensitive) + + def test_modify_attribute_with_unmodifiable_attribute(self): + """ + Test that a KmipError is raised when attempting to modify an + unmodifiable attribute. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(1, 4) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + attribute=attribute_factory.create_attribute( + enums.AttributeType.UNIQUE_IDENTIFIER, + "2" + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Unique Identifier' attribute is read-only and cannot be " + "modified.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_with_multivalued(self): + """ + Test that a ModifyAttribute request can be processed correctly. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(1, 4) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + # Confirm that the attribute is set to its default value by + # fetching the managed object fresh from the database and + # checking it. + managed_object = e._get_object_with_access_controls( + "1", + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertEqual("Symmetric Key", managed_object.names[0]) + + payload = payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + attribute=attribute_factory.create_attribute( + enums.AttributeType.NAME, + "Modified Name" + ) + ) + + response_payload = e._process_modify_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: ModifyAttribute" + ) + self.assertEqual("1", response_payload.unique_identifier) + self.assertEqual( + "Name", + response_payload.attribute.attribute_name.value + ) + self.assertEqual(0, response_payload.attribute.attribute_index.value) + self.assertEqual( + "Modified Name", + response_payload.attribute.attribute_value.name_value.value + ) + + # Confirm that the attribute was actually set by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertEqual("Modified Name", managed_object.names[0]) + + def test_modify_attribute_with_multivalued_no_index_match(self): + """ + Test that a KmipError is raised when attempting to modify an attribute + based on an index that doesn't exist. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(1, 4) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + attribute=attribute_factory.create_attribute( + enums.AttributeType.NAME, + "Modified Name", + index=3 + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "No matching attribute instance could be found for the specified " + "attribute index.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_with_singlevalued_index_specified(self): + """ + Test that a KmipError is raised when attempting to modify a + single-valued attribute while also specifying the attribute index. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(1, 4) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + attribute=attribute_factory.create_attribute( + enums.AttributeType.SENSITIVE, + True, + index=0 + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The attribute index cannot be specified for a single-valued " + "attribute.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_with_singlevalued_unset_attr(self): + """ + Test that a KmipError is raised when attempting to modify an + unset attribute. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(1, 4) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + e._get_attributes_from_managed_object = mock.Mock(return_value=[]) + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + e._set_attribute_on_managed_object( + secret, + ("Sensitive", primitives.Boolean(None)) + ) + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + attribute=attribute_factory.create_attribute( + enums.AttributeType.SENSITIVE, + True + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Sensitive' attribute is not set on the managed " + "object. It must be set before it can be modified.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_kmip_2_0(self): + """ + Test that a ModifyAttribute request can be processed correctly with + KMIP 2.0 parameters. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + # Confirm that the attribute is set to its default value by + # fetching the managed object fresh from the database and + # checking it. + managed_object = e._get_object_with_access_controls( + "1", + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertFalse(managed_object.sensitive) + + payload = payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + current_attribute=objects.CurrentAttribute( + attribute=primitives.Boolean( + False, + tag=enums.Tags.SENSITIVE + ) + ), + new_attribute=objects.NewAttribute( + attribute=primitives.Boolean( + True, + tag=enums.Tags.SENSITIVE + ) + ) + ) + + response_payload = e._process_modify_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: ModifyAttribute" + ) + self.assertEqual("1", response_payload.unique_identifier) + self.assertIsNone(response_payload.attribute) + + # Confirm that the attribute was actually set by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertTrue(managed_object.sensitive) + + def test_modify_attribute_kmip_2_0_with_unmodifiable_attribute(self): + """ + Test that a KmipError is raised when attempting to modify an + unmodifiable attribute with KMIP 2.0 parameters. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + current_attribute=objects.CurrentAttribute( + attribute=primitives.TextString( + "1", + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + ), + new_attribute=objects.NewAttribute( + attribute=primitives.TextString( + "2", + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Unique Identifier' attribute is read-only and cannot be " + "modified.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_kmip_2_0_with_multivalued(self): + """ + Test that a ModifyAttribute request can be processed correctly with + KMIP 2.0 parameters. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + # Confirm that the attribute is set to its default value by + # fetching the managed object fresh from the database and + # checking it. + managed_object = e._get_object_with_access_controls( + "1", + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertEqual("Symmetric Key", managed_object.names[0]) + + payload = payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + current_attribute=objects.CurrentAttribute( + attribute=attributes.Name( + name_value=attributes.Name.NameValue("Symmetric Key") + ) + ), + new_attribute=objects.NewAttribute( + attribute=attributes.Name( + name_value=attributes.Name.NameValue("Modified Name") + ) + ) + ) + + response_payload = e._process_modify_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: ModifyAttribute" + ) + self.assertEqual("1", response_payload.unique_identifier) + self.assertIsNone(response_payload.attribute) + + # Confirm that the attribute was actually set by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.MODIFY_ATTRIBUTE + ) + self.assertEqual("Modified Name", managed_object.names[0]) + + def test_modify_attribute_kmip_2_0_with_multivalued_no_current(self): + """ + Test that a KmipError is raised when attempting to modifyg a + multivalued attribute with no current attribute. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + new_attribute=objects.NewAttribute( + attribute=attributes.Name( + name_value=attributes.Name.NameValue("Modified Name") + ) + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Name' attribute is multivalued so the current attribute " + "must be specified.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_kmip_2_0_with_multivalued_no_attr_match(self): + """ + Test that a KmipError is raised when attempting to modify an + non-existent attribute value. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + current_attribute=objects.CurrentAttribute( + attribute=attributes.Name( + name_value=attributes.Name.NameValue("Invalid Key") + ) + ), + new_attribute=objects.NewAttribute( + attribute=attributes.Name( + name_value=attributes.Name.NameValue("Modified Name") + ) + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The specified current attribute could not be found on the " + "managed object.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_kmip_2_0_with_singlevalued_unset_attr(self): + """ + Test that a KmipError is raised when attempting to modify an + unset attribute with KMIP 2.0 parameters. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + e._get_attribute_from_managed_object = mock.Mock(return_value=None) + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._set_attribute_on_managed_object( + secret, + ("Sensitive", primitives.Boolean(None)) + ) + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + new_attribute=objects.NewAttribute( + attribute=primitives.Boolean( + True, + tag=enums.Tags.SENSITIVE + ) + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Sensitive' attribute is not set on the managed " + "object. It must be set before it can be modified.", + e._process_modify_attribute, + *args + ) + + def test_modify_attribute_kmip_2_0_with_singlevalued_no_attr_match(self): + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._attribute_policy._version = e._protocol_version + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.ModifyAttributeRequestPayload( + unique_identifier="1", + current_attribute=objects.CurrentAttribute( + attribute=primitives.Boolean( + True, + tag=enums.Tags.SENSITIVE + ) + ), + new_attribute=objects.NewAttribute( + attribute=primitives.Boolean( + False, + tag=enums.Tags.SENSITIVE + ) + ) + ), + ) + self.assertRaisesRegex( + exceptions.KmipError, + "The specified current attribute could not be found on the " + "managed object.", + e._process_modify_attribute, + *args + ) + def test_register(self): """ Test that a Register request can be processed correctly.