Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipes-alpha): support for customer-managed KMS keys to encrypt pipe data #33546

Merged
merged 12 commits into from
Feb 27, 2025
46 changes: 34 additions & 12 deletions packages/@aws-cdk/aws-pipes-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ can be filtered, transformed and enriched.

![diagram of pipes](https://d1.awsstatic.com/product-marketing/EventBridge/Product-Page-Diagram_Amazon-EventBridge-Pipes.cd7961854be4432d63f6158ffd18271d6c9fa3ec.png)

For more details see the [service documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed several unnecessary spaces.

For more details see the [service documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html).

## Pipe

Expand Down Expand Up @@ -65,7 +65,7 @@ A source is a AWS Service that is polled. The following sources are possible:
- [Amazon SQS queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html)
- [Apache Kafka stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kafka.html)

Currently, DynamoDB, Kinesis, and SQS are supported. If you are interested in support for additional sources,
Currently, DynamoDB, Kinesis, and SQS are supported. If you are interested in support for additional sources,
kindly let us know by opening a GitHub issue or raising a PR.

### Example source
Expand Down Expand Up @@ -107,7 +107,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {

This example shows a filter that only forwards events with the `customerType` B2B or B2C from the source messages. Messages that are not matching the filter are not forwarded to the enrichment or target step.

You can define multiple filter pattern which are combined with a logical `OR`.
You can define multiple filter pattern which are combined with a logical `OR`.

Additional filter pattern and details can be found in the EventBridge pipes [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html).

Expand All @@ -117,7 +117,7 @@ For enrichments and targets the input event can be transformed. The transformati
A transformation has access to the input event as well to some context information of the pipe itself like the name of the pipe.
See [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-input-transformation.html) for details.

### Example - input transformation from object
### Example - input transformation from object

The input transformation can be created from an object. The object can contain static values, dynamic values or pipe variables.

Expand Down Expand Up @@ -185,7 +185,7 @@ If the transformation is applied to a target it might be converted to a string r

In cases where you want to forward only a part of the event to the target you can use the transformation event path.

> This only works for targets because the enrichment needs to have a valid json as input.
> This only works for targets because the enrichment needs to have a valid json as input.


```ts
Expand All @@ -204,7 +204,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {

This transformation extracts the body of the event.

So when the following batch of input events is processed by the pipe
So when the following batch of input events is processed by the pipe

```json
[
Expand Down Expand Up @@ -357,7 +357,7 @@ For example a lambda function that returns a concatenation of the static field,
export async function handler (event: any) {
return event.staticField + "-" + event.dynamicField + "-" + event.pipeVariable;
};
```
```

will produce the following target message in the target SQS queue.

Expand Down Expand Up @@ -407,9 +407,9 @@ const pipeTarget = new SqsTarget(targetQueue);

## Log destination

A pipe can produce log events that are forwarded to different log destinations.
A pipe can produce log events that are forwarded to different log destinations.
You can configure multiple destinations, but all the destination share the same log level and log data.
For details check the official [documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-logs.html).
For details check the official [documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-logs.html).

The log level and data that is included in the log events is configured on the pipe class itself.
The actual destination is defined independently, and there are three options:
Expand All @@ -436,6 +436,28 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
});
```

This example uses a CloudWatch Logs log group to store the log emitted during a pipe execution.
The log level is set to `TRACE` so all steps of the pipe are logged.
Additionally all execution data is logged as well.
This example uses a CloudWatch Logs log group to store the log emitted during a pipe execution.
The log level is set to `TRACE` so all steps of the pipe are logged.
Additionally all execution data is logged as well.

## Encrypt pipe data with KMS

You can specify that EventBridge use a customer managed key to encrypt pipe data stored at rest,
rather than use an AWS owned key as is the default.
Details can be found in the [documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-encryption-pipes-cmkey.html).

To do this, you need to specify the key in the `kmsKey` property of the pipe.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetQueue: sqs.Queue;
declare const kmsKey: kms.Key;

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: new SqsTarget(targetQueue),
kmsKey,
// pipeName is required when using a KMS key
pipeName: 'MyPipe',
});
```
43 changes: 40 additions & 3 deletions packages/@aws-cdk/aws-pipes-alpha/lib/pipe.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { IResource, Resource, Stack } from 'aws-cdk-lib';
import { IRole, Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam';
import { IResource, Resource, Stack, ValidationError } from 'aws-cdk-lib';
import { ArnPrincipal, IRole, PolicyStatement, Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam';
import * as kms from 'aws-cdk-lib/aws-kms';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';
import { addConstructMetadata } from 'aws-cdk-lib/core/lib/metadata-resource';
import { Construct } from 'constructs';
Expand Down Expand Up @@ -165,6 +166,13 @@ export interface PipeProps {
readonly tags?: {
[key: string]: string;
};

/**
* The AWS KMS customer managed key to encrypt pipe data.
*
* @default undefined - AWS managed key is used
*/
readonly kmsKey?: kms.IKey;
}

abstract class PipeBase extends Resource implements IPipe {
Expand Down Expand Up @@ -279,10 +287,38 @@ export class Pipe extends PipeBase {
return { ...currentLogConfiguration, ...additionalLogConfiguration };
}, initialLogConfiguration);

if (props.kmsKey) {
if (!props.pipeName) {
throw new ValidationError('`pipeName` is required when specifying a `kmsKey` prop.', this);
}
// Add permissions to the KMS key
// see https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-encryption-pipes-cmkey.html#eb-encryption-key-policy-pipe
props.kmsKey.addToResourcePolicy(
new PolicyStatement({
actions: ['kms:Decrypt', 'kms:DescribeKey', 'kms:GenerateDataKey'],
resources: ['*'],
principals: [new ArnPrincipal(this.pipeRole.roleArn)],
conditions: {
'ArnLike': {
'kms:EncryptionContext:aws:pipe:arn': Stack.of(this).formatArn({
service: 'pipes',
resource: 'pipe',
resourceName: props.pipeName,
}),
},
'ForAnyValue:StringEquals': {
'kms:EncryptionContextKeys': [
'aws:pipe:arn',
],
},
},
}),
);
}

/**
* Pipe resource
*/

const resource = new CfnPipe(this, 'Resource', {
name: props.pipeName,
description: props.description,
Expand All @@ -295,6 +331,7 @@ export class Pipe extends PipeBase {
targetParameters: target.targetParameters,
desiredState: props.desiredState,
logConfiguration: logConfiguration,
kmsKeyIdentifier: props.kmsKey?.keyArn,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question—did it work correctly even without setting a key policy?

When I previously attempted to contribute to this, I encountered an error without a key policy, but I couldn't come up with a proper way to configure it and had to give up at the time.

It might have been my misunderstanding back then, or something might have changed with an update. If it worked fine, please just ignore this question.

Ref: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-encryption-key-policy.html#eb-encryption-key-policy-pipe

Copy link
Contributor Author

@badmintoncryer badmintoncryer Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are absolutely right.. I misunderstood that this implementation would be work fine. Thank you for your great suggestion.

How about adding the restriction that makes pipeName mandatory when kmsKey is provided?
This idea is not ideal, but it would provide the minimum feature that enables us to configure CMK for Pipes.

Copy link
Contributor

@mazyu36 mazyu36 Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for confirming. So that's how it is...
I think your suggested solution is good.​​​​​​​​​​​​​​​​

Personally, I've been considering the following three options:

  1. Make name mandatory when kmsKey is specified. This is the same as the suggestion you provided.
  2. Instead of setting name to undefined when it's not specified, automatically generate it using Names.uniquid. However, I understand this would be a breaking change.
  3. Configure kmsKey using a custom resource. I think this option is not ideal.

Based on the above, I believe option 1 or option 2 would be good, and I think option 1 would be better to avoid a breaking change.

Copy link
Contributor Author

@badmintoncryer badmintoncryer Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't thought of option 2. I think implementing option 2 with a feature flag is also a good idea.
For now, I'll proceed with option 1. Of course, I'll also add integration testing to verify the functionality.

Copy link
Contributor Author

@badmintoncryer badmintoncryer Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just noticed that the feature flag is not essential because this is alpha module..
I think the option 2 might be preferable for alpha module. I'll leave the decision to the maintainer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think Options2 is smarter, but since the impact is significant, I'd like to wait for the maintainer's opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the deep dive. I think Option 1 makes sense to me. Can you help me understand Option 2 + feature flag solution?
In my understanding, we can't create the policy without knowing the pipe name. If we add a feature flag, for new stacks, we will always create a default pipe name if not specified which would solve the issue. However, for existing stack (who did not enable this feature flag), we will still have the same issue right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GavinZZ

Thank you for the clarification.

However, for existing stack (who did not enable this feature flag), we will still have the same issue right?

You're absolutely right! When setting a CMK for stacks under an App where the function flag is not enabled, the pipeName cannot be obtained, resulting in the inability to set the correct key policy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick response. Given the above discussion, I'll remove the do-not-merge label and let's go with the solution you had here. Thank you!

tags: props.tags,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as cdk from 'aws-cdk-lib';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as kms from 'aws-cdk-lib/aws-kms';
import { Construct } from 'constructs';
import * as pipes from '@aws-cdk/aws-pipes-alpha';
import { SqsSource } from '@aws-cdk/aws-pipes-sources-alpha';
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading