Skip to content

Commit

Permalink
Transport: Add support for request retries
Browse files Browse the repository at this point in the history
This patch adds the configuration parameters and supporting logic for retrying
a failed request: a feature common to official Elasticsearch clients.

Executable example:

```golang
//+build ignore

package main

import (
	"context"
	"fmt"
	"log"
	"math"
	"net"
	"net/http"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"

	"github.com/elastic/go-elasticsearch/v8"
	"github.com/elastic/go-elasticsearch/v8/esapi"
	"github.com/elastic/go-elasticsearch/v8/estransport"

	"github.com/cenkalti/backoff"
)

var (
	_ = fmt.Print
	_ = context.WithTimeout
	_ = math.Exp
	_ = strings.NewReader
	_ = http.DefaultClient
)

func main() {
	log.SetFlags(0)

	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	aborted := make(chan os.Signal)
	signal.Notify(aborted, os.Interrupt, syscall.SIGTERM)

	retryBackoff := backoff.NewExponentialBackOff()
	retryBackoff.InitialInterval = time.Second
	_ = retryBackoff

	cfg := elasticsearch.Config{
		Addresses: []string{
			"http://localhost:9200",
			// "http://localhost:1000",
			// "http://localhost:2000",
			// "http://localhost:3000",
			"http://localhost:4000",
			"http://localhost:5000",
			// "http://localhost:6000",
			// "http://localhost:7000",
			// "http://localhost:8000",
			// "http://localhost:9200",
			// "http://localhost:9000",
			// "http://localhost:9201",
			// "http://localhost:9202",
		},

		Logger: &estransport.ColorLogger{
			Output:             os.Stdout,
			EnableRequestBody:  true,
			EnableResponseBody: true,
		},

		// RetryOnStatus:        []int{404},
		// EnableRetryOnTimeout: true,
		// MaxRetries: 10,

		// RetryBackoff: func(i int) time.Duration {
		// 	d := time.Duration(i) * time.Second
		// 	fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
		// 	return d
		// },

		// RetryBackoff: func(i int) time.Duration {
		// 	d := time.Duration(math.Exp2(float64(i))) * time.Second
		// 	fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
		// 	return d
		// },

		RetryBackoff: func(i int) time.Duration {
			if i == 1 {
				retryBackoff.Reset()
			}
			d := retryBackoff.NextBackOff()
			fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
			return d
		},

		// Transport: &http.Transport{
		// 	ResponseHeaderTimeout: time.Millisecond,
		// },
	}

	es, err := elasticsearch.NewClient(cfg)
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}
	log.Println("Client ready with URLs:", es.Transport.(*estransport.Client).URLs())

	go func() {
		<-aborted
		log.Println("\nDone!")
		os.Exit(0)
	}()

	for {
		select {
		case <-ticker.C:
			var (
				res *esapi.Response
				err error
			)
			// res, err = es.Info()

			// ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
			// defer cancel()
			// res, err = es.Info(es.Info.WithContext(ctx))

			// es.Search(es.Search.WithTimeout(time.Nanosecond))

			// res, err = es.Get("test", "MISSING")

			res, err = es.Index("test", strings.NewReader(`{"foo":"bar"}`))

			if err != nil {
				if e, ok := err.(net.Error); ok {
					log.Fatalf("Error getting response: [%T]: %s (timeout:%v)", e, e, e.Timeout())
				} else {
					log.Fatalf("Error getting response: [%T]: %s", err, err)
				}
			}
			if res.IsError() {
				log.Fatalf("Error response: %s", res.Status())
			}
		}
	}
}

```

Closes #67

(cherry picked from commit 7f37b7b)
  • Loading branch information
karmi committed Oct 30, 2019
1 parent 4cfb2d7 commit 3577217
Show file tree
Hide file tree
Showing 8 changed files with 448 additions and 73 deletions.
4 changes: 3 additions & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Call the Elasticsearch APIs by invoking the corresponding methods on the client:
log.Println(res)
See the github.com/elastic/go-elasticsearch/esapi package for more information and examples.
See the github.com/elastic/go-elasticsearch/esapi package for more information about using the API.
See the github.com/elastic/go-elasticsearch/estransport package for more information about configuring the transport.
*/
package elasticsearch
14 changes: 14 additions & 0 deletions elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/url"
"os"
"strings"
"time"

