From e75712d40d5a92a193cef0eb75cefb610528edac Mon Sep 17 00:00:00 2001 From: Crimson <39024757+crimson-gao@users.noreply.github.com> Date: Tue, 19 Nov 2024 16:19:58 +0800 Subject: [PATCH] Fix consumer progress (#296) --- consumer/shard_worker.go | 4 +- consumer/tasks.go | 22 ++++--- consumer/worker_test.go | 36 +++++++++-- log_store.go | 132 +++++++++++++++++---------------------- model.go | 19 +++--- utils.go | 8 +++ 6 files changed, 119 insertions(+), 102 deletions(-) diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index 64a9e3f1..adfae01e 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -84,8 +84,8 @@ func (consumer *ShardConsumerWorker) consume() { consumer.updateStatus(false) return } - err := consumer.nextFetchTask() - consumer.updateStatus(err == nil && consumer.lastFetchGroupCount > 0) + hasProgress, err := consumer.nextFetchTask() + consumer.updateStatus(err == nil && hasProgress) }() case PROCESSING: go func() { diff --git a/consumer/tasks.go b/consumer/tasks.go index 1df4fd66..b2869d19 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -15,7 +15,7 @@ func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) { if err != nil { return "", err } - if checkpoint != "" && err == nil { + if checkpoint != "" { consumer.consumerCheckPointTracker.initCheckPoint(checkpoint) return checkpoint, nil } @@ -45,23 +45,23 @@ func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) { return "", errors.New("CursorPositionError") } -func (consumer *ShardConsumerWorker) nextFetchTask() error { +func (consumer *ShardConsumerWorker) nextFetchTask() (hasProgress bool, err error) { // update last fetch time, for control fetch frequency consumer.lastFetchTime = time.Now() - - logGroup, pullLogMeta, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) + cursor := consumer.nextFetchCursor + logGroup, pullLogMeta, err := consumer.client.pullLogs(consumer.shardId, cursor) if err != nil { - return err + return false, err } // set cursors user to decide whether to save according to the execution of `process` consumer.consumerCheckPointTracker.setCurrentCursor(consumer.nextFetchCursor) consumer.lastFetchLogGroupList = logGroup consumer.nextFetchCursor = pullLogMeta.NextCursor consumer.lastFetchRawSize = pullLogMeta.RawSize - consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList) + consumer.lastFetchGroupCount = pullLogMeta.Count if consumer.client.option.Query != "" { consumer.lastFetchRawSizeBeforeQuery = pullLogMeta.RawSizeBeforeQuery - consumer.lastFetchGroupCountBeforeQuery = pullLogMeta.RawDataCountBeforeQuery + consumer.lastFetchGroupCountBeforeQuery = pullLogMeta.DataCountBeforeQuery if consumer.lastFetchRawSizeBeforeQuery == -1 { consumer.lastFetchRawSizeBeforeQuery = 0 } @@ -74,13 +74,15 @@ func (consumer *ShardConsumerWorker) nextFetchTask() error { "shardId", consumer.shardId, "fetch log count", consumer.lastFetchGroupCount, ) - if consumer.lastFetchGroupCount == 0 { + + // if cursor == nextCursor, no progress is needed + if cursor == pullLogMeta.NextCursor { consumer.lastFetchLogGroupList = nil - // may no new data can be pulled, no process func can trigger checkpoint saving consumer.saveCheckPointIfNeeded() + return false, nil } - return nil + return true, nil } func (consumer *ShardConsumerWorker) consumerProcessTask() (rollBackCheckpoint string, err error) { diff --git a/consumer/worker_test.go b/consumer/worker_test.go index a835115d..aa46d209 100644 --- a/consumer/worker_test.go +++ b/consumer/worker_test.go @@ -23,15 +23,16 @@ func TestStartAndStop(t *testing.T) { CursorPosition: BEGIN_CURSOR, } - worker := InitConsumerWorker(option, process) + worker := InitConsumerWorkerWithCheckpointTracker(option, process) worker.Start() worker.StopAndWait() } -func process(shardId int, logGroupList *sls.LogGroupList) string { - fmt.Printf("shardId %d processing works sucess", shardId) - return "" +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) { + fmt.Printf("shardId %d processing works sucess, logGroupSize: %d\n", shardId, len(logGroupList.LogGroups)) + checkpointTracker.SaveCheckPoint(true) + return "", nil } func TestStartAndStopCredentialsProvider(t *testing.T) { @@ -46,12 +47,35 @@ func TestStartAndStopCredentialsProvider(t *testing.T) { ConsumerName: "test-consumer-1", // This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. // Could be "begin", "end", "specific time format in time stamp", it's log receiving time. - CursorPosition: BEGIN_CURSOR, + CursorPosition: BEGIN_CURSOR, + AutoCommitDisabled: false, } - worker := InitConsumerWorker(option, process) + worker := InitConsumerWorkerWithCheckpointTracker(option, process) worker.Start() time.Sleep(time.Second * 20) worker.StopAndWait() } + +func TestConsumerQueryNoData(t *testing.T) { + option := LogHubConfig{ + Endpoint: os.Getenv("LOG_TEST_ENDPOINT"), + CredentialsProvider: sls.NewStaticCredentialsProvider( + os.Getenv("LOG_TEST_ACCESS_KEY_ID"), + os.Getenv("LOG_TEST_ACCESS_KEY_SECRET"), ""), + Project: os.Getenv("LOG_TEST_PROJECT"), + Logstore: os.Getenv("LOG_TEST_LOGSTORE"), + ConsumerGroupName: "test-consumer", + ConsumerName: "test-consumer-1", + CursorPosition: END_CURSOR, + Query: "* | where \"request_method\" = 'GET'", + } + + worker := InitConsumerWorkerWithCheckpointTracker(option, process) + + worker.Start() + time.Sleep(time.Second * 2000) + worker.StopAndWait() + +} diff --git a/log_store.go b/log_store.go index c70631f7..94c91eb7 100644 --- a/log_store.go +++ b/log_store.go @@ -496,7 +496,7 @@ func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) ([]byte, string, error) { // GetLogsBytes gets logs binary data from shard specified by shardId according cursor and endCursor. // The logGroupMaxCount is the max number of logGroup could be returned. // The nextCursor is the next curosr can be used to read logs at next time. -func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, pullLogMeta *PullLogMeta, err error) { +func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) ([]byte, *PullLogMeta, error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Accept": "application/x-protobuf", @@ -514,102 +514,84 @@ func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, pullL r, err := request(s.project, "GET", uri, h, nil) if err != nil { - return + return nil, nil, err } defer r.Body.Close() buf, err := ioutil.ReadAll(r.Body) if err != nil { - return + return nil, nil, err } - pullLogMeta = &PullLogMeta{} - pullLogMeta.Netflow = len(buf) if r.StatusCode != http.StatusOK { errMsg := &Error{} err = json.Unmarshal(buf, errMsg) if err != nil { - err = fmt.Errorf("failed to get cursor") dump, _ := httputil.DumpResponse(r, true) if IsDebugLevelMatched(1) { level.Error(Logger).Log("msg", string(dump)) } - return + return nil, nil, fmt.Errorf("failed parse errorCode json: %w", err) } - err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message) - return + return nil, nil, fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message) } - v, ok := r.Header["X-Log-Compresstype"] - if !ok || len(v) == 0 { - err = fmt.Errorf("can't find 'x-log-compresstype' header") - return + netflow := len(buf) + + nextCursor, err := parseHeaderString(r.Header, "X-Log-Cursor") + if err != nil { + return nil, nil, err } - var compressType = Compress_None - if v[0] == "lz4" { - compressType = Compress_LZ4 - } else if v[0] == "zstd" { - compressType = Compress_ZSTD - } else { - err = fmt.Errorf("unexpected compress type:%v", compressType) - return + rawSize, err := ParseHeaderInt(r, "X-Log-Bodyrawsize") + if err != nil { + return nil, nil, err } - - v, ok = r.Header["X-Log-Cursor"] - if !ok || len(v) == 0 { - err = fmt.Errorf("can't find 'x-log-cursor' header") - return + count, err := ParseHeaderInt(r, "X-Log-Count") + if err != nil { + return nil, nil, err } - pullLogMeta.NextCursor = v[0] - pullLogMeta.RawSize, err = ParseHeaderInt(r, "X-Log-Bodyrawsize") + pullMeta := &PullLogMeta{ + RawSize: rawSize, + NextCursor: nextCursor, + Netflow: netflow, + Count: count, + } + // If query is not nil, extract more headers + if plr.Query != "" { + pullMeta.RawSizeBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatasize") + pullMeta.DataCountBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatacount") + pullMeta.Lines, _ = ParseHeaderInt(r, "X-Log-Resultlines") + pullMeta.LinesBeforeQuery, _ = ParseHeaderInt(r, "X-Log-Rawdatalines") + pullMeta.FailedLines, _ = ParseHeaderInt(r, "X-Log-Failedlines") + } + if rawSize == 0 { + return make([]byte, 0), pullMeta, nil + } + + // decompress data + out := make([]byte, rawSize) + compressType, err := parseHeaderString(r.Header, "X-Log-Compresstype") if err != nil { - return + return nil, nil, err } - if pullLogMeta.RawSize > 0 { - out = make([]byte, pullLogMeta.RawSize) - switch compressType { - case Compress_LZ4: - uncompressedSize := 0 - if uncompressedSize, err = lz4.UncompressBlock(buf, out); err != nil { - return - } - if uncompressedSize != pullLogMeta.RawSize { - return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", uncompressedSize, pullLogMeta.RawSize) - } - case Compress_ZSTD: - out, err = slsZstdCompressor.Decompress(buf, out) - if err != nil { - return nil, nil, err - } - if len(out) != pullLogMeta.RawSize { - return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", len(out), pullLogMeta.RawSize) - } - default: - return nil, nil, fmt.Errorf("unexpected compress type: %d", compressType) + switch compressType { + case "lz4": + uncompressedSize := 0 + if uncompressedSize, err = lz4.UncompressBlock(buf, out); err != nil { + return nil, nil, err } + if uncompressedSize != rawSize { + return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", uncompressedSize, rawSize) + } + case "zstd": + out, err = slsZstdCompressor.Decompress(buf, out) + if err != nil { + return nil, nil, err + } + if len(out) != rawSize { + return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", len(out), rawSize) + } + default: + return nil, nil, fmt.Errorf("unexpected compress type: %s", compressType) } - // todo: add query meta - // If query is not nil, extract more headers - // if plr.Query != "" { - // pullLogMeta.RawSizeBeforeQuery, err = ParseHeaderInt(r, "X-Log-Rawdatasize") - // if err != nil { - // return - // } - // pullLogMeta.DataCountBeforeQuery, err = ParseHeaderInt(r, "X-Log-Rawdatacount") - // if err != nil { - // return - // } - // pullLogMeta.Lines, err = ParseHeaderInt(r, "X-Log-Resultlines") - // if err != nil { - // return - // } - // pullLogMeta.LinesBeforeQuery, err = ParseHeaderInt(r, "X-Log-Rawdatalines") - // if err != nil { - // return - // } - // pullLogMeta.FailedLines, err = ParseHeaderInt(r, "X-Log-Failedlines") - // if err != nil { - // return - // } - // } - return + return out, pullMeta, nil } // LogsBytesDecode decodes logs binary data returned by GetLogsBytes API diff --git a/model.go b/model.go index 02ec8fca..f1f25a36 100644 --- a/model.go +++ b/model.go @@ -75,15 +75,16 @@ func (plr *PullLogRequest) ToURLParams() url.Values { } type PullLogMeta struct { - NextCursor string - Netflow int - RawSize int - RawDataCountBeforeQuery int - RawSizeBeforeQuery int - Lines int - LinesBeforeQuery int - FailedLines int - DataCountBeforeQuery int + NextCursor string + Netflow int + RawSize int + Count int + // these fields are only present when query is set + RawSizeBeforeQuery int // processed raw size before query + Lines int // result lines after query + LinesBeforeQuery int // processed lines before query + FailedLines int // failed lines during query + DataCountBeforeQuery int // processed logGroup count before query } // GetHistogramsResponse defines response from GetHistograms call diff --git a/utils.go b/utils.go index f9b1f263..055e63e4 100644 --- a/utils.go +++ b/utils.go @@ -42,3 +42,11 @@ func ParseHeaderInt(r *http.Response, headerName string) (int, error) { } return -1, fmt.Errorf("can't find '%s' header", strings.ToLower(headerName)) } + +func parseHeaderString(header http.Header, headerName string) (string, error) { + v, ok := header[headerName] + if !ok || len(v) == 0 { + return "", fmt.Errorf("can't find '%s' header", strings.ToLower(headerName)) + } + return v[0], nil +}