From dab6aca5005c8d6d180aada699a4cebc2ef5aefa Mon Sep 17 00:00:00 2001 From: Mitch Lloyd Date: Thu, 31 Mar 2022 13:54:06 -0400 Subject: [PATCH] feat(kinesisanalytics-flink): Add metrics to Flink applications (#19599) I PR'd the [original version of the aws-kinesisanalytics-flink constructs](https://github.com/aws/aws-cdk/pull/12464) to CDK. I'm following up to add the missing `metric*` methods according to the [design guidelines](https://github.com/aws/aws-cdk/blob/master/docs/DESIGN_GUIDELINES.md#metrics). [Reference for Flink Application metrics](https://docs.aws.amazon.com/kinesisanalytics/latest/java/metrics-dimensions.html). I have a few running Flink apps and I was able to see that KPUs are also reported for the Flink apps. ---- ### All Submissions: * [x] Have you followed the guidelines in our [Contributing guide?](https://github.com/aws/aws-cdk/blob/master/CONTRIBUTING.md) * [x] I don't think conventional metric changes require an update to the README. ### New Features * [x] Have you added the new feature to an [integration test](https://github.com/aws/aws-cdk/blob/master/INTEGRATION_TESTS.md)? * [x] Did you use `cdk-integ` to deploy the infrastructure and generate the snapshot (i.e. `cdk-integ` without `--dry-run`)? *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license* --- .../aws-kinesisanalytics-flink/README.md | 2 +- .../lib/application.ts | 658 ++++++++++++++++++ .../aws-kinesisanalytics-flink/package.json | 2 + .../test/application.test.ts | 63 ++ .../test/integ.application.lit.expected.json | 20 + .../test/integ.application.lit.ts | 9 +- 6 files changed, 752 insertions(+), 2 deletions(-) diff --git a/packages/@aws-cdk/aws-kinesisanalytics-flink/README.md b/packages/@aws-cdk/aws-kinesisanalytics-flink/README.md index 2882cc60afc54..8e91fcd78b6ac 100644 --- a/packages/@aws-cdk/aws-kinesisanalytics-flink/README.md +++ b/packages/@aws-cdk/aws-kinesisanalytics-flink/README.md @@ -18,7 +18,7 @@ This package provides constructs for creating Kinesis Analytics Flink applications. To learn more about using using managed Flink applications, see the [AWS developer -guide](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html). +guide](https://docs.aws.amazon.com/kinesisanalytics/latest/java/). ## Creating Flink Applications diff --git a/packages/@aws-cdk/aws-kinesisanalytics-flink/lib/application.ts b/packages/@aws-cdk/aws-kinesisanalytics-flink/lib/application.ts index 4a0f3bfc36138..f0fe1f659c035 100644 --- a/packages/@aws-cdk/aws-kinesisanalytics-flink/lib/application.ts +++ b/packages/@aws-cdk/aws-kinesisanalytics-flink/lib/application.ts @@ -1,3 +1,4 @@ +import * as cloudwatch from '@aws-cdk/aws-cloudwatch'; import * as iam from '@aws-cdk/aws-iam'; import { CfnApplicationCloudWatchLoggingOptionV2, CfnApplicationV2 } from '@aws-cdk/aws-kinesisanalytics'; import * as logs from '@aws-cdk/aws-logs'; @@ -37,6 +38,305 @@ export interface IApplication extends core.IResource, iam.IGrantable { * Convenience method for adding a policy statement to the application role. */ addToRolePolicy(policyStatement: iam.PolicyStatement): boolean; + + /** + * Return a CloudWatch metric associated with this Flink application. + * + * @param metricName The name of the metric + * @param props Customization properties + */ + metric(metricName: string, props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The number of Kinesis Processing Units that are used to run your stream + * processing application. The average number of KPUs used each hour + * determines the billing for your application. + * + * Units: Count + * + * Reporting Level: Application + * + * @default average over 5 minutes + */ + metricKpus(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The time elapsed during an outage for failing/recovering jobs. + * + * Units: Milliseconds + * + * Reporting Level: Application + * + * @default average over 5 minutes + */ + metricDowntime(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The time that the job has been running without interruption. + * + * Units: Milliseconds + * + * Reporting Level: Application + * + * @default sample count over 5 minutes + */ + metricUptime(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The total number of times this job has fully restarted since it was + * submitted. This metric does not measure fine-grained restarts. + * + * Units: Count + * + * Reporting Level: Application + * + * @default sum over 5 minutes + */ + metricFullRestarts(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The number of times checkpointing has failed. + * + * Units: Count + * + * Reporting Level: Application + * + * @default sum over 5 minutes + */ + metricNumberOfFailedCheckpoints(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The time it took to complete the last checkpoint. + * + * Units: Milliseconds + * + * Reporting Level: Application + * + * @default maximum over 5 minutes + */ + metricLastCheckpointDuration(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The total size of the last checkpoint. + * + * Units: Bytes + * + * Reporting Level: Application + * + * @default maximum over 5 minutes + */ + metricLastCheckpointSize(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The overall percentage of CPU utilization across task managers. For + * example, if there are five task managers, Kinesis Data Analytics publishes + * five samples of this metric per reporting interval. + * + * Units: Percentage + * + * Reporting Level: Application + * + * @default average over 5 minutes + */ + metricCpuUtilization(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * Overall heap memory utilization across task managers. For example, if there + * are five task managers, Kinesis Data Analytics publishes five samples of + * this metric per reporting interval. + * + * Units: Percentage + * + * Reporting Level: Application + * + * @default average over 5 minutes + */ + metricHeapMemoryUtilization(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The total time spent performing old garbage collection operations. + * + * Units: Milliseconds + * + * Reporting Level: Application + * + * @default sum over 5 minutes + */ + metricOldGenerationGCTime(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The total number of old garbage collection operations that have occurred + * across all task managers. + * + * Units: Count + * + * Reporting Level: Application + * + * @default sum over 5 minutes + */ + metricOldGenerationGCCount(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The total number of live threads used by the application. + * + * Units: Count + * + * Reporting Level: Application + * + * @default average over 5 minutes + */ + metricThreadsCount(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The total number of records this application, operator, or task has + * received. + * + * Units: Count + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricNumRecordsIn(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The total number of records this application, operator or task has + * received per second. + * + * Units: Count/Second + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricNumRecordsInPerSecond(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The total number of records this application, operator or task has emitted. + * + * Units: Count + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricNumRecordsOut(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The total number of records this application, operator or task has emitted + * per second. + * + * Units: Count/Second + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricNumRecordsOutPerSecond(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The number of records this operator or task has dropped due to arriving late. + * + * Units: Count + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default sum over 5 minutes + */ + metricNumLateRecordsDropped(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The last watermark this application/operator/task/thread has received. + * + * Units: Milliseconds + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default maximum over 5 minutes + */ + metricCurrentInputWatermark(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The last watermark this application/operator/task/thread has received. + * + * Units: Milliseconds + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default maximum over 5 minutes + */ + metricCurrentOutputWatermark(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The amount of managed memory currently used. + * + * Units: Bytes + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricManagedMemoryUsed(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The total amount of managed memory. + * + * Units: Bytes + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricManagedMemoryTotal(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * Derived from managedMemoryUsed/managedMemoryTotal. + * + * Units: Percentage + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricManagedMemoryUtilization(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The time (in milliseconds) this task or operator is idle (has no data to + * process) per second. Idle time excludes back pressured time, so if the task + * is back pressured it is not idle. + * + * Units: Milliseconds + * + * Reporting Level: Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricIdleTimeMsPerSecond(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The time (in milliseconds) this task or operator is back pressured per + * second. + * + * Units: Milliseconds + * + * Reporting Level: Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricBackPressuredTimeMsPerSecond(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * The time (in milliseconds) this task or operator is busy (neither idle nor + * back pressured) per second. Can be NaN, if the value could not be + * calculated. + * + * Units: Milliseconds + * + * Reporting Level: Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricBusyTimePerMsPerSecond(props?: cloudwatch.MetricOptions): cloudwatch.Metric; } /** @@ -60,6 +360,364 @@ abstract class ApplicationBase extends core.Resource implements IApplication { return false; } + + /** + * Return a CloudWatch metric associated with this Flink application. + * + * @param metricName The name of the metric + * @param props Customization properties + */ + metric(metricName: string, props?: cloudwatch.MetricOptions) { + return new cloudwatch.Metric({ + namespace: 'AWS/KinesisAnalytics', + metricName, + dimensionsMap: { Application: this.applicationName }, + ...props, + }).attachTo(this); + } + + /** + * The number of Kinesis Processing Units that are used to run your stream + * processing application. The average number of KPUs used each hour + * determines the billing for your application. + * + * Units: Count + * + * Reporting Level: Application + * + * @default average over 5 minutes + */ + metricKpus(props?: cloudwatch.MetricOptions) { + return this.metric('KPUs', { statistic: 'Average', ...props }); + } + + + /** + * The time elapsed during an outage for failing/recovering jobs. + * + * Units: Milliseconds + * + * Reporting Level: Application + * + * @default average over 5 minutes + */ + metricDowntime(props?: cloudwatch.MetricOptions) { + return this.metric('downtime', { statistic: 'Average', ...props }); + } + + /** + * The time that the job has been running without interruption. + * + * Units: Milliseconds + * + * Reporting Level: Application + * + * @default average over 5 minutes + */ + metricUptime(props?: cloudwatch.MetricOptions) { + return this.metric('uptime', { statistic: 'Average', ...props }); + } + + /** + * The total number of times this job has fully restarted since it was + * submitted. This metric does not measure fine-grained restarts. + * + * Units: Count + * + * Reporting Level: Application + * + * @default sum over 5 minutes + */ + metricFullRestarts(props?: cloudwatch.MetricOptions) { + return this.metric('fullRestarts', { statistic: 'Sum', ...props }); + } + + /** + * The number of times checkpointing has failed. + * + * Units: Count + * + * Reporting Level: Application + * + * @default sum over 5 minutes + */ + metricNumberOfFailedCheckpoints(props?: cloudwatch.MetricOptions) { + return this.metric('numberOfFailedCheckpoints', { statistic: 'Sum', ...props }); + } + + /** + * The time it took to complete the last checkpoint. + * + * Units: Milliseconds + * + * Reporting Level: Application + * + * @default maximum over 5 minutes + */ + metricLastCheckpointDuration(props?: cloudwatch.MetricOptions) { + return this.metric('lastCheckpointDuration', { statistic: 'Maximum', ...props }); + } + + /** + * The total size of the last checkpoint. + * + * Units: Bytes + * + * Reporting Level: Application + * + * @default maximum over 5 minutes + */ + metricLastCheckpointSize(props?: cloudwatch.MetricOptions) { + return this.metric('lastCheckpointSize', { statistic: 'Maximum', ...props }); + } + + /** + * The overall percentage of CPU utilization across task managers. For + * example, if there are five task managers, Kinesis Data Analytics publishes + * five samples of this metric per reporting interval. + * + * Units: Percentage + * + * Reporting Level: Application + * + * @default average over 5 minutes + */ + metricCpuUtilization(props?: cloudwatch.MetricOptions) { + return this.metric('cpuUtilization', { statistic: 'Average', ...props }); + } + + /** + * Overall heap memory utilization across task managers. For example, if there + * are five task managers, Kinesis Data Analytics publishes five samples of + * this metric per reporting interval. + * + * Units: Percentage + * + * Reporting Level: Application + * + * @default average over 5 minutes + */ + metricHeapMemoryUtilization(props?: cloudwatch.MetricOptions) { + return this.metric('heapMemoryUtilization', { statistic: 'Average', ...props }); + } + + /** + * The total time spent performing old garbage collection operations. + * + * Units: Milliseconds + * + * Reporting Level: Application + * + * @default sum over 5 minutes + */ + metricOldGenerationGCTime(props?: cloudwatch.MetricOptions) { + return this.metric('oldGenerationGCTime', { statistic: 'Sum', ...props }); + } + + /** + * The total number of old garbage collection operations that have occurred + * across all task managers. + * + * Units: Count + * + * Reporting Level: Application + * + * @default sum over 5 minutes + */ + metricOldGenerationGCCount(props?: cloudwatch.MetricOptions) { + return this.metric('oldGenerationGCCount', { statistic: 'Sum', ...props }); + } + + /** + * The total number of live threads used by the application. + * + * Units: Count + * + * Reporting Level: Application + * + * @default average over 5 minutes + */ + metricThreadsCount(props?: cloudwatch.MetricOptions) { + return this.metric('threadsCount', { statistic: 'Average', ...props }); + } + + /** + * The total number of records this application, operator, or task has + * received. + * + * Units: Count + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricNumRecordsIn(props?: cloudwatch.MetricOptions) { + return this.metric('numRecordsIn', { statistic: 'Average', ...props }); + } + + /** + * The total number of records this application, operator or task has received + * per second. + * + * Units: Count/Second + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricNumRecordsInPerSecond(props?: cloudwatch.MetricOptions) { + return this.metric('numRecordsInPerSecond', { statistic: 'Average', ...props }); + } + + /** + * The total number of records this application, operator or task has emitted. + * + * Units: Count + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricNumRecordsOut(props?: cloudwatch.MetricOptions) { + return this.metric('numRecordsOut', { statistic: 'Average', ...props }); + } + + /** + * The total number of records this application, operator or task has emitted + * per second. + * + * Units: Count/Second + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricNumRecordsOutPerSecond(props?: cloudwatch.MetricOptions) { + return this.metric('numRecordsOutPerSecond', { statistic: 'Average', ...props }); + } + + /** + * The number of records this operator or task has dropped due to arriving + * late. + * + * Units: Count + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default sum over 5 minutes + */ + metricNumLateRecordsDropped(props?: cloudwatch.MetricOptions) { + return this.metric('numLateRecordsDropped', { statistic: 'Sum', ...props }); + } + + /** + * The last watermark this application/operator/task/thread has received. + * + * Units: Milliseconds + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default maximum over 5 minutes + */ + metricCurrentInputWatermark(props?: cloudwatch.MetricOptions) { + return this.metric('currentInputWatermark', { statistic: 'Maximum', ...props }); + } + + /** + * The last watermark this application/operator/task/thread has received. + * + * Units: Milliseconds + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default maximum over 5 minutes + */ + metricCurrentOutputWatermark(props?: cloudwatch.MetricOptions) { + return this.metric('currentOutputWatermark', { statistic: 'Maximum', ...props }); + } + + /** + * The amount of managed memory currently used. + * + * Units: Bytes + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricManagedMemoryUsed(props?: cloudwatch.MetricOptions) { + return this.metric('managedMemoryUsed', { statistic: 'Average', ...props }); + } + + /** + * The total amount of managed memory. + * + * Units: Bytes + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricManagedMemoryTotal(props?: cloudwatch.MetricOptions) { + return this.metric('managedMemoryTotal', { statistic: 'Average', ...props }); + } + + /** + * Derived from managedMemoryUsed/managedMemoryTotal. + * + * Units: Percentage + * + * Reporting Level: Application, Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricManagedMemoryUtilization(props?: cloudwatch.MetricOptions) { + return this.metric('managedMemoryUtilization', { statistic: 'Average', ...props }); + } + + /** + * The time (in milliseconds) this task or operator is idle (has no data to + * process) per second. Idle time excludes back pressured time, so if the task + * is back pressured it is not idle. + * + * Units: Milliseconds + * + * Reporting Level: Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricIdleTimeMsPerSecond(props?: cloudwatch.MetricOptions) { + return this.metric('idleTimeMsPerSecond', { statistic: 'Average', ...props }); + } + + /** + * The time (in milliseconds) this task or operator is back pressured per + * second. + * + * Units: Milliseconds + * + * Reporting Level: Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricBackPressuredTimeMsPerSecond(props?: cloudwatch.MetricOptions) { + return this.metric('backPressuredTimeMsPerSecond', { statistic: 'Average', ...props }); + } + + /** + * The time (in milliseconds) this task or operator is busy (neither idle nor + * back pressured) per second. Can be NaN, if the value could not be + * calculated. + * + * Units: Milliseconds + * + * Reporting Level: Operator, Task, Parallelism + * + * @default average over 5 minutes + */ + metricBusyTimePerMsPerSecond(props?: cloudwatch.MetricOptions) { + return this.metric('busyTimePerMsPerSecond', { statistic: 'Average', ...props }); + } } /** diff --git a/packages/@aws-cdk/aws-kinesisanalytics-flink/package.json b/packages/@aws-cdk/aws-kinesisanalytics-flink/package.json index fb639112e35c2..fff56a1f9b93c 100644 --- a/packages/@aws-cdk/aws-kinesisanalytics-flink/package.json +++ b/packages/@aws-cdk/aws-kinesisanalytics-flink/package.json @@ -82,6 +82,7 @@ }, "dependencies": { "@aws-cdk/assets": "0.0.0", + "@aws-cdk/aws-cloudwatch": "0.0.0", "@aws-cdk/aws-iam": "0.0.0", "@aws-cdk/aws-kinesisanalytics": "0.0.0", "@aws-cdk/aws-kms": "0.0.0", @@ -94,6 +95,7 @@ "homepage": "https://github.com/aws/aws-cdk", "peerDependencies": { "@aws-cdk/assets": "0.0.0", + "@aws-cdk/aws-cloudwatch": "0.0.0", "@aws-cdk/aws-iam": "0.0.0", "@aws-cdk/aws-kinesisanalytics": "0.0.0", "@aws-cdk/aws-kms": "0.0.0", diff --git a/packages/@aws-cdk/aws-kinesisanalytics-flink/test/application.test.ts b/packages/@aws-cdk/aws-kinesisanalytics-flink/test/application.test.ts index a4546a5fa9423..8c951730e65c2 100644 --- a/packages/@aws-cdk/aws-kinesisanalytics-flink/test/application.test.ts +++ b/packages/@aws-cdk/aws-kinesisanalytics-flink/test/application.test.ts @@ -1,5 +1,6 @@ import * as path from 'path'; import { Match, Template } from '@aws-cdk/assertions'; +import * as cloudwatch from '@aws-cdk/aws-cloudwatch'; import * as iam from '@aws-cdk/aws-iam'; import * as logs from '@aws-cdk/aws-logs'; import * as s3 from '@aws-cdk/aws-s3'; @@ -602,4 +603,66 @@ describe('Application', () => { expect(flinkApp.applicationArn).toEqual(arn); expect(flinkApp.addToRolePolicy(new iam.PolicyStatement())).toBe(false); }); + + test('get metric', () => { + const flinkApp = new flink.Application(stack, 'Application', { ...requiredProps }); + expect(flinkApp.metric('KPUs', { statistic: 'Sum' })) + .toMatchObject({ + namespace: 'AWS/KinesisAnalytics', + metricName: 'KPUs', + dimensions: { Application: flinkApp.applicationName }, + statistic: 'Sum', + }); + }); + + test('canned metrics', () => { + const flinkApp = new flink.Application(stack, 'Application', { ...requiredProps }); + + // Table driven test with: [method, metricName, default statistic] + const assertions: Array<[(options?: cloudwatch.MetricOptions) => cloudwatch.Metric, string, string]> = [ + [flinkApp.metricKpus, 'KPUs', 'Average'], + [flinkApp.metricDowntime, 'downtime', 'Average'], + [flinkApp.metricUptime, 'uptime', 'Average'], + [flinkApp.metricFullRestarts, 'fullRestarts', 'Sum'], + [flinkApp.metricNumberOfFailedCheckpoints, 'numberOfFailedCheckpoints', 'Sum'], + [flinkApp.metricLastCheckpointDuration, 'lastCheckpointDuration', 'Maximum'], + [flinkApp.metricLastCheckpointSize, 'lastCheckpointSize', 'Maximum'], + [flinkApp.metricCpuUtilization, 'cpuUtilization', 'Average'], + [flinkApp.metricHeapMemoryUtilization, 'heapMemoryUtilization', 'Average'], + [flinkApp.metricOldGenerationGCTime, 'oldGenerationGCTime', 'Sum'], + [flinkApp.metricOldGenerationGCCount, 'oldGenerationGCCount', 'Sum'], + [flinkApp.metricThreadsCount, 'threadsCount', 'Average'], + [flinkApp.metricNumRecordsIn, 'numRecordsIn', 'Average'], + [flinkApp.metricNumRecordsInPerSecond, 'numRecordsInPerSecond', 'Average'], + [flinkApp.metricNumRecordsOut, 'numRecordsOut', 'Average'], + [flinkApp.metricNumRecordsOutPerSecond, 'numRecordsOutPerSecond', 'Average'], + [flinkApp.metricNumLateRecordsDropped, 'numLateRecordsDropped', 'Sum'], + [flinkApp.metricCurrentInputWatermark, 'currentInputWatermark', 'Maximum'], + [flinkApp.metricCurrentOutputWatermark, 'currentOutputWatermark', 'Maximum'], + [flinkApp.metricManagedMemoryUsed, 'managedMemoryUsed', 'Average'], + [flinkApp.metricManagedMemoryTotal, 'managedMemoryTotal', 'Average'], + [flinkApp.metricManagedMemoryUtilization, 'managedMemoryUtilization', 'Average'], + [flinkApp.metricIdleTimeMsPerSecond, 'idleTimeMsPerSecond', 'Average'], + [flinkApp.metricBackPressuredTimeMsPerSecond, 'backPressuredTimeMsPerSecond', 'Average'], + [flinkApp.metricBusyTimePerMsPerSecond, 'busyTimePerMsPerSecond', 'Average'], + ]; + + assertions.forEach(([method, metricName, defaultStatistic]) => { + // Test metrics with no options provided + expect(method.call(flinkApp)).toMatchObject({ + metricName, + statistic: defaultStatistic, + namespace: 'AWS/KinesisAnalytics', + dimensions: { + Application: flinkApp.applicationName, + }, + }); + + // Make sure we can override the default statistic and add other options + expect(method.call(flinkApp, { statistic: 'special', color: '#00ff00' })).toMatchObject({ + statistic: 'special', + color: '#00ff00', + }); + }); + }); }); diff --git a/packages/@aws-cdk/aws-kinesisanalytics-flink/test/integ.application.lit.expected.json b/packages/@aws-cdk/aws-kinesisanalytics-flink/test/integ.application.lit.expected.json index 5ab3c94353f04..60f2f92a97420 100644 --- a/packages/@aws-cdk/aws-kinesisanalytics-flink/test/integ.application.lit.expected.json +++ b/packages/@aws-cdk/aws-kinesisanalytics-flink/test/integ.application.lit.expected.json @@ -276,6 +276,26 @@ } } } + }, + "Alarm7103F465": { + "Type": "AWS::CloudWatch::Alarm", + "Properties": { + "ComparisonOperator": "GreaterThanOrEqualToThreshold", + "EvaluationPeriods": 1, + "Dimensions": [ + { + "Name": "Application", + "Value": { + "Ref": "AppF1B96344" + } + } + ], + "MetricName": "fullRestarts", + "Namespace": "AWS/KinesisAnalytics", + "Period": 300, + "Statistic": "Sum", + "Threshold": 3 + } } }, "Parameters": { diff --git a/packages/@aws-cdk/aws-kinesisanalytics-flink/test/integ.application.lit.ts b/packages/@aws-cdk/aws-kinesisanalytics-flink/test/integ.application.lit.ts index 02a6a9949dcfa..11c8b5bda36f4 100644 --- a/packages/@aws-cdk/aws-kinesisanalytics-flink/test/integ.application.lit.ts +++ b/packages/@aws-cdk/aws-kinesisanalytics-flink/test/integ.application.lit.ts @@ -2,14 +2,21 @@ import * as path from 'path'; import * as core from '@aws-cdk/core'; import * as flink from '../lib'; +import * as cloudwatch from '@aws-cdk/aws-cloudwatch'; const app = new core.App(); const stack = new core.Stack(app, 'FlinkAppTest'); -new flink.Application(stack, 'App', { +const flinkApp = new flink.Application(stack, 'App', { code: flink.ApplicationCode.fromAsset(path.join(__dirname, 'code-asset')), runtime: flink.Runtime.FLINK_1_11, }); + +new cloudwatch.Alarm(stack, 'Alarm', { + metric: flinkApp.metricFullRestarts(), + evaluationPeriods: 1, + threshold: 3, +}); ///! hide app.synth();