From 7e1c08e5d23bc8674bd23cf5a5f1ccfaeb65365c Mon Sep 17 00:00:00 2001 From: Crimson <39024757+crimson-gao@users.noreply.github.com> Date: Thu, 28 Nov 2024 12:56:02 +0800 Subject: [PATCH] feat: refactor consumer, add runtime metrics (#302) --- consumer/config.go | 2 + consumer/shard_monitor.go | 116 ++++++++ consumer/shard_monitor_benchmark_test.go | 33 +++ consumer/shard_worker.go | 349 +++++++++++++---------- consumer/tasks.go | 71 +---- consumer/worker.go | 15 +- consumer/worker_test.go | 7 +- 7 files changed, 367 insertions(+), 226 deletions(-) create mode 100644 consumer/shard_monitor.go create mode 100644 consumer/shard_monitor_benchmark_test.go diff --git a/consumer/config.go b/consumer/config.go index fb08b14e..bfe7ca67 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -51,6 +51,7 @@ type LogHubConfig struct { //:param AutoCommitIntervalInSec: default auto commit interval, default is 30 //:param AuthVersion: signature algorithm version, default is sls.AuthV1 //:param Region: region of sls endpoint, eg. cn-hangzhou, region must be set if AuthVersion is sls.AuthV4 + //:param DisableRuntimeMetrics: disable runtime metrics, runtime metrics prints to local log. Endpoint string AccessKeyID string AccessKeySecret string @@ -81,6 +82,7 @@ type LogHubConfig struct { AutoCommitIntervalInMS int64 AuthVersion sls.AuthVersionType Region string + DisableRuntimeMetrics bool } const ( diff --git a/consumer/shard_monitor.go b/consumer/shard_monitor.go new file mode 100644 index 00000000..c697382d --- /dev/null +++ b/consumer/shard_monitor.go @@ -0,0 +1,116 @@ +package consumerLibrary + +import ( + "fmt" + "math" + "time" + + "go.uber.org/atomic" + + sls "github.com/aliyun/aliyun-log-go-sdk" + "github.com/go-kit/kit/log" +) + +type MonitorMetrics struct { + fetchReqFailedCount atomic.Int64 + logRawSize atomic.Int64 + fetchLogHistogram TimeHistogram // in us + + processFailedCount atomic.Int64 + processHistogram TimeHistogram // in us +} + +type ShardMonitor struct { + shard int + reportInterval time.Duration + lastReportTime time.Time + metrics atomic.Value // *MonitorMetrics +} + +func newShardMonitor(shard int, reportInterval time.Duration) *ShardMonitor { + monitor := &ShardMonitor{ + shard: shard, + reportInterval: reportInterval, + lastReportTime: time.Now(), + } + monitor.metrics.Store(&MonitorMetrics{}) + return monitor +} + +func (m *ShardMonitor) RecordFetchRequest(plm *sls.PullLogMeta, err error, start time.Time) { + metrics := m.metrics.Load().(*MonitorMetrics) + if err != nil { + metrics.fetchReqFailedCount.Inc() + } else { + metrics.logRawSize.Add(int64(plm.RawSize)) + } + metrics.fetchLogHistogram.AddSample(float64(time.Since(start).Microseconds())) +} + +func (m *ShardMonitor) RecordProcess(err error, start time.Time) { + metrics := m.metrics.Load().(*MonitorMetrics) + if err != nil { + metrics.processFailedCount.Inc() + } + metrics.processHistogram.AddSample(float64(time.Since(start).Microseconds())) +} + +func (m *ShardMonitor) getAndResetMetrics() *MonitorMetrics { + // we dont need cmp and swap, only one thread would call m.metrics.Store + old := m.metrics.Load().(*MonitorMetrics) + m.metrics.Store(&MonitorMetrics{}) + return old +} + +func (m *ShardMonitor) shouldReport() bool { + return time.Since(m.lastReportTime) >= m.reportInterval +} + +func (m *ShardMonitor) reportByLogger(logger log.Logger) { + m.lastReportTime = time.Now() + metrics := m.getAndResetMetrics() + logger.Log("msg", "report status", + "fetchFailed", metrics.fetchReqFailedCount.Load(), + "logRawSize", metrics.logRawSize.Load(), + "processFailed", metrics.processFailedCount.Load(), + "fetch", metrics.fetchLogHistogram.String(), + "process", metrics.processHistogram.String(), + ) +} + +type TimeHistogram struct { + Count atomic.Int64 + Sum atomic.Float64 + SumSquare atomic.Float64 +} + +func (h *TimeHistogram) AddSample(v float64) { + h.Count.Inc() + h.Sum.Add(v) + h.SumSquare.Add(v * v) +} + +func (h *TimeHistogram) String() string { + avg := h.Avg() + stdDev := h.StdDev() + count := h.Count.Load() + return fmt.Sprintf("{avg: %.1fus, stdDev: %.1fus, count: %d}", avg, stdDev, count) +} + +func (h *TimeHistogram) Avg() float64 { + count := h.Count.Load() + if count == 0 { + return 0 + } + return h.Sum.Load() / float64(count) +} + +func (h *TimeHistogram) StdDev() float64 { + count := h.Count.Load() + if count < 2 { + return 0 + } + div := float64(count * (count - 1)) + num := (float64(count) * h.SumSquare.Load()) - math.Pow(h.Sum.Load(), 2) + return math.Sqrt(num / div) +} diff --git a/consumer/shard_monitor_benchmark_test.go b/consumer/shard_monitor_benchmark_test.go new file mode 100644 index 00000000..e747ea45 --- /dev/null +++ b/consumer/shard_monitor_benchmark_test.go @@ -0,0 +1,33 @@ +package consumerLibrary + +import ( + "testing" + "time" + + sls "github.com/aliyun/aliyun-log-go-sdk" +) + +// BenchmarkRecordFetchRequest +// BenchmarkRecordFetchRequest-12 29816072 40.05 ns/op 0 B/op 0 allocs/op +func BenchmarkRecordFetchRequest(b *testing.B) { + shardMonitor := newShardMonitor(1, time.Second) + start := time.Now() + plm := &sls.PullLogMeta{RawSize: 1} + b.ResetTimer() + + for i := 0; i < b.N; i++ { + shardMonitor.RecordFetchRequest(plm, nil, start) + } +} + +// BenchmarkRecordProcess +// BenchmarkRecordProcess-12 33092797 35.15 ns/op 0 B/op 0 allocs/op +func BenchmarkRecordProcess(b *testing.B) { + shardMonitor := newShardMonitor(1, time.Second) + start := time.Now() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + shardMonitor.RecordProcess(nil, start) + } +} diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index adfae01e..0f0cbc87 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -1,202 +1,249 @@ package consumerLibrary import ( + "fmt" + "runtime" "sync" "time" + "go.uber.org/atomic" + sls "github.com/aliyun/aliyun-log-go-sdk" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" ) -type ShardConsumerWorker struct { - client *ConsumerClient - consumerCheckPointTracker *DefaultCheckPointTracker - shutdownFlag bool - lastFetchLogGroupList *sls.LogGroupList - nextFetchCursor string - lastFetchGroupCount int - lastFetchGroupCountBeforeQuery int - lastFetchTime time.Time - lastFetchRawSize int - lastFetchRawSizeBeforeQuery int - consumerStatus string - processor Processor - shardId int - // TODO: refine to channel - isCurrentDone bool - logger log.Logger - // unix time - lastCheckpointSaveTime time.Time - - taskLock sync.RWMutex - statusLock sync.RWMutex -} +// todo: refine the sleep time +const ( + noProgressSleepTime = 500 * time.Millisecond + processFailedSleepTime = 50 * time.Millisecond + fetchFailedSleepTime = 100 * time.Millisecond // todo: use backoff interval, [1, 2, 4, 8, ...] + shutdownFailedSleepTime = 100 * time.Millisecond + flushCheckPointFailedSleepTime = 100 * time.Millisecond +) -func (consumer *ShardConsumerWorker) setConsumerStatus(status string) { - consumer.statusLock.Lock() - defer consumer.statusLock.Unlock() - consumer.consumerStatus = status -} +type ShardConsumerWorker struct { + client *ConsumerClient + consumerCheckPointTracker *DefaultCheckPointTracker + processor Processor + shardId int + monitor *ShardMonitor -func (consumer *ShardConsumerWorker) getConsumerStatus() string { - consumer.statusLock.RLock() - defer consumer.statusLock.RUnlock() - return consumer.consumerStatus + logger log.Logger + lastCheckpointSaveTime time.Time + shutDownFlag *atomic.Bool + stopped *atomic.Bool + startOnceFlag sync.Once } -func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, processor Processor, logger log.Logger) *ShardConsumerWorker { +func newShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, processor Processor, logger log.Logger) *ShardConsumerWorker { shardConsumeWorker := &ShardConsumerWorker{ - shutdownFlag: false, processor: processor, consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, consumerHeartBeat, logger), client: consumerClient, - consumerStatus: INITIALIZING, shardId: shardId, - lastFetchTime: time.Now(), - isCurrentDone: true, - logger: logger, + logger: log.With(logger, "shard", shardId), + shutDownFlag: atomic.NewBool(false), + stopped: atomic.NewBool(false), + lastCheckpointSaveTime: time.Now(), + monitor: newShardMonitor(shardId, time.Minute), } return shardConsumeWorker } -func (consumer *ShardConsumerWorker) consume() { - if !consumer.isTaskDone() { - return +func (c *ShardConsumerWorker) ensureStarted() { + c.startOnceFlag.Do(func() { + go c.runLoop() + }) +} + +func (c *ShardConsumerWorker) runLoop() { + level.Info(c.logger).Log("msg", "runLoop started") + defer func() { + c.recoverIfPanic("runLoop panic") + c.doShutDown() + }() + + cursor := c.getInitCursor() + level.Info(c.logger).Log("msg", "runLoop got init cursor", "cursor", cursor) + + for !c.shutDownFlag.Load() { + lastFetchTime := time.Now() + shouldCallProcess, logGroupList, plm := c.fetchLogs(cursor) + if !shouldCallProcess { + continue + } + + cursor = c.callProcess(logGroupList, plm) + if c.shutDownFlag.Load() { + break + } + + c.sleepUtilNextFetch(lastFetchTime, plm) } +} - // start a new task - // initial task / fetch data task / processing task / shutdown task - consumer.setTaskDoneFlag(false) - switch consumer.getConsumerStatus() { - case INITIALIZING: - go func() { - cursor, err := consumer.consumerInitializeTask() - if err == nil { - consumer.nextFetchCursor = cursor - } - consumer.updateStatus(err == nil) - }() - case PULLING: - go func() { - if !consumer.shouldFetch() { - level.Debug(consumer.logger).Log("msg", "Pull Log Current Limitation and Re-Pull Log") - consumer.updateStatus(false) - return - } - hasProgress, err := consumer.nextFetchTask() - consumer.updateStatus(err == nil && hasProgress) - }() - case PROCESSING: - go func() { - rollBackCheckpoint, err := consumer.consumerProcessTask() - if err != nil { - level.Warn(consumer.logger).Log("messge", "process failed", "err", err) - } - if rollBackCheckpoint != "" { - consumer.nextFetchCursor = rollBackCheckpoint - level.Info(consumer.logger).Log( - "msg", "Checkpoints set for users have been reset", - "shardId", consumer.shardId, - "rollBackCheckpoint", rollBackCheckpoint, - ) - } - consumer.updateStatus(err == nil) - }() - case SHUTTING_DOWN: - go func() { - err := consumer.processor.Shutdown(consumer.consumerCheckPointTracker) - if err != nil { - level.Error(consumer.logger).Log("msg", "failed to call processor shutdown", "err", err) - consumer.updateStatus(false) - return - } - - err = consumer.consumerCheckPointTracker.flushCheckPoint() - if err == nil { - level.Info(consumer.logger).Log("msg", "shard worker status shutdown_complete", "shardWorkerId", consumer.shardId) - } else { - level.Warn(consumer.logger).Log("msg", "failed to flush checkpoint when shutdown", "err", err) - } - - consumer.updateStatus(err == nil) - }() - default: - consumer.setTaskDoneFlag(true) - } -} - -func (consumer *ShardConsumerWorker) updateStatus(success bool) { - status := consumer.getConsumerStatus() - if status == SHUTTING_DOWN { - if success { - consumer.setConsumerStatus(SHUTDOWN_COMPLETE) +func (consumer *ShardConsumerWorker) getInitCursor() string { + for !consumer.shutDownFlag.Load() { + initCursor, err := consumer.consumerInitializeTask() + if err == nil { + return initCursor } - } else if consumer.shutdownFlag { - consumer.setConsumerStatus(SHUTTING_DOWN) - } else if success { - switch status { - case PULLING: - consumer.setConsumerStatus(PROCESSING) - case INITIALIZING, PROCESSING: - consumer.setConsumerStatus(PULLING) + time.Sleep(100 * time.Millisecond) + } + return "" +} + +func (c *ShardConsumerWorker) fetchLogs(cursor string) (shouldCallProcess bool, logGroupList *sls.LogGroupList, plm *sls.PullLogMeta) { + + start := time.Now() + logGroupList, plm, err := c.client.pullLogs(c.shardId, cursor) + c.monitor.RecordFetchRequest(plm, err, start) + + if err != nil { + time.Sleep(fetchFailedSleepTime) + return false, nil, nil + } + + c.consumerCheckPointTracker.setCurrentCursor(cursor) + c.consumerCheckPointTracker.setNextCursor(plm.NextCursor) + + if cursor == plm.NextCursor { // already reach end of shard + c.saveCheckPointIfNeeded() + time.Sleep(noProgressSleepTime) + return false, nil, nil + } + return true, logGroupList, plm +} + +func (c *ShardConsumerWorker) callProcess(logGroupList *sls.LogGroupList, plm *sls.PullLogMeta) (nextCursor string) { + for { + start := time.Now() + rollBackCheckpoint, err := c.processInternal(logGroupList) + c.monitor.RecordProcess(err, start) + + c.saveCheckPointIfNeeded() + if err != nil { + level.Error(c.logger).Log("msg", "process func returns an error", "err", err) + } + if rollBackCheckpoint != "" { + level.Warn(c.logger).Log("msg", "Rollback checkpoint by user", + "rollBackCheckpoint", rollBackCheckpoint) + return rollBackCheckpoint + } + if err == nil { + return plm.NextCursor + } + // if process failed and shutting down, just quit + if c.shutDownFlag.Load() { + level.Warn(c.logger).Log("msg", "shutting down and last process failed, just quit") + return plm.NextCursor } + time.Sleep(processFailedSleepTime) } +} + +func (c *ShardConsumerWorker) processInternal(logGroup *sls.LogGroupList) (rollBackCheckpoint string, err error) { + defer func() { + if r := c.recoverIfPanic("panic in your process function"); r != nil { + err = fmt.Errorf("panic when process: %v", r) + } + }() - consumer.setTaskDoneFlag(true) + return c.processor.Process(c.shardId, logGroup, c.consumerCheckPointTracker) } -func (consumer *ShardConsumerWorker) shouldFetch() bool { - lastFetchRawSize := consumer.lastFetchRawSize - lastFetchGroupCount := consumer.lastFetchGroupCount +// call user shutdown func and flush checkpoint +func (c *ShardConsumerWorker) doShutDown() { + level.Info(c.logger).Log("msg", "begin to shutdown, invoking processor.shutdown") + for { + err := c.processor.Shutdown(c.consumerCheckPointTracker) // todo: should we catch panic here? + if err == nil { + break + } + level.Error(c.logger).Log("msg", "processor.shutdown finished with error", "err", err) + time.Sleep(shutdownFailedSleepTime) + } - if consumer.client.option.Query != "" { - lastFetchRawSize = consumer.lastFetchRawSizeBeforeQuery - lastFetchGroupCount = consumer.lastFetchGroupCountBeforeQuery + level.Info(c.logger).Log("msg", "call processor.shutdown succeed, begin to flush checkpoint") + + for { + err := c.consumerCheckPointTracker.flushCheckPoint() + if err == nil { + break + } + level.Error(c.logger).Log("msg", "failed to flush checkpoint when shutting down", "err", err) + time.Sleep(flushCheckPointFailedSleepTime) } - if lastFetchGroupCount >= consumer.client.option.MaxFetchLogGroupCount || lastFetchRawSize >= 4*1024*1024 { - return true + level.Info(c.logger).Log("msg", "shutting down completed, bye") + c.stopped.Store(true) +} + +// todo: refine sleep time, make it more reasonable +func (c *ShardConsumerWorker) sleepUtilNextFetch(lastFetchSuccessTime time.Time, plm *sls.PullLogMeta) { + sinceLastFetch := time.Since(lastFetchSuccessTime) + if sinceLastFetch > time.Duration(c.client.option.DataFetchIntervalInMs)*time.Millisecond { + return } - duration := time.Since(consumer.lastFetchTime) + + lastFetchRawSize := plm.RawSize + lastFetchGroupCount := plm.Count + if c.client.option.Query != "" { + lastFetchRawSize = plm.RawSizeBeforeQuery + lastFetchGroupCount = plm.DataCountBeforeQuery + } + + if lastFetchGroupCount >= c.client.option.MaxFetchLogGroupCount || lastFetchRawSize >= 4*1024*1024 { + return + } + // negative or zero sleepTime is ok if lastFetchGroupCount < 100 && lastFetchRawSize < 1024*1024 { - // The time used here is in milliseconds. - return duration > 500*time.Millisecond - } else if lastFetchGroupCount < 500 && lastFetchRawSize < 2*1024*1024 { - return duration > 200*time.Millisecond - } else { - return duration > 50*time.Millisecond + time.Sleep(500*time.Millisecond - sinceLastFetch) + return } + if lastFetchGroupCount < 500 && lastFetchRawSize < 2*1024*1024 { + time.Sleep(200*time.Millisecond - sinceLastFetch) + return + } + + time.Sleep(50*time.Millisecond - sinceLastFetch) } -func (consumer *ShardConsumerWorker) saveCheckPointIfNeeded() { - if consumer.client.option.AutoCommitDisabled { +func (c *ShardConsumerWorker) saveCheckPointIfNeeded() { + if c.client.option.AutoCommitDisabled { return } - if time.Since(consumer.lastCheckpointSaveTime) > time.Millisecond*time.Duration(consumer.client.option.AutoCommitIntervalInMS) { - consumer.consumerCheckPointTracker.flushCheckPoint() - consumer.lastCheckpointSaveTime = time.Now() + if time.Since(c.lastCheckpointSaveTime) > time.Millisecond*time.Duration(c.client.option.AutoCommitIntervalInMS) { + c.consumerCheckPointTracker.flushCheckPoint() + c.lastCheckpointSaveTime = time.Now() } } -func (consumer *ShardConsumerWorker) consumerShutDown() { - consumer.shutdownFlag = true - if !consumer.isShutDownComplete() { - consumer.consume() - } +func (c *ShardConsumerWorker) shutdown() { + level.Info(c.logger).Log("msg", "shutting down by others") + c.shutDownFlag.Store(true) } -func (consumer *ShardConsumerWorker) isShutDownComplete() bool { - return consumer.getConsumerStatus() == SHUTDOWN_COMPLETE +func (c *ShardConsumerWorker) isStopped() bool { + return c.stopped.Load() +} + +func (c *ShardConsumerWorker) recoverIfPanic(reason string) any { + if r := recover(); r != nil { + stackBuf := make([]byte, 1<<16) + n := runtime.Stack(stackBuf, false) + level.Error(c.logger).Log("msg", "get panic in shard consumer worker", + "reason", reason, + "error", r, "stack", stackBuf[:n]) + return r + } + return nil } -func (consumer *ShardConsumerWorker) setTaskDoneFlag(done bool) { - consumer.taskLock.Lock() - defer consumer.taskLock.Unlock() - consumer.isCurrentDone = done +func (c *ShardConsumerWorker) shouldReportMetrics() bool { + return !c.client.option.DisableRuntimeMetrics && c.monitor.shouldReport() } -func (consumer *ShardConsumerWorker) isTaskDone() bool { - consumer.taskLock.RLock() - defer consumer.taskLock.RUnlock() - return consumer.isCurrentDone +func (c *ShardConsumerWorker) reportMetrics() { + c.monitor.reportByLogger(c.logger) } diff --git a/consumer/tasks.go b/consumer/tasks.go index b2869d19..1028be65 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -3,12 +3,11 @@ package consumerLibrary import ( "errors" "fmt" - "runtime" - "time" "github.com/go-kit/kit/log/level" ) +// todo: move to shard_worker.go func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) { // read checkpoint firstly checkpoint, err := consumer.client.getCheckPoint(consumer.shardId) @@ -23,86 +22,24 @@ func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) { if consumer.client.option.CursorPosition == BEGIN_CURSOR { cursor, err := consumer.client.getCursor(consumer.shardId, "begin") if err != nil { - level.Warn(consumer.logger).Log("msg", "get beginCursor error", "shard", consumer.shardId, "error", err) + level.Warn(consumer.logger).Log("msg", "get beginCursor error", "error", err) } return cursor, err } if consumer.client.option.CursorPosition == END_CURSOR { cursor, err := consumer.client.getCursor(consumer.shardId, "end") if err != nil { - level.Warn(consumer.logger).Log("msg", "get endCursor error", "shard", consumer.shardId, "error", err) + level.Warn(consumer.logger).Log("msg", "get endCursor error", "error", err) } return cursor, err } if consumer.client.option.CursorPosition == SPECIAL_TIMER_CURSOR { cursor, err := consumer.client.getCursor(consumer.shardId, fmt.Sprintf("%v", consumer.client.option.CursorStartTime)) if err != nil { - level.Warn(consumer.logger).Log("msg", "get specialCursor error", "shard", consumer.shardId, "error", err) + level.Warn(consumer.logger).Log("msg", "get specialCursor error", "error", err) } return cursor, err } level.Warn(consumer.logger).Log("msg", "CursorPosition setting error, please reset with BEGIN_CURSOR or END_CURSOR or SPECIAL_TIMER_CURSOR") return "", errors.New("CursorPositionError") } - -func (consumer *ShardConsumerWorker) nextFetchTask() (hasProgress bool, err error) { - // update last fetch time, for control fetch frequency - consumer.lastFetchTime = time.Now() - cursor := consumer.nextFetchCursor - logGroup, pullLogMeta, err := consumer.client.pullLogs(consumer.shardId, cursor) - if err != nil { - return false, err - } - // set cursors user to decide whether to save according to the execution of `process` - consumer.consumerCheckPointTracker.setCurrentCursor(consumer.nextFetchCursor) - consumer.lastFetchLogGroupList = logGroup - consumer.nextFetchCursor = pullLogMeta.NextCursor - consumer.lastFetchRawSize = pullLogMeta.RawSize - consumer.lastFetchGroupCount = pullLogMeta.Count - if consumer.client.option.Query != "" { - consumer.lastFetchRawSizeBeforeQuery = pullLogMeta.RawSizeBeforeQuery - consumer.lastFetchGroupCountBeforeQuery = pullLogMeta.DataCountBeforeQuery - if consumer.lastFetchRawSizeBeforeQuery == -1 { - consumer.lastFetchRawSizeBeforeQuery = 0 - } - if consumer.lastFetchGroupCountBeforeQuery == -1 { - consumer.lastFetchGroupCountBeforeQuery = 0 - } - } - consumer.consumerCheckPointTracker.setNextCursor(consumer.nextFetchCursor) - level.Debug(consumer.logger).Log( - "shardId", consumer.shardId, - "fetch log count", consumer.lastFetchGroupCount, - ) - - // if cursor == nextCursor, no progress is needed - if cursor == pullLogMeta.NextCursor { - consumer.lastFetchLogGroupList = nil - consumer.saveCheckPointIfNeeded() - return false, nil - } - - return true, nil -} - -func (consumer *ShardConsumerWorker) consumerProcessTask() (rollBackCheckpoint string, err error) { - // If the user's consumption function reports a panic error, it will be captured and retry until sucessed. - defer func() { - if r := recover(); r != nil { - stackBuf := make([]byte, 1<<16) - n := runtime.Stack(stackBuf, false) - level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", stackBuf[:n]) - err = fmt.Errorf("get a panic when process: %v", r) - } - }() - if consumer.lastFetchLogGroupList != nil { - rollBackCheckpoint, err = consumer.processor.Process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) - consumer.saveCheckPointIfNeeded() - if err != nil { - return - } - consumer.lastFetchLogGroupList = nil - } - - return -} diff --git a/consumer/worker.go b/consumer/worker.go index 884853e8..308674a6 100644 --- a/consumer/worker.go +++ b/consumer/worker.go @@ -103,7 +103,10 @@ func (consumerWorker *ConsumerWorker) run() { break } shardConsumer := consumerWorker.getShardConsumer(shard) - shardConsumer.consume() + shardConsumer.ensureStarted() + if shardConsumer.shouldReportMetrics() { + shardConsumer.reportMetrics() + } } consumerWorker.cleanShardConsumer(heldShards) TimeToSleepInMillsecond(consumerWorker.client.option.DataFetchIntervalInMs, lastFetchTime, consumerWorker.workerShutDownFlag.Load()) @@ -121,8 +124,8 @@ func (consumerWorker *ConsumerWorker) shutDownAndWait() { func(key, value interface{}) bool { count++ consumer := value.(*ShardConsumerWorker) - if !consumer.isShutDownComplete() { - consumer.consumerShutDown() + if !consumer.isStopped() { + consumer.shutdown() } else { consumerWorker.shardConsumer.Delete(key) } @@ -141,7 +144,7 @@ func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsum if ok { return consumer.(*ShardConsumerWorker) } - consumerIns := initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.consumerHeatBeat, consumerWorker.processor, consumerWorker.Logger) + consumerIns := newShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.consumerHeatBeat, consumerWorker.processor, consumerWorker.Logger) consumerWorker.shardConsumer.Store(shardId, consumerIns) return consumerIns @@ -156,11 +159,11 @@ func (consumerWorker *ConsumerWorker) cleanShardConsumer(owned_shards []int) { if !Contain(shard, owned_shards) { level.Info(consumerWorker.Logger).Log("msg", "try to call shut down for unassigned consumer shard", "shardId", shard) - consumer.consumerShutDown() + consumer.shutdown() level.Info(consumerWorker.Logger).Log("msg", "Complete call shut down for unassigned consumer shard", "shardId", shard) } - if consumer.isShutDownComplete() { + if consumer.isStopped() { isDeleteShard := consumerWorker.consumerHeatBeat.removeHeartShard(shard) if isDeleteShard { level.Info(consumerWorker.Logger).Log("msg", "Remove an assigned consumer shard", "shardId", shard) diff --git a/consumer/worker_test.go b/consumer/worker_test.go index aa46d209..3569a4a3 100644 --- a/consumer/worker_test.go +++ b/consumer/worker_test.go @@ -30,7 +30,10 @@ func TestStartAndStop(t *testing.T) { } func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) { - fmt.Printf("shardId %d processing works sucess, logGroupSize: %d\n", shardId, len(logGroupList.LogGroups)) + fmt.Printf("time: %s, shardId %d processing works success, logGroupSize: %d, currentCursor: %s\n", + time.Now().Format("2006-01-02 15:04:05 000"), + shardId, len(logGroupList.LogGroups), + checkpointTracker.GetCurrentCursor()) checkpointTracker.SaveCheckPoint(true) return "", nil } @@ -69,7 +72,7 @@ func TestConsumerQueryNoData(t *testing.T) { ConsumerGroupName: "test-consumer", ConsumerName: "test-consumer-1", CursorPosition: END_CURSOR, - Query: "* | where \"request_method\" = 'GET'", + Query: "* | where \"Shard\" = '0'", } worker := InitConsumerWorkerWithCheckpointTracker(option, process)