diff --git a/event/sync.go b/event/sync.go index 68c350f..a5e478c 100644 --- a/event/sync.go +++ b/event/sync.go @@ -29,8 +29,8 @@ type Sync struct { } func (s *Sync) Register(subject string, fn Func) { - if strings.Contains(subject, "*") { - panic("subject should not contains * in event.Sync.Register") + if strings.Contains(subject, "*") || strings.Contains(subject, ">") { + panic("subject should not contains * or > in event.Sync.Register") } if s.IsLocked() { diff --git a/event/sync_test.go b/event/sync_test.go index 6a789aa..c7c03a4 100644 --- a/event/sync_test.go +++ b/event/sync_test.go @@ -2,29 +2,60 @@ package event_test import ( "context" + "log/slog" "testing" "time" + "ella.to/bus" "ella.to/bus/event" "ella.to/bus/internal/testutil" + "ella.to/sqlite" "github.com/stretchr/testify/assert" ) func TestEventSync(t *testing.T) { t.Skip("skipping test becase race test failed on this") - client := testutil.PrepareTestServer(t, testutil.WithDatabasePath("./test.db")) + slog.SetLogLoggerLevel(slog.LevelDebug) - s := event.NewSync(nil, client) + client := testutil.PrepareTestServer( + t, + testutil.WithDatabasePath("./test.db"), + ) + + s := event.NewSync(nil, client, event.WithSubject("test.>")) + + s.Register("test.1", func(ctx context.Context, wdb *sqlite.Worker, evt *bus.Event) error { + return nil + }) s.Lock() err := s.Once(context.Background(), 1*time.Second) assert.NoError(t, err) - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() + ctx := context.Background() + // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + // defer cancel() + + go func() { + err := s.Continue(ctx) + if err != nil { + slog.Error("continue: ", "err", err) + } + }() + assert.NoError(t, err) + + evt, err := bus.NewEvent( + bus.WithSubject("test.1"), + bus.WithData(struct { + Name string `json:"name"` + }{ + Name: "test name", + }), + ) + assert.NoError(t, err) - err = s.Continue(ctx) + err = client.Put(context.Background(), evt) assert.NoError(t, err) time.Sleep(2 * time.Second) diff --git a/event/test.db b/event/test.db index 3b5f25d..abf1ed9 100644 Binary files a/event/test.db and b/event/test.db differ diff --git a/storage/sqlite.go b/storage/sqlite.go index 2f53976..603b5e6 100644 --- a/storage/sqlite.go +++ b/storage/sqlite.go @@ -67,8 +67,6 @@ func (s *Sqlite) SaveConsumer(ctx context.Context, c *bus.Consumer) (err error) queueName = c.QueueName } - c.Subject = changeSubjectToPattern(c.Subject) - stmt, err = conn.Prepare(ctx, `INSERT INTO consumers ( @@ -202,6 +200,8 @@ func (s *Sqlite) LoadEventsByConsumerId(ctx context.Context, consumerId string) return } + consumer.Subject = changeSubjectToPattern(consumer.Subject) + s.wdb.Submit(func(conn *sqlite.Conn) { var stmt *sqlite.Stmt