Skip to content

Commit

Permalink
feat(pipes-alpha): support for customer-managed KMS keys to encrypt p…
Browse files Browse the repository at this point in the history
…ipe data (aws#33546)

### Issue # (if applicable)

Closes aws#31453

### Reason for this change

AWS Pipes supports for encrypting data by customer managed KMS key instead of Amazon managed key.

https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-encryption-pipes-cmkey.html

The L2 Pipe construct does not support this feature now.

### Description of changes

- Add `kmsKey` prop to `PipeProps`
- 
### Describe any new or updated permissions being added

- Add KMS key policy which enables pipes to access to the key.

https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-encryption-key-policy.html#eb-encryption-key-policy-pipe

### Description of how you validated changes

Add both unit and integ tests.

### Checklist
- [x] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md)

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
badmintoncryer authored Feb 27, 2025
1 parent 7ebb92c commit dd0d62f
Show file tree
Hide file tree
Showing 14 changed files with 36,766 additions and 15 deletions.
46 changes: 34 additions & 12 deletions packages/@aws-cdk/aws-pipes-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ can be filtered, transformed and enriched.

![diagram of pipes](https://d1.awsstatic.com/product-marketing/EventBridge/Product-Page-Diagram_Amazon-EventBridge-Pipes.cd7961854be4432d63f6158ffd18271d6c9fa3ec.png)

For more details see the [service documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html).
For more details see the [service documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html).

## Pipe

Expand Down Expand Up @@ -65,7 +65,7 @@ A source is a AWS Service that is polled. The following sources are possible:
- [Amazon SQS queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html)
- [Apache Kafka stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kafka.html)

Currently, DynamoDB, Kinesis, and SQS are supported. If you are interested in support for additional sources,
Currently, DynamoDB, Kinesis, and SQS are supported. If you are interested in support for additional sources,
kindly let us know by opening a GitHub issue or raising a PR.

### Example source
Expand Down Expand Up @@ -107,7 +107,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {

This example shows a filter that only forwards events with the `customerType` B2B or B2C from the source messages. Messages that are not matching the filter are not forwarded to the enrichment or target step.

You can define multiple filter pattern which are combined with a logical `OR`.
You can define multiple filter pattern which are combined with a logical `OR`.

Additional filter pattern and details can be found in the EventBridge pipes [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html).

Expand All @@ -117,7 +117,7 @@ For enrichments and targets the input event can be transformed. The transformati
A transformation has access to the input event as well to some context information of the pipe itself like the name of the pipe.
See [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-input-transformation.html) for details.

### Example - input transformation from object
### Example - input transformation from object

The input transformation can be created from an object. The object can contain static values, dynamic values or pipe variables.

Expand Down Expand Up @@ -185,7 +185,7 @@ If the transformation is applied to a target it might be converted to a string r

In cases where you want to forward only a part of the event to the target you can use the transformation event path.

> This only works for targets because the enrichment needs to have a valid json as input.
> This only works for targets because the enrichment needs to have a valid json as input.

```ts
Expand All @@ -204,7 +204,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {

This transformation extracts the body of the event.

So when the following batch of input events is processed by the pipe
So when the following batch of input events is processed by the pipe

```json
[
Expand Down Expand Up @@ -357,7 +357,7 @@ For example a lambda function that returns a concatenation of the static field,
export async function handler (event: any) {
return event.staticField + "-" + event.dynamicField + "-" + event.pipeVariable;
};
```
```

will produce the following target message in the target SQS queue.

Expand Down Expand Up @@ -407,9 +407,9 @@ const pipeTarget = new SqsTarget(targetQueue);

## Log destination

A pipe can produce log events that are forwarded to different log destinations.
A pipe can produce log events that are forwarded to different log destinations.
You can configure multiple destinations, but all the destination share the same log level and log data.
For details check the official [documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-logs.html).
For details check the official [documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-logs.html).

The log level and data that is included in the log events is configured on the pipe class itself.
The actual destination is defined independently, and there are three options:
Expand All @@ -436,6 +436,28 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
});
```

This example uses a CloudWatch Logs log group to store the log emitted during a pipe execution.
The log level is set to `TRACE` so all steps of the pipe are logged.
Additionally all execution data is logged as well.
This example uses a CloudWatch Logs log group to store the log emitted during a pipe execution.
The log level is set to `TRACE` so all steps of the pipe are logged.
Additionally all execution data is logged as well.

## Encrypt pipe data with KMS

You can specify that EventBridge use a customer managed key to encrypt pipe data stored at rest,
rather than use an AWS owned key as is the default.
Details can be found in the [documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-encryption-pipes-cmkey.html).

To do this, you need to specify the key in the `kmsKey` property of the pipe.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetQueue: sqs.Queue;
declare const kmsKey: kms.Key;

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: new SqsTarget(targetQueue),
kmsKey,
// pipeName is required when using a KMS key
pipeName: 'MyPipe',
});
```
43 changes: 40 additions & 3 deletions packages/@aws-cdk/aws-pipes-alpha/lib/pipe.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { IResource, Resource, Stack } from 'aws-cdk-lib';
import { IRole, Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam';
import { IResource, Resource, Stack, ValidationError } from 'aws-cdk-lib';
import { ArnPrincipal, IRole, PolicyStatement, Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam';
import * as kms from 'aws-cdk-lib/aws-kms';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';
import { addConstructMetadata } from 'aws-cdk-lib/core/lib/metadata-resource';
import { Construct } from 'constructs';
Expand Down Expand Up @@ -165,6 +166,13 @@ export interface PipeProps {
readonly tags?: {
[key: string]: string;
};

/**
* The AWS KMS customer managed key to encrypt pipe data.
*
* @default undefined - AWS managed key is used
*/
readonly kmsKey?: kms.IKey;
}

abstract class PipeBase extends Resource implements IPipe {
Expand Down Expand Up @@ -279,10 +287,38 @@ export class Pipe extends PipeBase {
return { ...currentLogConfiguration, ...additionalLogConfiguration };
}, initialLogConfiguration);

if (props.kmsKey) {
if (!props.pipeName) {
throw new ValidationError('`pipeName` is required when specifying a `kmsKey` prop.', this);
}
// Add permissions to the KMS key
// see https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-encryption-pipes-cmkey.html#eb-encryption-key-policy-pipe
props.kmsKey.addToResourcePolicy(
new PolicyStatement({
actions: ['kms:Decrypt', 'kms:DescribeKey', 'kms:GenerateDataKey'],
resources: ['*'],
principals: [new ArnPrincipal(this.pipeRole.roleArn)],
conditions: {
'ArnLike': {
'kms:EncryptionContext:aws:pipe:arn': Stack.of(this).formatArn({
service: 'pipes',
resource: 'pipe',
resourceName: props.pipeName,
}),
},
'ForAnyValue:StringEquals': {
'kms:EncryptionContextKeys': [
'aws:pipe:arn',
],
},
},
}),
);
}

/**
* Pipe resource
*/

const resource = new CfnPipe(this, 'Resource', {
name: props.pipeName,
description: props.description,
Expand All @@ -295,6 +331,7 @@ export class Pipe extends PipeBase {
targetParameters: target.targetParameters,
desiredState: props.desiredState,
logConfiguration: logConfiguration,
kmsKeyIdentifier: props.kmsKey?.keyArn,
tags: props.tags,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as cdk from 'aws-cdk-lib';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as kms from 'aws-cdk-lib/aws-kms';
import { Construct } from 'constructs';
import * as pipes from '@aws-cdk/aws-pipes-alpha';
import { SqsSource } from '@aws-cdk/aws-pipes-sources-alpha';
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit dd0d62f

Please sign in to comment.