Skip to content

Commit

Permalink
fix: Allow Pipes with Kinesis Targets (#143)
Browse files Browse the repository at this point in the history
Co-authored-by: Anton Babenko <anton@antonbabenko.com>
  • Loading branch information
soisyourface and antonbabenko authored Jan 10, 2025
1 parent c947f01 commit b4b29bf
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 2 deletions.
39 changes: 39 additions & 0 deletions examples/with-pipes/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,39 @@ module "eventbridge" {
}
}

# With Kinesis Stream source and Kinesis Stream target
kinesis_source_kinesis_target = {
source = aws_kinesis_stream.source.arn
target = aws_kinesis_stream.target.arn

source_parameters = {
kinesis_stream_parameters = {
batch_size = 7
maximum_batching_window_in_seconds = 90
maximum_record_age_in_seconds = 100
maximum_retry_attempts = 4
on_partial_batch_item_failure = "AUTOMATIC_BISECT"
parallelization_factor = 5
starting_position = "TRIM_HORIZON"
starting_position_timestamp = null
dead_letter_config = {
arn = aws_sqs_queue.dlq.arn
}
}
}

target_parameters = {
kinesis_stream_parameters = {
# Must be a json path and start with $.
partition_key = "$.id"
}
}

tags = {
Pipe = "kinesis_source_kinesis_target"
}
}

# With SQS Queue source and EventBridge target
sqs_source_eventbridge_target = {
source = aws_sqs_queue.source.arn
Expand Down Expand Up @@ -429,6 +462,12 @@ resource "aws_kinesis_stream" "source" {
shard_count = 1
}

resource "aws_kinesis_stream" "target" {
name = "${random_pet.this.id}-target"

shard_count = 1
}

##################################
# CloudWatch Log Group and Stream
##################################
Expand Down
5 changes: 3 additions & 2 deletions iam_pipes.tf
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ locals {

# Enrichment / Target
lambda = {
values = [v.target, try(aws_cloudwatch_event_api_destination.this[v.enrichment].arn, null)],
values = [v.target, try(v.enrichment, null), try(aws_cloudwatch_event_api_destination.this[v.enrichment].arn, null)],
matching_services = ["lambda"]
},
step_functions = {
Expand Down Expand Up @@ -191,7 +191,8 @@ locals {

kinesis_target = {
actions = [
"kinesis:PutRecord"
"kinesis:PutRecord",
"kinesis:PutRecords"
]
}

Expand Down
7 changes: 7 additions & 0 deletions main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,13 @@ resource "aws_pipes_pipe" "this" {
message_group_id = try(sqs_queue_parameters.value.message_group_id, null)
}
}
dynamic "kinesis_stream_parameters" {
for_each = try([target_parameters.value.kinesis_stream_parameters], [])

content {
partition_key = try(kinesis_stream_parameters.value.partition_key, null)
}
}

dynamic "cloudwatch_logs_parameters" {
for_each = try([target_parameters.value.cloudwatch_logs_parameters], [])
Expand Down

0 comments on commit b4b29bf

Please sign in to comment.