From dadf5d8296c79b29dee1ec33aac8a15cbfc1c0c5 Mon Sep 17 00:00:00 2001 From: Ketan Umare <16888709+kumare3@users.noreply.github.com> Date: Wed, 31 Mar 2021 08:57:15 -0700 Subject: [PATCH] Bug Fix and Perf Tweak #minor (#248) * Bug Fix and Perf Tweak - Building a cache from Flyteadmin executions, should stop querying flyteadmin, if the workflow is in terminal state - Default for Max streak length can be optimized to 8 (number of rounds needed to complete one workflow start to end, with inmemory execution) - Other defaults set that have been hardened over time. Signed-off-by: Ketan Umare * Updated testcase Signed-off-by: Ketan Umare * lint fix Signed-off-by: Ketan Umare --- go.mod | 2 +- go.sum | 2 + pkg/controller/config/config.go | 91 +++++++++++++++---- .../nodes/subworkflow/launchplan/admin.go | 22 +++++ .../subworkflow/launchplan/admin_test.go | 35 +++++++ .../subworkflow/launchplan/adminconfig.go | 4 +- pkg/controller/workflowstore/config.go | 13 ++- pkg/controller/workflowstore/iface.go | 1 + 8 files changed, 147 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index 6c9eecae75..b0f40662d9 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 github.com/flyteorg/flyteidl v0.18.25 - github.com/flyteorg/flyteplugins v0.5.41 + github.com/flyteorg/flyteplugins v0.5.42 github.com/flyteorg/flytestdlib v0.3.13 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible diff --git a/go.sum b/go.sum index 8cc5cb2b33..f592a2b755 100644 --- a/go.sum +++ b/go.sum @@ -234,6 +234,8 @@ github.com/flyteorg/flyteidl v0.18.25 h1:XbHwM4G1u5nGAcdKod+ENgbL84cHdNzQIWY+Naj github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= github.com/flyteorg/flyteplugins v0.5.41 h1:8n1Z55P59ICV4453Dk7fhaUbB944j3BMZ+ozywHczgU= github.com/flyteorg/flyteplugins v0.5.41/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w= +github.com/flyteorg/flyteplugins v0.5.42 h1:G4DRR2r8LlmkV+orXloDi1ly+M5WuvAaNlWFgGGyy3A= +github.com/flyteorg/flyteplugins v0.5.42/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w= github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index 79801ce4b6..0af70af171 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -1,3 +1,33 @@ +// Package config contains the core configuration for FlytePropeller. This configuration can be added under the ``propeller`` section. +// Example config: +// ---------------- +// propeller: +// rawoutput-prefix: s3://my-container/test/ +// metadata-prefix: metadata/propeller/sandbox +// workers: 4 +// workflow-reeval-duration: 10s +// downstream-eval-duration: 5s +// limit-namespace: "all" +// prof-port: 11254 +// metrics-prefix: flyte +// enable-admin-launcher: true +// max-ttl-hours: 1 +// gc-interval: 500m +// queue: +// type: batch +// queue: +// type: bucket +// rate: 20 +// capacity: 100 +// sub-queue: +// type: bucket +// rate: 100 +// capacity: 1000 +// # This config assumes using `make start` in flytesnacks repo to startup a DinD k3s container +// kube-config: "$HOME/kubeconfig/k3s/k3s.yaml" +// publish-k8s-events: true +// workflowStore: +// policy: "ResourceVersionCache" package config import ( @@ -15,22 +45,38 @@ var ( configSection = config.MustRegisterSection(configSectionKey, defaultConfig) defaultConfig = &Config{ - MaxWorkflowRetries: 5, + Workers: 20, + WorkflowReEval: config.Duration{ + Duration: 10 * time.Second, + }, + DownstreamEval: config.Duration{ + Duration: 30 * time.Second, + }, + MaxWorkflowRetries: 10, MaxDatasetSizeBytes: 10 * 1024 * 1024, Queue: CompositeQueueConfig{ - Type: CompositeQueueSimple, + Type: CompositeQueueBatch, + BatchingInterval: config.Duration{ + Duration: time.Second, + }, + BatchSize: -1, Queue: WorkqueueConfig{ - Type: WorkqueueTypeDefault, - BaseDelay: config.Duration{Duration: time.Second * 10}, - MaxDelay: config.Duration{Duration: time.Second * 10}, - Rate: 10, - Capacity: 100, + Type: WorkqueueTypeMaxOfRateLimiter, + BaseDelay: config.Duration{Duration: time.Second * 5}, + MaxDelay: config.Duration{Duration: time.Second * 60}, + Rate: 100, + Capacity: 1000, + }, + Sub: WorkqueueConfig{ + Type: WorkqueueTypeBucketRateLimiter, + Rate: 100, + Capacity: 1000, }, }, KubeConfig: KubeClientConfig{ - QPS: 5, - Burst: 10, - Timeout: config.Duration{Duration: 0}, + QPS: 100, + Burst: 25, + Timeout: config.Duration{Duration: 30 * time.Second}, }, LeaderElection: LeaderElectionConfig{ Enabled: false, @@ -47,13 +93,20 @@ var ( MaxNodeRetriesOnSystemFailures: 3, InterruptibleFailureThreshold: 1, }, - MaxStreakLength: 5, // Turbo mode is enabled by default + MaxStreakLength: 8, // Turbo mode is enabled by default + ProfilerPort: config.Port{ + Port: 10254, + }, + LimitNamespace: "all", + MetadataPrefix: "metadata/propeller", + EnableAdminLauncher: true, + MetricsPrefix: "flyte", } ) -// NOTE: when adding new fields, do not mark them as "omitempty" if it's desirable to read the value from env variables. // Config that uses the flytestdlib Config module to generate commandline and load config files. This configuration is // the base configuration to start propeller +// NOTE: when adding new fields, do not mark them as "omitempty" if it's desirable to read the value from env variables. type Config struct { KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."` MasterURL string `json:"master"` @@ -78,6 +131,7 @@ type Config struct { MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled."` } +// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client. type KubeClientConfig struct { // QPS indicates the maximum QPS to the master from this client. // If it's zero, the created RESTClient will use DefaultQPS: 5 @@ -96,6 +150,7 @@ const ( CompositeQueueBatch CompositeQueueType = "batch" ) +// CompositeQueueConfig contains configuration for the controller queue and the downstream resource queue type CompositeQueueConfig struct { Type CompositeQueueType `json:"type" pflag:",Type of composite queue to use for the WorkQueue"` Queue WorkqueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."` @@ -113,7 +168,7 @@ const ( WorkqueueTypeMaxOfRateLimiter WorkqueueType = "maxof" ) -// prototypical configuration to configure a workqueue. We may want to generalize this in a package like k8sutils +// WorkqueueConfig has the configuration to configure a workqueue. We may want to generalize this in a package like k8sutils type WorkqueueConfig struct { // Refer to https://github.com/kubernetes/client-go/tree/master/util/workqueue Type WorkqueueType `json:"type" pflag:",Type of RateLimiter to use for the WorkQueue"` @@ -123,21 +178,21 @@ type WorkqueueConfig struct { Capacity int `json:"capacity" pflag:",Bucket capacity as number of items"` } -// configuration for a node +// NodeConfig contains configuration that is useful for every node execution type NodeConfig struct { DefaultDeadlines DefaultDeadlines `json:"default-deadlines,omitempty" pflag:",Default value for timeouts"` MaxNodeRetriesOnSystemFailures int64 `json:"max-node-retries-system-failures" pflag:"2,Maximum number of retries per node for node failure due to infra issues"` InterruptibleFailureThreshold int64 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible'"` } -// Contains default values for timeouts +// DefaultDeadlines contains default values for timeouts type DefaultDeadlines struct { DefaultNodeExecutionDeadline config.Duration `json:"node-execution-deadline" pflag:",Default value of node execution timeout"` DefaultNodeActiveDeadline config.Duration `json:"node-active-deadline" pflag:",Default value of node timeout"` DefaultWorkflowActiveDeadline config.Duration `json:"workflow-active-deadline" pflag:",Default value of workflow timeout"` } -// Contains leader election configuration. +// LeaderElectionConfig Contains leader election configuration. type LeaderElectionConfig struct { // Enable or disable leader election. Enabled bool `json:"enabled" pflag:",Enables/Disables leader election."` @@ -156,12 +211,12 @@ type LeaderElectionConfig struct { RetryPeriod config.Duration `json:"retry-period" pflag:",Duration the LeaderElector clients should wait between tries of actions."` } -// Extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object. -// TODO What if the type is incorrect? +// GetConfig extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object. func GetConfig() *Config { return configSection.GetConfig().(*Config) } +// MustRegisterSubSection can be used to configure any subsections the the propeller configuration func MustRegisterSubSection(subSectionKey string, section config.Config) config.Section { return configSection.MustRegisterSection(subSectionKey, section) } diff --git a/pkg/controller/nodes/subworkflow/launchplan/admin.go b/pkg/controller/nodes/subworkflow/launchplan/admin.go index 3fa4bcbc21..9ac4420e66 100644 --- a/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -22,6 +22,12 @@ import ( "google.golang.org/grpc/status" ) +// IsWorkflowTerminated returns a true if the Workflow Phase is in a Terminal Phase, else returns a false +func IsWorkflowTerminated(p core.WorkflowExecution_Phase) bool { + return p == core.WorkflowExecution_ABORTED || p == core.WorkflowExecution_FAILED || + p == core.WorkflowExecution_SUCCEEDED || p == core.WorkflowExecution_TIMED_OUT +} + // Executor for Launchplans that executes on a remote FlyteAdmin service (if configured) type adminLaunchPlanExecutor struct { adminClient service.AdminServiceClient @@ -141,6 +147,21 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc resp = make([]cache.ItemSyncResponse, 0, len(batch)) for _, obj := range batch { exec := obj.GetItem().(executionCacheItem) + + // Is workflow already terminated, then no need to fetch information, also the item can be dropped from the cache + if exec.ExecutionClosure != nil { + if IsWorkflowTerminated(exec.ExecutionClosure.Phase) { + logger.Debugf(ctx, "Workflow [%s] is already completed, will not fetch execution information", exec.ExecutionClosure.WorkflowId) + resp = append(resp, cache.ItemSyncResponse{ + ID: obj.GetID(), + Item: exec, + Action: cache.Unchanged, + }) + continue + } + } + + // Workflow is not already terminated, lets check the status req := &admin.WorkflowExecutionGetRequest{ Id: &exec.WorkflowExecutionIdentifier, } @@ -167,6 +188,7 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc continue } + // Update the cache with the retrieved status resp = append(resp, cache.ItemSyncResponse{ ID: obj.GetID(), Item: executionCacheItem{ diff --git a/pkg/controller/nodes/subworkflow/launchplan/admin_test.go b/pkg/controller/nodes/subworkflow/launchplan/admin_test.go index 69a24ff750..f52a5bf829 100644 --- a/pkg/controller/nodes/subworkflow/launchplan/admin_test.go +++ b/pkg/controller/nodes/subworkflow/launchplan/admin_test.go @@ -5,6 +5,9 @@ import ( "testing" "time" + "github.com/flyteorg/flytestdlib/cache" + mocks2 "github.com/flyteorg/flytestdlib/cache/mocks" + "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flyteidl/clients/go/admin/mocks" @@ -39,6 +42,25 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { assert.Equal(t, result, s) }) + t.Run("terminal-sync", func(t *testing.T) { + mockClient := &mocks.AdminServiceClient{} + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope()) + assert.NoError(t, err) + iwMock := &mocks2.ItemWrapper{} + i := executionCacheItem{ExecutionClosure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED, WorkflowId: &core.Identifier{Project: "p"}}} + iwMock.OnGetItem().Return(i) + iwMock.OnGetID().Return("id") + adminExec := exec.(*adminLaunchPlanExecutor) + v, err := adminExec.syncItem(ctx, cache.Batch{ + iwMock, + }) + assert.NoError(t, err) + assert.NotNil(t, v) + assert.Len(t, v, 1) + assert.Equal(t, v[0].ID, "id") + assert.Equal(t, v[0].Item, i) + }) + t.Run("notFound", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} @@ -130,6 +152,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { assert.Nil(t, s) assert.False(t, IsNotFound(err)) }) + } func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { @@ -313,3 +336,15 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) { assert.Error(t, err) }) } + +func TestIsWorkflowTerminated(t *testing.T) { + assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_SUCCEEDED)) + assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_ABORTED)) + assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_FAILED)) + assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_TIMED_OUT)) + assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_SUCCEEDING)) + assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_FAILING)) + assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_RUNNING)) + assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_QUEUED)) + assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_UNDEFINED)) +} diff --git a/pkg/controller/nodes/subworkflow/launchplan/adminconfig.go b/pkg/controller/nodes/subworkflow/launchplan/adminconfig.go index 76fce2a0e0..a88a96b30d 100644 --- a/pkg/controller/nodes/subworkflow/launchplan/adminconfig.go +++ b/pkg/controller/nodes/subworkflow/launchplan/adminconfig.go @@ -8,7 +8,7 @@ import ( var ( defaultAdminConfig = &AdminConfig{ - TPS: 5, + TPS: 100, Burst: 10, MaxCacheSize: 10000, Workers: 10, @@ -17,6 +17,8 @@ var ( adminConfigSection = ctrlConfig.MustRegisterSubSection("admin-launcher", defaultAdminConfig) ) +// AdminConfig provides a "admin-launcher" section in core Flytepropeller configuration and can be used to configure +// the rate at which Flytepropeller can query for status of workflows in flyteadmin or create new executions type AdminConfig struct { // TPS indicates the maximum transactions per second to flyte admin from this client. // If it's zero, the created client will use DefaultTPS: 5 diff --git a/pkg/controller/workflowstore/config.go b/pkg/controller/workflowstore/config.go index aa04af0962..aa70f3810e 100644 --- a/pkg/controller/workflowstore/config.go +++ b/pkg/controller/workflowstore/config.go @@ -9,19 +9,26 @@ import ( type Policy = string const ( - PolicyInMemory = "InMemory" - PolicyPassThrough = "PassThrough" + // PolicyInMemory provides an inmemory Workflow store which is useful for testing + PolicyInMemory = "InMemory" + // PolicyPassThrough just calls the underlying Clientset or the shared informer cache to get or write the workflow + PolicyPassThrough = "PassThrough" + // PolicyResourceVersionCache uses the resource version on the Workflow object, to determine if the inmemory copy + // of the workflow is stale PolicyResourceVersionCache = "ResourceVersionCache" ) +// By default we will use the ResourceVersionCache example var ( defaultConfig = &Config{ - Policy: PolicyPassThrough, + Policy: PolicyResourceVersionCache, } configSection = ctrlConfig.MustRegisterSubSection("workflowStore", defaultConfig) ) +// Config for Workflow access in the controller. +// Various policies are available like - InMemory, PassThrough, ResourceVersionCache type Config struct { Policy Policy `json:"policy" pflag:",Workflow Store Policy to initialize"` } diff --git a/pkg/controller/workflowstore/iface.go b/pkg/controller/workflowstore/iface.go index 84477450a9..f6c9eb4454 100644 --- a/pkg/controller/workflowstore/iface.go +++ b/pkg/controller/workflowstore/iface.go @@ -15,6 +15,7 @@ const ( PriorityClassRegular ) +// FlyteWorkflow store interface provides an abstraction of accessing the actual FlyteWorkflow object. type FlyteWorkflow interface { Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error) UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (