Skip to content

Commit

Permalink
feat(specUpdate): Updated to Kafka Headers of specification version 2…
Browse files Browse the repository at this point in the history
…. Use new "ce_encryption.ref.*" headers.

The new kafka headers will be published for every new kafka event. When receiving a kafka message you can use the old and the new headers or both. The new headers will be preferred.
We will remove the old kafka headers in some time in the year 2024.
  • Loading branch information
andrekaplick5678 committed Oct 4, 2023
1 parent 49c14b2 commit 11a9bff
Show file tree
Hide file tree
Showing 15 changed files with 550 additions and 30 deletions.
26 changes: 18 additions & 8 deletions docs/USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

```groovy
dependencies {
implementation "de.otto:kafka-messaging-e2ee:2.0.0"
implementation "de.otto:kafka-messaging-e2ee:2.1.0"
}
```

Expand Down Expand Up @@ -113,6 +113,7 @@ class Example {
Map<String, byte[]> kafkaHeaders = null;
if (aesEncryptedPayload.isEncrypted()) {
kafkaHeaders = KafkaEncryptionHelper.mapToKafkaHeadersForValue(aesEncryptedPayload);
System.out.println("Kafka Headers: " + kafkaHeaders);
}
}
}
Expand All @@ -126,11 +127,15 @@ class Example {
class Example {

void example() {
// meta data attributes for Kafka Headers (values can be String or byte[])
// metadata attributes for Kafka Headers (values can be String or byte[])
String kafkaTopicName = "some-topic";
Map<String, Object> kafkaHeaders = new HashMap<>();
kafkaHeaders.put(KafkaEncryptionHelper.KAFKA_HEADER_CIPHER_VALUE, "[{\"encryption_key\":{\"cipherVersionString\":null,\"cipherVersion\":3,\"cipherName\":\"encryption_key\"}}]");
kafkaHeaders.put(KafkaEncryptionHelper.KAFKA_HEADER_IV_VALUE, "2rW2tDnRdwRg87Ta");
kafkaHeaders.put(KafkaEncryptionHelper.KAFKA_HEADER_CIPHER_VALUE, "[{\"encryption_key\":{\"cipherVersionString\":null,\"cipherVersion\":3,\"cipherName\":\"encryption_key\"}}]");
kafkaHeaders.put(KafkaEncryptionHelper.KAFKA_CE_HEADER_IV_VALUE, "2rW2tDnRdwRg87Ta");
kafkaHeaders.put(KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_VERSION_VALUE, "3");
kafkaHeaders.put(KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_NAME_VALUE, "encryption_key");

// the encrypted payload
byte[] encryptedPayloadByteArray = Base64.getDecoder().decode("6ttHpHYw7eYQ1OnvrhZAFi0PPsUGl9NR18hXFQ==");

Expand All @@ -153,15 +158,20 @@ class Example {
void example() {
// meta data attributes for Kafka Headers
String kafkaTopicName = "some-topic";
byte[] cipherConfigRawKafkaHeaderValue = kafkaHeaders.get(KafkaEncryptionHelper.KAFKA_HEADER_CIPHER_VALUE);
byte[] ivRawKafkaHeaderValue = kafkaHeaders.get(KafkaEncryptionHelper.KAFKA_HEADER_IV_VALUE);
byte[] kafkaHeaderInitializationVector = kafkaHeaders.get(KafkaEncryptionHelper.KAFKA_HEADER_IV_VALUE);
byte[] kafkaHeaderCiphersText = kafkaHeaders.get(KafkaEncryptionHelper.KAFKA_HEADER_CIPHER_VALUE);
byte[] kafkaCeHeaderInitializationVector = kafkaHeaders.get(KafkaEncryptionHelper.KAFKA_CE_HEADER_IV_VALUE);
byte[] kafkaCeHeaderCipherVersion = kafkaHeaders.get(KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_VERSION_VALUE);
byte[] kafkaCeHeaderCipherName = kafkaHeaders.get(KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_NAME_VALUE);

// the encrypted payload
byte[] encryptedPayloadByteArray = Base64.getDecoder().decode("6ttHpHYw7eYQ1OnvrhZAFi0PPsUGl9NR18hXFQ==");

// perform decryption
byte[] iv = KafkaEncryptionHelper.extractIv(ivRawKafkaHeaderValue);
EncryptionCipherSpec cipherSpec = KafkaEncryptionHelper.extractCipherSpec(cipherConfigRawKafkaHeaderValue);
AesEncryptedPayload encryptedPayload = AesEncryptedPayload.ofEncryptedPayload(payload, iv, cipherSpec);
AesEncryptedPayload encryptedPayload = KafkaEncryptionHelper.aesEncryptedPayloadOfKafka(
encryptedPayloadByteArray, kafkaHeaderInitializationVector, kafkaHeaderCiphersText,
kafkaCeHeaderInitializationVector, kafkaCeHeaderCipherVersion, kafkaCeHeaderCipherName
);
String plainText = decryptionService.decryptToString(kafkaTopicName, encryptedPayload);

// print result
Expand Down
2 changes: 1 addition & 1 deletion examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

ext {
kafkaE2eeLibraryVersion = "2.0.0"
kafkaE2eeLibraryVersion = "2.1.0"
}

dockerCompose {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package de.otto.springboot.example.multiple;

import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_NAME_VALUE;
import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_VERSION_VALUE;
import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_CE_HEADER_IV_VALUE;
import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_HEADER_CIPHER_VALUE;
import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_HEADER_IV_VALUE;

Expand Down Expand Up @@ -29,11 +32,15 @@ public void onMessage(
@Payload(required = false) byte[] payload,
@Header(name = "kafka_receivedTopic") String kafkaTopicName,
@Header(required = false, name = KAFKA_HEADER_IV_VALUE) byte[] ivRaw,
@Header(required = false, name = KAFKA_HEADER_CIPHER_VALUE) byte[] cipherConfigRaw) {
@Header(required = false, name = KAFKA_HEADER_CIPHER_VALUE) byte[] cipherConfigRaw,
@Header(required = false, name = KAFKA_CE_HEADER_IV_VALUE) byte[] ceIvRaw,
@Header(required = false, name = KAFKA_CE_HEADER_CIPHER_VERSION_VALUE) byte[] cipherVersionRaw,
@Header(required = false, name = KAFKA_CE_HEADER_CIPHER_NAME_VALUE) byte[] cipherNameRaw
) {

// decrypt incoming event
AesEncryptedPayload encryptedPayload = KafkaEncryptionHelper.aesEncryptedPayloadOfKafka(payload,
ivRaw, cipherConfigRaw);
ivRaw, cipherConfigRaw, ceIvRaw, cipherVersionRaw, cipherNameRaw);
String plainEvent = decryptionService.decryptToString(kafkaTopicName, encryptedPayload);

String messageWasEncryptedTxt;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package de.otto.springboot.example.single;

import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_NAME_VALUE;
import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_VERSION_VALUE;
import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_CE_HEADER_IV_VALUE;
import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_HEADER_CIPHER_VALUE;
import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_HEADER_IV_VALUE;

Expand Down Expand Up @@ -29,10 +32,14 @@ public void onMessage(
@Payload(required = false) byte[] payload,
@Header(name = "kafka_receivedTopic") String kafkaTopicName,
@Header(required = false, name = KAFKA_HEADER_IV_VALUE) byte[] ivRaw,
@Header(required = false, name = KAFKA_HEADER_CIPHER_VALUE) byte[] cipherConfigRaw) {
@Header(required = false, name = KAFKA_HEADER_CIPHER_VALUE) byte[] cipherConfigRaw,
@Header(required = false, name = KAFKA_CE_HEADER_IV_VALUE) byte[] ceIvRaw,
@Header(required = false, name = KAFKA_CE_HEADER_CIPHER_VERSION_VALUE) byte[] cipherVersionRaw,
@Header(required = false, name = KAFKA_CE_HEADER_CIPHER_NAME_VALUE) byte[] cipherNameRaw
) {

// decrypt incoming event
String plainEvent = decryptPayload(payload, kafkaTopicName, ivRaw, cipherConfigRaw);
String plainEvent = decryptPayload(payload, kafkaTopicName, ivRaw, cipherConfigRaw, ceIvRaw, cipherVersionRaw, cipherNameRaw);

String messageWasEncryptedTxt;
if (ivRaw != null && cipherConfigRaw != null) {
Expand All @@ -44,9 +51,9 @@ public void onMessage(
}

private String decryptPayload(byte[] payload, String kafkaTopicName, byte[] ivRaw,
byte[] cipherConfigRaw) {
byte[] cipherConfigRaw, byte[] ceIvRaw, byte[] cipherVersionRaw, byte[] cipherNameRaw) {
AesEncryptedPayload encryptedPayload = KafkaEncryptionHelper.aesEncryptedPayloadOfKafka(payload,
ivRaw, cipherConfigRaw);
ivRaw, cipherConfigRaw, ceIvRaw, cipherVersionRaw, cipherNameRaw);
return decryptionService.decryptToString(kafkaTopicName, encryptedPayload);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package de.otto.springboot.example.single;

import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_NAME_VALUE;
import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_CE_HEADER_CIPHER_VERSION_VALUE;
import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_CE_HEADER_IV_VALUE;
import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_HEADER_CIPHER_VALUE;
import static de.otto.kafka.messaging.e2ee.KafkaEncryptionHelper.KAFKA_HEADER_IV_VALUE;

Expand Down Expand Up @@ -29,10 +32,13 @@ public void onMessage(
@Payload(required = false) byte[] payload,
@Header(name = "kafka_receivedTopic") String kafkaTopicName,
@Header(required = false, name = KAFKA_HEADER_IV_VALUE) byte[] ivRaw,
@Header(required = false, name = KAFKA_HEADER_CIPHER_VALUE) byte[] cipherConfigRaw) {
@Header(required = false, name = KAFKA_HEADER_CIPHER_VALUE) byte[] cipherConfigRaw,
@Header(required = false, name = KAFKA_CE_HEADER_IV_VALUE) byte[] ceIvRaw,
@Header(required = false, name = KAFKA_CE_HEADER_CIPHER_VERSION_VALUE) byte[] cipherVersionRaw,
@Header(required = false, name = KAFKA_CE_HEADER_CIPHER_NAME_VALUE) byte[] cipherNameRaw) {

// decrypt incoming event
String plainEvent = decryptPayload(payload, kafkaTopicName, ivRaw, cipherConfigRaw);
String plainEvent = decryptPayload(payload, kafkaTopicName, ivRaw, cipherConfigRaw, ceIvRaw, cipherVersionRaw, cipherNameRaw);

String messageWasEncryptedTxt;
if (ivRaw != null && cipherConfigRaw != null) {
Expand All @@ -44,9 +50,9 @@ public void onMessage(
}

private String decryptPayload(byte[] payload, String kafkaTopicName, byte[] ivRaw,
byte[] cipherConfigRaw) {
byte[] cipherConfigRaw, byte[] ceIvRaw, byte[] cipherVersionRaw, byte[] cipherNameRaw) {
AesEncryptedPayload encryptedPayload = KafkaEncryptionHelper.aesEncryptedPayloadOfKafka(payload,
ivRaw, cipherConfigRaw);
ivRaw, cipherConfigRaw, ceIvRaw, cipherVersionRaw, cipherNameRaw);
return decryptionService.decryptToString(kafkaTopicName, encryptedPayload);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,45 @@
import java.util.Base64;
import java.util.Objects;

/**
* record to hold all the data needed for an encrypted payload. But it can also hold an unencrypted
* payload.
*
* @see #isEncrypted()
*/
public final class AesEncryptedPayload {

private final byte[] encryptedPayload;
private final byte[] initializationVector;
private final int keyVersion;
private final String encryptionKeyAttributeName;

/**
* @param plainPayload the plain text as byte array.
*/
public AesEncryptedPayload(byte[] plainPayload) {
this.encryptedPayload = plainPayload;
this.initializationVector = null;
this.keyVersion = 0;
this.encryptionKeyAttributeName = null;
}

/**
* @param encryptedPayload an encrypted payload as byte array
* @param initializationVector the raw initialization vector
* @param keyVersion the vault version of the encryption key entry
*/
public AesEncryptedPayload(byte[] encryptedPayload, byte[] initializationVector, int keyVersion) {
this(encryptedPayload, initializationVector, keyVersion, null);
}

/**
* @param encryptedPayload an encrypted payload as byte array
* @param initializationVector the raw initialization vector
* @param keyVersion the vault version of the encryption key entry
* @param encryptionKeyAttributeName JSON property name of the key within Vault. Can be
* <code>null</code> for Field-Level-Encryption.
*/
public AesEncryptedPayload(byte[] encryptedPayload, byte[] initializationVector, int keyVersion,
String encryptionKeyAttributeName) {
Objects.requireNonNull(encryptedPayload, "encryptedPayload must not be null");
Expand All @@ -32,11 +53,23 @@ public AesEncryptedPayload(byte[] encryptedPayload, byte[] initializationVector,
this.encryptionKeyAttributeName = encryptionKeyAttributeName;
}

/**
* @param encryptedPayload an encrypted payload as byte array
* @param initializationVectorBase64 the initialization vector base64 encoded
* @param keyVersion the vault version of the encryption key entry
*/
public AesEncryptedPayload(byte[] encryptedPayload, String initializationVectorBase64,
int keyVersion) {
this(encryptedPayload, initializationVectorBase64, keyVersion, null);
}

/**
* @param encryptedPayload an encrypted payload as byte array
* @param initializationVectorBase64 the initialization vector base64 encoded
* @param keyVersion the vault version of the encryption key entry
* @param encryptionKeyAttributeName JSON property name of the key within Vault. Can be
* <code>null</code> for Field-Level-Encryption.
*/
public AesEncryptedPayload(byte[] encryptedPayload, String initializationVectorBase64,
int keyVersion, String encryptionKeyAttributeName) {
Objects.requireNonNull(encryptedPayload, "encryptedPayload must not be null");
Expand All @@ -48,10 +81,20 @@ public AesEncryptedPayload(byte[] encryptedPayload, String initializationVectorB
this.encryptionKeyAttributeName = encryptionKeyAttributeName;
}

/**
* @param plainPayload the plain payload as byte array
* @return an AesEncryptedPayload of an unencrypted payload
*/
public static AesEncryptedPayload ofUnencryptedPayload(byte[] plainPayload) {
return new AesEncryptedPayload(plainPayload);
}

/**
* @param encryptedPayload an encrypted payload as byte array
* @param initializationVector the raw initialization vector
* @param keyVersion the vault data for the encryption key
* @return an AesEncryptedPayload of an encrypted payload
*/
public static AesEncryptedPayload ofEncryptedPayload(
byte[] encryptedPayload,
byte[] initializationVector,
Expand All @@ -61,13 +104,27 @@ public static AesEncryptedPayload ofEncryptedPayload(
keyVersion.encryptionKeyAttributeName());
}

/**
* @param encryptedPayload an encrypted payload as byte array
* @param initializationVectorBase64 the initialization vector base64 encoded
* @param keyVersion the vault metadata for the encryption key
* @return an AesEncryptedPayload of an encrypted payload
*/
public static AesEncryptedPayload ofEncryptedPayload(
byte[] encryptedPayload,
String initializationVectorBase64,
int keyVersion) {
return new AesEncryptedPayload(encryptedPayload, initializationVectorBase64, keyVersion);
}

/**
* @param encryptedPayload an encrypted payload as byte array
* @param initializationVectorBase64 the initialization vector base64 encoded
* @param keyVersion the vault version of the encryption key entry
* @param encryptionKeyAttributeName JSON property name of the key within Vault. Can be
* <code>null</code> for Field-Level-Encryption.
* @return an AesEncryptedPayload of an encrypted payload
*/
public static AesEncryptedPayload ofEncryptedPayload(
byte[] encryptedPayload,
String initializationVectorBase64,
Expand All @@ -77,6 +134,12 @@ public static AesEncryptedPayload ofEncryptedPayload(
encryptionKeyAttributeName);
}

/**
* @param encryptedPayload an encrypted payload as byte array
* @param initializationVector the raw initialization vector
* @param cipherSpec the vault metadata for the encryption key
* @return an AesEncryptedPayload of an encrypted payload
*/
public static AesEncryptedPayload ofEncryptedPayload(
byte[] encryptedPayload,
byte[] initializationVector,
Expand All @@ -88,6 +151,12 @@ public static AesEncryptedPayload ofEncryptedPayload(
cipherSpec.keyVersion(), cipherSpec.cipherName());
}

/**
* @param encryptedPayload an encrypted payload as byte array
* @param initializationVectorBase64 the initialization vector base64 encoded
* @param cipherSpec the vault metadata for the encryption key
* @return an AesEncryptedPayload of an encrypted payload
*/
public static AesEncryptedPayload ofEncryptedPayload(byte[] encryptedPayload,
String initializationVectorBase64, EncryptionCipherSpec cipherSpec) {
if (cipherSpec == null) {
Expand All @@ -97,27 +166,47 @@ public static AesEncryptedPayload ofEncryptedPayload(byte[] encryptedPayload,
cipherSpec.keyVersion(), cipherSpec.cipherName());
}

/**
* @return <code>true</code> when this object holds an encrypted value. <code>false</code> when
* this object hold an unencrypted value.
*/
public boolean isEncrypted() {
return initializationVector != null
&& initializationVector.length > 0
&& keyVersion > 0;
}

/**
* @return the value - which might is encrypted
* @see #isEncrypted()
*/
public byte[] encryptedPayload() {
return encryptedPayload;
}

/**
* @return the raw initialization vector or <code>null</code> when the value is encrypted
* @see #isEncrypted()
*/
public byte[] initializationVector() {
return initializationVector;
}

/**
* @return the initialization vector base64 encoded or <code>null</code> when the value is
* encrypted
* @see #isEncrypted()
*/
public String initializationVectorBase64() {
if (initializationVector == null) {
return null;
}
return Base64.getEncoder().encodeToString(initializationVector);
}

/**
* @return the vault version of the encryption key entry
*/
public int keyVersion() {
return keyVersion;
}
Expand Down
Loading

0 comments on commit 11a9bff

Please sign in to comment.