Skip to content

Commit

Permalink
Add one more index for expires_in field and provide a loop to clean u…
Browse files Browse the repository at this point in the history
…p the expired events
  • Loading branch information
alinz committed Apr 23, 2024
1 parent 528803c commit 3f4a25d
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 8 deletions.
10 changes: 10 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ func getWorkerBufferSize(serverOpts []server.Opt) []server.Opt {
return append(serverOpts, server.WithWorkerBufferSize(value))
}

func getCleanExpiredEventsFreq(serverOpts []server.Opt) []server.Opt {
value, err := time.ParseDuration(os.Getenv("BUS_CLEAN_EXPIRED_EVENTS_FREQ"))
if err != nil {
return serverOpts
}

return append(serverOpts, server.WithCleanExpiredEventsFreq(value))
}

func main() {
ctx := context.TODO()

Expand All @@ -104,6 +113,7 @@ func main() {
serverOpts = getBatchWindowSize(serverOpts)
serverOpts = getBatchWindowDuration(serverOpts)
serverOpts = getWorkerBufferSize(serverOpts)
serverOpts = getCleanExpiredEventsFreq(serverOpts)

handler, err := server.New(
ctx,
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ services:
- BUS_BATCH_WINDOW_SIZE=100
- BUS_BATCH_WINDOW_DURATION=5s
- BUS_WORKER_BUFFER_SIZE=1000
- BUS_CLEAN_EXPIRED_EVENTS_FREQ=30s

# if ports is removed, then the bus service can
# only be accessed from within the network bus
Expand Down
7 changes: 7 additions & 0 deletions server/server.db.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,10 @@ func (h *Handler) CreateConsumer(ctx context.Context, consumer *bus.Consumer) (e

return
}

func (h *Handler) DeleteExpiredEvents(ctx context.Context) (err error) {
h.dbw.Submit(func(conn *sqlite.Conn) {
err = storage.DeleteExpiredEvents(ctx, conn)
})
return
}
49 changes: 41 additions & 8 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (
// OPTIONS

type config struct {
dbOpts []sqlite.OptionFunc
dbPoolSize int
batchWindowSize int
batchWindowDuration time.Duration
workerBufferSize int64
dbOpts []sqlite.OptionFunc
dbPoolSize int
batchWindowSize int
batchWindowDuration time.Duration
workerBufferSize int64
cleanExpiredEventsFreq time.Duration
}

type Opt interface {
Expand Down Expand Up @@ -75,13 +76,21 @@ func WithWorkerBufferSize(size int64) Opt {
})
}

func WithCleanExpiredEventsFreq(freq time.Duration) Opt {
return optFn(func(s *config) error {
s.cleanExpiredEventsFreq = freq
return nil
})
}

// HANDLER

type Handler struct {
mux http.ServeMux
dbw *sqlite.Worker
consumersEventMap *bus.ConsumersEventMap
batch *batch.Sort
closeCh chan struct{}
}

var _ http.Handler = (*Handler)(nil)
Expand Down Expand Up @@ -393,6 +402,26 @@ func (h *Handler) notify(consumerId string, event *bus.Event) {
}
}

func (h *Handler) removeExpiredEventsLoop(ctx context.Context, freq time.Duration) {
for {
select {
case <-ctx.Done():
return
case <-time.After(freq):
err := h.DeleteExpiredEvents(ctx)
if err != nil {
slog.Error("failed to delete expired events", "error", err)
}
case <-h.closeCh:
return
}
}
}

func (h *Handler) Close() {
close(h.closeCh)
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.mux.ServeHTTP(w, r)
}
Expand All @@ -402,9 +431,10 @@ func New(ctx context.Context, opts ...Opt) (*Handler, error) {
dbOpts: []sqlite.OptionFunc{
sqlite.WithMemory(),
},
batchWindowSize: 20,
batchWindowDuration: 500 * time.Millisecond,
workerBufferSize: 1000,
batchWindowSize: 20,
batchWindowDuration: 500 * time.Millisecond,
workerBufferSize: 1000,
cleanExpiredEventsFreq: 30 * time.Second,
}
for _, opt := range opts {
err := opt.configureHandler(conf)
Expand All @@ -419,6 +449,7 @@ func New(ctx context.Context, opts ...Opt) (*Handler, error) {

h := &Handler{
consumersEventMap: bus.NewConsumersEventMap(conf.dbPoolSize, 2*time.Second),
closeCh: make(chan struct{}),
}

h.batch = batch.NewSort(conf.batchWindowSize, conf.batchWindowDuration, func(events []*bus.Event) {
Expand All @@ -443,5 +474,7 @@ func New(ctx context.Context, opts ...Opt) (*Handler, error) {
h.mux.HandleFunc("HEAD /", h.ackedHandler) // HEAD /?consumer_id=123&event_id=456
h.mux.HandleFunc("DELETE /", h.deleteConsumerHandler) // DELETE /?consumer_id=123

go h.removeExpiredEventsLoop(ctx, conf.cleanExpiredEventsFreq)

return h, nil
}
25 changes: 25 additions & 0 deletions server/storage/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strings"
"time"

"ella.to/bus"
"ella.to/sqlite"
Expand Down Expand Up @@ -406,3 +407,27 @@ func AckEvent(ctx context.Context, conn *sqlite.Conn, consumerId, eventId string
_, err = stmt.Step()
return err
}

func DeleteExpiredEventsBeforeDate(ctx context.Context, conn *sqlite.Conn, before time.Time) (err error) {
defer conn.Save(&err)()

const sql = `DELETE FROM events WHERE expires_at < ?;`

stmt, err := conn.Prepare(ctx, sql, before)
if err != nil {
return err
}

defer stmt.Finalize()

_, err = stmt.Step()
if err != nil {
return err
}

return nil
}

func DeleteExpiredEvents(ctx context.Context, conn *sqlite.Conn) (err error) {
return DeleteExpiredEventsBeforeDate(ctx, conn, time.Now())
}
2 changes: 2 additions & 0 deletions server/storage/schema/0001-data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ CREATE INDEX IF NOT EXISTS consumers_queue_name ON consumers (queue_name);

CREATE INDEX IF NOT EXISTS consumers_last_event_id ON consumers (last_event_id);

CREATE INDEX IF NOT EXISTS consumers_expires_in ON consumers (expires_in);

--
-- CONSUMERS_EVENTS
--
Expand Down

0 comments on commit 3f4a25d

Please sign in to comment.