Skip to content

Commit

Permalink
refactor: evict cache for connection error (#45)
Browse files Browse the repository at this point in the history
* chore: bump cache

* update pr template

* add error ctx

* return write err msg

* add todo comments
  • Loading branch information
jiacai2050 authored Sep 5, 2023
1 parent 6447e99 commit 63eea2d
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 45 deletions.
30 changes: 3 additions & 27 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
@@ -1,31 +1,7 @@
# Which issue does this PR close?
## Rationale

Closes #

# Rationale for this change

<!---
Why are you proposing this change? If this is already explained clearly in the issue, then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.
-->
## Detailed Changes

# What changes are included in this PR?

<!---
There is no need to duplicate the description in the issue here, but it is sometimes worth providing a summary of the individual changes in this PR to help reviewers understand the structure.
-->

# Are there any user-facing changes?

<!---
Please mention if:
- there are user-facing changes that need to update the documentation or configuration.
- this is a breaking change to public APIs
-->

# How does this change test

<!--
Please describe how you test this change (like by unit test case, integration test or some other ways) if this change has touched the code.
-->
## Test Plan
58 changes: 43 additions & 15 deletions ceresdb/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package ceresdb

import (
"context"
"fmt"
"strings"

"github.com/pkg/errors"
)

type clientImpl struct {
Expand All @@ -24,9 +26,22 @@ func newClient(endpoint string, routeMode RouteMode, opts options) (Client, erro
}, nil
}

func shouldClearRoute(err error) bool {
if err != nil {
if ceresdbErr, ok := err.(*CeresdbError); ok && ceresdbErr.ShouldClearRoute() {
return true
} else if strings.Contains(err.Error(), "connection error") {
// TODO: Find a better way to check if err means remote endpoint is down.
return true
}
}

return false
}

func (c *clientImpl) SQLQuery(ctx context.Context, req SQLQueryRequest) (SQLQueryResponse, error) {
if err := c.withDefaultRequestContext(&req.ReqCtx); err != nil {
return SQLQueryResponse{}, err
return SQLQueryResponse{}, errors.Wrap(err, "add request ctx")
}

if len(req.Tables) == 0 {
Expand All @@ -35,54 +50,67 @@ func (c *clientImpl) SQLQuery(ctx context.Context, req SQLQueryRequest) (SQLQuer

routes, err := c.routeClient.RouteFor(req.ReqCtx, req.Tables)
if err != nil {
return SQLQueryResponse{}, fmt.Errorf("route tables failed, tables:%v, err:%v", req.Tables, err)
return SQLQueryResponse{}, errors.Wrapf(err, "route tables failed, names:%v", req.Tables)
}
for _, route := range routes {
queryResponse, err := c.rpcClient.SQLQuery(ctx, route.Endpoint, req)
if ceresdbErr, ok := err.(*CeresdbError); ok && ceresdbErr.ShouldClearRoute() {

var endpoint string
if v, ok := routes[req.Tables[0]]; ok {
endpoint = v.Endpoint
} else {
return SQLQueryResponse{}, errors.Wrapf(ErrEmptyRoute, "failed to route table, name:%s", req.Tables[0])
}

resp, err := c.rpcClient.SQLQuery(ctx, endpoint, req)
if err != nil {
if shouldClearRoute(err) {
c.routeClient.ClearRouteFor(req.Tables)
}
return queryResponse, err

return SQLQueryResponse{}, errors.Wrap(err, "do grpc query")
}
return SQLQueryResponse{}, ErrEmptyRoute

return resp, nil
}

func (c *clientImpl) Write(ctx context.Context, req WriteRequest) (WriteResponse, error) {
if err := c.withDefaultRequestContext(&req.ReqCtx); err != nil {
return WriteResponse{}, err
return WriteResponse{}, errors.Wrap(err, "add request ctx")
}

if len(req.Points) == 0 {
return WriteResponse{}, ErrNullRows
}

tables := getTablesFromPoints(req.Points)

routes, err := c.routeClient.RouteFor(req.ReqCtx, tables)
if err != nil {
return WriteResponse{}, err
return WriteResponse{}, errors.Wrap(err, "route table")
}

pointsByRoute, err := splitPointsByRoute(req.Points, routes)
if err != nil {
return WriteResponse{}, err
return WriteResponse{}, errors.Wrap(err, "split points by route")
}

// TODO
// Convert to parallel write
// TODO(chenxiang): Convert to parallel write
ret := WriteResponse{}
for endpoint, points := range pointsByRoute {
response, err := c.rpcClient.Write(ctx, endpoint, req.ReqCtx, points)
if err != nil {
if ceresdbErr, ok := err.(*CeresdbError); ok && ceresdbErr.ShouldClearRoute() {
if shouldClearRoute(err) {
c.routeClient.ClearRouteFor(getTablesFromPoints(points))
}

// Only return first error message now.
if ret.Message != "" {
ret.Message = err.Error()
}
ret = combineWriteResponse(ret, WriteResponse{Failed: uint32(len(points))})
continue
}
ret = combineWriteResponse(ret, response)
}

return ret, nil
}

Expand Down
1 change: 1 addition & 0 deletions ceresdb/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func newRouteClient(endpoint string, routeMode RouteMode, rpcClient *rpcClient,
if err != nil {
return nil, err
}

routeClient.routeCache = routeCache
return routeClient, nil
case Proxy:
Expand Down
1 change: 1 addition & 0 deletions ceresdb/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type WriteRequest struct {
type WriteResponse struct {
Success uint32
Failed uint32
Message string
}

type SQLQueryRequest struct {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.17
require (
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230228090856-37ba6214b131
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40
github.com/hashicorp/golang-lru v0.6.0
github.com/hashicorp/golang-lru v1.0.2
github.com/stretchr/testify v1.8.1
google.golang.org/grpc v1.47.0
)
Expand All @@ -21,6 +21,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/golang-lru v0.6.0 h1:uL2shRDx7RTrOrTCUZEGP/wJUFiUI8QT6E7z5o8jga4=
github.com/hashicorp/golang-lru v0.6.0/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
Expand All @@ -84,6 +84,7 @@ github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk
github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4=
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down

0 comments on commit 63eea2d

Please sign in to comment.