diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index 836457ca..93876fc9 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -28,6 +28,7 @@ type ShardConsumerWorker struct { logger log.Logger lastFetchTimeForForceFlushCpt int64 isFlushCheckpointDone bool + rollBackCheckpoint string } func (consumer *ShardConsumerWorker) setConsumerStatus(status string) { @@ -55,6 +56,7 @@ func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, do fun isFlushCheckpointDone: true, logger: logger, lastFetchTimeForForceFlushCpt: 0, + rollBackCheckpoint: "", } return shardConsumeWorker } diff --git a/consumer/tasks.go b/consumer/tasks.go index 479aafb2..f5435409 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/aliyun/aliyun-log-go-sdk" "github.com/go-kit/kit/log/level" + "time" ) func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) { @@ -49,16 +50,35 @@ func (consumer *ShardConsumerWorker) consumerFetchTask() (*sls.LogGroupList, str } func (consumer *ShardConsumerWorker) consumerProcessTask() string { - // If the user's consumption function reports a panic error, it will be captured and exited. - rollBackCheckpoint := "" + // If the user's consumption function reports a panic error, it will be captured and retry until sucessed. defer func() { if r := recover(); r != nil { level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r) + for { + if consumer.consumerRetryProcessTask() == true { + break + } else { + time.Sleep(time.Second * 2) + } + } } }() if consumer.lastFetchLogGroupList != nil { - rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList) + consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList) consumer.consumerCheckPointTracker.flushCheck() } - return rollBackCheckpoint + return consumer.rollBackCheckpoint +} + +func (consumer *ShardConsumerWorker) consumerRetryProcessTask() bool { + level.Info(consumer.logger).Log("msg", "Start retrying the process function") + defer func() { + if r := recover(); r != nil { + level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r) + } + }() + consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList) + consumer.consumerCheckPointTracker.flushCheck() + return true + }