Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lambda-event-sources): disable event source #6541

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ behavior:
* __receiveMessageWaitTime__: Will determine [long
poll](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html)
duration. The default value is 20 seconds.
* __enabled__: Disables or enables the event source mapping to pause/start polling and invocation.

```ts
import sqs = require('@aws-cdk/aws-sqs');
Expand Down Expand Up @@ -137,6 +138,7 @@ and add it to your Lambda function. The following parameters will impact Amazon

* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __enabled__: Disables or enables the event source mapping to pause/start polling and invocation.
Copy link
Contributor

@nija-at nija-at Mar 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not applicable to other event sources, such as S3 and SNS?

The documentation here seems to suggest that it's applicable to all event source types, no?


```ts
import dynamodb = require('@aws-cdk/aws-dynamodb');
Expand Down Expand Up @@ -167,6 +169,7 @@ behavior:

* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __enabled__: Disables or enables the event source mapping to pause/start polling and invocation.

```ts
import lambda = require('@aws-cdk/aws-lambda');
Expand Down
8 changes: 8 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ export interface SqsEventSourceProps {
* @default 10
*/
readonly batchSize?: number;

/**
* Disables the event source mapping to pause polling and invocation.
*
* @default true
*/
readonly enabled?: boolean;
}

/**
Expand All @@ -30,6 +37,7 @@ export class SqsEventSource implements lambda.IEventSource {
const eventSourceMapping = target.addEventSourceMapping(`SqsEventSource:${this.queue.node.uniqueId}`, {
batchSize: this.props.batchSize,
eventSourceArn: this.queue.queueArn,
enabled: this.props.enabled
});
this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;

Expand Down
8 changes: 8 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ export interface StreamEventSourceProps {
* @default Duration.seconds(0)
*/
readonly maxBatchingWindow?: Duration;

/**
* Disables the event source mapping to pause polling and invocation.
*
* @default true
*/
readonly enabled?: boolean;
}

/**
Expand All @@ -50,6 +57,7 @@ export abstract class StreamEventSource implements lambda.IEventSource {
batchSize: this.props.batchSize || 100,
startingPosition: this.props.startingPosition,
maxBatchingWindow: this.props.maxBatchingWindow,
enabled: this.props.enabled
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
"Ref": "FC4345940"
},
"BatchSize": 5,
"Enabled": false,
"StartingPosition": "TRIM_HORIZON"
}
},
Expand All @@ -138,8 +139,8 @@
"StreamViewType": "NEW_IMAGE"
}
},
"DeletionPolicy": "Delete",
"UpdateReplacePolicy": "Delete"
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class DynamoEventSourceTest extends cdk.Stack {

fn.addEventSource(new DynamoEventSource(queue, {
batchSize: 5,
startingPosition: lambda.StartingPosition.TRIM_HORIZON
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
enabled: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid changing an existing integration test.

Actually, an integration test is not needed for this feature; unit tests are sufficient.

}));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"Ref": "FC4345940"
},
"BatchSize": 100,
"Enabled": false,
"StartingPosition": "TRIM_HORIZON"
}
},
Expand All @@ -105,4 +106,4 @@
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class KinesisEventSourceTest extends cdk.Stack {
const stream = new kinesis.Stream(this, 'Q');

fn.addEventSource(new KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
enabled: false
}));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,12 @@
"FunctionName": {
"Ref": "FC4345940"
},
"BatchSize": 5
"BatchSize": 5,
"Enabled": false
}
},
"Q63C6E3AB": {
"Type": "AWS::SQS::Queue"
}
}
}
}
3 changes: 2 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/test/integ.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class SqsEventSourceTest extends cdk.Stack {
const queue = new sqs.Queue(this, 'Q');

fn.addEventSource(new SqsEventSource(queue, {
batchSize: 5
batchSize: 5,
enabled: false
}));
}
}
Expand Down
35 changes: 35 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,4 +273,39 @@ export = {
test.done();
},

'initialize event source disabled'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING
},
stream: dynamodb.StreamViewType.NEW_IMAGE
});

// WHEN
fn.addEventSource(new sources.DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.LATEST,
enabled: false
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
"EventSourceArn": {
"Fn::GetAtt": [
"TD925BC7E",
"StreamArn"
]
},
"FunctionName": {
"Ref": "Fn9270CBC0"
},
"Enabled": false,
"StartingPosition": "LATEST"
}));

test.done();
},
};
30 changes: 30 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,34 @@ export = {
test.throws(() => eventSource.eventSourceMappingId, /KinesisEventSource is not yet bound to an event source mapping/);
test.done();
},

'initialize event source disabled'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const stream = new kinesis.Stream(stack, 'S');

// WHEN
fn.addEventSource(new sources.KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.LATEST,
enabled: false
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
"EventSourceArn": {
"Fn::GetAtt": [
"S509448A1",
"Arn"
]
},
"FunctionName": {
"Ref": "Fn9270CBC0"
},
"Enabled": false,
"StartingPosition": "LATEST"
}));

test.done();
},
};
26 changes: 26 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,30 @@ export = {
test.throws(() => eventSource.eventSourceMappingId, /SqsEventSource is not yet bound to an event source mapping/);
test.done();
},

'initialize event source disabled'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {enabled: false}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
"EventSourceArn": {
"Fn::GetAtt": [
"Q63C6E3AB",
"Arn"
]
},
"FunctionName": {
"Ref": "Fn9270CBC0"
},
"Enabled": false
}));

test.done();
}
};