Skip to content

Commit

Permalink
Refactor validating event (#925)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Sep 23, 2024
1 parent 0cfeb12 commit 067f4c8
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
23 changes: 13 additions & 10 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package event

import (
"cmp"
"errors"
"fmt"
"log/slog"
"sort"
Expand Down Expand Up @@ -99,27 +98,31 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi
}, nil
}

func (e *Event) IsValid() bool {
func (e *Event) Validate() error {
// Does it have a PK or table set?
if stringutil.Empty(e.Table) {
return false
return fmt.Errorf("table name is empty")
}

if len(e.PrimaryKeyMap) == 0 {
return false
return fmt.Errorf("primary key map is empty")
}

if len(e.Data) == 0 {
return false
return fmt.Errorf("event has no data")
}

if e.mode == config.History {
// History mode does not have the delete column marker.
return true
return nil
}

// Check if delete flag exists.
_, isOk := e.Data[constants.DeleteColumnMarker]
return isOk
if _, isOk := e.Data[constants.DeleteColumnMarker]; !isOk {
return fmt.Errorf("delete column marker does not exist")
}

return nil
}

// PrimaryKeys is returned in a sorted manner to be safe.
Expand Down Expand Up @@ -148,8 +151,8 @@ func (e *Event) PrimaryKeyValue() string {
// Save will save the event into our in memory event
// It will return (flush bool, flushReason string, err error)
func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkalib.TopicConfig, message artie.Message) (bool, string, error) {
if !e.IsValid() {
return false, "", errors.New("event not valid")
if err := e.Validate(); err != nil {
return false, "", fmt.Errorf("event validation failed: %w", err)
}

// Does the table exist?
Expand Down
12 changes: 6 additions & 6 deletions models/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ var idMap = map[string]any{
"id": 123,
}

func (e *EventsTestSuite) TestEvent_IsValid() {
func (e *EventsTestSuite) TestEvent_Validate() {
{
_evt := Event{Table: "foo"}
assert.False(e.T(), _evt.IsValid())
assert.ErrorContains(e.T(), _evt.Validate(), "primary key map is empty")
}
{
_evt := Event{Table: "foo", PrimaryKeyMap: idMap}
assert.False(e.T(), _evt.IsValid())
assert.ErrorContains(e.T(), _evt.Validate(), "event has no data")
}
{
_evt := Event{
Expand All @@ -30,7 +30,7 @@ func (e *EventsTestSuite) TestEvent_IsValid() {
},
mode: config.History,
}
assert.True(e.T(), _evt.IsValid())
assert.Nil(e.T(), _evt.Validate())
}
{
_evt := Event{
Expand All @@ -40,15 +40,15 @@ func (e *EventsTestSuite) TestEvent_IsValid() {
"foo": "bar",
},
}
assert.False(e.T(), _evt.IsValid())
assert.ErrorContains(e.T(), _evt.Validate(), "")
}
{
_evt := Event{
Table: "foo",
PrimaryKeyMap: idMap,
Data: map[string]any{constants.DeleteColumnMarker: true, constants.OnlySetDeleteColumnMarker: true},
}
assert.True(e.T(), _evt.IsValid())
assert.Nil(e.T(), _evt.Validate())
}
}

Expand Down

0 comments on commit 067f4c8

Please sign in to comment.