Skip to content

Commit

Permalink
cr
Browse files Browse the repository at this point in the history
  • Loading branch information
lyuboxa committed Jun 13, 2024
1 parent e33b68d commit aebb318
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion source/logrepl/internal/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func CreateSubscription(
slotLSN, err = pglogrepl.ParseLSN(result.ConsistentPoint)
if err != nil {
if startLSN == 0 {
return nil, fmt.Errorf("failed to parse consistent wal lsn: %w", err)
sdk.Logger(ctx).Warn().
Msg("start LSN is zero, using existing replication slot without position")
}

slotLSN = startLSN
Expand Down Expand Up @@ -232,6 +233,12 @@ func (s *Subscription) handleXLogData(ctx context.Context, copyDataMsg *pgproto3
return fmt.Errorf("handler error: %w", err)
}

// When starting on an existing slot without having a consistent slot LSN,
// set the flushed WAL LSN to just before the received LSN
if s.walWritten == 0 && s.ConsistentSlotLSN == 0 {
atomic.StoreUint64((*uint64)(&s.walFlushed), uint64(writtenLSN-1))
}

if writtenLSN > 0 {
s.walWritten = writtenLSN
}
Expand Down

0 comments on commit aebb318

Please sign in to comment.