-
Notifications
You must be signed in to change notification settings - Fork 4k
/
Copy pathinteg.kinesis-data-firehose-put-record.ts
69 lines (59 loc) · 2.35 KB
/
integ.kinesis-data-firehose-put-record.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import * as firehose from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as destinations from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';
import * as scheduler from '@aws-cdk/aws-scheduler-alpha';
import { AwsApiCall, ExpectedResult, IntegTest } from '@aws-cdk/integ-tests-alpha';
import * as cdk from 'aws-cdk-lib';
import { Bucket } from 'aws-cdk-lib/aws-s3';
import { KinesisDataFirehosePutRecord } from '../lib';
/*
* Stack verification steps:
* A record is put to the kinesis data firehose stream by the scheduler
* The firehose deliveries the record to S3 bucket
* The assertion checks there is an object in the S3 bucket
*/
const app = new cdk.App();
const stack = new cdk.Stack(app, 'aws-cdk-scheduler-targets-firehose-put-record');
const payload = {
Data: 'record',
};
const destinationBucket = new Bucket(stack, 'DestinationBucket', {
removalPolicy: cdk.RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});
const deliveryStreamRole = new cdk.aws_iam.Role(stack, 'deliveryStreamRole', {
assumedBy: new cdk.aws_iam.ServicePrincipal('firehose.amazonaws.com'),
});
destinationBucket.grantReadWrite(deliveryStreamRole);
const firehoseStream = new firehose.DeliveryStream(stack, 'MyFirehoseStream', {
destination: new destinations.S3Bucket(destinationBucket, {
role: deliveryStreamRole,
bufferingInterval: cdk.Duration.minutes(1),
}),
});
new scheduler.Schedule(stack, 'Schedule', {
schedule: scheduler.ScheduleExpression.rate(cdk.Duration.minutes(1)),
target: new KinesisDataFirehosePutRecord(firehoseStream, {
input: scheduler.ScheduleTargetInput.fromObject(payload),
}),
});
const integrationTest = new IntegTest(app, 'integrationtest-firehose-put-record', {
testCases: [stack],
stackUpdateWorkflow: false, // this would cause the schedule to trigger with the old code
});
// Verifies that an object was delivered to the S3 bucket by the firehose
const objects = integrationTest.assertions.awsApiCall('S3', 'listObjectsV2', {
Bucket: destinationBucket.bucketName,
MaxKeys: 1,
}).expect(ExpectedResult.objectLike({
KeyCount: 1,
})).waitForAssertions({
interval: cdk.Duration.seconds(30),
totalTimeout: cdk.Duration.minutes(10),
});
if (objects instanceof AwsApiCall && objects.waiterProvider) {
objects.waiterProvider.addToRolePolicy({
Effect: 'Allow',
Action: ['s3:GetObject', 's3:ListBucket'],
Resource: ['*'],
});
}