diff --git a/bus.go b/bus.go index a21494a..85d466b 100644 --- a/bus.go +++ b/bus.go @@ -319,6 +319,7 @@ type Consumer struct { Id string `json:"id"` Subject string `json:"subject"` Type ConsumerType `json:"type"` + Online bool `json:"online"` QueueName string `json:"queue_name"` BatchSize int64 `json:"batch_size"` AckedCount int64 `json:"acked_count"` diff --git a/server/action.go b/server/action.go index c143de3..898eaf9 100644 --- a/server/action.go +++ b/server/action.go @@ -33,6 +33,8 @@ func (s *Server) registerConsumer(ctx context.Context, consumer *bus.Consumer) ( consumer.LastEventId = lastEventId } + consumer.Online = true + // Need to check if consumer is durable and has been registered before // Also we need to check if the consumer is alive and a new consumer is registered // with the same id, which means we need to return an error @@ -207,3 +209,19 @@ func (s *Server) deleteExpiredEvents(ctx context.Context) (err error) { return nil } + +func (s *Server) offlineConsumer(ctx context.Context, consumerId string) (err error) { + consumer, err := s.storage.LoadConsumerById(ctx, consumerId) + if err != nil { + return err + } + + consumer.Online = false + + err = s.storage.SaveConsumer(ctx, consumer) + if err != nil { + return err + } + + return nil +} diff --git a/server/dispatcher.go b/server/dispatcher.go index 6ba22cd..60fc3bb 100644 --- a/server/dispatcher.go +++ b/server/dispatcher.go @@ -17,6 +17,7 @@ const ( AckEvent // AckEvent is used to acknowledge an event DeleteConsumer // DeleteConsumer is used to delete a consumer DeleteExpiredEvents // DeleteExpiredEvents is used to delete expired events + OfflineConsumer // OfflineConsumer is used to set a consumer offline Cleanup // Cleanup is used to clean up the bus for expired events and consumers ) @@ -58,6 +59,7 @@ type Dispatcher struct { ackEventFunc func(ctx context.Context, consumerId string, eventId string) error deleteConsumerFunc func(ctx context.Context, consumerId string) error deleteExpiredEvents func(ctx context.Context) error + offlinceConsumerFunc func(ctx context.Context, consumerId string) error } func (d *Dispatcher) PutEvent(ctx context.Context, evt *bus.Event) error { @@ -130,6 +132,17 @@ func (d *Dispatcher) DeleteExpiredEvents(ctx context.Context) error { return <-action.Error } +func (d *Dispatcher) OfflineConsumer(ctx context.Context, consumerId string) error { + action := d.getAction() + action.Type = OfflineConsumer + action.ConsumerId = consumerId + action.Ctx = ctx + + d.pushAction(action) + + return <-action.Error +} + func (d *Dispatcher) getAction() *Action { select { case action := <-d.actionsPool: @@ -168,6 +181,8 @@ func (d *Dispatcher) run() { action.Error <- d.deleteConsumerFunc(action.Ctx, action.ConsumerId) case DeleteExpiredEvents: action.Error <- d.deleteExpiredEvents(action.Ctx) + case OfflineConsumer: + action.Error <- d.offlinceConsumerFunc(action.Ctx, action.ConsumerId) } action.clean() @@ -221,6 +236,12 @@ func withDeleteExpiredEventsFunc(fn func(context.Context) error) dipatcherOpt { } } +func withOfflineConsumerFunc(fn func(context.Context, string) error) dipatcherOpt { + return func(d *Dispatcher) { + d.offlinceConsumerFunc = fn + } +} + func NewDispatcher(bufferSize int, poolSize int, fns ...dipatcherOpt) *Dispatcher { d := &Dispatcher{ actions: make(chan *Action, bufferSize), @@ -256,6 +277,10 @@ func NewDispatcher(bufferSize int, poolSize int, fns ...dipatcherOpt) *Dispatche panic("deleteExpiredEvents is required") } + if d.offlinceConsumerFunc == nil { + panic("offlinceConsumerFunc is required") + } + for i := 0; i < poolSize; i++ { d.actionsPool <- newAction(true) } diff --git a/server/dispatcher_test.go b/server/dispatcher_test.go index 65c22a0..e2103bb 100644 --- a/server/dispatcher_test.go +++ b/server/dispatcher_test.go @@ -27,6 +27,9 @@ func TestDispatcher(t *testing.T) { deleteExpiredEvents := func(ctx context.Context) error { return nil } + offlineConsumerFunc := func(ctx context.Context, consumerId string) error { + return nil + } dispatcher := NewDispatcher( 1, @@ -38,6 +41,7 @@ func TestDispatcher(t *testing.T) { withAckEventFunc(ackEventFunc), withDeleteConsumerFunc(deleteConsumerFunc), withDeleteExpiredEventsFunc(deleteExpiredEvents), + withOfflineConsumerFunc(offlineConsumerFunc), ) n := 100 diff --git a/server/http.go b/server/http.go index f25fc02..b13d3f5 100644 --- a/server/http.go +++ b/server/http.go @@ -68,6 +68,10 @@ func (s *Server) consumerHandler(w http.ResponseWriter, r *http.Request) { defer func() { // delete consumer if it's ephemeral or queue consumer if consumer.Id == "" || consumer.Type == bus.Durable { + err := s.dispatcher.OfflineConsumer(context.Background(), consumer.Id) + if err != nil { + slog.Error("failed to offline consumer", "consumer_id", consumer.Id) + } return } diff --git a/server/server.go b/server/server.go index b3cfd4b..e6787e7 100644 --- a/server/server.go +++ b/server/server.go @@ -81,6 +81,7 @@ func New(ctx context.Context, storage storage.Storage, opts ...ServerOpt) *Serve withAckEventFunc(s.ackEvent), withDeleteConsumerFunc(s.deleteConsumer), withDeleteExpiredEventsFunc(s.deleteExpiredEvents), + withOfflineConsumerFunc(s.offlineConsumer), ) s.RegisterHandlers() diff --git a/storage/schema/0001-data.sql b/storage/schema/0001-data.sql index b28af99..d3b0bad 100644 --- a/storage/schema/0001-data.sql +++ b/storage/schema/0001-data.sql @@ -27,6 +27,7 @@ CREATE TABLE id TEXT NOT NULL, subject TEXT NOT NULL, type INTEGER NOT NULL, + online INTEGER NOT NULL, batch_size INTEGER NOT NULL DEFAULT 1, queue_name TEXT, -- <- queue name, can be null acked_count INTEGER NOT NULL DEFAULT 0, diff --git a/storage/sqlite.go b/storage/sqlite.go index 8ddaa63..2f53976 100644 --- a/storage/sqlite.go +++ b/storage/sqlite.go @@ -75,6 +75,7 @@ func (s *Sqlite) SaveConsumer(ctx context.Context, c *bus.Consumer) (err error) id, subject, type, + online, batch_size, acked_count, queue_name, @@ -82,8 +83,9 @@ func (s *Sqlite) SaveConsumer(ctx context.Context, c *bus.Consumer) (err error) updated_at ) VALUES - (?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(id) DO UPDATE SET + (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + online = EXCLUDED.online, acked_count = EXCLUDED.acked_count, last_event_id = EXCLUDED.last_event_id, updated_at = EXCLUDED.updated_at @@ -91,6 +93,7 @@ func (s *Sqlite) SaveConsumer(ctx context.Context, c *bus.Consumer) (err error) c.Id, c.Subject, int64(c.Type), + c.Online, c.BatchSize, c.AckedCount, queueName, @@ -170,7 +173,7 @@ func (s *Sqlite) LoadNextQueueConsumerByName(ctx context.Context, queueName stri var stmt *sqlite.Stmt var hasRow bool - stmt, err = conn.Prepare(ctx, `SELECT * FROM consumers WHERE queue_name = ? ORDER BY acked_count ASC LIMIT 1;`, queueName) + stmt, err = conn.Prepare(ctx, `SELECT * FROM consumers WHERE queue_name = ? AND online = 1 ORDER BY acked_count ASC LIMIT 1;`, queueName) if err != nil { return }