Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' of github.com:flyteorg/flyteadmin into fix-336
Browse files Browse the repository at this point in the history
  • Loading branch information
yindia committed Aug 16, 2021
2 parents e780eef + e6e6576 commit 49ca1e1
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 68 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.19.14
github.com/flyteorg/flyteidl v0.19.19
github.com/flyteorg/flyteplugins v0.5.59
github.com/flyteorg/flytepropeller v0.13.3
github.com/flyteorg/flytestdlib v0.3.27
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,9 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.14 h1:OLg2eT9uYllcfMMjEZJoXQ+2WXcrNbUxD+yaCrz2AlI=
github.com/flyteorg/flyteidl v0.19.14/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.19 h1:jv93YLz0Bq++sH9r0AOhdNaHFdXSCWjsXJoLOIduA2o=
github.com/flyteorg/flyteidl v0.19.19/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.5.59 h1:Uw1xlrlx5rSTpdTMwJTo7mbqHI7X7p7CFVm3473iRjo=
github.com/flyteorg/flyteplugins v0.5.59/go.mod h1:nesnW7pJhXEysFQg9TnSp36ao33ie0oA/TI4sYPaeyw=
github.com/flyteorg/flytepropeller v0.13.3 h1:nnO4d9w6UbgLCF9kn0M6LTkYpS/F5jEoEF22YcRmLYI=
Expand Down
74 changes: 50 additions & 24 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,45 +210,51 @@ func (m *ExecutionManager) offloadInputs(ctx context.Context, literalMap *core.L
}

