From a1a1fd860a8f112cb8f754c1285cde55b691c13c Mon Sep 17 00:00:00 2001 From: Haytham AbuelFutuh Date: Fri, 29 May 2020 19:15:04 -0700 Subject: [PATCH 1/4] Trim error messages read from errors.pb to avoid spaming Etcd. and Admin --- pkg/controller/nodes/task/config/config.go | 2 ++ .../nodes/task/config/config_flags.go | 1 + .../nodes/task/config/config_flags_test.go | 22 +++++++++++++++++++ .../nodes/task/pre_post_execution.go | 3 +++ pkg/controller/nodes/task/transformer.go | 8 +++++++ pkg/controller/nodes/task/transformer_test.go | 22 +++++++++++++++++++ 6 files changed, 58 insertions(+) diff --git a/pkg/controller/nodes/task/config/config.go b/pkg/controller/nodes/task/config/config.go index 668920c8d..b4f45d473 100644 --- a/pkg/controller/nodes/task/config/config.go +++ b/pkg/controller/nodes/task/config/config.go @@ -25,6 +25,7 @@ var ( BaseSecond: 2, MaxDuration: config.Duration{Duration: time.Minute * 10}, }, + MaxErrorMessageLength: 2048, } section = config.MustRegisterSection(SectionKey, defaultConfig) @@ -35,6 +36,7 @@ type Config struct { MaxPluginPhaseVersions int32 `json:"max-plugin-phase-versions" pflag:",Maximum number of plugin phase versions allowed for one phase."` BarrierConfig BarrierConfig `json:"barrier" pflag:",Config for Barrier implementation"` BackOffConfig BackOffConfig `json:"backoff" pflag:",Config for Exponential BackOff implementation"` + MaxErrorMessageLength int `json:"maxLogMessageLength" pflag:",Max length of error message."` } type BarrierConfig struct { diff --git a/pkg/controller/nodes/task/config/config_flags.go b/pkg/controller/nodes/task/config/config_flags.go index 6a7a97224..76707946d 100755 --- a/pkg/controller/nodes/task/config/config_flags.go +++ b/pkg/controller/nodes/task/config/config_flags.go @@ -48,5 +48,6 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "barrier.cache-ttl"), defaultConfig.BarrierConfig.CacheTTL.String(), " Max duration that a barrier would be respected if the process is not restarted. This should account for time required to store the record into persistent storage (across multiple rounds.") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "backoff.base-second"), defaultConfig.BackOffConfig.BaseSecond, "The number of seconds representing the base duration of the exponential backoff") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "backoff.max-duration"), defaultConfig.BackOffConfig.MaxDuration.String(), "The cap of the backoff duration") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "maxLogMessageLength"), defaultConfig.MaxErrorMessageLength, "Max length of error message.") return cmdFlags } diff --git a/pkg/controller/nodes/task/config/config_flags_test.go b/pkg/controller/nodes/task/config/config_flags_test.go index 1aa93762c..b5ebaf283 100755 --- a/pkg/controller/nodes/task/config/config_flags_test.go +++ b/pkg/controller/nodes/task/config/config_flags_test.go @@ -253,4 +253,26 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_maxLogMessageLength", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vInt, err := cmdFlags.GetInt("maxLogMessageLength"); err == nil { + assert.Equal(t, int(defaultConfig.MaxErrorMessageLength), vInt) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("maxLogMessageLength", testValue) + if vInt, err := cmdFlags.GetInt("maxLogMessageLength"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.MaxErrorMessageLength) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/pkg/controller/nodes/task/pre_post_execution.go b/pkg/controller/nodes/task/pre_post_execution.go index 7e0c36331..f89827d84 100644 --- a/pkg/controller/nodes/task/pre_post_execution.go +++ b/pkg/controller/nodes/task/pre_post_execution.go @@ -94,6 +94,9 @@ func (t *Handler) ValidateOutputAndCacheAdd(ctx context.Context, nodeID v1alpha1 if err != nil { return nil, err } + + // Errors can be arbitrary long since they are written by + taskErr.Message = trimErrorMessage(taskErr.Message, t.cfg.MaxErrorMessageLength) return &taskErr, nil } diff --git a/pkg/controller/nodes/task/transformer.go b/pkg/controller/nodes/task/transformer.go index 4dcccb45b..3e51b805e 100644 --- a/pkg/controller/nodes/task/transformer.go +++ b/pkg/controller/nodes/task/transformer.go @@ -42,6 +42,14 @@ func ToTaskEventPhase(p pluginCore.Phase) core.TaskExecution_Phase { } } +func trimErrorMessage(original string, maxLength int) string { + if len(original) <= maxLength { + return original + } + + return original[0:maxLength/2] + original[len(original)-maxLength/2:] +} + func ToTaskExecutionEvent(taskExecID *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths, info pluginCore.PhaseInfo) (*event.TaskExecutionEvent, error) { // Transitions to a new phase diff --git a/pkg/controller/nodes/task/transformer_test.go b/pkg/controller/nodes/task/transformer_test.go index 04b8b5a22..6d3a02501 100644 --- a/pkg/controller/nodes/task/transformer_test.go +++ b/pkg/controller/nodes/task/transformer_test.go @@ -27,6 +27,28 @@ func TestToTaskEventPhase(t *testing.T) { assert.Equal(t, core.TaskExecution_QUEUED, ToTaskEventPhase(pluginCore.PhaseQueued)) } +func Test_trimErrorMessage(t *testing.T) { + t.Run("Length less or equal than max", func(t *testing.T) { + input := "0123456789" + assert.Equal(t, input, trimErrorMessage(input, 10)) + }) + + t.Run("Length > max", func(t *testing.T) { + input := "0123456789" + assert.Equal(t, "01236789", trimErrorMessage(input, 8)) + }) + + t.Run("Odd Max", func(t *testing.T) { + input := "0123456789" + assert.Equal(t, "01236789", trimErrorMessage(input, 9)) + }) + + t.Run("Odd input", func(t *testing.T) { + input := "012345678" + assert.Equal(t, "012345678", trimErrorMessage(input, 9)) + }) +} + func TestToTaskExecutionEvent(t *testing.T) { tkID := &core.Identifier{} nodeID := &core.NodeExecutionIdentifier{} From 35f7fb0ccacf513c4e094f4a5a6625c322ebc1b7 Mon Sep 17 00:00:00 2001 From: Haytham AbuelFutuh Date: Fri, 29 May 2020 19:19:32 -0700 Subject: [PATCH 2/4] doc --- pkg/controller/nodes/task/pre_post_execution.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/controller/nodes/task/pre_post_execution.go b/pkg/controller/nodes/task/pre_post_execution.go index f89827d84..1b4547d30 100644 --- a/pkg/controller/nodes/task/pre_post_execution.go +++ b/pkg/controller/nodes/task/pre_post_execution.go @@ -95,7 +95,9 @@ func (t *Handler) ValidateOutputAndCacheAdd(ctx context.Context, nodeID v1alpha1 return nil, err } - // Errors can be arbitrary long since they are written by + // Errors can be arbitrary long since they are written by containers/potentially 3rd party plugins. This ensures + // the error message length will never be big enough to cause write failures to Etcd. or spam Admin DB with huge + // objects. taskErr.Message = trimErrorMessage(taskErr.Message, t.cfg.MaxErrorMessageLength) return &taskErr, nil } From 765f1a3ba32b729046bbe6772e595f4c2c66a604 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Sat, 30 May 2020 12:17:08 -0700 Subject: [PATCH 3/4] Linter fix --- pkg/controller/nodes/task/transformer_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/controller/nodes/task/transformer_test.go b/pkg/controller/nodes/task/transformer_test.go index 6d3a02501..618417f27 100644 --- a/pkg/controller/nodes/task/transformer_test.go +++ b/pkg/controller/nodes/task/transformer_test.go @@ -28,18 +28,19 @@ func TestToTaskEventPhase(t *testing.T) { } func Test_trimErrorMessage(t *testing.T) { + const inputStr = "0123456789" t.Run("Length less or equal than max", func(t *testing.T) { - input := "0123456789" + input := inputStr assert.Equal(t, input, trimErrorMessage(input, 10)) }) t.Run("Length > max", func(t *testing.T) { - input := "0123456789" + input := inputStr assert.Equal(t, "01236789", trimErrorMessage(input, 8)) }) t.Run("Odd Max", func(t *testing.T) { - input := "0123456789" + input := inputStr assert.Equal(t, "01236789", trimErrorMessage(input, 9)) }) From 149b7462ef44d871bbc9480adf722b8c4f6609f0 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Sat, 30 May 2020 12:31:53 -0700 Subject: [PATCH 4/4] Fixed linter and unit test --- pkg/controller/nodes/task/handler_test.go | 1 + pkg/controller/nodes/task/pre_post_execution.go | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/controller/nodes/task/handler_test.go b/pkg/controller/nodes/task/handler_test.go index 60740edfc..fe10fa340 100644 --- a/pkg/controller/nodes/task/handler_test.go +++ b/pkg/controller/nodes/task/handler_test.go @@ -574,6 +574,7 @@ func Test_task_Handle_NoCatalog(t *testing.T) { nCtx := createNodeContext(tt.args.startingPluginPhase, uint32(tt.args.startingPluginPhaseVersion), tt.args.expectedState, ev, "test", state) c := &pluginCatalogMocks.Client{} tk := Handler{ + cfg: &config.Config{MaxErrorMessageLength: 100}, plugins: map[pluginCore.TaskType]pluginCore.Plugin{ "test": fakeplugins.NewPhaseBasedPlugin(), }, diff --git a/pkg/controller/nodes/task/pre_post_execution.go b/pkg/controller/nodes/task/pre_post_execution.go index 1b4547d30..b302cae4c 100644 --- a/pkg/controller/nodes/task/pre_post_execution.go +++ b/pkg/controller/nodes/task/pre_post_execution.go @@ -7,12 +7,13 @@ import ( "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" - errors2 "github.com/lyft/flytepropeller/pkg/controller/nodes/errors" "github.com/lyft/flytestdlib/logger" "github.com/pkg/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + errors2 "github.com/lyft/flytepropeller/pkg/controller/nodes/errors" ) func (t *Handler) CheckCatalogCache(ctx context.Context, tr pluginCore.TaskReader, inputReader io.InputReader, outputWriter io.OutputWriter) (bool, error) { @@ -95,6 +96,9 @@ func (t *Handler) ValidateOutputAndCacheAdd(ctx context.Context, nodeID v1alpha1 return nil, err } + if taskErr.ExecutionError == nil { + taskErr.ExecutionError = &core.ExecutionError{Kind: core.ExecutionError_UNKNOWN, Code: "Unknown", Message: "Unknown"} + } // Errors can be arbitrary long since they are written by containers/potentially 3rd party plugins. This ensures // the error message length will never be big enough to cause write failures to Etcd. or spam Admin DB with huge // objects.