Skip to content

Commit

Permalink
feat(pipes-enrichments): support API destination enrichment (#31312)
Browse files Browse the repository at this point in the history
### Issue # (if applicable)

Closes #29383 .

### Reason for this change
To support API destination enrichment for EventBridge pipes.


### Description of changes
Add `ApiDestinationEnrichment` class.


### Description of how you validated changes
Add unit tests and an integ test.



### Checklist
- [x] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md)

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
mazyu36 authored Oct 16, 2024
1 parent 7815144 commit 1557793
Show file tree
Hide file tree
Showing 16 changed files with 34,265 additions and 0 deletions.
19 changes: 19 additions & 0 deletions packages/@aws-cdk/aws-pipes-enrichments-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,22 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
target: new SomeTarget(targetQueue),
});
```

### API destination

API destination can be used to enrich events of a pipe.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetQueue: sqs.Queue;

declare const apiDestination: events.ApiDestination;

const enrichment = new enrichments.ApiDestinationEnrichment(apiDestination);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
enrichment,
target: new SomeTarget(targetQueue),
});
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { EnrichmentParametersConfig, IEnrichment, IPipe, InputTransformation } from '@aws-cdk/aws-pipes-alpha';
import { IApiDestination } from 'aws-cdk-lib/aws-events';
import { IRole, PolicyStatement } from 'aws-cdk-lib/aws-iam';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';

/**
* Properties for a ApiDestinationEnrichment
*/
export interface ApiDestinationEnrichmentProps {
/**
* The input transformation for the enrichment
* @see https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-input-transformation.html
* @default - None
*/
readonly inputTransformation?: InputTransformation;

/**
* The headers that need to be sent as part of request invoking the EventBridge ApiDestination.
*
* @default - none
*/
readonly headerParameters?: Record<string, string>;

/**
* The path parameter values used to populate the EventBridge API destination path wildcards ("*").
*
* @default - none
*/
readonly pathParameterValues?: string[];

/**
* The query string keys/values that need to be sent as part of request invoking the EventBridge API destination.
*
* @default - none
*/
readonly queryStringParameters?: Record<string, string>;
}

/**
* An API Destination enrichment for a pipe
*/
export class ApiDestinationEnrichment implements IEnrichment {
public readonly enrichmentArn: string;

private readonly inputTransformation?: InputTransformation;
private readonly headerParameters?: Record<string, string>;
private readonly pathParameterValues?: string[];
private readonly queryStringParameters?: Record<string, string>;

constructor(private readonly destination: IApiDestination, props?: ApiDestinationEnrichmentProps) {
this.enrichmentArn = destination.apiDestinationArn;
this.inputTransformation = props?.inputTransformation;
this.headerParameters = props?.headerParameters;
this.queryStringParameters = props?.queryStringParameters;
this.pathParameterValues = props?.pathParameterValues;
}

bind(pipe: IPipe): EnrichmentParametersConfig {

const httpParameters: CfnPipe.PipeEnrichmentHttpParametersProperty | undefined =
this.headerParameters ??
this.pathParameterValues ??
this.queryStringParameters
? {
headerParameters: this.headerParameters,
pathParameterValues: this.pathParameterValues,
queryStringParameters: this.queryStringParameters,
}
: undefined;

return {
enrichmentParameters: {
inputTemplate: this.inputTransformation?.bind(pipe).inputTemplate,
httpParameters,
},
};
}

grantInvoke(pipeRole: IRole): void {
pipeRole.addToPrincipalPolicy(new PolicyStatement({
resources: [this.destination.apiDestinationArn],
actions: ['events:InvokeApiDestination'],
}));
}
}

1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-enrichments-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './api-destination';
export * from './lambda';
export * from './stepfunctions';
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Fixture with packages imported, but nothing else
import * as cdk from 'aws-cdk-lib';
import * as events from 'aws-cdk-lib/aws-events';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as stepfunctions from 'aws-cdk-lib/aws-stepfunctions';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`api-destination should grant pipe role invoke access 1`] = `
{
"MyPipeRoleCBC8E9AB": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`api-destination should grant pipe role invoke access 2`] = `
{
"MyPipeRoleDefaultPolicy31387C20": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "events:InvokeApiDestination",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"ApiDestination3AB57A39",
"Arn",
],
},
},
],
"Version": "2012-10-17",
},
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
"Roles": [
{
"Ref": "MyPipeRoleCBC8E9AB",
},
],
},
"Type": "AWS::IAM::Policy",
},
}
`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { DynamicInput, InputTransformation, Pipe } from '@aws-cdk/aws-pipes-alpha';
import { App, Stack, SecretValue } from 'aws-cdk-lib';
import { Template } from 'aws-cdk-lib/assertions';
import * as events from 'aws-cdk-lib/aws-events';
import { Secret } from 'aws-cdk-lib/aws-secretsmanager';
import { TestSource, TestTarget } from './test-classes';
import { ApiDestinationEnrichment } from '../lib';

describe('api-destination', () => {
let app: App;
let stack: Stack;
let secret: Secret;
let connection: events.Connection;
let apiDestination: events.ApiDestination;

beforeEach(() => {
app = new App();
stack = new Stack(app, 'TestStack');
secret = new Secret(stack, 'MySecret', {
secretStringValue: SecretValue.unsafePlainText('abc123'),
});
connection = new events.Connection(stack, 'MyConnection', {
authorization: events.Authorization.apiKey('x-api-key', secret.secretValue),
description: 'Connection with API Key x-api-key',
connectionName: 'MyConnection',
});

apiDestination = new events.ApiDestination(stack, 'ApiDestination', {
apiDestinationName: 'ApiDestination',
connection,
description: 'ApiDestination',
httpMethod: events.HttpMethod.GET,
endpoint: 'someendpoint',
rateLimitPerSecond: 60,
});
});

it('should have only enrichment arn', () => {
// ARRANGE
const enrichment = new ApiDestinationEnrichment(apiDestination);

new Pipe(stack, 'MyPipe', {
source: new TestSource(),
enrichment,
target: new TestTarget(),
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
template.hasResourceProperties('AWS::Pipes::Pipe', {
Enrichment: {
'Fn::GetAtt': [
'ApiDestination3AB57A39',
'Arn',
],
},
EnrichmentParameters: {},
});
});

it('should have enrichment parameters', () => {
// ARRANGE
const enrichment = new ApiDestinationEnrichment(apiDestination, {
inputTransformation: InputTransformation.fromObject({
body: DynamicInput.fromEventPath('$.body'),
}),
headerParameters: {
headerParam: 'headerParam',
},
pathParameterValues: ['pathParam'],
queryStringParameters: {
param: 'queryParam',
},
});

new Pipe(stack, 'MyPipe', {
source: new TestSource(),
enrichment,
target: new TestTarget(),
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
template.hasResourceProperties('AWS::Pipes::Pipe', {
EnrichmentParameters: {
InputTemplate: '{"body":<$.body>}',
HttpParameters: {
HeaderParameters: {
headerParam: 'headerParam',
},
PathParameterValues: [
'pathParam',
],
QueryStringParameters: {
param: 'queryParam',
},
},
},
});
});

it('should grant pipe role invoke access', () => {
// ARRANGE
const enrichment = new ApiDestinationEnrichment(apiDestination);

new Pipe(stack, 'MyPipe', {
source: new TestSource(),
enrichment,
target: new TestTarget(),
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
expect(template.findResources('AWS::IAM::Role')).toMatchSnapshot();
expect(template.findResources('AWS::IAM::Policy')).toMatchSnapshot();
});
});

Loading

0 comments on commit 1557793

Please sign in to comment.