Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Add support for retrying requests #67

Closed
wants to merge 18 commits into from
Closed

[DRAFT] Add support for retrying requests #67

wants to merge 18 commits into from

Conversation

karmi
Copy link
Contributor

@karmi karmi commented Jul 16, 2019

This patch adds a draft of support for retrying the requests automatically by the client.

@karmi karmi marked this pull request as ready for review July 16, 2019 07:11
r.Body = ioutil.NopCloser(body)
if body != nil && req.GetBody == nil {
req.GetBody = func() (io.ReadCloser, error) {
return ioutil.NopCloser(body), nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work, you're just returning the same body over and over. The first attempt will consume it, and then it'll be empty on retries. Instead you'd need to modify req.Body so that it reads into a buffer, and then have GetBody use that buffer. Basically, move the dupReqBodyForLog logic here.

r.ContentLength = int64(b.Len())
default:
r.Body = ioutil.NopCloser(body)
if body != nil && req.GetBody == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil, this will panic. You should just do the usual if err != nil { return nil, err } here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in c78709c.

res *http.Response
err error

dupReqBodyForLog io.ReadCloser
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea behind using GetBody was to get rid of this

if c.logger != nil && c.logger.RequestBodyEnabled() {
if req.Body != nil && req.Body != http.NoBody {
dupReqBody, req.Body, _ = duplicateBody(req.Body)
for i := 1; ; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i := 0; i < maxRetries; i++ { ?

}

// TODO(karmi): If c.DisableRetryOnError => break
// TODO(karmi): Retry on status [502, 503, 504]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you would do something like:

if req.Body != nil && req.Body != http.NoBody {
    body, err := req.GetBody()
    if err != nil {
        return nil, err
    }
    req.Body = body
}

//
if c.logger != nil && c.logger.RequestBodyEnabled() {
if req.Body != nil && req.Body != http.NoBody {
dupReqBodyForLog, req.Body, _ = duplicateBody(req.Body)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use req.GetBody here:

if req.Body != nil && req.Body != http.NoBody {
    body, err := req.GetBody()
    if err != nil {
        return nil, err
    }
    dupReqBodyForLog = body
}

Or you could just let logRoundTrip do that.

@karmi karmi force-pushed the retries branch 2 times, most recently from c78709c to 85713fb Compare July 21, 2019 08:12
@karmi karmi force-pushed the retries branch 2 times, most recently from 9bca87d to 656fe3c Compare July 30, 2019 14:30
@karmi karmi force-pushed the retries branch 6 times, most recently from 85b2d56 to 803eeca Compare October 4, 2019 08:00
@karmi
Copy link
Contributor Author

karmi commented Oct 7, 2019

Executable example:

//+build ignore

package main

import (
	"context"
	"fmt"
	"log"
	"math"
	"net"
	"net/http"
	"os"
	"os/signal"
	"strings"
	"time"

	"github.com/elastic/go-elasticsearch/v8"
	"github.com/elastic/go-elasticsearch/v8/esapi"
	"github.com/elastic/go-elasticsearch/v8/estransport"

	"github.com/cenkalti/backoff"
)

var (
	_ = fmt.Print
	_ = context.WithTimeout
	_ = math.Exp
	_ = strings.NewReader
	_ = http.DefaultClient
)

func main() {
	log.SetFlags(0)

	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	aborted := make(chan os.Signal)
	signal.Notify(aborted, os.Interrupt)

	retryBackoff := backoff.NewExponentialBackOff()
	retryBackoff.InitialInterval = time.Second
	_ = retryBackoff

	cfg := elasticsearch.Config{
		Addresses: []string{
			"http://localhost:9200",
			// "http://localhost:1000",
			// "http://localhost:2000",
			// "http://localhost:3000",
			"http://localhost:4000",
			"http://localhost:5000",
			// "http://localhost:6000",
			// "http://localhost:7000",
			// "http://localhost:8000",
			// "http://localhost:9200",
			// "http://localhost:9000",
			// "http://localhost:9201",
			// "http://localhost:9202",
		},

		Logger: &estransport.ColorLogger{
			Output:             os.Stdout,
			EnableRequestBody:  true,
			EnableResponseBody: true,
		},

		// RetryOnStatus:        []int{404},
		// EnableRetryOnTimeout: true,
		// MaxRetries: 10,

		// RetryBackoff: func(i int) time.Duration {
		// 	d := time.Duration(i) * time.Second
		// 	fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
		// 	return d
		// },

		// RetryBackoff: func(i int) time.Duration {
		// 	d := time.Duration(math.Exp2(float64(i))) * time.Second
		// 	fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
		// 	return d
		// },

		RetryBackoff: func(i int) time.Duration {
			if i == 1 {
				retryBackoff.Reset()
			}
			d := retryBackoff.NextBackOff()
			fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
			return d
		},

		// Transport: &http.Transport{
		// 	ResponseHeaderTimeout: time.Millisecond,
		// },
	}

	es, err := elasticsearch.NewClient(cfg)
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}
	log.Println("Client ready with URLs:", es.Transport.(*estransport.Client).URLs())

	for {
		select {
		case <-aborted:
			log.Println("\nDone!")
			return
		case <-ticker.C:
			var (
				res *esapi.Response
				err error
			)
			// res, err = es.Info()

			// ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
			// defer cancel()
			// res, err = es.Info(es.Info.WithContext(ctx))

			// es.Search(es.Search.WithTimeout(time.Nanosecond))

			// res, err = es.Get("test", "MISSING")

			res, err = es.Index("test", strings.NewReader(`{"foo":"bar"}`))

			if err != nil {
				if e, ok := err.(net.Error); ok {
					log.Fatalf("Error getting response: [%T]: %s (timeout:%v)", e, e, e.Timeout())
				} else {
					log.Fatalf("Error getting response: [%T]: %s", err, err)
				}
			}
			if res.IsError() {
				log.Fatalf("Error response: %s", res.Status())
			}
		}
	}
}

karmi added 16 commits October 7, 2019 11:28
…bles

Also fix "make lint" for estransport_internal_test.go
Instead of calling break/continue in the `for` loop directly, declare a `shouldRetry`
variable and switch on its value a the end of the loop.

With compliments to @honzakral.
@karmi karmi closed this in 7f37b7b Oct 8, 2019
karmi added a commit that referenced this pull request Oct 30, 2019
This patch adds the configuration parameters and supporting logic for retrying
a failed request: a feature common to official Elasticsearch clients.

Executable example:

```golang
//+build ignore

package main

import (
	"context"
	"fmt"
	"log"
	"math"
	"net"
	"net/http"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"

	"github.com/elastic/go-elasticsearch/v8"
	"github.com/elastic/go-elasticsearch/v8/esapi"
	"github.com/elastic/go-elasticsearch/v8/estransport"

	"github.com/cenkalti/backoff"
)

var (
	_ = fmt.Print
	_ = context.WithTimeout
	_ = math.Exp
	_ = strings.NewReader
	_ = http.DefaultClient
)

func main() {
	log.SetFlags(0)

	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	aborted := make(chan os.Signal)
	signal.Notify(aborted, os.Interrupt, syscall.SIGTERM)

	retryBackoff := backoff.NewExponentialBackOff()
	retryBackoff.InitialInterval = time.Second
	_ = retryBackoff

	cfg := elasticsearch.Config{
		Addresses: []string{
			"http://localhost:9200",
			// "http://localhost:1000",
			// "http://localhost:2000",
			// "http://localhost:3000",
			"http://localhost:4000",
			"http://localhost:5000",
			// "http://localhost:6000",
			// "http://localhost:7000",
			// "http://localhost:8000",
			// "http://localhost:9200",
			// "http://localhost:9000",
			// "http://localhost:9201",
			// "http://localhost:9202",
		},

		Logger: &estransport.ColorLogger{
			Output:             os.Stdout,
			EnableRequestBody:  true,
			EnableResponseBody: true,
		},

		// RetryOnStatus:        []int{404},
		// EnableRetryOnTimeout: true,
		// MaxRetries: 10,

		// RetryBackoff: func(i int) time.Duration {
		// 	d := time.Duration(i) * time.Second
		// 	fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
		// 	return d
		// },

		// RetryBackoff: func(i int) time.Duration {
		// 	d := time.Duration(math.Exp2(float64(i))) * time.Second
		// 	fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
		// 	return d
		// },

		RetryBackoff: func(i int) time.Duration {
			if i == 1 {
				retryBackoff.Reset()
			}
			d := retryBackoff.NextBackOff()
			fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
			return d
		},

		// Transport: &http.Transport{
		// 	ResponseHeaderTimeout: time.Millisecond,
		// },
	}

	es, err := elasticsearch.NewClient(cfg)
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}
	log.Println("Client ready with URLs:", es.Transport.(*estransport.Client).URLs())

	go func() {
		<-aborted
		log.Println("\nDone!")
		os.Exit(0)
	}()

	for {
		select {
		case <-ticker.C:
			var (
				res *esapi.Response
				err error
			)
			// res, err = es.Info()

			// ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
			// defer cancel()
			// res, err = es.Info(es.Info.WithContext(ctx))

			// es.Search(es.Search.WithTimeout(time.Nanosecond))

			// res, err = es.Get("test", "MISSING")

			res, err = es.Index("test", strings.NewReader(`{"foo":"bar"}`))

			if err != nil {
				if e, ok := err.(net.Error); ok {
					log.Fatalf("Error getting response: [%T]: %s (timeout:%v)", e, e, e.Timeout())
				} else {
					log.Fatalf("Error getting response: [%T]: %s", err, err)
				}
			}
			if res.IsError() {
				log.Fatalf("Error response: %s", res.Status())
			}
		}
	}
}

```

Closes #67

(cherry picked from commit 7f37b7b)
karmi added a commit that referenced this pull request Oct 30, 2019
This patch adds the configuration parameters and supporting logic for retrying
a failed request: a feature common to official Elasticsearch clients.

Executable example:

```golang
//+build ignore

package main

import (
	"context"
	"fmt"
	"log"
	"math"
	"net"
	"net/http"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"

	"github.com/elastic/go-elasticsearch/v8"
	"github.com/elastic/go-elasticsearch/v8/esapi"
	"github.com/elastic/go-elasticsearch/v8/estransport"

	"github.com/cenkalti/backoff"
)

var (
	_ = fmt.Print
	_ = context.WithTimeout
	_ = math.Exp
	_ = strings.NewReader
	_ = http.DefaultClient
)

func main() {
	log.SetFlags(0)

	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	aborted := make(chan os.Signal)
	signal.Notify(aborted, os.Interrupt, syscall.SIGTERM)

	retryBackoff := backoff.NewExponentialBackOff()
	retryBackoff.InitialInterval = time.Second
	_ = retryBackoff

	cfg := elasticsearch.Config{
		Addresses: []string{
			"http://localhost:9200",
			// "http://localhost:1000",
			// "http://localhost:2000",
			// "http://localhost:3000",
			"http://localhost:4000",
			"http://localhost:5000",
			// "http://localhost:6000",
			// "http://localhost:7000",
			// "http://localhost:8000",
			// "http://localhost:9200",
			// "http://localhost:9000",
			// "http://localhost:9201",
			// "http://localhost:9202",
		},

		Logger: &estransport.ColorLogger{
			Output:             os.Stdout,
			EnableRequestBody:  true,
			EnableResponseBody: true,
		},

		// RetryOnStatus:        []int{404},
		// EnableRetryOnTimeout: true,
		// MaxRetries: 10,

		// RetryBackoff: func(i int) time.Duration {
		// 	d := time.Duration(i) * time.Second
		// 	fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
		// 	return d
		// },

		// RetryBackoff: func(i int) time.Duration {
		// 	d := time.Duration(math.Exp2(float64(i))) * time.Second
		// 	fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
		// 	return d
		// },

		RetryBackoff: func(i int) time.Duration {
			if i == 1 {
				retryBackoff.Reset()
			}
			d := retryBackoff.NextBackOff()
			fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
			return d
		},

		// Transport: &http.Transport{
		// 	ResponseHeaderTimeout: time.Millisecond,
		// },
	}

	es, err := elasticsearch.NewClient(cfg)
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}
	log.Println("Client ready with URLs:", es.Transport.(*estransport.Client).URLs())

	go func() {
		<-aborted
		log.Println("\nDone!")
		os.Exit(0)
	}()

	for {
		select {
		case <-ticker.C:
			var (
				res *esapi.Response
				err error
			)
			// res, err = es.Info()

			// ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
			// defer cancel()
			// res, err = es.Info(es.Info.WithContext(ctx))

			// es.Search(es.Search.WithTimeout(time.Nanosecond))

			// res, err = es.Get("test", "MISSING")

			res, err = es.Index("test", strings.NewReader(`{"foo":"bar"}`))

			if err != nil {
				if e, ok := err.(net.Error); ok {
					log.Fatalf("Error getting response: [%T]: %s (timeout:%v)", e, e, e.Timeout())
				} else {
					log.Fatalf("Error getting response: [%T]: %s", err, err)
				}
			}
			if res.IsError() {
				log.Fatalf("Error response: %s", res.Status())
			}
		}
	}
}

```

Closes #67

(cherry picked from commit 7f37b7b)
@karmi karmi deleted the retries branch October 31, 2019 10:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants