Skip to content

Commit

Permalink
feat(aws-lambda-event-sources): add optional disabled event source ma…
Browse files Browse the repository at this point in the history
…pping options parameter to supported event sources in Lambda

closes #5750
  • Loading branch information
Zeynep Karasozen committed Mar 2, 2020
1 parent cfea360 commit 45e4d4c
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 10 deletions.
3 changes: 3 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ behavior:
* __receiveMessageWaitTime__: Will determine [long
poll](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html)
duration. The default value is 20 seconds.
* __enabled__: Disables or enables the event source mapping to pause/start polling and invocation.
```ts
import sqs = require('@aws-cdk/aws-sqs');
Expand Down Expand Up @@ -137,6 +138,7 @@ and add it to your Lambda function. The following parameters will impact Amazon
* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __enabled__: Disables or enables the event source mapping to pause/start polling and invocation.
```ts
import dynamodb = require('@aws-cdk/aws-dynamodb');
Expand Down Expand Up @@ -167,6 +169,7 @@ behavior:
* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __enabled__: Disables or enables the event source mapping to pause/start polling and invocation.
```ts
import lambda = require('@aws-cdk/aws-lambda');
Expand Down
8 changes: 8 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ export interface SqsEventSourceProps {
* @default 10
*/
readonly batchSize?: number;

/**
* Disables the event source mapping to pause polling and invocation.
*
* @default true
*/
readonly enabled?: boolean;
}

/**
Expand All @@ -30,6 +37,7 @@ export class SqsEventSource implements lambda.IEventSource {
const eventSourceMapping = target.addEventSourceMapping(`SqsEventSource:${this.queue.node.uniqueId}`, {
batchSize: this.props.batchSize,
eventSourceArn: this.queue.queueArn,
enabled: this.props.enabled
});
this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;

Expand Down
8 changes: 8 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ export interface StreamEventSourceProps {
* @default Duration.seconds(0)
*/
readonly maxBatchingWindow?: Duration;

/**
* Disables the event source mapping to pause polling and invocation.
*
* @default true
*/
readonly enabled?: boolean;
}

/**
Expand All @@ -50,6 +57,7 @@ export abstract class StreamEventSource implements lambda.IEventSource {
batchSize: this.props.batchSize || 100,
startingPosition: this.props.startingPosition,
maxBatchingWindow: this.props.maxBatchingWindow,
enabled: this.props.enabled
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
"Ref": "FC4345940"
},
"BatchSize": 5,
"Enabled": false,
"StartingPosition": "TRIM_HORIZON"
}
},
Expand All @@ -138,8 +139,8 @@
"StreamViewType": "NEW_IMAGE"
}
},
"DeletionPolicy": "Delete",
"UpdateReplacePolicy": "Delete"
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class DynamoEventSourceTest extends cdk.Stack {

fn.addEventSource(new DynamoEventSource(queue, {
batchSize: 5,
startingPosition: lambda.StartingPosition.TRIM_HORIZON
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
enabled: false
}));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"Ref": "FC4345940"
},
"BatchSize": 100,
"Enabled": false,
"StartingPosition": "TRIM_HORIZON"
}
},
Expand All @@ -105,4 +106,4 @@
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class KinesisEventSourceTest extends cdk.Stack {
const stream = new kinesis.Stream(this, 'Q');

fn.addEventSource(new KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
enabled: false
}));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,12 @@
"FunctionName": {
"Ref": "FC4345940"
},
"BatchSize": 5
"BatchSize": 5,
"Enabled": false
}
},
"Q63C6E3AB": {
"Type": "AWS::SQS::Queue"
}
}
}
}
3 changes: 2 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/test/integ.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class SqsEventSourceTest extends cdk.Stack {
const queue = new sqs.Queue(this, 'Q');

fn.addEventSource(new SqsEventSource(queue, {
batchSize: 5
batchSize: 5,
enabled: false
}));
}
}
Expand Down
35 changes: 35 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,4 +273,39 @@ export = {
test.done();
},

'initialize event source disabled'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING
},
stream: dynamodb.StreamViewType.NEW_IMAGE
});

// WHEN
fn.addEventSource(new sources.DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.LATEST,
enabled: false
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
"EventSourceArn": {
"Fn::GetAtt": [
"TD925BC7E",
"StreamArn"
]
},
"FunctionName": {
"Ref": "Fn9270CBC0"
},
"Enabled": false,
"StartingPosition": "LATEST"
}));

test.done();
},
};
30 changes: 30 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,34 @@ export = {
test.throws(() => eventSource.eventSourceMappingId, /KinesisEventSource is not yet bound to an event source mapping/);
test.done();
},

'initialize event source disabled'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const stream = new kinesis.Stream(stack, 'S');

// WHEN
fn.addEventSource(new sources.KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.LATEST,
enabled: false
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
"EventSourceArn": {
"Fn::GetAtt": [
"S509448A1",
"Arn"
]
},
"FunctionName": {
"Ref": "Fn9270CBC0"
},
"Enabled": false,
"StartingPosition": "LATEST"
}));

test.done();
},
};
26 changes: 26 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,30 @@ export = {
test.throws(() => eventSource.eventSourceMappingId, /SqsEventSource is not yet bound to an event source mapping/);
test.done();
},

'initialize event source disabled'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {enabled: false}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
"EventSourceArn": {
"Fn::GetAtt": [
"Q63C6E3AB",
"Arn"
]
},
"FunctionName": {
"Ref": "Fn9270CBC0"
},
"Enabled": false
}));

test.done();
}
};
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-lambda/lib/event-source.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { IFunction } from './function-base';

/**
* An abstract class which represents an AWS Lambda event source.
*/
export interface IEventSource {

/**
* Called by `lambda.addEventSource` to allow the event source to bind to this
* function.
Expand Down

0 comments on commit 45e4d4c

Please sign in to comment.