func createTaskDefaultLimits(ctx context.Context, task *core.CompiledTask,
systemResourceLimits runtimeInterfaces.TaskResourceSet) runtimeInterfaces.TaskResourceSet {
configResourceLimits runtimeInterfaces.TaskResourceSet) runtimeInterfaces.TaskResourceSet {
// The values below should never be used (deduce it from the request; request should be set by the time we get here).
// Setting them here just in case we end up with requests not set. We are not adding to config because it would add
// more confusion as its mostly not used.
cpuLimit := "500m"
memoryLimit := "500Mi"
resourceEntries := task.Template.GetContainer().Resources.Requests
var cpuIndex, memoryIndex = -1, -1
for idx, entry := range resourceEntries {
resourceRequestEntries := task.Template.GetContainer().Resources.Requests
var cpuIndex, memoryIndex, ephemeralStorageIndex = -1, -1, -1
for idx, entry := range resourceRequestEntries {
switch entry.Name {
case core.Resources_CPU:
cpuIndex = idx

case core.Resources_MEMORY:
memoryIndex = idx
case core.Resources_EPHEMERAL_STORAGE:
ephemeralStorageIndex = idx
}
}

if cpuIndex < 0 || memoryIndex < 0 {
logger.Errorf(ctx, "Cpu request and Memory request missing for %s", task.Template.Id)
}
taskResourceLimits := runtimeInterfaces.TaskResourceSet{}

if cpuIndex >= 0 {
cpuLimit = resourceEntries[cpuIndex].Value
// For resource values, we prefer to use the limits set in the application config over the set resource values.
if len(configResourceLimits.CPU) > 0 {
cpuLimit = configResourceLimits.CPU
} else if cpuIndex >= 0 {
cpuLimit = resourceRequestEntries[cpuIndex].Value
}
if memoryIndex >= 0 {
memoryLimit = resourceEntries[memoryIndex].Value
taskResourceLimits.CPU = cpuLimit
if len(configResourceLimits.Memory) > 0 {
memoryLimit = configResourceLimits.Memory
} else if memoryIndex >= 0 {
memoryLimit = resourceRequestEntries[memoryIndex].Value
}

taskResourceLimits := runtimeInterfaces.TaskResourceSet{CPU: cpuLimit, Memory: memoryLimit}
// Use the limits from config
if systemResourceLimits.CPU != "" {
taskResourceLimits.CPU = systemResourceLimits.CPU
taskResourceLimits.Memory = memoryLimit
if len(taskResourceLimits.GPU) == 0 && len(configResourceLimits.GPU) > 0 {
// When a platform default for GPU exists, but one isn't set in the task resources, use the platform value.
taskResourceLimits.GPU = configResourceLimits.GPU
}
if systemResourceLimits.Memory != "" {
taskResourceLimits.Memory = systemResourceLimits.Memory
}
if systemResourceLimits.GPU != "" {
taskResourceLimits.GPU = systemResourceLimits.GPU
if len(configResourceLimits.EphemeralStorage) > 0 {
taskResourceLimits.EphemeralStorage = configResourceLimits.EphemeralStorage
} else if ephemeralStorageIndex >= 0 {
taskResourceLimits.EphemeralStorage = resourceRequestEntries[ephemeralStorageIndex].Value
}

return taskResourceLimits
Expand All @@ -257,21 +263,23 @@ func createTaskDefaultLimits(ctx context.Context, task *core.CompiledTask,
func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier,
platformValues runtimeInterfaces.TaskResourceSet,
resourceEntries []*core.Resources_ResourceEntry, taskResourceSpec *admin.TaskResourceSpec) []*core.Resources_ResourceEntry {
var cpuIndex, memoryIndex = -1, -1
var cpuIndex, memoryIndex, ephemeralStorageindex = -1, -1, -1
for idx, entry := range resourceEntries {
switch entry.Name {
case core.Resources_CPU:
cpuIndex = idx
case core.Resources_MEMORY:
memoryIndex = idx
case core.Resources_EPHEMERAL_STORAGE:
ephemeralStorageindex = idx
}
}
if cpuIndex > 0 && memoryIndex > 0 {
if cpuIndex > 0 && memoryIndex > 0 && ephemeralStorageindex > 0 {
// nothing to do
return resourceEntries
}

if cpuIndex < 0 && platformValues.CPU != "" {
if cpuIndex < 0 && len(platformValues.CPU) > 0 {
logger.Debugf(ctx, "Setting 'cpu' for [%+v] to %s", identifier, platformValues.CPU)
cpuValue := platformValues.CPU
if taskResourceSpec != nil && len(taskResourceSpec.Cpu) > 0 {
Expand All @@ -284,7 +292,7 @@ func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier,
}
resourceEntries = append(resourceEntries, cpuResource)
}
if memoryIndex < 0 && platformValues.Memory != "" {
if memoryIndex < 0 && len(platformValues.Memory) > 0 {
memoryValue := platformValues.Memory
if taskResourceSpec != nil && len(taskResourceSpec.Memory) > 0 {
// Use the custom attributes from the database rather than the platform defaults from the application config
Expand All @@ -297,6 +305,23 @@ func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier,
logger.Debugf(ctx, "Setting 'memory' for [%+v] to %s", identifier, platformValues.Memory)
resourceEntries = append(resourceEntries, memoryResource)
}
if ephemeralStorageindex < 0 {
var ephemeralStorageValue string
if taskResourceSpec != nil && len(taskResourceSpec.EphemeralStorage) > 0 {
// Use the custom attributes from the database rather than the platform defaults from the application config
ephemeralStorageValue = taskResourceSpec.EphemeralStorage
} else if len(platformValues.EphemeralStorage) > 0 {
ephemeralStorageValue = platformValues.EphemeralStorage
}
if len(ephemeralStorageValue) > 0 {
ephemeralStorageResource := &core.Resources_ResourceEntry{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: ephemeralStorageValue,
}
logger.Debugf(ctx, "Setting 'ephemeralStorage' for [%+v] to %s", identifier, platformValues.EphemeralStorage)
resourceEntries = append(resourceEntries, ephemeralStorageResource)
}
}
return resourceEntries
}

Expand Down Expand Up @@ -383,8 +408,9 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
if resource != nil && resource.Attributes != nil && resource.Attributes.GetTaskResourceAttributes() != nil {
taskResourceSpec = resource.Attributes.GetTaskResourceAttributes().Limits
}

task.Template.GetContainer().Resources.Limits = assignResourcesIfUnset(
ctx, task.Template.Id, createTaskDefaultLimits(ctx, task, m.config.TaskResourceConfiguration().GetLimits()), task.Template.GetContainer().Resources.Limits,
ctx, task.Template.Id, createTaskDefaultLimits(ctx, task, m.config.TaskResourceConfiguration().GetDefaults()), task.Template.GetContainer().Resources.Limits,
taskResourceSpec)
checkTaskRequestsLessThanLimits(ctx, task.Template.Id, task.Template.GetContainer().Resources)
}
Expand Down
Loading

0 comments on commit 49ca1e1

Please sign in to comment.