Skip to content

Commit

Permalink
Add Offline to the consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
alinz committed Aug 17, 2024
1 parent 5df6bd3 commit 242ec50
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 3 deletions.
1 change: 1 addition & 0 deletions bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ type Consumer struct {
Id string `json:"id"`
Subject string `json:"subject"`
Type ConsumerType `json:"type"`
Online bool `json:"online"`
QueueName string `json:"queue_name"`
BatchSize int64 `json:"batch_size"`
AckedCount int64 `json:"acked_count"`
Expand Down
18 changes: 18 additions & 0 deletions server/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func (s *Server) registerConsumer(ctx context.Context, consumer *bus.Consumer) (
consumer.LastEventId = lastEventId
}

consumer.Online = true

// Need to check if consumer is durable and has been registered before
// Also we need to check if the consumer is alive and a new consumer is registered
// with the same id, which means we need to return an error
Expand Down Expand Up @@ -207,3 +209,19 @@ func (s *Server) deleteExpiredEvents(ctx context.Context) (err error) {

return nil
}

func (s *Server) offlineConsumer(ctx context.Context, consumerId string) (err error) {
consumer, err := s.storage.LoadConsumerById(ctx, consumerId)
if err != nil {
return err
}

consumer.Online = false

err = s.storage.SaveConsumer(ctx, consumer)
if err != nil {
return err
}

return nil
}
25 changes: 25 additions & 0 deletions server/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
AckEvent // AckEvent is used to acknowledge an event
DeleteConsumer // DeleteConsumer is used to delete a consumer
DeleteExpiredEvents // DeleteExpiredEvents is used to delete expired events
OfflineConsumer // OfflineConsumer is used to set a consumer offline
Cleanup // Cleanup is used to clean up the bus for expired events and consumers
)

Expand Down Expand Up @@ -58,6 +59,7 @@ type Dispatcher struct {
ackEventFunc func(ctx context.Context, consumerId string, eventId string) error
deleteConsumerFunc func(ctx context.Context, consumerId string) error
deleteExpiredEvents func(ctx context.Context) error
offlinceConsumerFunc func(ctx context.Context, consumerId string) error
}

func (d *Dispatcher) PutEvent(ctx context.Context, evt *bus.Event) error {
Expand Down Expand Up @@ -130,6 +132,17 @@ func (d *Dispatcher) DeleteExpiredEvents(ctx context.Context) error {
return <-action.Error
}

func (d *Dispatcher) OfflineConsumer(ctx context.Context, consumerId string) error {
action := d.getAction()
action.Type = OfflineConsumer
action.ConsumerId = consumerId
action.Ctx = ctx

d.pushAction(action)

return <-action.Error
}

func (d *Dispatcher) getAction() *Action {
select {
case action := <-d.actionsPool:
Expand Down Expand Up @@ -168,6 +181,8 @@ func (d *Dispatcher) run() {
action.Error <- d.deleteConsumerFunc(action.Ctx, action.ConsumerId)
case DeleteExpiredEvents:
action.Error <- d.deleteExpiredEvents(action.Ctx)
case OfflineConsumer:
action.Error <- d.offlinceConsumerFunc(action.Ctx, action.ConsumerId)
}

action.clean()
Expand Down Expand Up @@ -221,6 +236,12 @@ func withDeleteExpiredEventsFunc(fn func(context.Context) error) dipatcherOpt {
}
}

func withOfflineConsumerFunc(fn func(context.Context, string) error) dipatcherOpt {
return func(d *Dispatcher) {
d.offlinceConsumerFunc = fn
}
}

func NewDispatcher(bufferSize int, poolSize int, fns ...dipatcherOpt) *Dispatcher {
d := &Dispatcher{
actions: make(chan *Action, bufferSize),
Expand Down Expand Up @@ -256,6 +277,10 @@ func NewDispatcher(bufferSize int, poolSize int, fns ...dipatcherOpt) *Dispatche
panic("deleteExpiredEvents is required")
}

if d.offlinceConsumerFunc == nil {
panic("offlinceConsumerFunc is required")
}

for i := 0; i < poolSize; i++ {
d.actionsPool <- newAction(true)
}
Expand Down
4 changes: 4 additions & 0 deletions server/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func TestDispatcher(t *testing.T) {
deleteExpiredEvents := func(ctx context.Context) error {
return nil
}
offlineConsumerFunc := func(ctx context.Context, consumerId string) error {
return nil
}

dispatcher := NewDispatcher(
1,
Expand All @@ -38,6 +41,7 @@ func TestDispatcher(t *testing.T) {
withAckEventFunc(ackEventFunc),
withDeleteConsumerFunc(deleteConsumerFunc),
withDeleteExpiredEventsFunc(deleteExpiredEvents),
withOfflineConsumerFunc(offlineConsumerFunc),
)

n := 100
Expand Down
4 changes: 4 additions & 0 deletions server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func (s *Server) consumerHandler(w http.ResponseWriter, r *http.Request) {
defer func() {
// delete consumer if it's ephemeral or queue consumer
if consumer.Id == "" || consumer.Type == bus.Durable {
err := s.dispatcher.OfflineConsumer(context.Background(), consumer.Id)
if err != nil {
slog.Error("failed to offline consumer", "consumer_id", consumer.Id)
}
return
}

Expand Down
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func New(ctx context.Context, storage storage.Storage, opts ...ServerOpt) *Serve
withAckEventFunc(s.ackEvent),
withDeleteConsumerFunc(s.deleteConsumer),
withDeleteExpiredEventsFunc(s.deleteExpiredEvents),
withOfflineConsumerFunc(s.offlineConsumer),
)

s.RegisterHandlers()
Expand Down
1 change: 1 addition & 0 deletions storage/schema/0001-data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ CREATE TABLE
id TEXT NOT NULL,
subject TEXT NOT NULL,
type INTEGER NOT NULL,
online INTEGER NOT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
queue_name TEXT, -- <- queue name, can be null
acked_count INTEGER NOT NULL DEFAULT 0,
Expand Down
9 changes: 6 additions & 3 deletions storage/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,25 @@ func (s *Sqlite) SaveConsumer(ctx context.Context, c *bus.Consumer) (err error)
id,
subject,
type,
online,
batch_size,
acked_count,
queue_name,
last_event_id,
updated_at
)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
(?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
online = EXCLUDED.online,
acked_count = EXCLUDED.acked_count,
last_event_id = EXCLUDED.last_event_id,
updated_at = EXCLUDED.updated_at
;`,
c.Id,
c.Subject,
int64(c.Type),
c.Online,
c.BatchSize,
c.AckedCount,
queueName,
Expand Down Expand Up @@ -170,7 +173,7 @@ func (s *Sqlite) LoadNextQueueConsumerByName(ctx context.Context, queueName stri
var stmt *sqlite.Stmt
var hasRow bool

stmt, err = conn.Prepare(ctx, `SELECT * FROM consumers WHERE queue_name = ? ORDER BY acked_count ASC LIMIT 1;`, queueName)
stmt, err = conn.Prepare(ctx, `SELECT * FROM consumers WHERE queue_name = ? AND online = 1 ORDER BY acked_count ASC LIMIT 1;`, queueName)
if err != nil {
return
}
Expand Down

0 comments on commit 242ec50

Please sign in to comment.