From d25b4a681eb2f669163e887b94a71a172d0bb3ef Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 28 Jan 2025 18:05:59 -0800 Subject: [PATCH] Adding Kafka message guardrail. --- models/event/event.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/models/event/event.go b/models/event/event.go index d11381dc3..ad9a3cd25 100644 --- a/models/event/event.go +++ b/models/event/event.go @@ -268,6 +268,14 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali // If the message is Kafka, then we only need the latest one // If it's pubsub, we will store all of them in memory. This is because GCP pub/sub REQUIRES us to ack every single message if message.Kind() == artie.Kafka { + kafkaMessage, ok := td.PartitionsToLastMessage[message.Partition()] + if ok { + // Guardrail to make sure we are not going backwards in the stream. + if kafkaMessage[0].KafkaMsg.Offset > message.KafkaMsg.Offset { + return false, "", fmt.Errorf("kafka message offset is less than the previous message offset") + } + } + td.PartitionsToLastMessage[message.Partition()] = []artie.Message{message} } else { td.PartitionsToLastMessage[message.Partition()] = append(td.PartitionsToLastMessage[message.Partition()], message)