Skip to content

Commit

Permalink
feat(controller): Always retry when IsTransientErr to tolerate tran…
Browse files Browse the repository at this point in the history
…sient errors. Fixes #3217 (#3853)
  • Loading branch information
alexec authored Aug 28, 2020
1 parent 0cf7709 commit 4c18a06
Show file tree
Hide file tree
Showing 14 changed files with 634 additions and 589 deletions.
911 changes: 453 additions & 458 deletions pkg/apis/workflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ type Template struct {
PodSpecPatch string `json:"podSpecPatch,omitempty" protobuf:"bytes,31,opt,name=podSpecPatch"`

// ResubmitPendingPods is a flag to enable resubmitting pods that remain Pending after initial submission
ResubmitPendingPods *bool `json:"resubmitPendingPods,omitempty" protobuf:"varint,34,opt,name=resubmitPendingPods"`
ResubmitPendingPods bool `json:"resubmitPendingPods,omitempty" protobuf:"varint,34,opt,name=resubmitPendingPods"`

// Metrics are a list of metrics emitted from this template
Metrics *Metrics `json:"metrics,omitempty" protobuf:"bytes,35,opt,name=metrics"`
Expand Down Expand Up @@ -1742,6 +1742,10 @@ func (tmpl *Template) IsLeaf() bool {
return false
}

func (tmpl *Template) IsResubmitPendingPods() bool {
return tmpl != nil && tmpl.ResubmitPendingPods
}

// DAGTemplate is a template subtype for directed acyclic graph templates
type DAGTemplate struct {
// Target are one or more names of targets to execute in a DAG
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func TestArtifact_GetArchive(t *testing.T) {
assert.Equal(t, &ArchiveStrategy{None: &NoneStrategy{}}, (&Artifact{Archive: &ArchiveStrategy{None: &NoneStrategy{}}}).GetArchive())
}

func TestTemplate_IsResubmitAllowed(t *testing.T) {
assert.False(t, (&Template{}).IsResubmitPendingPods())
assert.True(t, (&Template{ResubmitPendingPods: true}).IsResubmitPendingPods())
}

func TestNodes_FindByDisplayName(t *testing.T) {
assert.Nil(t, Nodes{}.FindByDisplayName(""))
assert.NotNil(t, Nodes{"": NodeStatus{DisplayName: "foo"}}.FindByDisplayName("foo"))
Expand Down
5 changes: 0 additions & 5 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 23 additions & 19 deletions test/e2e/argo_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,13 +671,13 @@ func (s *ArgoServerSuite) TestCreateWorkflowDryRun() {
}

func (s *ArgoServerSuite) TestWorkflowService() {

var name string
s.Run("Create", func() {
s.e().POST("/api/v1/workflows/argo").
name = s.e().POST("/api/v1/workflows/argo").
WithBytes([]byte(`{
"workflow": {
"metadata": {
"name": "test",
"generateName": "test-",
"labels": {
"argo-e2e": "subject"
}
Expand All @@ -696,15 +696,19 @@ func (s *ArgoServerSuite) TestWorkflowService() {
}
}`)).
Expect().
Status(200)
Status(200).
JSON().
Path("$.metadata.name").
NotNull().
String().
Raw()
})

s.Run("List", func() {
s.Given().
WorkflowName("test").
When().
WaitForWorkflowToStart(20 * time.Second)
s.Given().
When().
WaitForWorkflowToStart(20 * time.Second)

s.Run("List", func() {
j := s.e().GET("/api/v1/workflows/argo").
WithQuery("listOptions.labelSelector", "argo-e2e=subject").
Expect().
Expand Down Expand Up @@ -737,7 +741,7 @@ func (s *ArgoServerSuite) TestWorkflowService() {
})

s.Run("Get", func() {
j := s.e().GET("/api/v1/workflows/argo/test").
j := s.e().GET("/api/v1/workflows/argo/" + name).
Expect().
Status(200).
JSON()
Expand All @@ -749,7 +753,7 @@ func (s *ArgoServerSuite) TestWorkflowService() {
})

s.Run("GetWithFields", func() {
j := s.e().GET("/api/v1/workflows/argo/test").
j := s.e().GET("/api/v1/workflows/argo/"+name).
WithQuery("fields", "status.phase").
Expect().
Status(200).
Expand All @@ -758,11 +762,11 @@ func (s *ArgoServerSuite) TestWorkflowService() {
})

s.Run("Suspend", func() {
s.e().PUT("/api/v1/workflows/argo/test/suspend").
s.e().PUT("/api/v1/workflows/argo/" + name + "/suspend").
Expect().
Status(200)

s.e().GET("/api/v1/workflows/argo/test").
s.e().GET("/api/v1/workflows/argo/" + name).
Expect().
Status(200).
JSON().
Expand All @@ -771,11 +775,11 @@ func (s *ArgoServerSuite) TestWorkflowService() {
})

s.Run("Resume", func() {
s.e().PUT("/api/v1/workflows/argo/test/resume").
s.e().PUT("/api/v1/workflows/argo/" + name + "/resume").
Expect().
Status(200)

s.e().GET("/api/v1/workflows/argo/test").
s.e().GET("/api/v1/workflows/argo/" + name).
Expect().
Status(200).
JSON().
Expand All @@ -785,14 +789,14 @@ func (s *ArgoServerSuite) TestWorkflowService() {
})

s.Run("Terminate", func() {
s.e().PUT("/api/v1/workflows/argo/test/terminate").
s.e().PUT("/api/v1/workflows/argo/" + name + "/terminate").
Expect().
Status(200)

// sleep in a test is bad practice
time.Sleep(3 * time.Second)

s.e().GET("/api/v1/workflows/argo/test").
s.e().GET("/api/v1/workflows/argo/" + name).
Expect().
Status(200).
JSON().
Expand All @@ -801,7 +805,7 @@ func (s *ArgoServerSuite) TestWorkflowService() {
})

s.Run("Resubmit", func() {
s.e().PUT("/api/v1/workflows/argo/test/resubmit").
s.e().PUT("/api/v1/workflows/argo/" + name + "/resubmit").
WithBytes([]byte(`{"memoized": true}`)).
Expect().
Status(200).
Expand All @@ -811,7 +815,7 @@ func (s *ArgoServerSuite) TestWorkflowService() {
})

s.Run("Delete", func() {
s.e().DELETE("/api/v1/workflows/argo/test").
s.e().DELETE("/api/v1/workflows/argo/" + name).
Expect().
Status(200)
s.e().DELETE("/api/v1/workflows/argo/not-found").
Expand Down
53 changes: 53 additions & 0 deletions util/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package errors

import (
"net"
"net/url"
"strings"

apierr "k8s.io/apimachinery/pkg/api/errors"

argoerrs "github.com/argoproj/argo/errors"
)

func IsTransientErr(err error) bool {
if err == nil {
return false
}
err = argoerrs.Cause(err)
return isExceededQuotaErr(err) || apierr.IsTooManyRequests(err) || isResourceQuotaConflictErr(err) || isTransientNetworkErr(err)
}

func isExceededQuotaErr(err error) bool {
return apierr.IsForbidden(err) && strings.Contains(err.Error(), "exceeded quota")
}

func isResourceQuotaConflictErr(err error) bool {
return apierr.IsConflict(err) && strings.Contains(err.Error(), "Operation cannot be fulfilled on resourcequota")
}

func isTransientNetworkErr(err error) bool {
switch err.(type) {
case net.Error:
switch err.(type) {
case *net.DNSError, *net.OpError, net.UnknownNetworkError:
return true
case *url.Error:
// For a URL error, where it replies back "connection closed"
// retry again.
return strings.Contains(err.Error(), "Connection closed by foreign host")
default:
if strings.Contains(err.Error(), "net/http: TLS handshake timeout") {
// If error is - tlsHandshakeTimeoutError, retry.
return true
} else if strings.Contains(err.Error(), "i/o timeout") {
// If error is - tcp timeoutError, retry.
return true
} else if strings.Contains(err.Error(), "connection timed out") {
// If err is a net.Dial timeout, retry.
return true
}
}
}
return false
}
61 changes: 61 additions & 0 deletions util/errors/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package errors

import (
"errors"
"net"
"net/url"
"testing"

"github.com/stretchr/testify/assert"
apierr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
)

type netError string

func (n netError) Error() string { return string(n) }
func (n netError) Timeout() bool { return false }
func (n netError) Temporary() bool { return false }

var tlsHandshakeTimeoutErr net.Error = netError("net/http: TLS handshake timeout")
var ioTimeoutErr net.Error = netError("i/o timeout")
var connectionTimedout net.Error = netError("connection timed out")

func TestIsTransientErr(t *testing.T) {
t.Run("Nil", func(t *testing.T) {
assert.False(t, IsTransientErr(nil))
})
t.Run("ResourceQuotaConflictErr", func(t *testing.T) {
assert.False(t, IsTransientErr(apierr.NewConflict(schema.GroupResource{}, "", nil)))
assert.True(t, IsTransientErr(apierr.NewConflict(schema.GroupResource{Group: "v1", Resource: "resourcequotas"}, "", nil)))
})
t.Run("ExceededQuotaErr", func(t *testing.T) {
assert.False(t, IsTransientErr(apierr.NewForbidden(schema.GroupResource{}, "", nil)))
assert.True(t, IsTransientErr(apierr.NewForbidden(schema.GroupResource{Group: "v1", Resource: "pods"}, "", errors.New("exceeded quota"))))
})
t.Run("TooManyRequestsDNS", func(t *testing.T) {
assert.True(t, IsTransientErr(apierr.NewTooManyRequests("", 0)))
})
t.Run("DNSError", func(t *testing.T) {
assert.True(t, IsTransientErr(&net.DNSError{}))
})
t.Run("OpError", func(t *testing.T) {
assert.True(t, IsTransientErr(&net.OpError{}))
})
t.Run("UnknownNetworkError", func(t *testing.T) {
assert.True(t, IsTransientErr(net.UnknownNetworkError("")))
})
t.Run("ConnectionClosedErr", func(t *testing.T) {
assert.False(t, IsTransientErr(&url.Error{Err: errors.New("")}))
assert.True(t, IsTransientErr(&url.Error{Err: errors.New("Connection closed by foreign host")}))
})
t.Run("TLSHandshakeTimeout", func(t *testing.T) {
assert.True(t, IsTransientErr(tlsHandshakeTimeoutErr))
})
t.Run("IOHandshakeTimeout", func(t *testing.T) {
assert.True(t, IsTransientErr(ioTimeoutErr))
})
t.Run("ConnectionTimeout", func(t *testing.T) {
assert.True(t, IsTransientErr(connectionTimedout))
})
}
59 changes: 0 additions & 59 deletions util/retry/retry.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,14 @@
package retry

import (
"net"
"net/url"
"strings"
"time"

apierr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"

argoerrs "github.com/argoproj/argo/errors"
)

// DefaultRetry is a default retry backoff settings when retrying API calls
var DefaultRetry = wait.Backoff{
Steps: 5,
Duration: 10 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
}

// IsRetryableKubeAPIError returns if the error is a retryable kubernetes error
func IsRetryableKubeAPIError(err error) bool {
// get original error if it was wrapped
err = argoerrs.Cause(err)
if apierr.IsNotFound(err) || apierr.IsForbidden(err) || apierr.IsTooManyRequests(err) || IsResourceQuotaConflictErr(err) || apierr.IsInvalid(err) || apierr.IsMethodNotSupported(err) {
return false
}
return true
}

// It is possible to create a pod and it be prevented by a conflict updating the resource quota,
// this func identifies those errors and allows us to retry muck like `Forbidden` or `TooManyRequests` errors.
func IsResourceQuotaConflictErr(err error) bool {
return apierr.IsConflict(err) && strings.Contains(err.Error(), "Operation cannot be fulfilled on resourcequota")
}

// IsRetryableNetworkError returns whether or not the error is a retryable network error
func IsRetryableNetworkError(err error) bool {
if err == nil {
return false
}
// get original error if it was wrapped
err = argoerrs.Cause(err)
errStr := err.Error()

switch err.(type) {
case net.Error:
switch err.(type) {
case *net.DNSError, *net.OpError, net.UnknownNetworkError:
return true
case *url.Error:
// For a URL error, where it replies back "connection closed"
// retry again.
if strings.Contains(errStr, "Connection closed by foreign host") {
return true
}
default:
if strings.Contains(errStr, "net/http: TLS handshake timeout") {
// If error is - tlsHandshakeTimeoutError, retry.
return true
} else if strings.Contains(errStr, "i/o timeout") {
// If error is - tcp timeoutError, retry.
return true
} else if strings.Contains(errStr, "connection timed out") {
// If err is a net.Dial timeout, retry.
return true
}
}
}
return false
}
14 changes: 0 additions & 14 deletions util/retry/retry_test.go

This file was deleted.

3 changes: 2 additions & 1 deletion util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
errorsutil "github.com/argoproj/argo/util/errors"
"github.com/argoproj/argo/util/retry"
)

Expand All @@ -39,7 +40,7 @@ func GetSecrets(clientSet kubernetes.Interface, namespace, name, key string) ([]
secret, err = secretsIf.Get(name, metav1.GetOptions{})
if err != nil {
log.Warnf("Failed to get secret '%s': %v", name, err)
if !retry.IsRetryableKubeAPIError(err) {
if !errorsutil.IsTransientErr(err) {
return false, err
}
return false, nil
Expand Down
Loading

0 comments on commit 4c18a06

Please sign in to comment.