Skip to content

Commit

Permalink
feat: refactor producer, improve performance and observability (#312)
Browse files Browse the repository at this point in the history
- performance improved  in high-contention scenario
  • Loading branch information
crimson-gao authored Jan 16, 2025
1 parent 270ed16 commit 0a73cef
Show file tree
Hide file tree
Showing 13 changed files with 447 additions and 336 deletions.
44 changes: 3 additions & 41 deletions consumer/shard_monitor.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
package consumerLibrary

import (
"fmt"
"math"
"time"

"go.uber.org/atomic"

sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/aliyun/aliyun-log-go-sdk/internal"
"github.com/go-kit/kit/log"
)

type MonitorMetrics struct {
fetchReqFailedCount atomic.Int64
logRawSize atomic.Int64
fetchLogHistogram TimeHistogram // in us
fetchLogHistogram internal.TimeHistogram // in us

processFailedCount atomic.Int64
processHistogram TimeHistogram // in us
processHistogram internal.TimeHistogram // in us
}

type ShardMonitor struct {
Expand Down Expand Up @@ -77,40 +76,3 @@ func (m *ShardMonitor) reportByLogger(logger log.Logger) {
"process", metrics.processHistogram.String(),
)
}

type TimeHistogram struct {
Count atomic.Int64
Sum atomic.Float64
SumSquare atomic.Float64
}

func (h *TimeHistogram) AddSample(v float64) {
h.Count.Inc()
h.Sum.Add(v)
h.SumSquare.Add(v * v)
}

func (h *TimeHistogram) String() string {
avg := h.Avg()
stdDev := h.StdDev()
count := h.Count.Load()
return fmt.Sprintf("{avg: %.1fus, stdDev: %.1fus, count: %d}", avg, stdDev, count)
}

func (h *TimeHistogram) Avg() float64 {
count := h.Count.Load()
if count == 0 {
return 0
}
return h.Sum.Load() / float64(count)
}

func (h *TimeHistogram) StdDev() float64 {
count := h.Count.Load()
if count < 2 {
return 0
}
div := float64(count * (count - 1))
num := (float64(count) * h.SumSquare.Load()) - math.Pow(h.Sum.Load(), 2)
return math.Sqrt(num / div)
}
45 changes: 45 additions & 0 deletions internal/histogram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package internal

import (
"fmt"
"math"

"go.uber.org/atomic"
)

type TimeHistogram struct {
Count atomic.Int64
Sum atomic.Float64
SumSquare atomic.Float64
}

func (h *TimeHistogram) AddSample(us float64) {
h.Count.Inc()
h.Sum.Add(us)
h.SumSquare.Add(us * us)
}

func (h *TimeHistogram) String() string {
avg := h.Avg()
stdDev := h.StdDev()
count := h.Count.Load()
return fmt.Sprintf("{avg: %.1fus, stdDev: %.1fus, count: %d}", avg, stdDev, count)
}

func (h *TimeHistogram) Avg() float64 {
count := h.Count.Load()
if count == 0 {
return 0
}
return h.Sum.Load() / float64(count)
}

func (h *TimeHistogram) StdDev() float64 {
count := h.Count.Load()
if count < 2 {
return 0
}
div := float64(count * (count - 1))
num := (float64(count) * h.SumSquare.Load()) - math.Pow(h.Sum.Load(), 2)
return math.Sqrt(num / div)
}
65 changes: 26 additions & 39 deletions producer/io_thread_pool.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package producer

import (
"container/list"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -12,61 +10,50 @@ import (

type IoThreadPool struct {
threadPoolShutDownFlag *atomic.Bool
queue *list.List
lock sync.RWMutex
taskCh chan *ProducerBatch
ioworker *IoWorker
logger log.Logger
stopped *atomic.Bool
}

func initIoThreadPool(ioworker *IoWorker, logger log.Logger) *IoThreadPool {
return &IoThreadPool{
threadPoolShutDownFlag: atomic.NewBool(false),
queue: list.New(),
taskCh: make(chan *ProducerBatch, 100000),
ioworker: ioworker,
logger: logger,
stopped: atomic.NewBool(false),
}
}

func (threadPool *IoThreadPool) addTask(batch *ProducerBatch) {
defer threadPool.lock.Unlock()
threadPool.lock.Lock()
threadPool.queue.PushBack(batch)
}

func (threadPool *IoThreadPool) popTask() *ProducerBatch {
defer threadPool.lock.Unlock()
threadPool.lock.Lock()
if threadPool.queue.Len() <= 0 {
return nil
}
ele := threadPool.queue.Front()
threadPool.queue.Remove(ele)
return ele.Value.(*ProducerBatch)
}

func (threadPool *IoThreadPool) hasTask() bool {
defer threadPool.lock.RUnlock()
threadPool.lock.RLock()
return threadPool.queue.Len() > 0
threadPool.taskCh <- batch
}

func (threadPool *IoThreadPool) start(ioWorkerWaitGroup *sync.WaitGroup, ioThreadPoolwait *sync.WaitGroup) {
defer ioThreadPoolwait.Done()
for {
if task := threadPool.popTask(); task != nil {
threadPool.ioworker.startSendTask(ioWorkerWaitGroup)
go func(producerBatch *ProducerBatch) {
defer threadPool.ioworker.closeSendTask(ioWorkerWaitGroup)
threadPool.ioworker.sendToServer(producerBatch)
}(task)
} else {
if !threadPool.threadPoolShutDownFlag.Load() {
time.Sleep(100 * time.Millisecond)
} else {
level.Info(threadPool.logger).Log("msg", "All cache tasks in the thread pool have been successfully sent")
break
}
for task := range threadPool.taskCh {
if task == nil {
level.Info(threadPool.logger).Log("msg", "All cache tasks in the thread pool have been successfully sent")
threadPool.stopped.Store(true)
return
}

threadPool.ioworker.startSendTask(ioWorkerWaitGroup)
go func(producerBatch *ProducerBatch) {
defer threadPool.ioworker.closeSendTask(ioWorkerWaitGroup)
threadPool.ioworker.sendToServer(producerBatch)
}(task)
}
}

func (threadPool *IoThreadPool) ShutDown() {
old := threadPool.threadPoolShutDownFlag.Swap(true)
if !old {
close(threadPool.taskCh)
}
}

func (threadPool *IoThreadPool) Stopped() bool {
return threadPool.stopped.Load()
}
113 changes: 47 additions & 66 deletions producer/io_worker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package producer

import (
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -43,7 +42,7 @@ func initIoWorker(client sls.ClientInterface, retryQueue *RetryQueue, logger log

func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch) {
level.Debug(ioWorker.logger).Log("msg", "ioworker send data to server")
beginMs := GetTimeMs(time.Now().UnixNano())
sendBegin := time.Now()
var err error
if producerBatch.isUseMetricStoreUrl() {
// not use compress type now
Expand All @@ -57,67 +56,59 @@ func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch) {
}
err = ioWorker.client.PostLogStoreLogsV2(producerBatch.getProject(), producerBatch.getLogstore(), req)
}
sendEnd := time.Now()

// send ok
if err == nil {
level.Debug(ioWorker.logger).Log("msg", "sendToServer suecssed,Execute successful callback function")
if producerBatch.attemptCount < producerBatch.maxReservedAttempts {
nowMs := GetTimeMs(time.Now().UnixNano())
attempt := createAttempt(true, "", "", "", nowMs, nowMs-beginMs)
producerBatch.result.attemptList = append(producerBatch.result.attemptList, attempt)
}
producerBatch.result.successful = true
level.Debug(ioWorker.logger).Log("msg", "sendToServer success")
defer ioWorker.producer.monitor.recordSuccess(sendBegin, sendEnd)
producerBatch.OnSuccess(sendBegin)
// After successful delivery, producer removes the batch size sent out
atomic.AddInt64(&ioWorker.producer.producerLogGroupSize, -producerBatch.totalDataSize)
if len(producerBatch.callBackList) > 0 {
for _, callBack := range producerBatch.callBackList {
callBack.Success(producerBatch.result)
}
}
} else {
if ioWorker.retryQueueShutDownFlag.Load() {
if len(producerBatch.callBackList) > 0 {
for _, callBack := range producerBatch.callBackList {
ioWorker.addErrorMessageToBatchAttempt(producerBatch, err, false, beginMs)
callBack.Fail(producerBatch.result)
}
}
return
}
level.Info(ioWorker.logger).Log("msg", "sendToServer failed", "error", err)
if slsError, ok := err.(*sls.Error); ok {
if _, ok := ioWorker.noRetryStatusCodeMap[int(slsError.HTTPCode)]; ok {
ioWorker.addErrorMessageToBatchAttempt(producerBatch, err, false, beginMs)
ioWorker.excuteFailedCallback(producerBatch)
return
}
}
if producerBatch.attemptCount < producerBatch.maxRetryTimes {
ioWorker.addErrorMessageToBatchAttempt(producerBatch, err, true, beginMs)
retryWaitTime := producerBatch.baseRetryBackoffMs * int64(math.Pow(2, float64(producerBatch.attemptCount)-1))
if retryWaitTime < producerBatch.maxRetryIntervalInMs {
producerBatch.nextRetryMs = GetTimeMs(time.Now().UnixNano()) + retryWaitTime
} else {
producerBatch.nextRetryMs = GetTimeMs(time.Now().UnixNano()) + producerBatch.maxRetryIntervalInMs
}
level.Debug(ioWorker.logger).Log("msg", "Submit to the retry queue after meeting the retry criteria。")
ioWorker.retryQueue.sendToRetryQueue(producerBatch, ioWorker.logger)
} else {
ioWorker.excuteFailedCallback(producerBatch)
}
return
}

slsError := parseSlsError(err)
canRetry := ioWorker.canRetry(producerBatch, slsError)
level.Error(ioWorker.logger).Log("msg", "sendToServer failed",
"retryTimes", producerBatch.attemptCount,
"requestId", slsError.RequestID,
"errorCode", slsError.Code,
"errorMessage", slsError.Message,
"logs", len(producerBatch.logGroup.Logs),
"canRetry", canRetry)
if !canRetry {
defer ioWorker.producer.monitor.recordFailure(sendBegin, sendEnd)
producerBatch.OnFail(slsError, sendBegin)
atomic.AddInt64(&ioWorker.producer.producerLogGroupSize, -producerBatch.totalDataSize)
return
}

// do retry
ioWorker.producer.monitor.recordRetry(sendEnd.Sub(sendBegin))
producerBatch.addAttempt(slsError, sendBegin)
producerBatch.nextRetryMs = producerBatch.getRetryBackoffIntervalMs() + time.Now().UnixMilli()
level.Debug(ioWorker.logger).Log("msg", "Submit to the retry queue after meeting the retry criteria。")
ioWorker.retryQueue.sendToRetryQueue(producerBatch, ioWorker.logger)
}

func (ioWorker *IoWorker) addErrorMessageToBatchAttempt(producerBatch *ProducerBatch, err error, retryInfo bool, beginMs int64) {
if producerBatch.attemptCount < producerBatch.maxReservedAttempts {
slsError := err.(*sls.Error)
if retryInfo {
level.Info(ioWorker.logger).Log("msg", "sendToServer failed,start retrying", "retry times", producerBatch.attemptCount, "requestId", slsError.RequestID, "error code", slsError.Code, "error message", slsError.Message)
}
nowMs := GetTimeMs(time.Now().UnixNano())
attempt := createAttempt(false, slsError.RequestID, slsError.Code, slsError.Message, nowMs, nowMs-beginMs)
producerBatch.result.attemptList = append(producerBatch.result.attemptList, attempt)
func parseSlsError(err error) *sls.Error {
if slsError, ok := err.(*sls.Error); ok {
return slsError
}
return &sls.Error{
Message: err.Error(),
}
}

func (ioWorker *IoWorker) canRetry(producerBatch *ProducerBatch, err *sls.Error) bool {
if ioWorker.retryQueueShutDownFlag.Load() {
return false
}
producerBatch.result.successful = false
producerBatch.attemptCount += 1
if _, ok := ioWorker.noRetryStatusCodeMap[int(err.HTTPCode)]; ok {
return false
}
return producerBatch.attemptCount < producerBatch.maxRetryTimes
}

func (ioWorker *IoWorker) closeSendTask(ioWorkerWaitGroup *sync.WaitGroup) {
Expand All @@ -131,13 +122,3 @@ func (ioWorker *IoWorker) startSendTask(ioWorkerWaitGroup *sync.WaitGroup) {
ioWorker.maxIoWorker <- 1
ioWorkerWaitGroup.Add(1)
}

func (ioWorker *IoWorker) excuteFailedCallback(producerBatch *ProducerBatch) {
level.Info(ioWorker.logger).Log("msg", "sendToServer failed,Execute failed callback function")
atomic.AddInt64(&ioWorker.producer.producerLogGroupSize, -producerBatch.totalDataSize)
if len(producerBatch.callBackList) > 0 {
for _, callBack := range producerBatch.callBackList {
callBack.Fail(producerBatch.result)
}
}
}
Loading

0 comments on commit 0a73cef

Please sign in to comment.