From 9cb9fb879a95c77b4f0048ac6a1d714bb9889efb Mon Sep 17 00:00:00 2001 From: Luca Pizzini Date: Wed, 11 Oct 2023 18:55:30 +0200 Subject: [PATCH] fix(stepfunctions-tasks): SNS FIFO tasks does not support messageGroupId and messageDeduplicationId (#27369) `SnsPublish` is failing the execution with FIFO topics due to some missing parameters. This fixes the problem by adding: * [`messageGroupId`](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html#:~:text=Required%3A%20No-,MessageGroupId,-This%20parameter%20applies) - Required for FIFO topics * [`messageDeduplicationId`](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html#:~:text=Required%3A%20No-,MessageDeduplicationId,-This%20parameter%20applies) - Required for FIFO topics with [`contentBasedDeduplication`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sns-topic.html#cfn-sns-topic-contentbaseddeduplication) disabled Closes #27341. ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license* --- ...ctions-tasks-sns-publish-integ.assets.json | 6 +- ...ions-tasks-sns-publish-integ.template.json | 112 +++++++- .../sns/integ.publish.js.snapshot/cdk.out | 2 +- .../sns/integ.publish.js.snapshot/integ.json | 2 +- .../integ.publish.js.snapshot/manifest.json | 35 ++- .../sns/integ.publish.js.snapshot/tree.json | 242 +++++++++++++++--- .../test/sns/integ.publish.ts | 21 ++ .../aws-cdk-lib/aws-sns/lib/topic-base.ts | 9 + packages/aws-cdk-lib/aws-sns/lib/topic.ts | 3 + .../lib/sns/publish.ts | 43 ++++ .../test/sns/publish.test.ts | 153 +++++++++++ 11 files changed, 569 insertions(+), 59 deletions(-) diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/aws-stepfunctions-tasks-sns-publish-integ.assets.json b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/aws-stepfunctions-tasks-sns-publish-integ.assets.json index 56302d33c2ca6..264c27e3340e7 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/aws-stepfunctions-tasks-sns-publish-integ.assets.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/aws-stepfunctions-tasks-sns-publish-integ.assets.json @@ -1,7 +1,7 @@ { - "version": "30.0.0", + "version": "33.0.0", "files": { - "fc62bfcd1fd4e167dcd5627971dff21335ad8ca3b09bd7e3c472830c6eef7372": { + "bc3f4e4d1be0f4a6d23d302efceeae3fb32518ab50986268c5d2519e621ec085": { "source": { "path": "aws-stepfunctions-tasks-sns-publish-integ.template.json", "packaging": "file" @@ -9,7 +9,7 @@ "destinations": { "current_account-current_region": { "bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}", - "objectKey": "fc62bfcd1fd4e167dcd5627971dff21335ad8ca3b09bd7e3c472830c6eef7372.json", + "objectKey": "bc3f4e4d1be0f4a6d23d302efceeae3fb32518ab50986268c5d2519e621ec085.json", "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}" } } diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/aws-stepfunctions-tasks-sns-publish-integ.template.json b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/aws-stepfunctions-tasks-sns-publish-integ.template.json index 6275490d5b23f..170a335d585ba 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/aws-stepfunctions-tasks-sns-publish-integ.template.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/aws-stepfunctions-tasks-sns-publish-integ.template.json @@ -46,19 +46,87 @@ "showmethemessagesawsstepfunctionstaskssnspublishintegcooltopic8388C976F1D63091": { "Type": "AWS::SNS::Subscription", "Properties": { + "Endpoint": { + "Fn::GetAtt": [ + "showmethemessages8D16BBDB", + "Arn" + ] + }, "Protocol": "sqs", "TopicArn": { "Ref": "cooltopic4736778A" + } + }, + "DependsOn": [ + "showmethemessagesPolicyB08B04B0" + ] + }, + "fifotopicA6114788": { + "Type": "AWS::SNS::Topic", + "Properties": { + "FifoTopic": true, + "TopicName": "awsstepfunctionstaskssnspublishinteg-fifotopic-6FE667F7.fifo" + } + }, + "fifoqueue3F2573B3": { + "Type": "AWS::SQS::Queue", + "Properties": { + "FifoQueue": true + }, + "UpdateReplacePolicy": "Delete", + "DeletionPolicy": "Delete" + }, + "fifoqueuePolicyCA528C39": { + "Type": "AWS::SQS::QueuePolicy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": "sqs:SendMessage", + "Condition": { + "ArnEquals": { + "aws:SourceArn": { + "Ref": "fifotopicA6114788" + } + } + }, + "Effect": "Allow", + "Principal": { + "Service": "sns.amazonaws.com" + }, + "Resource": { + "Fn::GetAtt": [ + "fifoqueue3F2573B3", + "Arn" + ] + } + } + ], + "Version": "2012-10-17" }, + "Queues": [ + { + "Ref": "fifoqueue3F2573B3" + } + ] + } + }, + "fifoqueueawsstepfunctionstaskssnspublishintegfifotopic6FE667F78F3219A7": { + "Type": "AWS::SNS::Subscription", + "Properties": { "Endpoint": { "Fn::GetAtt": [ - "showmethemessages8D16BBDB", + "fifoqueue3F2573B3", "Arn" ] + }, + "Protocol": "sqs", + "TopicArn": { + "Ref": "fifotopicA6114788" } }, "DependsOn": [ - "showmethemessagesPolicyB08B04B0" + "fifoqueuePolicyCA528C39" ] }, "StateMachineRoleB840431D": { @@ -86,9 +154,14 @@ { "Action": "sns:Publish", "Effect": "Allow", - "Resource": { - "Ref": "cooltopic4736778A" - } + "Resource": [ + { + "Ref": "cooltopic4736778A" + }, + { + "Ref": "fifotopicA6114788" + } + ] } ], "Version": "2012-10-17" @@ -104,17 +177,11 @@ "StateMachine2E01A3A5": { "Type": "AWS::StepFunctions::StateMachine", "Properties": { - "RoleArn": { - "Fn::GetAtt": [ - "StateMachineRoleB840431D", - "Arn" - ] - }, "DefinitionString": { "Fn::Join": [ "", [ - "{\"StartAt\":\"publish to SNS\",\"States\":{\"publish to SNS\":{\"Next\":\"Final step\",\"Type\":\"Task\",\"Resource\":\"arn:", + "{\"StartAt\":\"publish to SNS\",\"States\":{\"publish to SNS\":{\"Next\":\"publish to FIFO SNS\",\"Type\":\"Task\",\"Resource\":\"arn:", { "Ref": "AWS::Partition" }, @@ -122,9 +189,23 @@ { "Ref": "cooltopic4736778A" }, - "\",\"Message\":\"sending message over\"}},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}" + "\",\"Message\":\"sending message over\"}},\"publish to FIFO SNS\":{\"Next\":\"Final step\",\"Type\":\"Task\",\"Resource\":\"arn:", + { + "Ref": "AWS::Partition" + }, + ":states:::sns:publish\",\"Parameters\":{\"TopicArn\":\"", + { + "Ref": "fifotopicA6114788" + }, + "\",\"Message\":\"sending message over\",\"MessageDeduplicationId\":\"message-deduplication-id\",\"MessageGroupId\":\"message-group-id\"}},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}" ] ] + }, + "RoleArn": { + "Fn::GetAtt": [ + "StateMachineRoleB840431D", + "Arn" + ] } }, "DependsOn": [ @@ -145,6 +226,11 @@ "Value": { "Ref": "showmethemessages8D16BBDB" } + }, + "fifoQueueUrl": { + "Value": { + "Ref": "fifoqueue3F2573B3" + } } }, "Parameters": { diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/cdk.out b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/cdk.out index ae4b03c54e770..560dae10d018f 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/cdk.out +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/cdk.out @@ -1 +1 @@ -{"version":"30.0.0"} \ No newline at end of file +{"version":"33.0.0"} \ No newline at end of file diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/integ.json b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/integ.json index 63f8753289cac..ac3edad3e05f0 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/integ.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/integ.json @@ -1,5 +1,5 @@ { - "version": "30.0.0", + "version": "33.0.0", "testCases": { "integ.publish": { "stacks": [ diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/manifest.json b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/manifest.json index ba3d70b7224f8..90e148c675c5c 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/manifest.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/manifest.json @@ -1,5 +1,5 @@ { - "version": "30.0.0", + "version": "33.0.0", "artifacts": { "aws-stepfunctions-tasks-sns-publish-integ.assets": { "type": "cdk:asset-manifest", @@ -14,10 +14,11 @@ "environment": "aws://unknown-account/unknown-region", "properties": { "templateFile": "aws-stepfunctions-tasks-sns-publish-integ.template.json", + "terminationProtection": false, "validateOnSynth": false, "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", - "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/fc62bfcd1fd4e167dcd5627971dff21335ad8ca3b09bd7e3c472830c6eef7372.json", + "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/bc3f4e4d1be0f4a6d23d302efceeae3fb32518ab50986268c5d2519e621ec085.json", "requiresBootstrapStackVersion": 6, "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version", "additionalDependencies": [ @@ -57,6 +58,30 @@ "data": "showmethemessagesawsstepfunctionstaskssnspublishintegcooltopic8388C976F1D63091" } ], + "/aws-stepfunctions-tasks-sns-publish-integ/fifo-topic/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "fifotopicA6114788" + } + ], + "/aws-stepfunctions-tasks-sns-publish-integ/fifo-queue/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "fifoqueue3F2573B3" + } + ], + "/aws-stepfunctions-tasks-sns-publish-integ/fifo-queue/Policy/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "fifoqueuePolicyCA528C39" + } + ], + "/aws-stepfunctions-tasks-sns-publish-integ/fifo-queue/awsstepfunctionstaskssnspublishintegfifotopic6FE667F7/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "fifoqueueawsstepfunctionstaskssnspublishintegfifotopic6FE667F78F3219A7" + } + ], "/aws-stepfunctions-tasks-sns-publish-integ/StateMachine/Role/Resource": [ { "type": "aws:cdk:logicalId", @@ -87,6 +112,12 @@ "data": "queueUrl" } ], + "/aws-stepfunctions-tasks-sns-publish-integ/fifoQueueUrl": [ + { + "type": "aws:cdk:logicalId", + "data": "fifoQueueUrl" + } + ], "/aws-stepfunctions-tasks-sns-publish-integ/BootstrapVersion": [ { "type": "aws:cdk:logicalId", diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/tree.json b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/tree.json index f0a34d32d8205..7a3d8eb76de76 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/tree.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.js.snapshot/tree.json @@ -20,13 +20,13 @@ "aws:cdk:cloudformation:props": {} }, "constructInfo": { - "fqn": "@aws-cdk/aws-sns.CfnTopic", + "fqn": "aws-cdk-lib.aws_sns.CfnTopic", "version": "0.0.0" } } }, "constructInfo": { - "fqn": "@aws-cdk/aws-sns.Topic", + "fqn": "aws-cdk-lib.aws_sns.Topic", "version": "0.0.0" } }, @@ -42,7 +42,7 @@ "aws:cdk:cloudformation:props": {} }, "constructInfo": { - "fqn": "@aws-cdk/aws-sqs.CfnQueue", + "fqn": "aws-cdk-lib.aws_sqs.CfnQueue", "version": "0.0.0" } }, @@ -89,13 +89,13 @@ } }, "constructInfo": { - "fqn": "@aws-cdk/aws-sqs.CfnQueuePolicy", + "fqn": "aws-cdk-lib.aws_sqs.CfnQueuePolicy", "version": "0.0.0" } } }, "constructInfo": { - "fqn": "@aws-cdk/aws-sqs.QueuePolicy", + "fqn": "aws-cdk-lib.aws_sqs.QueuePolicy", "version": "0.0.0" } }, @@ -109,32 +109,32 @@ "attributes": { "aws:cdk:cloudformation:type": "AWS::SNS::Subscription", "aws:cdk:cloudformation:props": { - "protocol": "sqs", - "topicArn": { - "Ref": "cooltopic4736778A" - }, "endpoint": { "Fn::GetAtt": [ "showmethemessages8D16BBDB", "Arn" ] + }, + "protocol": "sqs", + "topicArn": { + "Ref": "cooltopic4736778A" } } }, "constructInfo": { - "fqn": "@aws-cdk/aws-sns.CfnSubscription", + "fqn": "aws-cdk-lib.aws_sns.CfnSubscription", "version": "0.0.0" } } }, "constructInfo": { - "fqn": "@aws-cdk/aws-sns.Subscription", + "fqn": "aws-cdk-lib.aws_sns.Subscription", "version": "0.0.0" } } }, "constructInfo": { - "fqn": "@aws-cdk/aws-sqs.Queue", + "fqn": "aws-cdk-lib.aws_sqs.Queue", "version": "0.0.0" } }, @@ -142,7 +142,150 @@ "id": "publish to SNS", "path": "aws-stepfunctions-tasks-sns-publish-integ/publish to SNS", "constructInfo": { - "fqn": "@aws-cdk/aws-stepfunctions-tasks.SnsPublish", + "fqn": "aws-cdk-lib.aws_stepfunctions_tasks.SnsPublish", + "version": "0.0.0" + } + }, + "fifo-topic": { + "id": "fifo-topic", + "path": "aws-stepfunctions-tasks-sns-publish-integ/fifo-topic", + "children": { + "Resource": { + "id": "Resource", + "path": "aws-stepfunctions-tasks-sns-publish-integ/fifo-topic/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::SNS::Topic", + "aws:cdk:cloudformation:props": { + "fifoTopic": true, + "topicName": "awsstepfunctionstaskssnspublishinteg-fifotopic-6FE667F7.fifo" + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_sns.CfnTopic", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_sns.Topic", + "version": "0.0.0" + } + }, + "fifo-queue": { + "id": "fifo-queue", + "path": "aws-stepfunctions-tasks-sns-publish-integ/fifo-queue", + "children": { + "Resource": { + "id": "Resource", + "path": "aws-stepfunctions-tasks-sns-publish-integ/fifo-queue/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::SQS::Queue", + "aws:cdk:cloudformation:props": { + "fifoQueue": true + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_sqs.CfnQueue", + "version": "0.0.0" + } + }, + "Policy": { + "id": "Policy", + "path": "aws-stepfunctions-tasks-sns-publish-integ/fifo-queue/Policy", + "children": { + "Resource": { + "id": "Resource", + "path": "aws-stepfunctions-tasks-sns-publish-integ/fifo-queue/Policy/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::SQS::QueuePolicy", + "aws:cdk:cloudformation:props": { + "policyDocument": { + "Statement": [ + { + "Action": "sqs:SendMessage", + "Condition": { + "ArnEquals": { + "aws:SourceArn": { + "Ref": "fifotopicA6114788" + } + } + }, + "Effect": "Allow", + "Principal": { + "Service": "sns.amazonaws.com" + }, + "Resource": { + "Fn::GetAtt": [ + "fifoqueue3F2573B3", + "Arn" + ] + } + } + ], + "Version": "2012-10-17" + }, + "queues": [ + { + "Ref": "fifoqueue3F2573B3" + } + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_sqs.CfnQueuePolicy", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_sqs.QueuePolicy", + "version": "0.0.0" + } + }, + "awsstepfunctionstaskssnspublishintegfifotopic6FE667F7": { + "id": "awsstepfunctionstaskssnspublishintegfifotopic6FE667F7", + "path": "aws-stepfunctions-tasks-sns-publish-integ/fifo-queue/awsstepfunctionstaskssnspublishintegfifotopic6FE667F7", + "children": { + "Resource": { + "id": "Resource", + "path": "aws-stepfunctions-tasks-sns-publish-integ/fifo-queue/awsstepfunctionstaskssnspublishintegfifotopic6FE667F7/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::SNS::Subscription", + "aws:cdk:cloudformation:props": { + "endpoint": { + "Fn::GetAtt": [ + "fifoqueue3F2573B3", + "Arn" + ] + }, + "protocol": "sqs", + "topicArn": { + "Ref": "fifotopicA6114788" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_sns.CfnSubscription", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_sns.Subscription", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_sqs.Queue", + "version": "0.0.0" + } + }, + "publish to FIFO SNS": { + "id": "publish to FIFO SNS", + "path": "aws-stepfunctions-tasks-sns-publish-integ/publish to FIFO SNS", + "constructInfo": { + "fqn": "aws-cdk-lib.aws_stepfunctions_tasks.SnsPublish", "version": "0.0.0" } }, @@ -150,7 +293,7 @@ "id": "Final step", "path": "aws-stepfunctions-tasks-sns-publish-integ/Final step", "constructInfo": { - "fqn": "@aws-cdk/aws-stepfunctions.Pass", + "fqn": "aws-cdk-lib.aws_stepfunctions.Pass", "version": "0.0.0" } }, @@ -166,7 +309,7 @@ "id": "ImportRole", "path": "aws-stepfunctions-tasks-sns-publish-integ/StateMachine/Role/ImportRole", "constructInfo": { - "fqn": "@aws-cdk/core.Resource", + "fqn": "aws-cdk-lib.Resource", "version": "0.0.0" } }, @@ -191,7 +334,7 @@ } }, "constructInfo": { - "fqn": "@aws-cdk/aws-iam.CfnRole", + "fqn": "aws-cdk-lib.aws_iam.CfnRole", "version": "0.0.0" } }, @@ -210,9 +353,14 @@ { "Action": "sns:Publish", "Effect": "Allow", - "Resource": { - "Ref": "cooltopic4736778A" - } + "Resource": [ + { + "Ref": "cooltopic4736778A" + }, + { + "Ref": "fifotopicA6114788" + } + ] } ], "Version": "2012-10-17" @@ -226,19 +374,19 @@ } }, "constructInfo": { - "fqn": "@aws-cdk/aws-iam.CfnPolicy", + "fqn": "aws-cdk-lib.aws_iam.CfnPolicy", "version": "0.0.0" } } }, "constructInfo": { - "fqn": "@aws-cdk/aws-iam.Policy", + "fqn": "aws-cdk-lib.aws_iam.Policy", "version": "0.0.0" } } }, "constructInfo": { - "fqn": "@aws-cdk/aws-iam.Role", + "fqn": "aws-cdk-lib.aws_iam.Role", "version": "0.0.0" } }, @@ -248,17 +396,11 @@ "attributes": { "aws:cdk:cloudformation:type": "AWS::StepFunctions::StateMachine", "aws:cdk:cloudformation:props": { - "roleArn": { - "Fn::GetAtt": [ - "StateMachineRoleB840431D", - "Arn" - ] - }, "definitionString": { "Fn::Join": [ "", [ - "{\"StartAt\":\"publish to SNS\",\"States\":{\"publish to SNS\":{\"Next\":\"Final step\",\"Type\":\"Task\",\"Resource\":\"arn:", + "{\"StartAt\":\"publish to SNS\",\"States\":{\"publish to SNS\":{\"Next\":\"publish to FIFO SNS\",\"Type\":\"Task\",\"Resource\":\"arn:", { "Ref": "AWS::Partition" }, @@ -266,20 +408,34 @@ { "Ref": "cooltopic4736778A" }, - "\",\"Message\":\"sending message over\"}},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}" + "\",\"Message\":\"sending message over\"}},\"publish to FIFO SNS\":{\"Next\":\"Final step\",\"Type\":\"Task\",\"Resource\":\"arn:", + { + "Ref": "AWS::Partition" + }, + ":states:::sns:publish\",\"Parameters\":{\"TopicArn\":\"", + { + "Ref": "fifotopicA6114788" + }, + "\",\"Message\":\"sending message over\",\"MessageDeduplicationId\":\"message-deduplication-id\",\"MessageGroupId\":\"message-group-id\"}},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}" ] ] + }, + "roleArn": { + "Fn::GetAtt": [ + "StateMachineRoleB840431D", + "Arn" + ] } } }, "constructInfo": { - "fqn": "@aws-cdk/aws-stepfunctions.CfnStateMachine", + "fqn": "aws-cdk-lib.aws_stepfunctions.CfnStateMachine", "version": "0.0.0" } } }, "constructInfo": { - "fqn": "@aws-cdk/aws-stepfunctions.StateMachine", + "fqn": "aws-cdk-lib.aws_stepfunctions.StateMachine", "version": "0.0.0" } }, @@ -287,7 +443,7 @@ "id": "stateMachineArn", "path": "aws-stepfunctions-tasks-sns-publish-integ/stateMachineArn", "constructInfo": { - "fqn": "@aws-cdk/core.CfnOutput", + "fqn": "aws-cdk-lib.CfnOutput", "version": "0.0.0" } }, @@ -295,7 +451,15 @@ "id": "queueUrl", "path": "aws-stepfunctions-tasks-sns-publish-integ/queueUrl", "constructInfo": { - "fqn": "@aws-cdk/core.CfnOutput", + "fqn": "aws-cdk-lib.CfnOutput", + "version": "0.0.0" + } + }, + "fifoQueueUrl": { + "id": "fifoQueueUrl", + "path": "aws-stepfunctions-tasks-sns-publish-integ/fifoQueueUrl", + "constructInfo": { + "fqn": "aws-cdk-lib.CfnOutput", "version": "0.0.0" } }, @@ -303,7 +467,7 @@ "id": "BootstrapVersion", "path": "aws-stepfunctions-tasks-sns-publish-integ/BootstrapVersion", "constructInfo": { - "fqn": "@aws-cdk/core.CfnParameter", + "fqn": "aws-cdk-lib.CfnParameter", "version": "0.0.0" } }, @@ -311,13 +475,13 @@ "id": "CheckBootstrapVersion", "path": "aws-stepfunctions-tasks-sns-publish-integ/CheckBootstrapVersion", "constructInfo": { - "fqn": "@aws-cdk/core.CfnRule", + "fqn": "aws-cdk-lib.CfnRule", "version": "0.0.0" } } }, "constructInfo": { - "fqn": "@aws-cdk/core.Stack", + "fqn": "aws-cdk-lib.Stack", "version": "0.0.0" } }, @@ -326,12 +490,12 @@ "path": "Tree", "constructInfo": { "fqn": "constructs.Construct", - "version": "10.1.237" + "version": "10.2.70" } } }, "constructInfo": { - "fqn": "@aws-cdk/core.App", + "fqn": "aws-cdk-lib.App", "version": "0.0.0" } } diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.ts b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.ts index 3ddefdb17527c..35438fbe1aa22 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.ts +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-stepfunctions-tasks/test/sns/integ.publish.ts @@ -32,9 +32,26 @@ const publishTask = new SnsPublish(stack, 'publish to SNS', { message: sfn.TaskInput.fromText('sending message over'), }); +const fifoTopic = new sns.Topic(stack, 'fifo-topic', { + fifo: true, +}); +const fifoQueue = new sqs.Queue(stack, 'fifo-queue', { + fifo: true, +}); + +fifoTopic.addSubscription(new subs.SqsSubscription(fifoQueue)); + +const publishFifoTask = new SnsPublish(stack, 'publish to FIFO SNS', { + topic: fifoTopic, + message: sfn.TaskInput.fromText('sending message over'), + messageGroupId: 'message-group-id', + messageDeduplicationId: 'message-deduplication-id', +}); + const finalStatus = new sfn.Pass(stack, 'Final step'); const chain = sfn.Chain.start(publishTask) + .next(publishFifoTask) .next(finalStatus); const sm = new sfn.StateMachine(stack, 'StateMachine', { @@ -50,4 +67,8 @@ new cdk.CfnOutput(stack, 'queueUrl', { value: queue.queueUrl, }); +new cdk.CfnOutput(stack, 'fifoQueueUrl', { + value: fifoQueue.queueUrl, +}); + app.synth(); diff --git a/packages/aws-cdk-lib/aws-sns/lib/topic-base.ts b/packages/aws-cdk-lib/aws-sns/lib/topic-base.ts index d7a4fb808c5c2..602d83f866369 100644 --- a/packages/aws-cdk-lib/aws-sns/lib/topic-base.ts +++ b/packages/aws-cdk-lib/aws-sns/lib/topic-base.ts @@ -25,6 +25,13 @@ export interface ITopic extends IResource, notifications.INotificationRuleTarget */ readonly topicName: string; + /** + * Enables content-based deduplication for FIFO topics. + * + * @attribute + */ + readonly contentBasedDeduplication: boolean; + /** * Whether this topic is an Amazon SNS FIFO queue. If false, this is a standard topic. * @@ -62,6 +69,8 @@ export abstract class TopicBase extends Resource implements ITopic { public abstract readonly fifo: boolean; + public abstract readonly contentBasedDeduplication: boolean; + /** * Controls automatic creation of policy objects. * diff --git a/packages/aws-cdk-lib/aws-sns/lib/topic.ts b/packages/aws-cdk-lib/aws-sns/lib/topic.ts index d8d585844e986..9619b6889bd2a 100644 --- a/packages/aws-cdk-lib/aws-sns/lib/topic.ts +++ b/packages/aws-cdk-lib/aws-sns/lib/topic.ts @@ -65,6 +65,7 @@ export class Topic extends TopicBase { public readonly topicArn = topicArn; public readonly topicName = Stack.of(scope).splitArn(topicArn, ArnFormat.NO_RESOURCE_NAME).resource; public readonly fifo = this.topicName.endsWith('.fifo'); + public readonly contentBasedDeduplication = false; protected autoCreatePolicy: boolean = false; } @@ -73,6 +74,7 @@ export class Topic extends TopicBase { public readonly topicArn: string; public readonly topicName: string; + public readonly contentBasedDeduplication: boolean; public readonly fifo: boolean; protected readonly autoCreatePolicy: boolean = true; @@ -114,5 +116,6 @@ export class Topic extends TopicBase { }); this.topicName = this.getResourceNameAttribute(resource.attrTopicName); this.fifo = props.fifo || false; + this.contentBasedDeduplication = props.contentBasedDeduplication || false; } } diff --git a/packages/aws-cdk-lib/aws-stepfunctions-tasks/lib/sns/publish.ts b/packages/aws-cdk-lib/aws-stepfunctions-tasks/lib/sns/publish.ts index 4f25f7d81cb3a..63ec97c6efee9 100644 --- a/packages/aws-cdk-lib/aws-stepfunctions-tasks/lib/sns/publish.ts +++ b/packages/aws-cdk-lib/aws-stepfunctions-tasks/lib/sns/publish.ts @@ -112,6 +112,32 @@ export interface SnsPublishProps extends sfn.TaskStateBaseProps { * @default - No subject */ readonly subject?: string; + + /** + * This parameter applies only to FIFO topics. + * + * The MessageGroupId is a tag that specifies that a message belongs to a specific message group. + * Messages that belong to the same message group are processed in a FIFO manner + * (however, messages in different message groups might be processed out of order). + * Every message must include a MessageGroupId. + * + * @default - Not used for standard topics, required for FIFO topics. + */ + readonly messageGroupId?: string; + + /** + * This parameter applies only to FIFO topics. + * + * Every message must have a unique MessageDeduplicationId, which is a token used for deduplication of sent messages. + * If a message with a particular MessageDeduplicationId is sent successfully, any message sent with the same MessageDeduplicationId + * during the 5-minute deduplication interval is treated as a duplicate. + * + * If the topic has ContentBasedDeduplication set, the system generates a MessageDeduplicationId + * based on the contents of the message. Your MessageDeduplicationId overrides the generated one. + * + * @default - Not used for standard topics, required for FIFO topics with ContentBasedDeduplication disabled. + */ + readonly messageDeduplicationId?: string; } /** @@ -142,6 +168,21 @@ export class SnsPublish extends sfn.TaskStateBase { } } + if (props.topic.fifo) { + if (!props.messageGroupId) { + throw new Error('\'messageGroupId\' is required for FIFO topics'); + } + if (props.messageGroupId.length > 128) { + throw new Error(`\'messageGroupId\' must be at most 128 characters long, got ${props.messageGroupId.length}`); + } + if (!props.topic.contentBasedDeduplication && !props.messageDeduplicationId) { + throw new Error('\'messageDeduplicationId\' is required for FIFO topics with \'contentBasedDeduplication\' disabled'); + } + if (props.messageDeduplicationId && props.messageDeduplicationId.length > 128) { + throw new Error(`\'messageDeduplicationId\' must be at most 128 characters long, got ${props.messageDeduplicationId.length}`); + } + } + this.taskPolicies = [ new iam.PolicyStatement({ actions: ['sns:Publish'], @@ -162,6 +203,8 @@ export class SnsPublish extends sfn.TaskStateBase { Parameters: sfn.FieldUtils.renderObject({ TopicArn: this.props.topic.topicArn, Message: this.props.message.value, + MessageDeduplicationId: this.props.messageDeduplicationId, + MessageGroupId: this.props.messageGroupId, MessageStructure: this.props.messagePerSubscriptionType ? 'json' : undefined, MessageAttributes: renderMessageAttributes(this.props.messageAttributes), Subject: this.props.subject, diff --git a/packages/aws-cdk-lib/aws-stepfunctions-tasks/test/sns/publish.test.ts b/packages/aws-cdk-lib/aws-stepfunctions-tasks/test/sns/publish.test.ts index d9c0ed107bd25..c20aeca1df599 100644 --- a/packages/aws-cdk-lib/aws-stepfunctions-tasks/test/sns/publish.test.ts +++ b/packages/aws-cdk-lib/aws-stepfunctions-tasks/test/sns/publish.test.ts @@ -336,4 +336,157 @@ describe('Publish', () => { }); }).toThrow(/Unsupported service integration pattern. Supported Patterns: REQUEST_RESPONSE,WAIT_FOR_TASK_TOKEN. Received: RUN_JOB/); }); + + test('MessageGroupId and messageDeduplicationId supplied to FIFO topic without contentBasedDeduplication', () => { + // GIVEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic', { + fifo: true, + }); + + // WHEN + const task = new SnsPublish(stack, 'Publish', { + topic, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + message: sfn.TaskInput.fromText('Publish this message'), + messageGroupId: 'messageGroupId', + messageDeduplicationId: 'messageDeduplicationId', + }); + + // THEN + expect(stack.resolve(task.toStateJson())).toEqual({ + Type: 'Task', + Resource: { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':states:::sns:publish', + ], + ], + }, + End: true, + Parameters: { + TopicArn: { + Ref: 'TopicBFC7AF6E', + }, + Message: 'Publish this message', + MessageGroupId: 'messageGroupId', + MessageDeduplicationId: 'messageDeduplicationId', + }, + }); + }); + + test('MessageGroupId supplied to FIFO topic with contentBasedDeduplication', () => { + // GIVEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic', { + fifo: true, + contentBasedDeduplication: true, + }); + + // WHEN + const task = new SnsPublish(stack, 'Publish', { + topic, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + message: sfn.TaskInput.fromText('Publish this message'), + messageGroupId: 'messageGroupId', + }); + + // THEN + expect(stack.resolve(task.toStateJson())).toEqual({ + Type: 'Task', + Resource: { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':states:::sns:publish', + ], + ], + }, + End: true, + Parameters: { + TopicArn: { + Ref: 'TopicBFC7AF6E', + }, + Message: 'Publish this message', + MessageGroupId: 'messageGroupId', + }, + }); + }); + + test('messageGroupId is required for FIFO topics', () => { + // GIVEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic', { + fifo: true, + }); + + expect(() => { + new SnsPublish(stack, 'Publish', { + topic, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + message: sfn.TaskInput.fromText('Publish this message'), + }); + }).toThrow(/'messageGroupId' is required for FIFO topics/); + }); + + test('messageGroupId length validation', () => { + // GIVEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic', { + fifo: true, + }); + + expect(() => { + new SnsPublish(stack, 'Publish', { + topic, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + message: sfn.TaskInput.fromText('Publish this message'), + messageGroupId: 'long'.repeat(100), + }); + }).toThrow(/'messageGroupId' must be at most 128 characters long/); + }); + + test('messageDeduplicationId is required for FIFO topics with contentBasedDeduplication disabled', () => { + // GIVEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic', { + fifo: true, + }); + + expect(() => { + new SnsPublish(stack, 'Publish', { + topic, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + message: sfn.TaskInput.fromText('Publish this message'), + messageGroupId: 'messageGroupId', + }); + }).toThrow(/'messageDeduplicationId' is required for FIFO topics with 'contentBasedDeduplication' disabled/); + }); + + test('messageDeduplicationId length validation', () => { + // GIVEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic', { + fifo: true, + }); + + expect(() => { + new SnsPublish(stack, 'Publish', { + topic, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + message: sfn.TaskInput.fromText('Publish this message'), + messageGroupId: 'messageGroupId', + messageDeduplicationId: 'long'.repeat(100), + }); + }).toThrow(/'messageDeduplicationId' must be at most 128 characters long/); + }); });