diff --git a/bus_test.go b/bus_test.go index 5f6aea6..83864a5 100644 --- a/bus_test.go +++ b/bus_test.go @@ -2,10 +2,14 @@ package bus_test import ( "context" + "log/slog" "net/http/httptest" "os" + "sync/atomic" "testing" + "time" + "ella.to/bus" "ella.to/bus/client" "ella.to/bus/server" "github.com/stretchr/testify/assert" @@ -32,3 +36,47 @@ func setupBusServer(t *testing.T) *client.Client { return c } + +func TestConfirmConsumer(t *testing.T) { + slog.SetLogLoggerLevel(slog.LevelDebug) + + c := setupBusServer(t) + + var hit int64 + + go func() { + ctx := context.TODO() + + // NOTE: we need to make sure consumer always starts reading from the oldest + // as it could happen that the event can be sent before the consumer starts + + for msg, err := range c.Get(ctx, bus.WithFromOldest(), bus.WithSubject("a.b.c"), bus.WithManualAck()) { + + assert.NoError(t, err) + assert.NotNil(t, msg) + assert.Len(t, msg.Events, 1) + + evt := msg.Events[0] + + assert.Equal(t, "a.b.c", evt.Subject) + assert.Equal(t, `"hello"`, string(evt.Data)) + + atomic.AddInt64(&hit, 1) + err = msg.Ack(ctx) + assert.NoError(t, err) + + break + } + }() + + evt, err := bus.NewEvent(bus.WithSubject("a.b.c"), bus.WithConfirm(1), bus.WithJsonData("hello")) + assert.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + err = c.Put(ctx, evt) + + assert.NoError(t, err) + assert.Equal(t, int64(1), atomic.LoadInt64(&hit)) +}