Skip to content

Commit

Permalink
fix clickhouse error handling (in many cases context canceled)
Browse files Browse the repository at this point in the history
  • Loading branch information
msaf1980 committed Aug 26, 2022
1 parent d3c3f04 commit 6c2bf3c
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 48 deletions.
2 changes: 2 additions & 0 deletions cmd/e2e-test/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func verifyRender(address string, check *RenderCheck) []string {
errStr := strings.TrimRight(err.Error(), "\n")
if check.errorRegexp == nil || !check.errorRegexp.MatchString(errStr) {
errors = append(errors, url+": "+errStr)
} else {
fmt.Printf("EXPECTED ERROR, SUCCESS : %s\n", errStr)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion finder/tag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/lomik/graphite-clickhouse/helper/clickhouse"
chtest "github.com/lomik/graphite-clickhouse/helper/tests/clickhouse"
)

func TestTagsMakeSQL(t *testing.T) {
Expand Down Expand Up @@ -96,7 +97,7 @@ func _TestTags(t *testing.T) {
for _, test := range table {
testName := fmt.Sprintf("query: %#v", test.query)

srv := clickhouse.NewTestServer()
srv := chtest.NewTestServer()

m := NewMockFinder(mockData)
f := WrapTag(m, srv.URL, "graphite_tag", clickhouse.Options{Timeout: time.Second, ConnectTimeout: time.Second})
Expand Down
73 changes: 44 additions & 29 deletions helper/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"

Expand All @@ -22,20 +23,20 @@ import (
"go.uber.org/zap/zapcore"
)

type ErrDataParse struct {
type ErrWithDescr struct {
err string
data string
}

func NewErrDataParse(err string, data string) error {
return &ErrDataParse{err, data}
func NewErrWithDescr(err string, data string) error {
return &ErrWithDescr{err, data}
}

func (e *ErrDataParse) Error() string {
return fmt.Sprintf("%s: %s", e.err, e.data)
func (e *ErrWithDescr) Error() string {
return e.err + ": " + e.data
}

func (e *ErrDataParse) PrependDescription(test string) {
func (e *ErrWithDescr) PrependDescription(test string) {
e.data = test + e.data
}

Expand All @@ -55,52 +56,66 @@ var ErrUvarintRead = errors.New("ReadUvarint: Malformed array")
var ErrUvarintOverflow = errors.New("ReadUvarint: varint overflows a 64-bit integer")
var ErrClickHouseResponse = errors.New("Malformed response from clickhouse")

func extractClickhouseError(e string) (int, string) {
if strings.HasPrefix(e, "clickhouse response status 500: Code:") || strings.HasPrefix(e, "Malformed response from clickhouse") {
if start := strings.Index(e, ": Limit for "); start != -1 {
e := e[start+8:]
if end := strings.Index(e, " (version "); end != -1 {
e = e[0:end]
}
return http.StatusForbidden, "Storage read limit " + e
} else if start := strings.Index(e, ": Memory limit "); start != -1 {
return http.StatusForbidden, "Storage read limit for memory"
} else if strings.HasPrefix(e, "clickhouse response status 500: Code: 170,") {
// distributed table configuration error
// clickhouse response status 500: Code: 170, e.displayText() = DB::Exception: Requested cluster 'cluster' not found
return http.StatusServiceUnavailable, "Storage configuration error"
}
}
return http.StatusInternalServerError, "Storage error"
}

func HandleError(w http.ResponseWriter, err error) {
errStr := err.Error()
if err == ErrInvalidTimeRange {
http.Error(w, err.Error(), http.StatusBadRequest)
http.Error(w, errStr, http.StatusBadRequest)
return
}
if errors.Is(err, context.Canceled) {
http.Error(w, "Storage read context canceled", http.StatusGatewayTimeout)
if _, ok := err.(*ErrWithDescr); ok {
status, message := extractClickhouseError(errStr)
http.Error(w, message, status)
return
}
netErr, ok := err.(net.Error)
if ok {
if netErr.Timeout() {
http.Error(w, "Storage read timeout", http.StatusGatewayTimeout)
} else if strings.HasSuffix(err.Error(), "connect: no route to host") ||
strings.HasSuffix(err.Error(), "connect: connection refused") ||
strings.HasSuffix(err.Error(), ": connection reset by peer") ||
strings.HasPrefix(err.Error(), "dial tcp: lookup ") { // DNS lookup
http.Error(w, "Storage error", http.StatusServiceUnavailable)
} else if strings.HasSuffix(errStr, "connect: no route to host") ||
strings.HasPrefix(errStr, "dial tcp: lookup ") { // DNS lookup
http.Error(w, "Storage route error", http.StatusServiceUnavailable)
} else if strings.HasSuffix(errStr, "connect: connection refused") ||
strings.HasSuffix(errStr, ": connection reset by peer") {
http.Error(w, "Storage connect error", http.StatusServiceUnavailable)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
http.Error(w, "Storage network error", http.StatusServiceUnavailable)
}
return
}
errCode, ok := err.(*ErrorWithCode)
if ok {
if (errCode.Code > 500 && errCode.Code < 512) ||
errCode.Code == http.StatusBadRequest || errCode.Code == http.StatusForbidden {
http.Error(w, html.EscapeString(errCode.Error()), errCode.Code)
http.Error(w, html.EscapeString(errStr), errCode.Code)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
http.Error(w, html.EscapeString(errStr), http.StatusInternalServerError)
}
return
}
_, ok = err.(*ErrDataParse)
if ok || strings.HasPrefix(err.Error(), "clickhouse response status 500: Code:") {
if strings.Contains(err.Error(), ": Limit for ") {
//logger.Info("limit", zap.Error(err))
http.Error(w, "Storage read limit", http.StatusForbidden)
} else if !ok && strings.HasPrefix(err.Error(), "clickhouse response status 500: Code: 170,") {
// distributed table configuration error
// clickhouse response status 500: Code: 170, e.displayText() = DB::Exception: Requested cluster 'cluster' not found
http.Error(w, "Storage configuration error", http.StatusServiceUnavailable)
}
if errors.Is(err, context.Canceled) {
http.Error(w, "Storage read context canceled", http.StatusGatewayTimeout)
} else {
//logger.Debug("query", zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError)
http.Error(w, html.EscapeString(errStr), http.StatusInternalServerError)
}
}

Expand Down Expand Up @@ -278,7 +293,7 @@ func reader(ctx context.Context, dsn string, query string, postBody io.Reader, g
} else if resp.StatusCode != 200 {
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
err = fmt.Errorf("clickhouse response status %d: %s", resp.StatusCode, string(body))
err = NewErrWithDescr("clickhouse response status "+strconv.Itoa(resp.StatusCode), string(body))
return
}

Expand Down
50 changes: 50 additions & 0 deletions helper/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package clickhouse

import (
"net/http"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_extractClickhouseError(t *testing.T) {
tests := []struct {
errStr string
wantStatus int
wantMessage string
}{
{
errStr: "clickhouse response status 500: Code: 158. DB::Exception: Received from host:9000. DB::Exception: Limit for rows (controlled by 'max_rows_to_read' setting) exceeded, max rows: 10.00, current rows: 8.19 thousand. (TOO_MANY_ROWS) (version 22.2.2.1)\n",
wantStatus: http.StatusForbidden,
wantMessage: "Storage read limit for rows (controlled by 'max_rows_to_read' setting) exceeded, max rows: 10.00, current rows: 8.19 thousand. (TOO_MANY_ROWS)",
},
{

errStr: "clickhouse response status 500: Code: 158. DB::Exception: Limit for rows (controlled by 'max_rows_to_read' setting) exceeded, max rows: 1.00, current rows: 50.00. (TOO_MANY_ROWS) (version 22.1.3.7 (official build))\n",
wantStatus: http.StatusForbidden,
wantMessage: "Storage read limit for rows (controlled by 'max_rows_to_read' setting) exceeded, max rows: 1.00, current rows: 50.00. (TOO_MANY_ROWS)",
},
{
errStr: "Malformed response from clickhouse: Code: 241. DB::Exception: Received from host:9000. DB::Exception: Memory limit (for query) exceeded: would use 77.20 GiB (attempt to allocate chunk of 13421776 bytes), maximum: 4.51 GiB: While executing AggregatingTransform. (MEMORY_LIMIT_EXCEEDED) (version 22.2.2.1)\n",
wantStatus: http.StatusForbidden,
wantMessage: "Storage read limit for memory",
},
{
errStr: "Malformed response from clickhouse : Code: 241. DB::Exception: Received from host:9000. DB::Exception: Memory limit (for query) exceeded: would use 6.66 GiB (attempt to allocate chunk of 8537964 bytes), maximum: 4.51 GiB: (avg_value_size_hint = 208.48085594177246, avg_chars_size = 240.57702713012694, limit = 32768): ... : While executing MergeTreeThread. (MEMORY_LIMIT_EXCEEDED)",
wantStatus: http.StatusForbidden,
wantMessage: "Storage read limit for memory",
},
{
errStr: "Other error",
wantStatus: http.StatusInternalServerError,
wantMessage: "Storage error",
},
}
for _, tt := range tests {
t.Run(tt.errStr, func(t *testing.T) {
gotStatus, gotMessage := extractClickhouseError(tt.errStr)
assert.Equal(t, tt.wantStatus, gotStatus)
assert.Equal(t, tt.wantMessage, gotMessage)
})
}
}
4 changes: 2 additions & 2 deletions helper/clickhouse/external-data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func getTestCases() (tables []ExternalTable) {
},
},
Format: "TSV",
Data: []byte(`f 3`),
Data: []byte(`f 3`),
},
{
Name: "test2",
Expand All @@ -45,7 +45,7 @@ func getTestCases() (tables []ExternalTable) {
},
},
Format: "TSKV",
Data: []byte(`aFloat=13.13 aDate=2013-12-13`),
Data: []byte(`aFloat=13.13 aDate=2013-12-13`),
},
}
return
Expand Down
File renamed without changes.
12 changes: 6 additions & 6 deletions render/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ func (d *data) setSteps(cond *conditions) {
func splitErrorHandler(data *[]byte, atEOF bool, tokenLen int, err error) (int, []byte, error) {
if err == clickhouse.ErrUvarintRead {
if atEOF {
return 0, nil, clickhouse.NewErrDataParse(errClickHouseResponse.Error(), string(*data))
return 0, nil, clickhouse.NewErrWithDescr(errClickHouseResponse.Error(), string(*data))
}
// signal for read more
return 0, nil, nil
} else if err != nil || (len(*data) < tokenLen && atEOF) {
return 0, nil, clickhouse.NewErrDataParse(errClickHouseResponse.Error(), string(*data))
return 0, nil, clickhouse.NewErrWithDescr(errClickHouseResponse.Error(), string(*data))
}
// signal for read more
return 0, nil, nil
Expand Down Expand Up @@ -174,7 +174,7 @@ func dataSplitAggregated(data []byte, atEOF bool) (advance int, token []byte, er
}

if timeLen != valueLen {
return 0, nil, clickhouse.NewErrDataParse(errClickHouseResponse.Error()+": Different amount of Times and Values", string(data))
return 0, nil, clickhouse.NewErrWithDescr(errClickHouseResponse.Error()+": Different amount of Times and Values", string(data))
}

return tokenLen, data[:tokenLen], nil
Expand Down Expand Up @@ -212,7 +212,7 @@ func dataSplitUnaggregated(data []byte, atEOF bool) (advance int, token []byte,
}

if timeLen != valueLen || timeLen != timestampLen {
return 0, nil, clickhouse.NewErrDataParse(errClickHouseResponse.Error()+": Different amount of Values, Times and Timestamps", string(data))
return 0, nil, clickhouse.NewErrWithDescr(errClickHouseResponse.Error()+": Different amount of Values, Times and Timestamps", string(data))
}

return tokenLen, data[:tokenLen], nil
Expand Down Expand Up @@ -316,9 +316,9 @@ func (d *data) parseResponse(ctx context.Context, bodyReader io.ReadCloser, cond

err := scanner.Err()
if err != nil {
dataErr, ok := err.(*clickhouse.ErrDataParse)
dataErr, ok := err.(*clickhouse.ErrWithDescr)
if ok {
// format full error string
// format full error string, sometimes parse not failed at start orf error string
dataErr.PrependDescription(string(rowStart))
}
bodyReader.Close()
Expand Down
15 changes: 8 additions & 7 deletions render/data/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,17 @@ func (q *query) getDataPoints(ctx context.Context, cond *conditions) error {
},
extData,
)
if err != nil {
if err == nil {
err = data.parseResponse(queryContext, body, cond)
if err != nil {
logger.Error("reader", zap.Error(err))
data.e <- err
queryCancel()
}
} else {
logger.Error("reader", zap.Error(err))
queryCancel()
data.e <- err
}
err = data.parseResponse(queryContext, body, cond)
if err != nil {
logger.Error("reader", zap.Error(err))
queryCancel()
data.e <- err
}
}()
}
Expand Down
5 changes: 5 additions & 0 deletions tests/error_handling/graphite-clickhouse.conf.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ query-params = [
duration = "1h",
url = "{{ .PROXY_URL }}/?max_rows_to_read=1&max_result_bytes=1&readonly=2&log_queries=1",
data-timeout = "5s"
},
{
duration = "2h",
url = "{{ .PROXY_URL }}/?max_memory_usage=1&max_memory_usage_for_user=1&readonly=2&log_queries=1",
data-timeout = "5s"
}
]

Expand Down
14 changes: 11 additions & 3 deletions tests/error_handling/test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,25 @@ values = [1.0, 2.0]

# Check addional queryparam (storage read limit)
[[test.render_checks]]
from = "now-14400"
from = "now-7190"
until = "now"
targets = [ "test.long" ]
timeout = "5s"
error_regexp = "^403: Storage read limit"
error_regexp = "^403: Storage read limit for rows"

# Check data-timeout on addional queryparam
[[test.render_checks]]
from = "now-14400"
from = "now-7190"
until = "now"
targets = [ "test.long" ]
timeout = "2s"
proxy_delay = "1500ms"
error_regexp = "^504: Storage read timeout"

# Check addional queryparam (storage read limit)
[[test.render_checks]]
from = "now-14400"
until = "now"
targets = [ "test.long" ]
timeout = "5s"
error_regexp = "^403: Storage read limit for memory"

0 comments on commit 6c2bf3c

Please sign in to comment.