Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Trim error messages read from errors.pb to avoid spaming Etcd. and Ad…
Browse files Browse the repository at this point in the history
…min (#141)


Co-authored-by: Ketan Umare <kumare@lyft.com>
  • Loading branch information
EngHabu and Ketan Umare authored May 30, 2020
1 parent 7c5df0d commit 4416aee
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 2 deletions.
2 changes: 2 additions & 0 deletions pkg/controller/nodes/task/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
BaseSecond: 2,
MaxDuration: config.Duration{Duration: time.Minute * 10},
},
MaxErrorMessageLength: 2048,
}

section = config.MustRegisterSection(SectionKey, defaultConfig)
Expand All @@ -35,6 +36,7 @@ type Config struct {
MaxPluginPhaseVersions int32 `json:"max-plugin-phase-versions" pflag:",Maximum number of plugin phase versions allowed for one phase."`
BarrierConfig BarrierConfig `json:"barrier" pflag:",Config for Barrier implementation"`
BackOffConfig BackOffConfig `json:"backoff" pflag:",Config for Exponential BackOff implementation"`
MaxErrorMessageLength int `json:"maxLogMessageLength" pflag:",Max length of error message."`
}

type BarrierConfig struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/task/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions pkg/controller/nodes/task/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ func Test_task_Handle_NoCatalog(t *testing.T) {
nCtx := createNodeContext(tt.args.startingPluginPhase, uint32(tt.args.startingPluginPhaseVersion), tt.args.expectedState, ev, "test", state)
c := &pluginCatalogMocks.Client{}
tk := Handler{
cfg: &config.Config{MaxErrorMessageLength: 100},
plugins: map[pluginCore.TaskType]pluginCore.Plugin{
"test": fakeplugins.NewPhaseBasedPlugin(),
},
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/nodes/task/pre_post_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog"
pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
errors2 "github.com/lyft/flytepropeller/pkg/controller/nodes/errors"
"github.com/lyft/flytestdlib/logger"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
errors2 "github.com/lyft/flytepropeller/pkg/controller/nodes/errors"
)

func (t *Handler) CheckCatalogCache(ctx context.Context, tr pluginCore.TaskReader, inputReader io.InputReader, outputWriter io.OutputWriter) (bool, error) {
Expand Down Expand Up @@ -94,6 +95,14 @@ func (t *Handler) ValidateOutputAndCacheAdd(ctx context.Context, nodeID v1alpha1
if err != nil {
return nil, err
}

if taskErr.ExecutionError == nil {
taskErr.ExecutionError = &core.ExecutionError{Kind: core.ExecutionError_UNKNOWN, Code: "Unknown", Message: "Unknown"}
}
// Errors can be arbitrary long since they are written by containers/potentially 3rd party plugins. This ensures
// the error message length will never be big enough to cause write failures to Etcd. or spam Admin DB with huge
// objects.
taskErr.Message = trimErrorMessage(taskErr.Message, t.cfg.MaxErrorMessageLength)
return &taskErr, nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/nodes/task/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ func ToTaskEventPhase(p pluginCore.Phase) core.TaskExecution_Phase {
}
}

func trimErrorMessage(original string, maxLength int) string {
if len(original) <= maxLength {
return original
}

return original[0:maxLength/2] + original[len(original)-maxLength/2:]
}

func ToTaskExecutionEvent(taskExecID *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths, info pluginCore.PhaseInfo) (*event.TaskExecutionEvent, error) {
// Transitions to a new phase

Expand Down
23 changes: 23 additions & 0 deletions pkg/controller/nodes/task/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,29 @@ func TestToTaskEventPhase(t *testing.T) {
assert.Equal(t, core.TaskExecution_QUEUED, ToTaskEventPhase(pluginCore.PhaseQueued))
}

func Test_trimErrorMessage(t *testing.T) {
const inputStr = "0123456789"
t.Run("Length less or equal than max", func(t *testing.T) {
input := inputStr
assert.Equal(t, input, trimErrorMessage(input, 10))
})

t.Run("Length > max", func(t *testing.T) {
input := inputStr
assert.Equal(t, "01236789", trimErrorMessage(input, 8))
})

t.Run("Odd Max", func(t *testing.T) {
input := inputStr
assert.Equal(t, "01236789", trimErrorMessage(input, 9))
})

t.Run("Odd input", func(t *testing.T) {
input := "012345678"
assert.Equal(t, "012345678", trimErrorMessage(input, 9))
})
}

func TestToTaskExecutionEvent(t *testing.T) {
tkID := &core.Identifier{}
nodeID := &core.NodeExecutionIdentifier{}
Expand Down

0 comments on commit 4416aee

Please sign in to comment.