Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: record offset for target topic instead of src #36

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/relay/relay.go
Original file line number Diff line number Diff line change
@@ -225,7 +225,10 @@ loop:
// Always record the latest offsets before the messages are processed for new connections and
// retries to consume from where it was left off.
// TODO: What if the next step fails? The messages won't be read again?
re.source.RecordOffsets(rec)
if err := re.source.RecordOffsets(rec); err != nil {
re.log.Error("error recording offset", "err", err)
return err
}

if err := re.processMessage(ctx, rec); err != nil {
re.log.Error("error processing message", "err", err)
19 changes: 15 additions & 4 deletions internal/relay/source_pool.go
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ type SourcePool struct {
log *slog.Logger
metrics *metrics.Set
targetToSrc map[string]string
srcToTarget map[string]string
srcTopics []string

// targetOffsets is initialized with current topic high watermarks from target.
@@ -103,17 +104,20 @@ func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerCfg, topics Topics, t
}

var (
srcToTarg = make(map[string]string, len(topics))
targToSrc = make(map[string]string, len(topics))
srcTopics = make([]string, 0, len(topics))
)
for src, targ := range topics {
srcTopics = append(srcTopics, src)
targToSrc[targ.TargetTopic] = src
srcToTarg[src] = targ.TargetTopic
}

sp := &SourcePool{
cfg: cfg,
targetToSrc: targToSrc,
srcToTarget: srcToTarg,
srcTopics: srcTopics,
servers: servers,
log: log,
@@ -223,21 +227,28 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) {

// RecordOffsets records the offsets of the latest fetched records per topic.
// This is used to resume consumption on new connections/reconnections from the source during runtime.
func (sp *SourcePool) RecordOffsets(rec *kgo.Record) {
func (sp *SourcePool) RecordOffsets(rec *kgo.Record) error {
if sp.targetOffsets == nil {
sp.targetOffsets = make(TopicOffsets)
}

if o, ok := sp.targetOffsets[rec.Topic]; ok {
topic, ok := sp.srcToTarget[rec.Topic]
if !ok {
return fmt.Errorf("target topic not found for src topic %s", rec.Topic)
}

if o, ok := sp.targetOffsets[topic]; ok {
// If the topic already exists, update the offset for the partition.
o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
sp.targetOffsets[rec.Topic] = o
sp.targetOffsets[topic] = o
} else {
// If the topic does not exist, create a new map for the topic.
o := make(map[int32]kgo.Offset)
o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
sp.targetOffsets[rec.Topic] = o
sp.targetOffsets[topic] = o
}

return nil
}

func (sp *SourcePool) GetHighWatermark(ctx context.Context, cl *kgo.Client) (kadm.ListedOffsets, error) {