diff --git a/cmd/server/main.go b/cmd/server/main.go index 3983574..49d07d2 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -90,6 +90,15 @@ func getWorkerBufferSize(serverOpts []server.Opt) []server.Opt { return append(serverOpts, server.WithWorkerBufferSize(value)) } +func getCleanExpiredEventsFreq(serverOpts []server.Opt) []server.Opt { + value, err := time.ParseDuration(os.Getenv("BUS_CLEAN_EXPIRED_EVENTS_FREQ")) + if err != nil { + return serverOpts + } + + return append(serverOpts, server.WithCleanExpiredEventsFreq(value)) +} + func main() { ctx := context.TODO() @@ -104,6 +113,7 @@ func main() { serverOpts = getBatchWindowSize(serverOpts) serverOpts = getBatchWindowDuration(serverOpts) serverOpts = getWorkerBufferSize(serverOpts) + serverOpts = getCleanExpiredEventsFreq(serverOpts) handler, err := server.New( ctx, diff --git a/docker-compose.yml b/docker-compose.yml index 448a70a..3b43671 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,7 @@ services: - BUS_BATCH_WINDOW_SIZE=100 - BUS_BATCH_WINDOW_DURATION=5s - BUS_WORKER_BUFFER_SIZE=1000 + - BUS_CLEAN_EXPIRED_EVENTS_FREQ=30s # if ports is removed, then the bus service can # only be accessed from within the network bus diff --git a/server/server.db.go b/server/server.db.go index 9ba3a6b..27ab826 100644 --- a/server/server.db.go +++ b/server/server.db.go @@ -99,3 +99,10 @@ func (h *Handler) CreateConsumer(ctx context.Context, consumer *bus.Consumer) (e return } + +func (h *Handler) DeleteExpiredEvents(ctx context.Context) (err error) { + h.dbw.Submit(func(conn *sqlite.Conn) { + err = storage.DeleteExpiredEvents(ctx, conn) + }) + return +} diff --git a/server/server.go b/server/server.go index 785608f..9f5ea28 100644 --- a/server/server.go +++ b/server/server.go @@ -20,11 +20,12 @@ import ( // OPTIONS type config struct { - dbOpts []sqlite.OptionFunc - dbPoolSize int - batchWindowSize int - batchWindowDuration time.Duration - workerBufferSize int64 + dbOpts []sqlite.OptionFunc + dbPoolSize int + batchWindowSize int + batchWindowDuration time.Duration + workerBufferSize int64 + cleanExpiredEventsFreq time.Duration } type Opt interface { @@ -75,6 +76,13 @@ func WithWorkerBufferSize(size int64) Opt { }) } +func WithCleanExpiredEventsFreq(freq time.Duration) Opt { + return optFn(func(s *config) error { + s.cleanExpiredEventsFreq = freq + return nil + }) +} + // HANDLER type Handler struct { @@ -82,6 +90,7 @@ type Handler struct { dbw *sqlite.Worker consumersEventMap *bus.ConsumersEventMap batch *batch.Sort + closeCh chan struct{} } var _ http.Handler = (*Handler)(nil) @@ -393,6 +402,26 @@ func (h *Handler) notify(consumerId string, event *bus.Event) { } } +func (h *Handler) removeExpiredEventsLoop(ctx context.Context, freq time.Duration) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(freq): + err := h.DeleteExpiredEvents(ctx) + if err != nil { + slog.Error("failed to delete expired events", "error", err) + } + case <-h.closeCh: + return + } + } +} + +func (h *Handler) Close() { + close(h.closeCh) +} + func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.mux.ServeHTTP(w, r) } @@ -402,9 +431,10 @@ func New(ctx context.Context, opts ...Opt) (*Handler, error) { dbOpts: []sqlite.OptionFunc{ sqlite.WithMemory(), }, - batchWindowSize: 20, - batchWindowDuration: 500 * time.Millisecond, - workerBufferSize: 1000, + batchWindowSize: 20, + batchWindowDuration: 500 * time.Millisecond, + workerBufferSize: 1000, + cleanExpiredEventsFreq: 30 * time.Second, } for _, opt := range opts { err := opt.configureHandler(conf) @@ -419,6 +449,7 @@ func New(ctx context.Context, opts ...Opt) (*Handler, error) { h := &Handler{ consumersEventMap: bus.NewConsumersEventMap(conf.dbPoolSize, 2*time.Second), + closeCh: make(chan struct{}), } h.batch = batch.NewSort(conf.batchWindowSize, conf.batchWindowDuration, func(events []*bus.Event) { @@ -443,5 +474,7 @@ func New(ctx context.Context, opts ...Opt) (*Handler, error) { h.mux.HandleFunc("HEAD /", h.ackedHandler) // HEAD /?consumer_id=123&event_id=456 h.mux.HandleFunc("DELETE /", h.deleteConsumerHandler) // DELETE /?consumer_id=123 + go h.removeExpiredEventsLoop(ctx, conf.cleanExpiredEventsFreq) + return h, nil } diff --git a/server/storage/queries.go b/server/storage/queries.go index 2245d85..198439f 100644 --- a/server/storage/queries.go +++ b/server/storage/queries.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "time" "ella.to/bus" "ella.to/sqlite" @@ -406,3 +407,27 @@ func AckEvent(ctx context.Context, conn *sqlite.Conn, consumerId, eventId string _, err = stmt.Step() return err } + +func DeleteExpiredEventsBeforeDate(ctx context.Context, conn *sqlite.Conn, before time.Time) (err error) { + defer conn.Save(&err)() + + const sql = `DELETE FROM events WHERE expires_at < ?;` + + stmt, err := conn.Prepare(ctx, sql, before) + if err != nil { + return err + } + + defer stmt.Finalize() + + _, err = stmt.Step() + if err != nil { + return err + } + + return nil +} + +func DeleteExpiredEvents(ctx context.Context, conn *sqlite.Conn) (err error) { + return DeleteExpiredEventsBeforeDate(ctx, conn, time.Now()) +} diff --git a/server/storage/schema/0001-data.sql b/server/storage/schema/0001-data.sql index b42f732..bc7899f 100644 --- a/server/storage/schema/0001-data.sql +++ b/server/storage/schema/0001-data.sql @@ -63,6 +63,8 @@ CREATE INDEX IF NOT EXISTS consumers_queue_name ON consumers (queue_name); CREATE INDEX IF NOT EXISTS consumers_last_event_id ON consumers (last_event_id); +CREATE INDEX IF NOT EXISTS consumers_expires_in ON consumers (expires_in); + -- -- CONSUMERS_EVENTS --