Skip to content

Commit

Permalink
Optimize watch of queue
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Feb 8, 2025
1 parent 4d4bf25 commit d2968d1
Showing 1 changed file with 70 additions and 57 deletions.
127 changes: 70 additions & 57 deletions queue/controller/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,79 +58,92 @@ type MessageController struct {
messageService *service.MessageService

watchChannelsMut sync.Mutex
watchChannels map[int64][]chan MessageResponse
watchChannels map[int64]map[chan MessageResponse]struct{}

watchListChannelsMut sync.Mutex
watchListChannels []chan MessageResponse
watchListChannels map[chan MessageResponse]struct{}
}

func (mc *MessageController) newWatchChannel(messageID int64) chan MessageResponse {
ch := make(chan MessageResponse, 8)
func (mc *MessageController) getWatchChannel(messageID int64) (chan MessageResponse, func()) {
ch := make(chan MessageResponse, 2)
mc.watchChannelsMut.Lock()
defer mc.watchChannelsMut.Unlock()
mc.watchChannels[messageID] = append(mc.watchChannels[messageID], ch)
return ch
}

func (mc *MessageController) cancelWatchChannel(messageID int64, ch chan MessageResponse) {
mc.watchChannelsMut.Lock()
defer mc.watchChannelsMut.Unlock()
channels := mc.watchChannels[messageID]
for i, channel := range channels {
if channel == ch {
mc.watchChannels[messageID] = append(channels[:i], channels[i+1:]...)
break
}
if mc.watchChannels[messageID] == nil {
mc.watchChannels[messageID] = map[chan MessageResponse]struct{}{}
}

if len(mc.watchChannels[messageID]) == 0 {
delete(mc.watchChannels, messageID)
mc.watchChannels[messageID][ch] = struct{}{}
return ch, func() {
mc.watchChannelsMut.Lock()
defer mc.watchChannelsMut.Unlock()
delete(mc.watchChannels[messageID], ch)
if len(mc.watchChannels[messageID]) == 0 {
delete(mc.watchChannels, messageID)
}
}
}

func (mc *MessageController) updateWatchChannel(messageID int64, mr MessageResponse) {
func (mc *MessageController) appendWatchChannel(messageID int64, mr MessageResponse) {
mc.watchChannelsMut.Lock()
defer mc.watchChannelsMut.Unlock()
for _, ch := range mc.watchChannels[messageID] {

retry := []chan MessageResponse{}
for ch := range mc.watchChannels[messageID] {
select {
case ch <- mr:
case <-time.After(time.Second / 10):
default:
retry = append(retry, ch)
}
}

for _, ch := range retry {
select {
case ch <- mr:
default:
}
}
}

func (mc *MessageController) newWatchListChannel() chan MessageResponse {
ch := make(chan MessageResponse, 32)
func (mc *MessageController) getWatchListChannel() (chan MessageResponse, func()) {
ch := make(chan MessageResponse, 8)
mc.watchListChannelsMut.Lock()
defer mc.watchListChannelsMut.Unlock()
mc.watchListChannels = append(mc.watchListChannels, ch)
return ch

mc.watchListChannels[ch] = struct{}{}
return ch, func() {
mc.watchListChannelsMut.Lock()
defer mc.watchListChannelsMut.Unlock()
delete(mc.watchListChannels, ch)
}
}

func (mc *MessageController) cancelWatchListChannel(ch chan MessageResponse) {
func (mc *MessageController) appendWatchListChannels(mr MessageResponse) {
mc.watchListChannelsMut.Lock()
defer mc.watchListChannelsMut.Unlock()
for i, channel := range mc.watchListChannels {
if channel == ch {
mc.watchListChannels = append(mc.watchListChannels[:i], mc.watchListChannels[i+1:]...)
break

retry := []chan MessageResponse{}
for ch := range mc.watchListChannels {
select {
case ch <- mr:
default:
retry = append(retry, ch)
}
}
}

func (mc *MessageController) updateWatchListChannels(mr MessageResponse) {
mc.watchListChannelsMut.Lock()
defer mc.watchListChannelsMut.Unlock()
for _, ch := range mc.watchListChannels {
for _, ch := range retry {
select {
case ch <- mr:
case <-time.After(time.Second / 10):
default:
}
}
}

func NewMessageController(messageService *service.MessageService) *MessageController {
return &MessageController{messageService: messageService, watchChannels: map[int64][]chan MessageResponse{}}
return &MessageController{
messageService: messageService,
watchChannels: map[int64]map[chan MessageResponse]struct{}{},
watchListChannels: map[chan MessageResponse]struct{}{},
}
}

func (mc *MessageController) RegisterRoutes(ws *restful.WebService) {
Expand Down Expand Up @@ -242,8 +255,8 @@ func (mc *MessageController) Schedule(ctx context.Context, logger *slog.Logger)
logger.Error("ResetToPending", "error", err)
} else {
data := MessageResponse{MessageID: item.MessageID, Content: item.Content, Priority: item.Priority, Status: model.StatusPending}
mc.updateWatchListChannels(data)
mc.updateWatchChannel(item.MessageID, data)
mc.appendWatchChannel(item.MessageID, data)
mc.appendWatchListChannels(data)
}
}
}
Expand Down Expand Up @@ -292,7 +305,7 @@ func (mc *MessageController) Create(req *restful.Request, resp *restful.Response
return
}
data.Priority = messageRequest.Priority
mc.updateWatchListChannels(data)
mc.appendWatchListChannels(data)
}
resp.WriteHeaderAndEntity(http.StatusOK, data)
return
Expand Down Expand Up @@ -321,7 +334,7 @@ func (mc *MessageController) Create(req *restful.Request, resp *restful.Response
Data: messageRequest.Data,
}

mc.updateWatchListChannels(data)
mc.appendWatchListChannels(data)
resp.WriteHeaderAndEntity(http.StatusCreated, data)
}

Expand Down Expand Up @@ -356,8 +369,8 @@ func (mc *MessageController) List(req *restful.Request, resp *restful.Response)
resp.Header().Set("Connection", "keep-alive")
resp.WriteHeader(http.StatusOK)

watchCh := mc.newWatchListChannel()
defer mc.cancelWatchListChannel(watchCh)
watchCh, cancel := mc.getWatchListChannel()
defer cancel()

encoder := json.NewEncoder(resp.ResponseWriter)

Expand Down Expand Up @@ -417,10 +430,10 @@ func (mc *MessageController) Get(req *restful.Request, resp *restful.Response) {
resp.Header().Set("Connection", "keep-alive")
resp.WriteHeader(http.StatusOK)

watchCh := mc.newWatchChannel(messageID)
defer mc.cancelWatchChannel(messageID, watchCh)
watchCh, cancel := mc.getWatchChannel(messageID)
defer cancel()

mc.updateWatchChannel(messageID, data)
mc.appendWatchChannel(messageID, data)

encoder := json.NewEncoder(resp.ResponseWriter)

Expand Down Expand Up @@ -477,8 +490,8 @@ func (mc *MessageController) Consume(req *restful.Request, resp *restful.Respons

data := MessageResponse{MessageID: curr.MessageID, Content: curr.Content, Priority: curr.Priority, Status: curr.Status, Data: curr.Data, LastHeartbeat: curr.LastHeartbeat}

mc.updateWatchChannel(messageID, data)
mc.updateWatchListChannels(data)
mc.appendWatchChannel(messageID, data)
mc.appendWatchListChannels(data)

resp.WriteHeaderAndEntity(http.StatusOK, data)
}
Expand Down Expand Up @@ -526,8 +539,8 @@ func (mc *MessageController) Heartbeat(req *restful.Request, resp *restful.Respo

data := MessageResponse{MessageID: curr.MessageID, Content: curr.Content, Priority: curr.Priority, Status: curr.Status, Data: curr.Data, LastHeartbeat: curr.LastHeartbeat}

mc.updateWatchChannel(messageID, data)
mc.updateWatchListChannels(data)
mc.appendWatchChannel(messageID, data)
mc.appendWatchListChannels(data)

resp.WriteHeader(http.StatusNoContent)
}
Expand Down Expand Up @@ -564,8 +577,8 @@ func (mc *MessageController) Complete(req *restful.Request, resp *restful.Respon

data := MessageResponse{MessageID: curr.MessageID, Content: curr.Content, Priority: curr.Priority, Status: curr.Status, Data: curr.Data, LastHeartbeat: curr.LastHeartbeat}

mc.updateWatchChannel(messageID, data)
mc.updateWatchListChannels(data)
mc.appendWatchChannel(messageID, data)
mc.appendWatchListChannels(data)

resp.WriteHeader(http.StatusNoContent)
}
Expand Down Expand Up @@ -610,8 +623,8 @@ func (mc *MessageController) Failed(req *restful.Request, resp *restful.Response

data := MessageResponse{MessageID: curr.MessageID, Content: curr.Content, Priority: curr.Priority, Status: curr.Status, Data: curr.Data, LastHeartbeat: curr.LastHeartbeat}

mc.updateWatchChannel(messageID, data)
mc.updateWatchListChannels(data)
mc.appendWatchChannel(messageID, data)
mc.appendWatchListChannels(data)

resp.WriteHeader(http.StatusNoContent)
}
Expand Down Expand Up @@ -648,8 +661,8 @@ func (mc *MessageController) Cancel(req *restful.Request, resp *restful.Response

data := MessageResponse{MessageID: curr.MessageID, Content: curr.Content, Priority: curr.Priority, Status: curr.Status, Data: curr.Data, LastHeartbeat: curr.LastHeartbeat}

mc.updateWatchChannel(messageID, data)
mc.updateWatchListChannels(data)
mc.appendWatchChannel(messageID, data)
mc.appendWatchListChannels(data)

resp.WriteHeader(http.StatusNoContent)
}

0 comments on commit d2968d1

Please sign in to comment.