Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: implement delay queue (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang authored and ShiKaiWi committed Apr 20, 2023
1 parent 9ee58dc commit 19f90f5
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 0 deletions.
128 changes: 128 additions & 0 deletions server/coordinator/queue/delay_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

package queue

import (
"container/heap"
"fmt"
"sync"
"time"

"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/pkg/errors"
)

type procedureScheduleEntry struct {
procedure procedure.Procedure
runAfter time.Time
}

type ProcedureDelayQueue struct {
maxLen int

// This lock is used to protect the following fields.
lock sync.RWMutex
heapQueue *heapPriorityQueue
// existingProcs is used to record procedures has been pushed into the queue,
// and they will be used to verify the addition of duplicate elements.
existingProcs map[uint64]struct{}
}

// heapPriorityQueue is no internal lock,
// and its thread safety is guaranteed by the external caller.
type heapPriorityQueue struct {
procedures []*procedureScheduleEntry
}

func (q *heapPriorityQueue) Len() int {
return len(q.procedures)
}

// The dequeue order of elements is determined by the less method.
// When return procedures[i].runAfter < procedures[j].runAfter, the element with smallest will be pop first.
func (q *heapPriorityQueue) Less(i, j int) bool {
return q.procedures[i].runAfter.Before(q.procedures[j].runAfter)
}

func (q *heapPriorityQueue) Swap(i, j int) {
q.procedures[i], q.procedures[j] = q.procedures[j], q.procedures[i]
}

func (q *heapPriorityQueue) Push(x any) {
item := x.(*procedureScheduleEntry)
q.procedures = append(q.procedures, item)
}

func (q *heapPriorityQueue) Pop() any {
length := len(q.procedures)
if length == 0 {
return nil
}
item := q.procedures[length-1]
q.procedures = q.procedures[:length-1]
return item
}

func (q *heapPriorityQueue) Peek() any {
length := len(q.procedures)
if length == 0 {
return nil
}
item := q.procedures[0]
return item
}

func NewProcedureDelayQueue(maxLen int) *ProcedureDelayQueue {
return &ProcedureDelayQueue{
heapQueue: &heapPriorityQueue{procedures: []*procedureScheduleEntry{}},
existingProcs: map[uint64]struct{}{},
maxLen: maxLen,
}
}

func (q *ProcedureDelayQueue) Len() int {
q.lock.RLock()
defer q.lock.RUnlock()

return q.heapQueue.Len()
}

func (q *ProcedureDelayQueue) Push(p procedure.Procedure, delay time.Duration) error {
q.lock.Lock()
defer q.lock.Unlock()

if q.heapQueue.Len() >= q.maxLen {
return errors.WithMessage(ErrQueueFull, fmt.Sprintf("queue max length is %d", q.maxLen))
}

if _, exists := q.existingProcs[p.ID()]; exists {
return errors.WithMessage(ErrPushDuplicatedProcedure, fmt.Sprintf("procedure has been pushed, %v", p))
}

heap.Push(q.heapQueue, &procedureScheduleEntry{
procedure: p,
runAfter: time.Now().Add(delay),
})
q.existingProcs[p.ID()] = struct{}{}

return nil
}

func (q *ProcedureDelayQueue) Pop() procedure.Procedure {
q.lock.Lock()
defer q.lock.Unlock()

if q.heapQueue.Len() == 0 {
return nil
}

entry := q.heapQueue.Peek().(*procedureScheduleEntry)
if time.Now().Before(entry.runAfter) {
return nil
}

heap.Pop(q.heapQueue)
delete(q.existingProcs, entry.procedure.ID())

return entry.procedure
}
81 changes: 81 additions & 0 deletions server/coordinator/queue/delay_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

package queue

import (
"context"
"testing"
"time"

"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/stretchr/testify/require"
)

type TestProcedure struct{ id uint64 }

func (t TestProcedure) ID() uint64 {
return t.id
}

func (t TestProcedure) Typ() procedure.Typ {
return procedure.CreateTable
}

func (t TestProcedure) Start(_ context.Context) error {
return nil
}

func (t TestProcedure) Cancel(_ context.Context) error {
return nil
}

func (t TestProcedure) State() procedure.State {
return procedure.StateInit
}

func TestDelayQueue(t *testing.T) {
re := require.New(t)

testProcedure0 := TestProcedure{0}
testProcedure1 := TestProcedure{1}
testProcedure2 := TestProcedure{2}
testProcedure3 := TestProcedure{3}

queue := NewProcedureDelayQueue(3)
err := queue.Push(testProcedure0, time.Millisecond*40)
re.NoError(err)
err = queue.Push(testProcedure0, time.Millisecond*30)
re.Error(err)
err = queue.Push(testProcedure1, time.Millisecond*10)
re.NoError(err)
err = queue.Push(testProcedure2, time.Millisecond*20)
re.NoError(err)
err = queue.Push(testProcedure3, time.Millisecond*20)
re.Error(err)
re.Equal(3, queue.Len())

po := queue.Pop()
re.Nil(po)

time.Sleep(time.Millisecond * 100)

p0 := queue.Pop()
re.Equal(uint64(1), p0.ID())
p1 := queue.Pop()
re.Equal(uint64(2), p1.ID())
p2 := queue.Pop()
re.Equal(uint64(0), p2.ID())
p := queue.Pop()
re.Nil(p)

err = queue.Push(testProcedure0, time.Millisecond*20)
re.NoError(err)

time.Sleep(time.Millisecond * 10)
p0 = queue.Pop()
re.Nil(p0)

time.Sleep(time.Millisecond * 10)
p0 = queue.Pop()
re.Equal(uint64(0), p0.ID())
}
10 changes: 10 additions & 0 deletions server/coordinator/queue/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

package queue

import "github.com/CeresDB/ceresmeta/pkg/coderr"

var (
ErrQueueFull = coderr.NewCodeError(coderr.Internal, "queue is full, unable to offer more data")
ErrPushDuplicatedProcedure = coderr.NewCodeError(coderr.Internal, "try to push duplicated procedure")
)

0 comments on commit 19f90f5

Please sign in to comment.