Skip to content

Commit

Permalink
Adding Kafka message guardrail.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jan 29, 2025
1 parent 3c970ed commit d25b4a6
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,14 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali
// If the message is Kafka, then we only need the latest one
// If it's pubsub, we will store all of them in memory. This is because GCP pub/sub REQUIRES us to ack every single message
if message.Kind() == artie.Kafka {
kafkaMessage, ok := td.PartitionsToLastMessage[message.Partition()]
if ok {
// Guardrail to make sure we are not going backwards in the stream.
if kafkaMessage[0].KafkaMsg.Offset > message.KafkaMsg.Offset {
return false, "", fmt.Errorf("kafka message offset is less than the previous message offset")
}
}

td.PartitionsToLastMessage[message.Partition()] = []artie.Message{message}
} else {
td.PartitionsToLastMessage[message.Partition()] = append(td.PartitionsToLastMessage[message.Partition()], message)
Expand Down

0 comments on commit d25b4a6

Please sign in to comment.