Skip to content

Commit

Permalink
feat(pipes-targets): add step function target (#29987)
Browse files Browse the repository at this point in the history
### Issue #29665

Closes #29665

### Reason for this change
Step Function target is not supported yet by pipes-targets.


### Description of changes
- Added step function as a pipes target.
- I've decided to make the `invocationType` a required parameter, since this made the code clearer and make users aware of how they want the step function to be invoked. 
The [AWS::Pipes::Pipe PipeTargetStateMachineParameters](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetstatemachineparameters.html) has this as an optional parameter (defaulting to Request-Response), which can lead the user unknowingly in a broken pipe, because cdk's StateMachines [default](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_stepfunctions.StateMachine.html#statemachinetype) to Standard Workflow, which is not compatible with Request-Response Invocation Type.
- Currently there seems no way to prevent users from creating a pipe with an imported Standard StateMachine and InvocationType Request-Response as the stateMachineType cant be read (or I dont know how :D)


### Description of how you validated changes
- Added unit tests
- Added integration test


### Checklist
- [x] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md)

----
I've talked with @RaphaelManke and he was fine for me opening up a PR (put him as a co-author nevertheless) :)

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
WtfJoke authored May 14, 2024
1 parent 205163f commit b0975e4
Show file tree
Hide file tree
Showing 16 changed files with 34,071 additions and 1 deletion.
61 changes: 60 additions & 1 deletion packages/@aws-cdk/aws-pipes-targets-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ For more details see the service documentation:

Pipe targets are the end point of a EventBridge Pipe.

The following targets are supported:

1. `targets.SqsTarget`: [Send event source to a Queue](#amazon-sqs)
2. `targets.SfnStateMachine`: [Invoke a State Machine from an event source](#aws-step-functions)

### Amazon SQS

A SQS message queue can be used as a target for a pipe. Messages will be pushed to the queue.
Expand All @@ -43,7 +48,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
});
```

The target configuration can be transformed:
The target input can be transformed:

```ts
declare const sourceQueue: sqs.Queue;
Expand All @@ -63,3 +68,57 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
target: pipeTarget
});
```

### AWS Step Functions State Machine

A State Machine can be used as a target for a pipe. The State Machine will be invoked with the (enriched/filtered) source payload.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetStateMachine: sfn.IStateMachine;

const pipeTarget = new targets.SfnStateMachine(targetStateMachine,{});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```

Specifying the Invocation Type when the target State Machine is invoked:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetStateMachine: sfn.IStateMachine;

const pipeTarget = new targets.SfnStateMachine(targetStateMachine,
{
invocationType: targets.StateMachineInvocationType.FIRE_AND_FORGET,
}
);


const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```

The input to the target State Machine can be transformed:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetStateMachine: sfn.IStateMachine;

const pipeTarget = new targets.SfnStateMachine(targetStateMachine,
{
inputTransformation: pipes.InputTransformation.fromObject({ body: '<$.body>' }),
invocationType: targets.StateMachineInvocationType.FIRE_AND_FORGET,
}
);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './sqs';
export * from './stepfunctions';
84 changes: 84 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/stepfunctions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
import { IRole } from 'aws-cdk-lib/aws-iam';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import { StateMachine, StateMachineType } from 'aws-cdk-lib/aws-stepfunctions';

/**
* Parameters for the SfnStateMachine target
*/
export interface SfnStateMachineParameters {
/**
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default none
*/
readonly inputTransformation?: IInputTransformation;

/**
* Specify whether to invoke the State Machine synchronously (`REQUEST_RESPONSE`) or asynchronously (`FIRE_AND_FORGET`).
*
* @see http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetsqsqueueparameters.html#cfn-pipes-pipe-pipetargetsqsqueueparameters-messagededuplicationid
* @default StateMachineInvocationType.FIRE_AND_FORGET
*/
readonly invocationType?: StateMachineInvocationType;
}

/**
* InvocationType for invoking the State Machine.
* @see https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetStateMachineParameters.html
*/
export enum StateMachineInvocationType {
/**
* Invoke StepFunction asynchronously (`StartExecution`). See https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html for more details.
*/
FIRE_AND_FORGET = 'FIRE_AND_FORGET',

/**
* Invoke StepFunction synchronously (`StartSyncExecution`) and wait for the execution to complete. See https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartSyncExecution.html for more details.
*/
REQUEST_RESPONSE = 'REQUEST_RESPONSE',
}

/**
* An EventBridge Pipes target that sends messages to an AWS Step Functions State Machine.
*/
export class SfnStateMachine implements ITarget {
public readonly targetArn: string;

private readonly stateMachine: sfn.IStateMachine;
private readonly invocationType: StateMachineInvocationType;
private readonly inputTemplate?: IInputTransformation;

constructor(stateMachine: sfn.IStateMachine, parameters: SfnStateMachineParameters) {
this.stateMachine = stateMachine;
this.targetArn = stateMachine.stateMachineArn;
this.invocationType = parameters.invocationType?? StateMachineInvocationType.FIRE_AND_FORGET;
this.inputTemplate = parameters.inputTransformation;

if (this.stateMachine instanceof StateMachine
&& this.stateMachine.stateMachineType === StateMachineType.STANDARD
&& this.invocationType === StateMachineInvocationType.REQUEST_RESPONSE) {
throw new Error('STANDARD state machine workflows do not support the REQUEST_RESPONSE invocation type. Use FIRE_AND_FORGET instead.');
}
}

grantPush(grantee: IRole): void {
if (this.invocationType === StateMachineInvocationType.FIRE_AND_FORGET) {
this.stateMachine.grantStartExecution(grantee);
} else {
this.stateMachine.grantStartSyncExecution(grantee);
}
}

bind(pipe: IPipe): TargetConfig {
return {
targetParameters: {
inputTemplate: this.inputTemplate?.bind(pipe).inputTemplate,
stepFunctionStateMachineParameters: {
invocationType: this.invocationType,
},
},
};
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Fixture with packages imported, but nothing else
import * as cdk from 'aws-cdk-lib';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import { Construct } from 'constructs';
import * as pipes from '@aws-cdk/aws-pipes-alpha';
import * as targets from '@aws-cdk/aws-pipes-targets-alpha';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`step-function should grant pipe role push access (StartAsyncExecution) with default invocation type (FIRE_AND_FORGET) 1`] = `
{
"MySfnPipeRoleF1D0F697": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
"MyStateMachineRoleD59FFEBC": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": {
"Fn::FindInMap": [
"ServiceprincipalMap",
{
"Ref": "AWS::Region",
},
"states",
],
},
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`step-function should grant pipe role push access (StartAsyncExecution) with invocation type FIRE_AND_FORGET 1`] = `
{
"MySfnPipeRoleF1D0F697": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
"MyStateMachineRoleD59FFEBC": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": {
"Fn::FindInMap": [
"ServiceprincipalMap",
{
"Ref": "AWS::Region",
},
"states",
],
},
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`step-function should grant pipe role push access (StartSyncExecution) with invocation type REQUEST-RESPONSE 1`] = `
{
"MySfnPipeRoleF1D0F697": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
"MyStateMachineRoleD59FFEBC": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": {
"Fn::FindInMap": [
"ServiceprincipalMap",
{
"Ref": "AWS::Region",
},
"states",
],
},
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;
Loading

0 comments on commit b0975e4

Please sign in to comment.