Skip to content

Commit

Permalink
Fix event deleivery issue
Browse files Browse the repository at this point in the history
  • Loading branch information
alinz committed Aug 18, 2024
1 parent 7ce1c54 commit b0454ab
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 9 deletions.
4 changes: 2 additions & 2 deletions event/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type Sync struct {
}

func (s *Sync) Register(subject string, fn Func) {
if strings.Contains(subject, "*") {
panic("subject should not contains * in event.Sync.Register")
if strings.Contains(subject, "*") || strings.Contains(subject, ">") {
panic("subject should not contains * or > in event.Sync.Register")
}

if s.IsLocked() {
Expand Down
41 changes: 36 additions & 5 deletions event/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,60 @@ package event_test

import (
"context"
"log/slog"
"testing"
"time"

"ella.to/bus"
"ella.to/bus/event"
"ella.to/bus/internal/testutil"
"ella.to/sqlite"
"github.com/stretchr/testify/assert"
)

func TestEventSync(t *testing.T) {
t.Skip("skipping test becase race test failed on this")

client := testutil.PrepareTestServer(t, testutil.WithDatabasePath("./test.db"))
slog.SetLogLoggerLevel(slog.LevelDebug)

s := event.NewSync(nil, client)
client := testutil.PrepareTestServer(
t,
testutil.WithDatabasePath("./test.db"),
)

s := event.NewSync(nil, client, event.WithSubject("test.>"))

s.Register("test.1", func(ctx context.Context, wdb *sqlite.Worker, evt *bus.Event) error {
return nil
})

s.Lock()
err := s.Once(context.Background(), 1*time.Second)
assert.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
ctx := context.Background()
// ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
// defer cancel()

go func() {
err := s.Continue(ctx)
if err != nil {
slog.Error("continue: ", "err", err)
}
}()
assert.NoError(t, err)

evt, err := bus.NewEvent(
bus.WithSubject("test.1"),
bus.WithData(struct {
Name string `json:"name"`
}{
Name: "test name",
}),
)
assert.NoError(t, err)

err = s.Continue(ctx)
err = client.Put(context.Background(), evt)
assert.NoError(t, err)

time.Sleep(2 * time.Second)
Expand Down
Binary file modified event/test.db
Binary file not shown.
4 changes: 2 additions & 2 deletions storage/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ func (s *Sqlite) SaveConsumer(ctx context.Context, c *bus.Consumer) (err error)
queueName = c.QueueName
}

c.Subject = changeSubjectToPattern(c.Subject)

stmt, err = conn.Prepare(ctx,
`INSERT INTO consumers
(
Expand Down Expand Up @@ -202,6 +200,8 @@ func (s *Sqlite) LoadEventsByConsumerId(ctx context.Context, consumerId string)
return
}

consumer.Subject = changeSubjectToPattern(consumer.Subject)

s.wdb.Submit(func(conn *sqlite.Conn) {
var stmt *sqlite.Stmt

Expand Down

0 comments on commit b0454ab

Please sign in to comment.