"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/go-elasticsearch/v7/estransport"
Expand All @@ -36,6 +37,13 @@ type Config struct {
CloudID string // Endpoint for the Elastic Service (https://elastic.co/cloud).
APIKey string // Base64-encoded token for authorization; if set, overrides username and password.

RetryOnStatus []int // List of status codes for retry. Default: 502, 503, 504.
DisableRetry bool // Default: false.
EnableRetryOnTimeout bool // Default: false.
MaxRetries int // Default: 3.

RetryBackoff func(attempt int) time.Duration // Optional backoff duration. Default: nil.

Transport http.RoundTripper // The HTTP transport object.
Logger estransport.Logger // The logger object.
}
Expand Down Expand Up @@ -116,6 +124,12 @@ func NewClient(cfg Config) (*Client, error) {
Password: cfg.Password,
APIKey: cfg.APIKey,

RetryOnStatus: cfg.RetryOnStatus,
DisableRetry: cfg.DisableRetry,
EnableRetryOnTimeout: cfg.EnableRetryOnTimeout,
MaxRetries: cfg.MaxRetries,
RetryBackoff: cfg.RetryBackoff,

Transport: cfg.Transport,
Logger: cfg.Logger,
})
Expand Down
5 changes: 3 additions & 2 deletions esapi/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ about the API endpoints and parameters.
The Go API is generated from the Elasticsearch JSON specification at
https://github.com/elastic/elasticsearch/tree/master/rest-api-spec/src/main/resources/rest-api-spec/api
by the internal package available at
https://github.com/elastic/go-elasticsearch/tree/master/internal/cmd/generate/commands.
https://github.com/elastic/go-elasticsearch/tree/master/internal/cmd/generate/commands/gensource.
The API is tested by integration tests common to all Elasticsearch official clients, generated from the
source at https://github.com/elastic/elasticsearch/tree/master/rest-api-spec/src/main/resources/rest-api-spec/test. The generator is provided by the internal package internal/cmd/generate.
source at https://github.com/elastic/elasticsearch/tree/master/rest-api-spec/src/main/resources/rest-api-spec/test.
The generator is provided by the internal package available at internal/cmd/generate/commands/gentests.
*/
package esapi
31 changes: 1 addition & 30 deletions esapi/esapi.request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@
package esapi

import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
)

