Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Apr 11, 2024
1 parent 64792a0 commit ccefd70
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions lib/cdc/util/relational_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ func (s *SchemaEventPayload) GetTableName() string {
func (s *SchemaEventPayload) GetData(pkMap map[string]any, tc *kafkalib.TopicConfig) map[string]any {
var retMap map[string]any
if len(s.Payload.After) == 0 {
if len(s.Payload.Before) > 0 {
retMap = s.Payload.Before
} else {
retMap = make(map[string]any)
}
// This is a delete payload, so mark it as deleted.
// And we need to reconstruct the data bit since it will be empty.
// We _can_ rely on *before* since even without running replicate identity, it will still copy over
// the PK. We can explore simplifying this interface in the future by leveraging before.
retMap = map[string]any{
constants.DeleteColumnMarker: true,
}

retMap[constants.DeleteColumnMarker] = true
for k, v := range pkMap {
retMap[k] = v
}
Expand Down

0 comments on commit ccefd70

Please sign in to comment.