Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alinz committed May 17, 2024
1 parent 9093a3d commit b30e446
Showing 1 changed file with 48 additions and 0 deletions.
48 changes: 48 additions & 0 deletions bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package bus_test

import (
"context"
"log/slog"
"net/http/httptest"
"os"
"sync/atomic"
"testing"
"time"

"ella.to/bus"
"ella.to/bus/client"
"ella.to/bus/server"
"github.com/stretchr/testify/assert"
Expand All @@ -32,3 +36,47 @@ func setupBusServer(t *testing.T) *client.Client {

return c
}

func TestConfirmConsumer(t *testing.T) {
slog.SetLogLoggerLevel(slog.LevelDebug)

c := setupBusServer(t)

var hit int64

go func() {
ctx := context.TODO()

// NOTE: we need to make sure consumer always starts reading from the oldest
// as it could happen that the event can be sent before the consumer starts

for msg, err := range c.Get(ctx, bus.WithFromOldest(), bus.WithSubject("a.b.c"), bus.WithManualAck()) {

assert.NoError(t, err)
assert.NotNil(t, msg)
assert.Len(t, msg.Events, 1)

evt := msg.Events[0]

assert.Equal(t, "a.b.c", evt.Subject)
assert.Equal(t, `"hello"`, string(evt.Data))

atomic.AddInt64(&hit, 1)
err = msg.Ack(ctx)
assert.NoError(t, err)

break
}
}()

evt, err := bus.NewEvent(bus.WithSubject("a.b.c"), bus.WithConfirm(1), bus.WithJsonData("hello"))
assert.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

err = c.Put(ctx, evt)

assert.NoError(t, err)
assert.Equal(t, int64(1), atomic.LoadInt64(&hit))
}

0 comments on commit b30e446

Please sign in to comment.