const (
Expand All @@ -31,30 +27,5 @@ type Request interface {
// newRequest creates an HTTP request.
//
func newRequest(method, path string, body io.Reader) (*http.Request, error) {
r := http.Request{
Method: method,
URL: &url.URL{Path: path},
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(http.Header),
}

if body != nil {
switch b := body.(type) {
case *bytes.Buffer:
r.Body = ioutil.NopCloser(body)
r.ContentLength = int64(b.Len())
case *bytes.Reader:
r.Body = ioutil.NopCloser(body)
r.ContentLength = int64(b.Len())
case *strings.Reader:
r.Body = ioutil.NopCloser(body)
r.ContentLength = int64(b.Len())
default:
r.Body = ioutil.NopCloser(body)
}
}

return &r, nil
return http.NewRequest(method, path, body)
}
13 changes: 12 additions & 1 deletion estransport/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@ Package estransport provides the transport layer for the Elasticsearch client.
It is automatically included in the client provided by the github.com/elastic/go-elasticsearch package
and is not intended for direct use: to configure the client, use the elasticsearch.Config struct.
The default HTTP transport of the client is http.Transport.
The default HTTP transport of the client is http.Transport; use the Transport option to customize it;
see the _examples/customization.go file in this repository for information.
The package defines the "Selector" interface for getting a URL from the list. At the moment,
the implementation is rather minimal: the client takes a slice of url.URL pointers,
and round-robins across them when performing the request.
The package will automatically retry requests on network-related errors, and on specific
response status codes (by default 502, 503, 504). Use the RetryOnStatus option to customize the list.
The transport will not retry a timeout network error, unless enabled by setting EnableRetryOnTimeout to true.
Use the MaxRetries option to configure the number of retries, and set DisableRetry to true
to disable the retry behaviour altogether.
By default, the retry will be performed without any delay; to configure a backoff interval,
implement the RetryBackoff option function; see an example in the package unit tests for information.
The package defines the "Logger" interface for logging information about request and response.
It comes with several bundled loggers for logging in text and JSON.
Expand Down
135 changes: 105 additions & 30 deletions estransport/estransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"regexp"
Expand All @@ -26,6 +27,9 @@ const Version = version.Client
var (
userAgent string
reGoVersion = regexp.MustCompile(`go(\d+\.\d+\..+)`)

defaultMaxRetries = 3
defaultRetryOnStatus = [...]int{502, 503, 504}
)

func init() {
Expand All @@ -46,6 +50,12 @@ type Config struct {
Password string
APIKey string

RetryOnStatus []int
DisableRetry bool
EnableRetryOnTimeout bool
MaxRetries int
RetryBackoff func(attempt int) time.Duration

Transport http.RoundTripper
Logger Logger
}
Expand All @@ -58,6 +68,12 @@ type Client struct {
password string
apikey string

retryOnStatus []int
disableRetry bool
enableRetryOnTimeout bool
maxRetries int
retryBackoff func(attempt int) time.Duration

transport http.RoundTripper
selector Selector
logger Logger
Expand All @@ -72,12 +88,26 @@ func New(cfg Config) *Client {
cfg.Transport = http.DefaultTransport
}

if len(cfg.RetryOnStatus) == 0 {
cfg.RetryOnStatus = defaultRetryOnStatus[:]
}

if cfg.MaxRetries == 0 {
cfg.MaxRetries = defaultMaxRetries
}

return &Client{
urls: cfg.URLs,
username: cfg.Username,
password: cfg.Password,
apikey: cfg.APIKey,

retryOnStatus: cfg.RetryOnStatus,
disableRetry: cfg.DisableRetry,
enableRetryOnTimeout: cfg.EnableRetryOnTimeout,
maxRetries: cfg.MaxRetries,
retryBackoff: cfg.RetryBackoff,

transport: cfg.Transport,
selector: NewRoundRobinSelector(cfg.URLs...),
logger: cfg.Logger,
Expand All @@ -88,41 +118,86 @@ func New(cfg Config) *Client {
//
func (c *Client) Perform(req *http.Request) (*http.Response, error) {
var (
dupReqBody io.Reader
)
res *http.Response
err error

// Get URL from the Selector
//
u, err := c.getURL()
if err != nil {
// TODO(karmi): Log error
return nil, fmt.Errorf("cannot get URL: %s", err)
}
dupReqBodyForLog io.ReadCloser
)

// Update request
//
c.setURL(u, req)
c.setUserAgent(req)
c.setAuthorization(u, req)
c.setReqUserAgent(req)

for i := 1; i <= c.maxRetries; i++ {
var (
nodeURL *url.URL
shouldRetry bool
)

// Get URL from the Selector
//
nodeURL, err = c.getURL()
if err != nil {
// TODO(karmi): Log error
return nil, fmt.Errorf("cannot get URL: %s", err)
}

// Duplicate request body for logger
//
if c.logger != nil && c.logger.RequestBodyEnabled() {
if req.Body != nil && req.Body != http.NoBody {
dupReqBody, req.Body, _ = duplicateBody(req.Body)
// Update request
//
c.setReqURL(nodeURL, req)
c.setReqAuth(nodeURL, req)

// Duplicate request body for logger
//
if c.logger != nil && c.logger.RequestBodyEnabled() {
if req.Body != nil && req.Body != http.NoBody {
dupReqBodyForLog, req.Body, _ = duplicateBody(req.Body)
}
}
}

// Set up time measures and execute the request
//
start := time.Now().UTC()
res, err := c.transport.RoundTrip(req)
dur := time.Since(start)
// Set up time measures and execute the request
//
start := time.Now().UTC()
res, err = c.transport.RoundTrip(req)
dur := time.Since(start)

// Log request and response
//
if c.logger != nil {
c.logRoundTrip(req, res, dupReqBody, err, start, dur)
// Log request and response
//
if c.logger != nil {
c.logRoundTrip(req, res, dupReqBodyForLog, err, start, dur)
}

// Retry only on network errors, but don't retry on timeout errors, unless configured
//
if err != nil {
if err, ok := err.(net.Error); ok {
if (!err.Timeout() || c.enableRetryOnTimeout) && !c.disableRetry {
shouldRetry = true
}
}
}

// Retry on configured response statuses
//
if res != nil && !c.disableRetry {
for _, code := range c.retryOnStatus {
if res.StatusCode == code {
shouldRetry = true
}
}
}

// Break if retry should not be performed
//
if !shouldRetry {
break
}

// Delay the retry if a backoff function is configured
//
if c.retryBackoff != nil {
time.Sleep(c.retryBackoff(i))
}
}

// TODO(karmi): Wrap error
Expand All @@ -139,7 +214,7 @@ func (c *Client) getURL() (*url.URL, error) {
return c.selector.Select()
}

func (c *Client) setURL(u *url.URL, req *http.Request) *http.Request {
func (c *Client) setReqURL(u *url.URL, req *http.Request) *http.Request {
req.URL.Scheme = u.Scheme
req.URL.Host = u.Host

Expand All @@ -154,7 +229,7 @@ func (c *Client) setURL(u *url.URL, req *http.Request) *http.Request {
return req
}

func (c *Client) setAuthorization(u *url.URL, req *http.Request) *http.Request {
func (c *Client) setReqAuth(u *url.URL, req *http.Request) *http.Request {
if _, ok := req.Header["Authorization"]; !ok {
if u.User != nil {
password, _ := u.User.Password()
Expand All @@ -180,7 +255,7 @@ func (c *Client) setAuthorization(u *url.URL, req *http.Request) *http.Request {
return req
}

func (c *Client) setUserAgent(req *http.Request) *http.Request {
func (c *Client) setReqUserAgent(req *http.Request) *http.Request {
req.Header.Set("User-Agent", userAgent)
return req
}
Expand Down
Loading

0 comments on commit 3577217

Please sign in to comment.