From 882c359f05085e7d31c57c42da97ed93074ae990 Mon Sep 17 00:00:00 2001 From: Cam Date: Wed, 18 Dec 2019 15:43:46 -0800 Subject: [PATCH 1/4] Add custom context to logs 1. added ability to create custom loggers for structs and add custom context 2. implements custom loggers for a few selected structs --- agent/api/container/container.go | 4 +- agent/api/task/json.go | 10 + agent/api/task/task.go | 110 ++++---- agent/dockerclient/dockerapi/types.go | 12 + agent/engine/docker_task_engine.go | 2 +- agent/engine/task_manager.go | 234 +++++++++--------- agent/logger/log.go | 62 +++-- agent/logger/log_test.go | 57 ++++- agent/taskresource/cgroup/cgroup.go | 28 ++- agent/taskresource/firelens/firelens_unix.go | 26 +- .../firelens/firelensconfig_unix.go | 3 +- agent/taskresource/firelens/json_unix.go | 2 + agent/taskresource/volume/dockervolume.go | 26 +- 13 files changed, 371 insertions(+), 205 deletions(-) diff --git a/agent/api/container/container.go b/agent/api/container/container.go index 97f3210b605..d78d8c3e223 100644 --- a/agent/api/container/container.go +++ b/agent/api/container/container.go @@ -1001,7 +1001,7 @@ func (c *Container) GetLogDriver() string { hostConfig := &dockercontainer.HostConfig{} err := json.Unmarshal([]byte(*c.DockerConfig.HostConfig), hostConfig) if err != nil { - seelog.Warnf("Encountered error when trying to get log driver for container %s: %v", err) + seelog.Warnf("Encountered error when trying to get log driver for container %s: %v", c.String(), err) return "" } @@ -1021,7 +1021,7 @@ func (c *Container) GetNetworkModeFromHostConfig() string { // TODO return error to differentiate between error and default mode . err := json.Unmarshal([]byte(*c.DockerConfig.HostConfig), hostConfig) if err != nil { - seelog.Warnf("Encountered error when trying to get network mode for container %s: %v", err) + seelog.Warnf("Encountered error when trying to get network mode for container %s: %v", c.String(), err) return "" } diff --git a/agent/api/task/json.go b/agent/api/task/json.go index ad46e56d066..83a1c5eb5d2 100644 --- a/agent/api/task/json.go +++ b/agent/api/task/json.go @@ -28,3 +28,13 @@ func (t *Task) MarshalJSON() ([]byte, error) { return json.Marshal((*jTask)(t)) } + +// UnmarshalJSON wraps Go's unmarshalling logic to guarantee that the logger gets created +func (t *Task) UnmarshalJSON(data []byte) error { + err := json.Unmarshal(data, (*jTask)(t)) + if err != nil { + return err + } + t.initLog() + return nil +} diff --git a/agent/api/task/task.go b/agent/api/task/task.go index 11a598c2231..5a72c082cd9 100644 --- a/agent/api/task/task.go +++ b/agent/api/task/task.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/aws/amazon-ecs-agent/agent/logger" "github.com/aws/amazon-ecs-agent/agent/utils/ttime" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/arn" @@ -253,6 +254,9 @@ type Task struct { // lock is for protecting all fields in the task struct lock sync.RWMutex + + // log is a custom logger with extra context specific to the task struct + log seelog.LoggerInterface } // TaskFromACS translates ecsacs.Task to apitask.Task by first marshaling the received @@ -282,9 +286,21 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task, //initialize resources map for task task.ResourcesMapUnsafe = make(map[string][]taskresource.TaskResource) + task.initLog() return task, nil } +func (task *Task) initLog() { + if task.log == nil { + task.log = logger.InitLogger() + task.log.SetContext(map[string]string{ + "taskARN": task.Arn, + "taskFamily": task.Family, + "taskVersion": task.Version, + }) + } +} + func (task *Task) initializeVolumes(cfg *config.Config, dockerClient dockerapi.DockerClient, ctx context.Context) error { err := task.initializeDockerLocalVolumes(dockerClient, ctx) if err != nil { @@ -309,21 +325,22 @@ func (task *Task) PostUnmarshalTask(cfg *config.Config, dockerClient dockerapi.DockerClient, ctx context.Context) error { // TODO, add rudimentary plugin support and call any plugins that want to // hook into this + task.initLog() task.adjustForPlatform(cfg) if task.MemoryCPULimitsEnabled { if err := task.initializeCgroupResourceSpec(cfg.CgroupPath, cfg.CgroupCPUPeriod, resourceFields); err != nil { - seelog.Errorf("Task [%s]: could not intialize resource: %v", task.Arn, err) + task.log.Errorf("could not intialize resource: %v", err) return apierrors.NewResourceInitError(task.Arn, err) } } if err := task.initializeContainerOrderingForVolumes(); err != nil { - seelog.Errorf("Task [%s]: could not initialize volumes dependency for container: %v", task.Arn, err) + task.log.Errorf("could not initialize volumes dependency for container: %v", err) return apierrors.NewResourceInitError(task.Arn, err) } if err := task.initializeContainerOrderingForLinks(); err != nil { - seelog.Errorf("Task [%s]: could not initialize links dependency for container: %v", task.Arn, err) + task.log.Errorf("could not initialize links dependency for container: %v", err) return apierrors.NewResourceInitError(task.Arn, err) } @@ -344,14 +361,14 @@ func (task *Task) PostUnmarshalTask(cfg *config.Config, } if err := task.addGPUResource(cfg); err != nil { - seelog.Errorf("Task [%s]: could not initialize GPU associations: %v", task.Arn, err) + task.log.Errorf("could not initialize GPU associations: %v", err) return apierrors.NewResourceInitError(task.Arn, err) } task.initializeCredentialsEndpoint(credentialsManager) task.initializeContainersV3MetadataEndpoint(utils.NewDynamicUUIDProvider()) if err := task.addNetworkResourceProvisioningDependency(cfg); err != nil { - seelog.Errorf("Task [%s]: could not provision network resource: %v", task.Arn, err) + task.log.Errorf("could not provision network resource: %v", err) return apierrors.NewResourceInitError(task.Arn, err) } // Adds necessary Pause containers for sharing PID or IPC namespaces @@ -363,7 +380,7 @@ func (task *Task) PostUnmarshalTask(cfg *config.Config, if task.requiresCredentialSpecResource() { if err := task.initializeCredentialSpecResource(cfg, credentialsManager, resourceFields); err != nil { - seelog.Errorf("Task [%s]: could not initialize credentialspec resource: %v", task.Arn, err) + task.log.Errorf("could not initialize credentialspec resource: %v", err) return apierrors.NewResourceInitError(task.Arn, err) } } @@ -619,7 +636,7 @@ func (task *Task) addSharedVolumes(SharedVolumeMatchFullConfig bool, ctx context return volumeMetadata.Error } - seelog.Infof("initialize volume: Task [%s]: non-autoprovisioned volume not found, adding to task resource %q", task.Arn, vol.Name) + task.log.Infof("initialize volume: non-autoprovisioned volume not found, adding to task resource %q", vol.Name) // this resource should be created by agent volumeResource, err := taskresourcevolume.NewVolumeResource( ctx, @@ -637,22 +654,22 @@ func (task *Task) addSharedVolumes(SharedVolumeMatchFullConfig bool, ctx context return nil } - seelog.Infof("initialize volume: Task [%s]: volume [%s] already exists", task.Arn, volumeConfig.DockerVolumeName) + task.log.Infof("initialize volume: volume [%s] already exists", volumeConfig.DockerVolumeName) if !SharedVolumeMatchFullConfig { - seelog.Infof("initialize volume: Task [%s]: ECS_SHARED_VOLUME_MATCH_FULL_CONFIG is set to false and volume with name [%s] is found", task.Arn, volumeConfig.DockerVolumeName) + task.log.Infof("initialize volume: ECS_SHARED_VOLUME_MATCH_FULL_CONFIG is set to false and volume with name [%s] is found", volumeConfig.DockerVolumeName) return nil } // validate all the volume metadata fields match to the configuration if len(volumeMetadata.DockerVolume.Labels) == 0 && len(volumeMetadata.DockerVolume.Labels) == len(volumeConfig.Labels) { - seelog.Infof("labels are both empty or null: Task [%s]: volume [%s]", task.Arn, volumeConfig.DockerVolumeName) + task.log.Infof("labels are both empty or null: volume [%s]", volumeConfig.DockerVolumeName) } else if !reflect.DeepEqual(volumeMetadata.DockerVolume.Labels, volumeConfig.Labels) { return errors.Errorf("intialize volume: non-autoprovisioned volume does not match existing volume labels: existing: %v, expected: %v", volumeMetadata.DockerVolume.Labels, volumeConfig.Labels) } if len(volumeMetadata.DockerVolume.Options) == 0 && len(volumeMetadata.DockerVolume.Options) == len(volumeConfig.DriverOpts) { - seelog.Infof("driver options are both empty or null: Task [%s]: volume [%s]", task.Arn, volumeConfig.DockerVolumeName) + task.log.Infof("driver options are both empty or null: volume [%s]", volumeConfig.DockerVolumeName) } else if !reflect.DeepEqual(volumeMetadata.DockerVolume.Options, volumeConfig.DriverOpts) { return errors.Errorf("initialize volume: non-autoprovisioned volume does not match existing volume options: existing: %v, expected: %v", volumeMetadata.DockerVolume.Options, volumeConfig.DriverOpts) @@ -689,7 +706,7 @@ func (task *Task) initializeCredentialsEndpoint(credentialsManager credentials.M // the id. This should never happen as the payload handler sets // credentialsId for the task after adding credentials to the // credentials manager - seelog.Errorf("Unable to get credentials for task: %s", task.Arn) + task.log.Errorf("Unable to get credentials for task") return } @@ -953,7 +970,7 @@ func (task *Task) addFirelensContainerDependency() error { if firelensContainer.HasContainerDependencies() { // If firelens container has any container dependency, we don't add internal container dependency that depends // on it in order to be safe (otherwise we need to deal with circular dependency). - seelog.Warnf("Not adding container dependency to let firelens container %s start first, because it has dependency on other containers.", firelensContainer.Name) + task.log.Warnf("Not adding container dependency to let firelens container %s start first, because it has dependency on other containers.", firelensContainer.Name) return nil } @@ -977,7 +994,7 @@ func (task *Task) addFirelensContainerDependency() error { // If there's no dependency between the app container and the firelens container, make firelens container // start first to be the default behavior by adding a START container depdendency. if !container.DependsOnContainer(firelensContainer.Name) { - seelog.Infof("Adding a START container dependency on firelens container %s for container %s", + task.log.Infof("Adding a START container dependency on firelens container %s for container %s", firelensContainer.Name, container.Name) container.AddContainerDependency(firelensContainer.Name, ContainerOrderingStartCondition) } @@ -1278,7 +1295,7 @@ func (task *Task) UpdateMountPoints(cont *apicontainer.Container, vols []types.M // there was no change // Invariant: task known status is the minimum of container known status func (task *Task) updateTaskKnownStatus() (newStatus apitaskstatus.TaskStatus) { - seelog.Debugf("api/task: Updating task's known status, task: %s", task.String()) + task.log.Debugf("api/task: Updating task's known status") // Set to a large 'impossible' status that can't be the min containerEarliestKnownStatus := apicontainerstatus.ContainerZombie var earliestKnownStatusContainer *apicontainer.Container @@ -1294,19 +1311,17 @@ func (task *Task) updateTaskKnownStatus() (newStatus apitaskstatus.TaskStatus) { } } if earliestKnownStatusContainer == nil { - seelog.Criticalf( - "Impossible state found while updating tasks's known status, earliest state recorded as %s for task [%v]", - containerEarliestKnownStatus.String(), task) + task.log.Criticalf( + "Impossible state found while updating tasks's known status, earliest state recorded as %s", + containerEarliestKnownStatus.String()) return apitaskstatus.TaskStatusNone } - seelog.Debugf("api/task: Container with earliest known container is [%s] for task: %s", - earliestKnownStatusContainer.String(), task.String()) + task.log.Debugf("api/task: Container with earliest known container is [%s]", + earliestKnownStatusContainer.String()) // If the essential container is stopped while other containers may be running // don't update the task status until the other containers are stopped. if earliestKnownStatusContainer.IsKnownSteadyState() && essentialContainerStopped { - seelog.Debugf( - "Essential container is stopped while other containers are running, not updating task status for task: %s", - task.String()) + task.log.Debugf("Essential container is stopped while other containers are running, not updating task status") return apitaskstatus.TaskStatusNone } // We can't rely on earliest container known status alone for determining if the @@ -1315,8 +1330,8 @@ func (task *Task) updateTaskKnownStatus() (newStatus apitaskstatus.TaskStatus) { // statuses and compute the min of this earliestKnownTaskStatus := task.getEarliestKnownTaskStatusForContainers() if task.GetKnownStatus() < earliestKnownTaskStatus { - seelog.Infof("api/task: Updating task's known status to: %s, task: %s", - earliestKnownTaskStatus.String(), task.String()) + task.log.Infof("api/task: Updating task's known status to: %s", + earliestKnownTaskStatus.String()) task.SetKnownStatus(earliestKnownTaskStatus) return task.GetKnownStatus() } @@ -1327,7 +1342,7 @@ func (task *Task) updateTaskKnownStatus() (newStatus apitaskstatus.TaskStatus) { // based on the known statuses of all containers in the task func (task *Task) getEarliestKnownTaskStatusForContainers() apitaskstatus.TaskStatus { if len(task.Containers) == 0 { - seelog.Criticalf("No containers in the task: %s", task.String()) + task.log.Criticalf("No containers in the task") return apitaskstatus.TaskStatusNone } // Set earliest container status to an impossible to reach 'high' task status @@ -1462,7 +1477,7 @@ func (task *Task) dockerHostConfig(container *apicontainer.Container, dockerCont if task.NvidiaRuntime == "" { return nil, &apierrors.HostConfigError{Msg: "Runtime is not set for GPU containers"} } - seelog.Debugf("Setting runtime as %s for container %s", task.NvidiaRuntime, container.Name) + task.log.Debugf("Setting runtime as %s for container %s", task.NvidiaRuntime, container.Name) hostConfig.Runtime = task.NvidiaRuntime } @@ -1512,8 +1527,8 @@ func (task *Task) getDockerResources(container *apicontainer.Container) dockerco // Convert MB to B and set Memory dockerMem := int64(container.Memory * 1024 * 1024) if dockerMem != 0 && dockerMem < apicontainer.DockerContainerMinimumMemoryInBytes { - seelog.Warnf("Task %s container %s memory setting is too low, increasing to %d bytes", - task.Arn, container.Name, apicontainer.DockerContainerMinimumMemoryInBytes) + task.log.Warnf("container %s memory setting is too low, increasing to %d bytes", + container.Name, apicontainer.DockerContainerMinimumMemoryInBytes) dockerMem = apicontainer.DockerContainerMinimumMemoryInBytes } // Set CPUShares @@ -1556,14 +1571,14 @@ func (task *Task) shouldOverrideNetworkMode(container *apicontainer.Container, d } } if pauseContName == "" { - seelog.Critical("Pause container required, but not found in the task: %s", task.String()) + task.log.Critical("Pause container required, but not found in the task") return false, "" } pauseContainer, ok := dockerContainerMap[pauseContName] if !ok || pauseContainer == nil { // This should never be the case and implies a code-bug. - seelog.Criticalf("Pause container required, but not found in container map for container: [%s] in task: %s", - container.String(), task.String()) + task.log.Criticalf("Pause container required, but not found in container map for container: [%s]", + container.String()) return false, "" } return true, dockerMappingContainerPrefix + pauseContainer.DockerID @@ -1645,14 +1660,14 @@ func (task *Task) shouldOverridePIDMode(container *apicontainer.Container, docke case pidModeTask: pauseCont, ok := task.ContainerByName(NamespacePauseContainerName) if !ok { - seelog.Criticalf("Namespace Pause container not found in the task: %s; Setting Task's Desired Status to Stopped", task.Arn) + task.log.Criticalf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped") task.SetDesiredStatus(apitaskstatus.TaskStopped) return false, "" } pauseDockerID, ok := dockerContainerMap[pauseCont.Name] if !ok || pauseDockerID == nil { // Docker container shouldn't be nil or not exist if the Container definition within task exists; implies code-bug - seelog.Criticalf("Namespace Pause docker container not found in the task: %s; Setting Task's Desired Status to Stopped", task.Arn) + task.log.Criticalf("Namespace Pause docker container not found in the task; Setting Task's Desired Status to Stopped") task.SetDesiredStatus(apitaskstatus.TaskStopped) return false, "" } @@ -1698,14 +1713,14 @@ func (task *Task) shouldOverrideIPCMode(container *apicontainer.Container, docke case ipcModeTask: pauseCont, ok := task.ContainerByName(NamespacePauseContainerName) if !ok { - seelog.Criticalf("Namespace Pause container not found in the task: %s; Setting Task's Desired Status to Stopped", task.Arn) + task.log.Criticalf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped") task.SetDesiredStatus(apitaskstatus.TaskStopped) return false, "" } pauseDockerID, ok := dockerContainerMap[pauseCont.Name] if !ok || pauseDockerID == nil { // Docker container shouldn't be nill or not exist if the Container definition within task exists; implies code-bug - seelog.Criticalf("Namespace Pause container not found in the task: %s; Setting Task's Desired Status to Stopped", task.Arn) + task.log.Criticalf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped") task.SetDesiredStatus(apitaskstatus.TaskStopped) return false, "" } @@ -1764,8 +1779,8 @@ func (task *Task) dockerLinks(container *apicontainer.Container, dockerContainer if len(linkParts) == 2 { linkAlias = linkParts[1] } else { - seelog.Warnf("Link name [%s] found with no linkalias for container: [%s] in task: [%s]", - linkName, container.String(), task.String()) + task.log.Warnf("Link name [%s] found with no linkalias for container: [%s]", + linkName, container.String()) linkAlias = linkName } @@ -1824,9 +1839,9 @@ func (task *Task) dockerHostBinds(container *apicontainer.Container) ([]string, } if hv.Source() == "" || mountPoint.ContainerPath == "" { - seelog.Errorf( - "Unable to resolve volume mounts for container [%s]; invalid path: [%s]; [%s] -> [%s] in task: [%s]", - container.Name, mountPoint.SourceVolume, hv.Source(), mountPoint.ContainerPath, task.String()) + task.log.Errorf( + "Unable to resolve volume mounts for container [%s]; invalid path: [%s]; [%s] -> [%s]", + container.Name, mountPoint.SourceVolume, hv.Source(), mountPoint.ContainerPath) return []string{}, errors.Errorf("Unable to resolve volume mounts; invalid path: %s %s; %s -> %s", container.Name, mountPoint.SourceVolume, hv.Source(), mountPoint.ContainerPath) } @@ -1863,14 +1878,13 @@ func (task *Task) UpdateDesiredStatus() { // updateTaskDesiredStatusUnsafe determines what status the task should properly be at based on the containers' statuses // Invariant: task desired status must be stopped if any essential container is stopped func (task *Task) updateTaskDesiredStatusUnsafe() { - seelog.Debugf("Updating task: [%s]", task.stringUnsafe()) + task.log.Debugf("Updating task") // A task's desired status is stopped if any essential container is stopped // Otherwise, the task's desired status is unchanged (typically running, but no need to change) for _, cont := range task.Containers { if cont.Essential && (cont.KnownTerminal() || cont.DesiredTerminal()) { - seelog.Infof("api/task: Updating task desired status to stopped because of container: [%s]; task: [%s]", - cont.Name, task.stringUnsafe()) + task.log.Infof("api/task: Updating task desired status to stopped because of container: [%s]", cont.Name) task.DesiredStatusUnsafe = apitaskstatus.TaskStopped } } @@ -2204,8 +2218,8 @@ func (task *Task) RecordExecutionStoppedAt(container *apicontainer.Container) { // ExecutionStoppedAt was already recorded. Nothing to left to do here return } - seelog.Infof("Task [%s]: recording execution stopped time. Essential container [%s] stopped at: %s", - task.Arn, container.Name, now.String()) + task.log.Infof("recording execution stopped time. Essential container [%s] stopped at: %s", + container.Name, now.String()) } // GetResources returns the list of task resources from ResourcesMap @@ -2234,9 +2248,9 @@ func (task *Task) AddResource(resourceType string, resource taskresource.TaskRes // SetTerminalReason sets the terminalReason string and this can only be set // once per the task's lifecycle. This field does not accept updates. func (task *Task) SetTerminalReason(reason string) { - seelog.Infof("Task [%s]: attempting to set terminal reason for task [%s]", task.Arn, reason) + task.log.Infof("attempting to set terminal reason for task [%s]", reason) task.terminalReasonOnce.Do(func() { - seelog.Infof("Task [%s]: setting terminal reason for task [%s]", task.Arn, reason) + task.log.Infof("setting terminal reason for task [%s]", reason) // Converts the first letter of terminal reason into capital letter words := strings.Fields(reason) diff --git a/agent/dockerclient/dockerapi/types.go b/agent/dockerclient/dockerapi/types.go index a7a5c960d61..2d29d8e7254 100644 --- a/agent/dockerclient/dockerapi/types.go +++ b/agent/dockerclient/dockerapi/types.go @@ -153,3 +153,15 @@ func (event *DockerContainerChangeEvent) String() string { return res } + +// String returns a short human readable string of the container change event +func (event *DockerContainerChangeEvent) ShortString() string { + res := fmt.Sprintf("event type: %s, event container status: %s, docker ID: %s", + event.Type.String(), event.Status.String(), event.DockerID) + + if event.ExitCode != nil { + res += fmt.Sprintf(", ExitCode: %d", aws.IntValue(event.ExitCode)) + } + + return res +} diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index 14b8875daab..0363934c6da 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -1392,7 +1392,7 @@ func (engine *DockerTaskEngine) applyContainerState(task *apitask.Task, containe } metadata := transitionFunction(task, container) if metadata.Error != nil { - seelog.Infof("Task engine [%s]: error transitioning container [%s] to [%s]: %v", + seelog.Errorf("Task engine [%s]: error transitioning container [%s] to [%s]: %v", task.Arn, container.Name, nextState.String(), metadata.Error) } else { seelog.Debugf("Task engine [%s]: transitioned container [%s] to [%s]", diff --git a/agent/engine/task_manager.go b/agent/engine/task_manager.go index 39a3a961f6c..86dc66388be 100644 --- a/agent/engine/task_manager.go +++ b/agent/engine/task_manager.go @@ -32,6 +32,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/ecscni" "github.com/aws/amazon-ecs-agent/agent/engine/dependencygraph" "github.com/aws/amazon-ecs-agent/agent/eventstream" + "github.com/aws/amazon-ecs-agent/agent/logger" "github.com/aws/amazon-ecs-agent/agent/statechange" "github.com/aws/amazon-ecs-agent/agent/statemanager" "github.com/aws/amazon-ecs-agent/agent/taskresource" @@ -126,6 +127,7 @@ type managedTask struct { *apitask.Task ctx context.Context cancel context.CancelFunc + log seelog.LoggerInterface engine *DockerTaskEngine cfg *config.Config @@ -165,11 +167,18 @@ type managedTask struct { // This method must only be called when the engine.processTasks write lock is // already held. func (engine *DockerTaskEngine) newManagedTask(task *apitask.Task) *managedTask { + log := logger.InitLogger() + log.SetContext(map[string]string{ + "taskARN": task.Arn, + "taskFamily": task.Family, + "taskVersion": task.Version, + }) ctx, cancel := context.WithCancel(engine.ctx) t := &managedTask{ ctx: ctx, cancel: cancel, Task: task, + log: log, acsMessages: make(chan acsTransition), dockerMessages: make(chan dockerContainerChange), resourceStateChangeEvent: make(chan resourceStateChange), @@ -205,7 +214,7 @@ func (mtask *managedTask) overseeTask() { for { select { case <-mtask.ctx.Done(): - seelog.Infof("Managed task [%s]: parent context cancelled, exit", mtask.Arn) + mtask.log.Infof("parent context cancelled, exit") return default: } @@ -218,8 +227,7 @@ func (mtask *managedTask) overseeTask() { if !mtask.GetKnownStatus().Terminal() { // If we aren't terminal and we aren't steady state, we should be // able to move some containers along. - seelog.Infof("Managed task [%s]: task not steady state or terminal; progressing it", - mtask.Arn) + mtask.log.Infof("task not steady state or terminal; progressing it") mtask.progressTask() } @@ -230,8 +238,7 @@ func (mtask *managedTask) overseeTask() { // be sufficient to capture state changes. err := mtask.saver.Save() if err != nil { - seelog.Warnf("Managed task [%s]: unable to checkpoint task's states to disk: %v", - mtask.Arn, err) + mtask.log.Warnf("unable to checkpoint task's states to disk: %v", err) } if mtask.GetKnownStatus().Terminal() { @@ -240,11 +247,11 @@ func (mtask *managedTask) overseeTask() { } // We only break out of the above if this task is known to be stopped. Do // onetime cleanup here, including removing the task after a timeout - seelog.Infof("Managed task [%s]: task has reached stopped. Waiting for container cleanup", mtask.Arn) + mtask.log.Infof("task has reached stopped. Waiting for container cleanup") mtask.cleanupCredentials() if mtask.StopSequenceNumber != 0 { - seelog.Debugf("Managed task [%s]: marking done for this sequence: %d", - mtask.Arn, mtask.StopSequenceNumber) + mtask.log.Debugf("marking done for this sequence: %d", + mtask.StopSequenceNumber) mtask.taskStopWG.Done(mtask.StopSequenceNumber) } // TODO: make this idempotent on agent restart @@ -274,8 +281,8 @@ func (mtask *managedTask) waitForHostResources() { return } - seelog.Infof("Managed task [%s]: waiting for any previous stops to complete. Sequence number: %d", - mtask.Arn, mtask.StartSequenceNumber) + mtask.log.Infof("waiting for any previous stops to complete. Sequence number: %d", + mtask.StartSequenceNumber) othersStoppedCtx, cancel := context.WithCancel(mtask.ctx) defer cancel() @@ -293,21 +300,21 @@ func (mtask *managedTask) waitForHostResources() { break } } - seelog.Infof("Managed task [%s]: wait over; ready to move towards status: %s", - mtask.Arn, mtask.GetDesiredStatus().String()) + mtask.log.Infof("wait over; ready to move towards status: %s", + mtask.GetDesiredStatus().String()) } // waitSteady waits for a task to leave steady-state by waiting for a new // event, or a timeout. func (mtask *managedTask) waitSteady() { - seelog.Infof("Managed task [%s]: task at steady state: %s", mtask.Arn, mtask.GetKnownStatus().String()) + mtask.log.Infof("task at steady state: %s", mtask.GetKnownStatus().String()) timeoutCtx, cancel := context.WithTimeout(mtask.ctx, mtask.steadyStatePollInterval) defer cancel() timedOut := mtask.waitEvent(timeoutCtx.Done()) if timedOut { - seelog.Debugf("Managed task [%s]: checking to make sure it's still at steadystate", mtask.Arn) + mtask.log.Debugf("checking to make sure it's still at steadystate") go mtask.engine.checkTaskState(mtask.Task) } } @@ -317,7 +324,7 @@ func (mtask *managedTask) waitSteady() { func (mtask *managedTask) steadyState() bool { select { case <-mtask.ctx.Done(): - seelog.Info("Context expired. No longer steady.") + mtask.log.Info("Context expired. No longer steady.") return false default: taskKnownStatus := mtask.GetKnownStatus() @@ -338,25 +345,24 @@ func (mtask *managedTask) cleanupCredentials() { // channel. When the Done channel is signalled by the context, waitEvent will // return true. func (mtask *managedTask) waitEvent(stopWaiting <-chan struct{}) bool { - seelog.Infof("Managed task [%s]: waiting for event for task", mtask.Arn) + mtask.log.Infof("waiting for event for task") select { case acsTransition := <-mtask.acsMessages: - seelog.Infof("Managed task [%s]: got acs event", mtask.Arn) mtask.handleDesiredStatusChange(acsTransition.desiredStatus, acsTransition.seqnum) return false case dockerChange := <-mtask.dockerMessages: - seelog.Infof("Managed task [%s]: got container [%s] event: [%s]", - mtask.Arn, dockerChange.container.Name, dockerChange.event.Status.String()) + mtask.log.Infof("got container change event [%s] for container [%s]", + dockerChange.event.ShortString(), dockerChange.container.Name) mtask.handleContainerChange(dockerChange) return false case resChange := <-mtask.resourceStateChangeEvent: res := resChange.resource - seelog.Infof("Managed task [%s]: got resource [%s] event: [%s]", - mtask.Arn, res.GetName(), res.StatusString(resChange.nextState)) + mtask.log.Infof("got resource change event [%s] for resource [%s]", + res.StatusString(resChange.nextState), res.GetName()) mtask.handleResourceStateChange(resChange) return false case <-stopWaiting: - seelog.Infof("Managed task [%s]: no longer waiting", mtask.Arn) + mtask.log.Infof("no longer waiting for events") return true } } @@ -368,16 +374,16 @@ func (mtask *managedTask) waitEvent(stopWaiting <-chan struct{}) bool { func (mtask *managedTask) handleDesiredStatusChange(desiredStatus apitaskstatus.TaskStatus, seqnum int64) { // Handle acs message changes this task's desired status to whatever // acs says it should be if it is compatible - seelog.Infof("Managed task [%s]: new acs transition to: %s; sequence number: %d; task stop sequence number: %d", - mtask.Arn, desiredStatus.String(), seqnum, mtask.StopSequenceNumber) + mtask.log.Infof("new acs transition to: %s; sequence number: %d; task stop sequence number: %d", + desiredStatus.String(), seqnum, mtask.StopSequenceNumber) if desiredStatus <= mtask.GetDesiredStatus() { - seelog.Infof("Managed task [%s]: redundant task transition from [%s] to [%s], ignoring", - mtask.Arn, mtask.GetDesiredStatus().String(), desiredStatus.String()) + mtask.log.Infof("redundant task transition from [%s] to [%s], ignoring", + mtask.GetDesiredStatus().String(), desiredStatus.String()) return } if desiredStatus == apitaskstatus.TaskStopped && seqnum != 0 && mtask.GetStopSequenceNumber() == 0 { - seelog.Infof("Managed task [%s]: task moving to stopped, adding to stopgroup with sequence number: %d", - mtask.Arn, seqnum) + mtask.log.Infof("task moving to stopped, adding to stopgroup with sequence number: %d", + seqnum) mtask.SetStopSequenceNumber(seqnum) mtask.taskStopWG.Add(seqnum, 1) } @@ -393,22 +399,19 @@ func (mtask *managedTask) handleContainerChange(containerChange dockerContainerC container := containerChange.container found := mtask.isContainerFound(container) if !found { - seelog.Criticalf("Managed task [%s]: state error; invoked with another task's container [%s]!", - mtask.Arn, container.Name) + mtask.log.Errorf("state error; invoked with another task's container [%s]!", + container.Name) return } - event := containerChange.event - seelog.Infof("Managed task [%s]: handling container change [%v] for container [%s]", - mtask.Arn, event, container.Name) // If this is a backwards transition stopped->running, the first time set it // to be known running so it will be stopped. Subsequently ignore these backward transitions containerKnownStatus := container.GetKnownStatus() mtask.handleStoppedToRunningContainerTransition(event.Status, container) if event.Status <= containerKnownStatus { - seelog.Infof("Managed task [%s]: redundant container state change. %s to %s, but already %s", - mtask.Arn, container.Name, event.Status.String(), containerKnownStatus.String()) + mtask.log.Infof("redundant container state change. %s to %s, but already %s", + container.Name, event.Status.String(), containerKnownStatus.String()) // Only update container metadata when status stays RUNNING if event.Status == containerKnownStatus && event.Status == apicontainerstatus.ContainerRunning { @@ -430,18 +433,18 @@ func (mtask *managedTask) handleContainerChange(containerChange dockerContainerC } mtask.RecordExecutionStoppedAt(container) - seelog.Debugf("Managed task [%s]: sending container change event to tcs, container: [%s(%s)], status: %s", - mtask.Arn, container.Name, event.DockerID, event.Status.String()) + mtask.log.Debugf("sending container change event to tcs, container: [%s(%s)], status: %s", + container.Name, event.DockerID, event.Status.String()) err := mtask.containerChangeEventStream.WriteToEventStream(event) if err != nil { - seelog.Warnf("Managed task [%s]: failed to write container [%s] change event to tcs event stream: %v", - mtask.Arn, container.Name, err) + mtask.log.Warnf("failed to write container [%s] change event to tcs event stream: %v", + container.Name, err) } mtask.emitContainerEvent(mtask.Task, container, "") if mtask.UpdateStatus() { - seelog.Infof("Managed task [%s]: container change also resulted in task change [%s]: [%s]", - mtask.Arn, container.Name, mtask.GetDesiredStatus().String()) + mtask.log.Infof("container change also resulted in task change [%s]: [%s]", + container.Name, mtask.GetDesiredStatus().String()) // If knownStatus changed, let it be known var taskStateChangeReason string if mtask.GetKnownStatus().Terminal() { @@ -457,8 +460,8 @@ func (mtask *managedTask) handleResourceStateChange(resChange resourceStateChang // locate the resource res := resChange.resource if !mtask.isResourceFound(res) { - seelog.Criticalf("Managed task [%s]: state error; invoked with another task's resource [%s]", - mtask.Arn, res.GetName()) + mtask.log.Errorf("state error; invoked with another task's resource [%s]", + res.GetName()) return } @@ -467,8 +470,8 @@ func (mtask *managedTask) handleResourceStateChange(resChange resourceStateChang currentKnownStatus := res.GetKnownStatus() if status <= currentKnownStatus { - seelog.Infof("Managed task [%s]: redundant resource state change. %s to %s, but already %s", - mtask.Arn, res.GetName(), res.StatusString(status), res.StatusString(currentKnownStatus)) + mtask.log.Infof("redundant resource state change. %s to %s, but already %s", + res.GetName(), res.StatusString(status), res.StatusString(currentKnownStatus)) return } @@ -477,11 +480,11 @@ func (mtask *managedTask) handleResourceStateChange(resChange resourceStateChang mtask.engine.saver.Save() return } - seelog.Infof("Managed task [%s]: unable to transition resource %s to %s: %v", - mtask.Arn, res.GetName(), res.StatusString(status), err) + mtask.log.Infof("unable to transition resource %s to %s: %v", + res.GetName(), res.StatusString(status), err) if status == res.SteadyState() { - seelog.Errorf("Managed task [%s]: error while creating resource %s, setting the task's desired status to STOPPED", - mtask.Arn, res.GetName()) + mtask.log.Errorf("error while creating resource %s, setting the task's desired status to STOPPED", + res.GetName()) mtask.SetDesiredStatus(apitaskstatus.TaskStopped) mtask.Task.SetTerminalReason(res.GetTerminalReason()) mtask.engine.saver.Save() @@ -490,8 +493,8 @@ func (mtask *managedTask) handleResourceStateChange(resChange resourceStateChang func (mtask *managedTask) emitResourceChange(change resourceStateChange) { if mtask.ctx.Err() != nil { - seelog.Infof("Managed task [%s]: unable to emit resource state change due to closed context: %v", - mtask.Arn, mtask.ctx.Err()) + mtask.log.Infof("unable to emit resource state change due to closed context: %v", + mtask.ctx.Err()) } mtask.resourceStateChangeEvent <- change } @@ -499,13 +502,13 @@ func (mtask *managedTask) emitResourceChange(change resourceStateChange) { func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) { event, err := api.NewTaskStateChangeEvent(task, reason) if err != nil { - seelog.Infof("Managed task [%s]: unable to create task state change event [%s]: %v", + mtask.log.Infof("unable to create task state change event [%s]: %v", task.Arn, reason, err) return } - seelog.Infof("Managed task [%s]: sending task change event [%s]", mtask.Arn, event.String()) + mtask.log.Infof("sending task change event [%s]", event.String()) mtask.stateChangeEvents <- event - seelog.Infof("Managed task [%s]: sent task change event [%s]", mtask.Arn, event.String()) + mtask.log.Infof("sent task change event [%s]", event.String()) } // emitContainerEvent passes a given event up through the containerEvents channel if necessary. @@ -513,30 +516,30 @@ func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) { func (mtask *managedTask) emitContainerEvent(task *apitask.Task, cont *apicontainer.Container, reason string) { event, err := api.NewContainerStateChangeEvent(task, cont, reason) if err != nil { - seelog.Infof("Managed task [%s]: unable to create state change event for container [%s]: %v", + mtask.log.Infof("unable to create state change event for container [%s]: %v", task.Arn, cont.Name, err) return } - seelog.Infof("Managed task [%s]: sending container change event [%s]: %s", - mtask.Arn, cont.Name, event.String()) + mtask.log.Infof("sending container change event [%s]: %s", + cont.Name, event.String()) mtask.stateChangeEvents <- event - seelog.Infof("Managed task [%s]: sent container change event [%s]: %s", - mtask.Arn, cont.Name, event.String()) + mtask.log.Infof("sent container change event [%s]: %s", + cont.Name, event.String()) } func (mtask *managedTask) emitDockerContainerChange(change dockerContainerChange) { if mtask.ctx.Err() != nil { - seelog.Infof("Managed task [%s]: unable to emit docker container change due to closed context: %v", - mtask.Arn, mtask.ctx.Err()) + mtask.log.Infof("unable to emit docker container change due to closed context: %v", + mtask.ctx.Err()) } mtask.dockerMessages <- change } func (mtask *managedTask) emitACSTransition(transition acsTransition) { if mtask.ctx.Err() != nil { - seelog.Infof("Managed task [%s]: unable to emit acs transition due to closed context: %v", - mtask.Arn, mtask.ctx.Err()) + mtask.log.Infof("unable to emit acs transition due to closed context: %v", + mtask.ctx.Err()) } mtask.acsMessages <- transition } @@ -566,20 +569,20 @@ func (mtask *managedTask) releaseIPInIPAM() { if !mtask.IsNetworkModeAWSVPC() { return } - seelog.Infof("Managed task [%s]: IPAM releasing ip for task eni", mtask.Arn) + mtask.log.Infof("IPAM releasing ip for task eni") cfg, err := mtask.BuildCNIConfig(true, &ecscni.Config{ MinSupportedCNIVersion: config.DefaultMinSupportedCNIVersion, }) if err != nil { - seelog.Errorf("Managed task [%s]: failed to release ip; unable to build cni configuration: %v", - mtask.Arn, err) + mtask.log.Errorf("failed to release ip; unable to build cni configuration: %v", + err) return } err = mtask.cniClient.ReleaseIPResource(mtask.ctx, cfg, ipamCleanupTmeout) if err != nil { - seelog.Errorf("Managed task [%s]: failed to release ip; IPAM error: %v", - mtask.Arn, err) + mtask.log.Errorf("failed to release ip; IPAM error: %v", + err) return } } @@ -607,8 +610,8 @@ func (mtask *managedTask) handleStoppedToRunningContainerTransition(status apico // because we got an error running it and it ran anyways), the first time // update it to 'known running' so that it will be driven back to stopped mtask.unexpectedStart.Do(func() { - seelog.Warnf("Managed task [%s]: stopped container [%s] came back; re-stopping it once", - mtask.Arn, container.Name) + mtask.log.Warnf("stopped container [%s] came back; re-stopping it once", + container.Name) go mtask.engine.transitionContainer(mtask.Task, container, apicontainerstatus.ContainerStopped) // This will not proceed afterwards because status <= knownstatus below }) @@ -632,8 +635,8 @@ func (mtask *managedTask) handleEventError(containerChange dockerContainerChange // don't want to use cached image for both cases. if mtask.cfg.ImagePullBehavior == config.ImagePullAlwaysBehavior || mtask.cfg.ImagePullBehavior == config.ImagePullOnceBehavior { - seelog.Errorf("Managed task [%s]: error while pulling image %s for container %s , moving task to STOPPED: %v", - mtask.Arn, container.Image, container.Name, event.Error) + mtask.log.Errorf("error while pulling image %s for container %s , moving task to STOPPED: %v", + container.Image, container.Name, event.Error) // The task should be stopped regardless of whether this container is // essential or non-essential. mtask.SetDesiredStatus(apitaskstatus.TaskStopped) @@ -644,8 +647,8 @@ func (mtask *managedTask) handleEventError(containerChange dockerContainerChange // the task fail here, will let create container handle it instead. // If the agent pull behavior is default, use local image cache directly, // assuming it exists. - seelog.Errorf("Managed task [%s]: error while pulling container %s and image %s, will try to run anyway: %v", - mtask.Arn, container.Name, container.Image, event.Error) + mtask.log.Errorf("error while pulling container %s and image %s, will try to run anyway: %v", + container.Name, container.Image, event.Error) // proceed anyway return true case apicontainerstatus.ContainerStopped: @@ -655,16 +658,16 @@ func (mtask *managedTask) handleEventError(containerChange dockerContainerChange fallthrough case apicontainerstatus.ContainerCreated: // No need to explicitly stop containers if this is a * -> NONE/CREATED transition - seelog.Warnf("Managed task [%s]: error creating container [%s]; marking its desired status as STOPPED: %v", - mtask.Arn, container.Name, event.Error) + mtask.log.Errorf("error creating container [%s]; marking its desired status as STOPPED: %v", + container.Name, event.Error) container.SetKnownStatus(currentKnownStatus) container.SetDesiredStatus(apicontainerstatus.ContainerStopped) return false default: // If this is a * -> RUNNING / RESOURCES_PROVISIONED transition, we need to stop // the container. - seelog.Warnf("Managed task [%s]: error starting/provisioning container[%s]; marking its desired status as STOPPED: %v", - mtask.Arn, container.Name, event.Error) + mtask.log.Errorf("error starting/provisioning container [%s]; marking its desired status as STOPPED: %v", + container.Name, event.Error) container.SetKnownStatus(currentKnownStatus) container.SetDesiredStatus(apicontainerstatus.ContainerStopped) errorName := event.Error.ErrorName() @@ -682,8 +685,8 @@ func (mtask *managedTask) handleEventError(containerChange dockerContainerChange } if shouldForceStop { - seelog.Warnf("Managed task [%s]: forcing container [%s] to stop", - mtask.Arn, container.Name) + mtask.log.Warnf("forcing container [%s] to stop", + container.Name) go mtask.engine.transitionContainer(mtask.Task, container, apicontainerstatus.ContainerStopped) } // Container known status not changed, no need for further processing @@ -705,8 +708,8 @@ func (mtask *managedTask) handleContainerStoppedTransitionError(event dockerapi. // could also trigger the progress and have another go at stopping the // container if event.Error.ErrorName() == dockerapi.DockerTimeoutErrorName { - seelog.Infof("Managed task [%s]: '%s' error stopping container [%s]. Ignoring state change: %v", - mtask.Arn, dockerapi.DockerTimeoutErrorName, container.Name, event.Error.Error()) + mtask.log.Infof("'%s' error stopping container [%s]. Ignoring state change: %v", + dockerapi.DockerTimeoutErrorName, container.Name, event.Error.Error()) container.SetKnownStatus(currentKnownStatus) return false } @@ -714,8 +717,8 @@ func (mtask *managedTask) handleContainerStoppedTransitionError(event dockerapi. // reset the known status to the current status and return cannotStopContainerError, ok := event.Error.(cannotStopContainerError) if ok && cannotStopContainerError.IsRetriableError() { - seelog.Infof("Managed task [%s]: error stopping the container [%s]. Ignoring state change: %v", - mtask.Arn, container.Name, cannotStopContainerError.Error()) + mtask.log.Infof("error stopping the container [%s]. Ignoring state change: %v", + container.Name, cannotStopContainerError.Error()) container.SetKnownStatus(currentKnownStatus) return false } @@ -726,8 +729,8 @@ func (mtask *managedTask) handleContainerStoppedTransitionError(event dockerapi. // enough) and get on with it // This can happen in cases where the container we tried to stop // was already stopped or did not exist at all. - seelog.Warnf("Managed task [%s]: 'docker stop' for container [%s] returned %s: %s", - mtask.Arn, container.Name, event.Error.ErrorName(), event.Error.Error()) + mtask.log.Warnf("'docker stop' for container [%s] returned %s: %s", + container.Name, event.Error.ErrorName(), event.Error.Error()) container.SetKnownStatus(apicontainerstatus.ContainerStopped) container.SetDesiredStatus(apicontainerstatus.ContainerStopped) return true @@ -740,7 +743,7 @@ func (mtask *managedTask) handleContainerStoppedTransitionError(event dockerapi. // docker completes. // Container changes may also prompt the task status to change as well. func (mtask *managedTask) progressTask() { - seelog.Debugf("Managed task [%s]: progressing containers and resources in task", mtask.Arn) + mtask.log.Debugf("progressing containers and resources in task") // max number of transitions length to ensure writes will never block on // these and if we exit early transitions can exit the goroutine and it'll // get GC'd eventually @@ -809,7 +812,7 @@ func (mtask *managedTask) progressTask() { mtask.waitForTransition(transitions, transitionChange, transitionChangeEntity) // update the task status if mtask.UpdateStatus() { - seelog.Infof("Managed task [%s]: container or resource change also resulted in task change", mtask.Arn) + mtask.log.Infof("container or resource change also resulted in task change") // If knownStatus changed, let it be known var taskStateChangeReason string @@ -825,14 +828,14 @@ func (mtask *managedTask) progressTask() { func (mtask *managedTask) isWaitingForACSExecutionCredentials(reasons []error) bool { for _, reason := range reasons { if reason == dependencygraph.CredentialsNotResolvedErr { - seelog.Infof("Managed task [%s]: waiting for credentials to pull from ECR", mtask.Arn) + mtask.log.Infof("waiting for credentials to pull from ECR") timeoutCtx, timeoutCancel := context.WithTimeout(mtask.ctx, waitForPullCredentialsTimeout) defer timeoutCancel() timedOut := mtask.waitEvent(timeoutCtx.Done()) if timedOut { - seelog.Infof("Managed task [%s]: timed out waiting for acs credentials message", mtask.Arn) + mtask.log.Infof("timed out waiting for acs credentials message") } return true } @@ -899,8 +902,8 @@ func (mtask *managedTask) startResourceTransitions(transitionFunc resourceTransi knownStatus := res.GetKnownStatus() desiredStatus := res.GetDesiredStatus() if knownStatus >= desiredStatus { - seelog.Debugf("Managed task [%s]: resource [%s] has already transitioned to or beyond the desired status %s; current known is %s", - mtask.Arn, res.GetName(), res.StatusString(desiredStatus), res.StatusString(knownStatus)) + mtask.log.Debugf("resource [%s] has already transitioned to or beyond the desired status %s; current known is %s", + res.GetName(), res.StatusString(desiredStatus), res.StatusString(knownStatus)) continue } anyCanTransition = true @@ -950,12 +953,12 @@ func (mtask *managedTask) applyResourceState(resource taskresource.TaskResource, resStatus := resource.StatusString(nextState) err := resource.ApplyTransition(nextState) if err != nil { - seelog.Infof("Managed task [%s]: error transitioning resource [%s] to [%s]: %v", - mtask.Arn, resName, resStatus, err) + mtask.log.Infof("error transitioning resource [%s] to [%s]: %v", + resName, resStatus, err) return err } - seelog.Infof("Managed task [%s]: transitioned resource [%s] to [%s]", - mtask.Arn, resName, resStatus) + mtask.log.Infof("transitioned resource [%s] to [%s]", + resName, resStatus) return nil } @@ -981,8 +984,8 @@ func (mtask *managedTask) containerNextState(container *apicontainer.Container) containerDesiredStatus := container.GetDesiredStatus() if containerKnownStatus == containerDesiredStatus { - seelog.Debugf("Managed task [%s]: container [%s] at desired status: %s", - mtask.Arn, container.Name, containerDesiredStatus.String()) + mtask.log.Debugf("container [%s] at desired status: %s", + container.Name, containerDesiredStatus.String()) return &containerTransition{ nextState: apicontainerstatus.ContainerStatusNone, actionRequired: false, @@ -991,8 +994,8 @@ func (mtask *managedTask) containerNextState(container *apicontainer.Container) } if containerKnownStatus > containerDesiredStatus { - seelog.Debugf("Managed task [%s]: container [%s] has already transitioned beyond desired status(%s): %s", - mtask.Arn, container.Name, containerKnownStatus.String(), containerDesiredStatus.String()) + mtask.log.Debugf("container [%s] has already transitioned beyond desired status(%s): %s", + container.Name, containerKnownStatus.String(), containerDesiredStatus.String()) return &containerTransition{ nextState: apicontainerstatus.ContainerStatusNone, actionRequired: false, @@ -1001,8 +1004,8 @@ func (mtask *managedTask) containerNextState(container *apicontainer.Container) } if blocked, err := dependencygraph.DependenciesAreResolved(container, mtask.Containers, mtask.Task.GetExecutionCredentialsID(), mtask.credentialsManager, mtask.GetResources()); err != nil { - seelog.Debugf("Managed task [%s]: can't apply state to container [%s] yet due to unresolved dependencies: %v", - mtask.Arn, container.Name, err) + mtask.log.Debugf("can't apply state to container [%s] yet due to unresolved dependencies: %v", + container.Name, err) return &containerTransition{ nextState: apicontainerstatus.ContainerStatusNone, actionRequired: false, @@ -1063,17 +1066,16 @@ func (mtask *managedTask) resourceNextState(resource taskresource.TaskResource) } func (mtask *managedTask) handleContainersUnableToTransitionState() { - seelog.Criticalf("Managed task [%s]: task in a bad state; it's not steadystate but no containers want to transition", - mtask.Arn) + mtask.log.Error("task in a bad state; it's not steadystate but no containers want to transition") if mtask.GetDesiredStatus().Terminal() { // Ack, really bad. We want it to stop but the containers don't think // that's possible. let's just break out and hope for the best! - seelog.Criticalf("Managed task [%s]: The state is so bad that we're just giving up on it", mtask.Arn) + mtask.log.Error("The state is so bad that we're just giving up on it") mtask.SetKnownStatus(apitaskstatus.TaskStopped) mtask.emitTaskEvent(mtask.Task, taskUnableToTransitionToStoppedReason) // TODO we should probably panic here } else { - seelog.Criticalf("Managed task [%s]: moving task to stopped due to bad state", mtask.Arn) + mtask.log.Error("moving task to stopped due to bad state") mtask.handleDesiredStatusChange(apitaskstatus.TaskStopped, 0) } } @@ -1085,14 +1087,14 @@ func (mtask *managedTask) waitForTransition(transitions map[string]string, // to ensure that there is at least one container or resource can be processed in the next // progressTask call. This is done by waiting for one transition/acs/docker message. if !mtask.waitEvent(transition) { - seelog.Debugf("Managed task [%s]: received non-transition events", mtask.Arn) + mtask.log.Debugf("received non-transition events") return } transitionedEntity := <-transitionChangeEntity - seelog.Debugf("Managed task [%s]: transition for [%s] finished", - mtask.Arn, transitionedEntity) + mtask.log.Debugf("transition for [%s] finished", + transitionedEntity) delete(transitions, transitionedEntity) - seelog.Debugf("Managed task [%s]: still waiting for: %v", mtask.Arn, transitions) + mtask.log.Debugf("still waiting for: %v", transitions) } func (mtask *managedTask) time() ttime.Time { @@ -1108,7 +1110,7 @@ func (mtask *managedTask) cleanupTask(taskStoppedDuration time.Duration) { cleanupTimeDuration := mtask.GetKnownStatusTime().Add(taskStoppedDuration).Sub(ttime.Now()) cleanupTime := make(<-chan time.Time) if cleanupTimeDuration < 0 { - seelog.Infof("Managed task [%s]: Cleanup Duration has been exceeded. Starting cleanup now ", mtask.Arn) + mtask.log.Infof("Cleanup Duration has been exceeded. Starting cleanup now ") cleanupTime = mtask.time().After(time.Nanosecond) } else { cleanupTime = mtask.time().After(cleanupTimeDuration) @@ -1125,12 +1127,12 @@ func (mtask *managedTask) cleanupTask(taskStoppedDuration time.Duration) { // wait for apitaskstatus.TaskStopped to be sent ok := mtask.waitForStopReported() if !ok { - seelog.Errorf("Managed task [%s]: aborting cleanup for task as it is not reported as stopped. SentStatus: %s", - mtask.Arn, mtask.GetSentStatus().String()) + mtask.log.Errorf("aborting cleanup for task as it is not reported as stopped. SentStatus: %s", + mtask.GetSentStatus().String()) return } - seelog.Infof("Managed task [%s]: cleaning up task's containers and data", mtask.Arn) + mtask.log.Infof("cleaning up task's containers and data") // For the duration of this, simply discard any task events; this ensures the // speedy processing of other events for other tasks @@ -1174,8 +1176,8 @@ func (mtask *managedTask) waitForStopReported() bool { taskStopped = true break } - seelog.Warnf("Managed task [%s]: blocking cleanup until the task has been reported stopped. SentStatus: %s (%d/%d)", - mtask.Arn, sentStatus.String(), i+1, _maxStoppedWaitTimes) + mtask.log.Warnf("blocking cleanup until the task has been reported stopped. SentStatus: %s (%d/%d)", + sentStatus.String(), i+1, _maxStoppedWaitTimes) mtask._time.Sleep(_stoppedSentWaitInterval) } stoppedSentBool <- struct{}{} diff --git a/agent/logger/log.go b/agent/logger/log.go index c74a570f53a..f358c95f9fd 100644 --- a/agent/logger/log.go +++ b/agent/logger/log.go @@ -16,6 +16,7 @@ package logger import ( "fmt" "os" + "sort" "strconv" "strings" "sync" @@ -53,24 +54,32 @@ var Config *logConfig func logfmtFormatter(params string) seelog.FormatterFunc { return func(message string, level seelog.LogLevel, context seelog.LogContextInterface) interface{} { - return fmt.Sprintf(`level=%s time=%s msg=%q module=%s -`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, context.FileName()) + cc, ok := context.CustomContext().(map[string]string) + var customContext string + if ok && len(cc) > 0 { + var sortedContext []string + for k, v := range cc { + sortedContext = append(sortedContext, k+"="+v) + } + sort.Strings(sortedContext) + customContext = " " + strings.Join(sortedContext, " ") + } + return fmt.Sprintf(`level=%s time=%s msg=%q module=%s%s +`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, context.FileName(), customContext) } } func jsonFormatter(params string) seelog.FormatterFunc { return func(message string, level seelog.LogLevel, context seelog.LogContextInterface) interface{} { - return fmt.Sprintf(`{"level": %q, "time": %q, "msg": %q, "module": %q} -`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, context.FileName()) - } -} - -func reloadConfig() { - logger, err := seelog.LoggerFromConfigAsString(seelogConfig()) - if err == nil { - seelog.ReplaceLogger(logger) - } else { - seelog.Error(err) + cc, ok := context.CustomContext().(map[string]string) + var customContext string + if ok && len(cc) > 0 { + for k, v := range cc { + customContext += fmt.Sprintf(", %q: %q", k, v) + } + } + return fmt.Sprintf(`{"level": %q, "time": %q, "msg": %q, "module": %q%s} +`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, context.FileName(), customContext) } } @@ -117,7 +126,7 @@ func SetLevel(logLevel string) { Config.lock.Lock() defer Config.lock.Unlock() Config.level = parsedLevel - reloadConfig() + reloadMainConfig() } } @@ -129,6 +138,24 @@ func GetLevel() string { return Config.level } +func InitLogger() seelog.LoggerInterface { + logger, err := seelog.LoggerFromConfigAsString(seelogConfig()) + if err != nil { + seelog.Errorf("Error creating seelog logger: %s", err) + return seelog.Default + } + return logger +} + +func reloadMainConfig() { + logger, err := seelog.LoggerFromConfigAsString(seelogConfig()) + if err == nil { + seelog.ReplaceLogger(logger) + } else { + seelog.Error(err) + } +} + func init() { Config = &logConfig{ logfile: os.Getenv(LOGFILE_ENV_VAR), @@ -139,7 +166,9 @@ func init() { MaxRollCount: DEFAULT_MAX_ROLL_COUNT, } - SetLevel(os.Getenv(LOGLEVEL_ENV_VAR)) + if level := os.Getenv(LOGLEVEL_ENV_VAR); level != "" { + SetLevel(level) + } if RolloverType := os.Getenv(LOG_ROLLOVER_TYPE_ENV_VAR); RolloverType != "" { Config.RolloverType = RolloverType } @@ -169,7 +198,6 @@ func init() { if err := seelog.RegisterCustomFormatter("EcsAgentJson", jsonFormatter); err != nil { seelog.Error(err) } - registerPlatformLogger() - reloadConfig() + seelog.ReplaceLogger(InitLogger()) } diff --git a/agent/logger/log_test.go b/agent/logger/log_test.go index 789b04ce0da..7f81b95f0a9 100644 --- a/agent/logger/log_test.go +++ b/agent/logger/log_test.go @@ -32,12 +32,53 @@ func TestLogfmtFormat(t *testing.T) { `, s) } +func TestLogfmtFormat_context(t *testing.T) { + logfmt := logfmtFormatter("") + out := logfmt("This is my log message", seelog.InfoLvl, &LogContextMock{ + context: map[string]string{ + "myID": "12345", + "myARN": "arn:12345:/abc", + }, + }) + s, ok := out.(string) + require.True(t, ok) + require.Equal(t, `level=info time=2018-10-01T01:02:03Z msg="This is my log message" module=mytestmodule.go myARN=arn:12345:/abc myID=12345 +`, s) +} + func TestJSONFormat(t *testing.T) { jsonF := jsonFormatter("") out := jsonF("This is my log message", seelog.InfoLvl, &LogContextMock{}) s, ok := out.(string) require.True(t, ok) - require.JSONEq(t, `{"level": "info", "time": "2018-10-01T01:02:03Z", "msg": "This is my log message", "module": "mytestmodule.go"}`, s) + require.JSONEq(t, ` + { + "level": "info", + "time": "2018-10-01T01:02:03Z", + "msg": "This is my log message", + "module": "mytestmodule.go" + }`, s) +} + +func TestJSONFormat_context(t *testing.T) { + jsonF := jsonFormatter("") + out := jsonF("This is my log message", seelog.InfoLvl, &LogContextMock{ + context: map[string]string{ + "myID": "12345", + "myARN": "arn:12345:/abc", + }, + }) + s, ok := out.(string) + require.True(t, ok) + require.JSONEq(t, ` + { + "level": "info", + "time": "2018-10-01T01:02:03Z", + "msg": "This is my log message", + "module": "mytestmodule.go", + "myARN":"arn:12345:/abc", + "myID":"12345" + }`, s) } func TestLogfmtFormat_debug(t *testing.T) { @@ -54,7 +95,13 @@ func TestJSONFormat_debug(t *testing.T) { out := jsonF("This is my log message", seelog.DebugLvl, &LogContextMock{}) s, ok := out.(string) require.True(t, ok) - require.JSONEq(t, `{"level": "debug", "time": "2018-10-01T01:02:03Z", "msg": "This is my log message", "module": "mytestmodule.go"}`, s) + require.JSONEq(t, ` + { + "level": "debug", + "time": "2018-10-01T01:02:03Z", + "msg": "This is my log message", + "module": "mytestmodule.go" + }`, s) } func TestSeelogConfig_Default(t *testing.T) { @@ -201,7 +248,9 @@ func TestSeelogConfig_JSONOutput(t *testing.T) { `, c) } -type LogContextMock struct{} +type LogContextMock struct { + context map[string]string +} // Caller's function name. func (l *LogContextMock) Func() string { @@ -242,5 +291,5 @@ func (l *LogContextMock) CallTime() time.Time { // Custom context that can be set by calling logger.SetContext func (l *LogContextMock) CustomContext() interface{} { - return map[string]string{} + return l.context } diff --git a/agent/taskresource/cgroup/cgroup.go b/agent/taskresource/cgroup/cgroup.go index 8f1d341dd9d..111b9b67142 100644 --- a/agent/taskresource/cgroup/cgroup.go +++ b/agent/taskresource/cgroup/cgroup.go @@ -23,6 +23,7 @@ import ( "time" "github.com/aws/amazon-ecs-agent/agent/api/task/status" + "github.com/aws/amazon-ecs-agent/agent/logger" "github.com/aws/amazon-ecs-agent/agent/taskresource" control "github.com/aws/amazon-ecs-agent/agent/taskresource/cgroup/control" resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status" @@ -64,6 +65,8 @@ type CgroupResource struct { statusToTransitions map[resourcestatus.ResourceStatus]func() error // lock is used for fields that are accessed and updated concurrently lock sync.RWMutex + // log is a custom logger with extra context specific to the cgroup struct + log seelog.LoggerInterface } // NewCgroupResource is used to return an object that implements the Resource interface @@ -82,9 +85,22 @@ func NewCgroupResource(taskARN string, resourceSpec: resourceSpec, } c.initializeResourceStatusToTransitionFunction() + c.initLog() return c } +func (cgroup *CgroupResource) initLog() { + if cgroup.log == nil { + cgroup.log = logger.InitLogger() + cgroup.log.SetContext(map[string]string{ + "taskARN": cgroup.taskARN, + "cgroupRoot": cgroup.cgroupRoot, + "cgroupMountPath": cgroup.cgroupMountPath, + "resourceName": resourceName, + }) + } +} + // GetTerminalReason returns an error string to propagate up through to task // state change messages func (cgroup *CgroupResource) GetTerminalReason() string { @@ -159,8 +175,7 @@ func (cgroup *CgroupResource) NextKnownState() resourcestatus.ResourceStatus { func (cgroup *CgroupResource) ApplyTransition(nextState resourcestatus.ResourceStatus) error { transitionFunc, ok := cgroup.statusToTransitions[nextState] if !ok { - seelog.Errorf("Cgroup Resource [%s]: unsupported desired state transition [%s]: %s", - cgroup.taskARN, cgroup.GetName(), cgroup.StatusString(nextState)) + cgroup.log.Errorf("unsupported desired state transition %s", cgroup.StatusString(nextState)) return errors.Errorf("resource [%s]: transition to %s impossible", cgroup.GetName(), cgroup.StatusString(nextState)) } @@ -244,7 +259,7 @@ func (cgroup *CgroupResource) GetCreatedAt() time.Time { func (cgroup *CgroupResource) Create() error { err := cgroup.setupTaskCgroup() if err != nil { - seelog.Criticalf("Cgroup resource [%s]: unable to setup cgroup root: %v", cgroup.taskARN, err) + cgroup.log.Errorf("unable to setup cgroup root: %v", err) return err } return nil @@ -252,10 +267,10 @@ func (cgroup *CgroupResource) Create() error { func (cgroup *CgroupResource) setupTaskCgroup() error { cgroupRoot := cgroup.cgroupRoot - seelog.Debugf("Cgroup resource [%s]: setting up cgroup at: %s", cgroup.taskARN, cgroupRoot) + cgroup.log.Info("setting up cgroup") if cgroup.control.Exists(cgroupRoot) { - seelog.Debugf("Cgroup resource [%s]: cgroup at %s already exists, skipping creation", cgroup.taskARN, cgroupRoot) + cgroup.log.Infof("cgroup at root already exists, skipping creation") return nil } @@ -285,7 +300,7 @@ func (cgroup *CgroupResource) Cleanup() error { // Explicitly handle cgroup deleted error if err != nil { if err == cgroups.ErrCgroupDeleted { - seelog.Warnf("Cgroup at %s has already been removed: %v", cgroup.cgroupRoot, err) + cgroup.log.Warnf("Cgroup at root has already been removed: %v", err) return nil } return errors.Wrapf(err, "resource: cleanup cgroup: unable to remove cgroup at %s", cgroup.cgroupRoot) @@ -343,6 +358,7 @@ func (cgroup *CgroupResource) UnmarshalJSON(b []byte) error { if temp.KnownStatus != nil { cgroup.SetKnownStatus(resourcestatus.ResourceStatus(*temp.KnownStatus)) } + cgroup.initLog() return nil } diff --git a/agent/taskresource/firelens/firelens_unix.go b/agent/taskresource/firelens/firelens_unix.go index 835c297129c..52cd958c3a1 100644 --- a/agent/taskresource/firelens/firelens_unix.go +++ b/agent/taskresource/firelens/firelens_unix.go @@ -28,6 +28,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/api/task/status" "github.com/aws/amazon-ecs-agent/agent/credentials" + "github.com/aws/amazon-ecs-agent/agent/logger" "github.com/aws/amazon-ecs-agent/agent/s3" "github.com/aws/amazon-ecs-agent/agent/s3/factory" "github.com/aws/amazon-ecs-agent/agent/taskresource" @@ -90,6 +91,8 @@ type FirelensResource struct { terminalReason string terminalReasonOnce sync.Once lock sync.RWMutex + // log is a custom logger with extra context specific to the firelens struct + log seelog.LoggerInterface } // NewFirelensResource returns a new FirelensResource. @@ -122,15 +125,27 @@ func NewFirelensResource(cluster, taskARN, taskDefinition, ec2InstanceID, dataDi } firelensResource.initStatusToTransition() + firelensResource.initLog() return firelensResource, nil } +func (firelens *FirelensResource) initLog() { + if firelens.log == nil { + firelens.log = logger.InitLogger() + firelens.log.SetContext(map[string]string{ + "taskARN": firelens.taskARN, + "configType": firelens.firelensConfigType, + "resourceName": ResourceName, + }) + } +} + func (firelens *FirelensResource) parseOptions(options map[string]string) error { if _, ok := options[ecsLogMetadataEnableOption]; ok { val := options[ecsLogMetadataEnableOption] b, err := strconv.ParseBool(val) if err != nil { - seelog.Warnf("Invalid value for firelens container option %s was specified: %s. Ignoring it.", ecsLogMetadataEnableOption, val) + firelens.log.Warnf("Invalid value for firelens container option %s was specified: %s. Ignoring it.", ecsLogMetadataEnableOption, val) } else { firelens.ecsMetadataEnabled = b } @@ -217,6 +232,7 @@ func (firelens *FirelensResource) Initialize(resourceFields *taskresource.Resour firelens.ioutil = ioutilwrapper.NewIOUtil() firelens.s3ClientCreator = factory.NewS3ClientCreator() firelens.credentialsManager = resourceFields.CredentialsManager + firelens.initLog() } // GetNetworkMode returns the network mode of the task. @@ -246,7 +262,7 @@ func (firelens *FirelensResource) DesiredTerminal() bool { func (firelens *FirelensResource) setTerminalReason(reason string) { firelens.terminalReasonOnce.Do(func() { - seelog.Infof("firelens resource: setting terminal reason for task: [%s]", firelens.taskARN) + firelens.log.Infof("firelens resource: setting terminal reason") firelens.terminalReason = reason }) } @@ -473,7 +489,7 @@ func (firelens *FirelensResource) generateConfigFile() error { return errors.Wrapf(err, "unable to generate firelens config file") } - seelog.Infof("Generated firelens config file at: %s", confFilePath) + firelens.log.Infof("Generated firelens config file at: %s", confFilePath) return nil } @@ -504,7 +520,7 @@ func (firelens *FirelensResource) downloadConfigFromS3() error { return errors.Wrapf(err, "unable to download s3 config %s from bucket %s", key, bucket) } - seelog.Debugf("Downloaded firelens config file from s3 and saved to: %s", confFilePath) + firelens.log.Debugf("Downloaded firelens config file from s3 and saved to: %s", confFilePath) return nil } @@ -547,6 +563,6 @@ func (firelens *FirelensResource) Cleanup() error { return fmt.Errorf("unable to remove firelens resource directory %s: %v", firelens.resourceDir, err) } - seelog.Infof("Removed firelens resource directory at %s", firelens.resourceDir) + firelens.log.Infof("Removed firelens resource directory at %s", firelens.resourceDir) return nil } diff --git a/agent/taskresource/firelens/firelensconfig_unix.go b/agent/taskresource/firelens/firelensconfig_unix.go index e24de698a89..9ba84b9a5e3 100644 --- a/agent/taskresource/firelens/firelensconfig_unix.go +++ b/agent/taskresource/firelens/firelensconfig_unix.go @@ -17,7 +17,6 @@ package firelens import ( "fmt" - "github.com/cihub/seelog" "github.com/pkg/errors" generator "github.com/awslabs/go-config-generator-for-fluentd-and-fluentbit" @@ -195,7 +194,7 @@ func (firelens *FirelensResource) generateConfig() (generator.FluentConfig, erro } config.AddExternalConfig(s3ConfPath, generator.AfterFilters) } - seelog.Infof("Included external firelens config file at: %s", firelens.externalConfigValue) + firelens.log.Infof("Included external firelens config file at: %s", firelens.externalConfigValue) return config, nil } diff --git a/agent/taskresource/firelens/json_unix.go b/agent/taskresource/firelens/json_unix.go index 1c96d274828..161bf3acb73 100644 --- a/agent/taskresource/firelens/json_unix.go +++ b/agent/taskresource/firelens/json_unix.go @@ -121,5 +121,7 @@ func (firelens *FirelensResource) UnmarshalJSON(b []byte) error { firelens.appliedStatusUnsafe = resourcestatus.ResourceStatus(*temp.AppliedStatus) firelens.networkMode = temp.NetworkMode + firelens.initLog() + return nil } diff --git a/agent/taskresource/volume/dockervolume.go b/agent/taskresource/volume/dockervolume.go index fb214af2999..733975d76c5 100644 --- a/agent/taskresource/volume/dockervolume.go +++ b/agent/taskresource/volume/dockervolume.go @@ -22,6 +22,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/api/task/status" "github.com/aws/amazon-ecs-agent/agent/dockerclient" "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi" + "github.com/aws/amazon-ecs-agent/agent/logger" "github.com/aws/amazon-ecs-agent/agent/taskresource" resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status" "github.com/cihub/seelog" @@ -63,6 +64,9 @@ type VolumeResource struct { // lock is used for fields that are accessed and updated concurrently lock sync.RWMutex + + // log is a custom logger with extra context specific to the volume resource struct + log seelog.LoggerInterface } // DockerVolumeConfig represents docker volume configuration @@ -119,9 +123,22 @@ func NewVolumeResource(ctx context.Context, ctx: ctx, } v.initStatusToTransitions() + v.initLog() return v, nil } +func (vol *VolumeResource) initLog() { + if vol.log == nil { + vol.log = logger.InitLogger() + vol.log.SetContext(map[string]string{ + "volumeName": vol.Name, + "dockerVolumeName": vol.VolumeConfig.DockerVolumeName, + "dockerScope": vol.VolumeConfig.Scope, + "resourceName": "volume", + }) + } +} + func (vol *VolumeResource) Initialize(resourceFields *taskresource.ResourceFields, taskKnownStatus status.TaskStatus, taskDesiredStatus status.TaskStatus) { @@ -173,7 +190,7 @@ func (vol *VolumeResource) GetTerminalReason() string { func (vol *VolumeResource) setTerminalReason(reason string) { vol.terminalReasonOnce.Do(func() { - seelog.Infof("Volume Resource [%s]: setting terminal reason for volume resource", vol.Name) + vol.log.Infof("setting terminal reason [%s] for volume resource", reason) vol.terminalReason = reason }) } @@ -309,7 +326,7 @@ func (vol *VolumeResource) SourcePath() string { // Create performs resource creation func (vol *VolumeResource) Create() error { - seelog.Debugf("Creating volume with name %s using driver %s", vol.VolumeConfig.DockerVolumeName, vol.VolumeConfig.Driver) + vol.log.Infof("Creating volume using driver %s", vol.VolumeConfig.Driver) volumeResponse := vol.client.CreateVolume( vol.ctx, vol.VolumeConfig.DockerVolumeName, @@ -332,11 +349,11 @@ func (vol *VolumeResource) Create() error { func (vol *VolumeResource) Cleanup() error { // Enable volume clean up if it's task scoped if vol.VolumeConfig.Scope != TaskScope { - seelog.Debugf("Volume [%s] is shared, not removing", vol.Name) + vol.log.Infof("Volume is shared, not removing") return nil } - seelog.Debugf("Removing volume with name %s", vol.Name) + vol.log.Infof("Removing volume") err := vol.client.RemoveVolume(vol.ctx, vol.VolumeConfig.DockerVolumeName, dockerclient.RemoveVolumeTimeout) if err != nil { @@ -385,5 +402,6 @@ func (vol *VolumeResource) UnmarshalJSON(b []byte) error { if temp.KnownStatus != nil { vol.SetKnownStatus(resourcestatus.ResourceStatus(*temp.KnownStatus)) } + vol.initLog() return nil } From 1a0c755772dd5a2bea83a53966e8d2c1b6650fb2 Mon Sep 17 00:00:00 2001 From: Cam Date: Fri, 20 Dec 2019 15:37:38 -0800 Subject: [PATCH 2/4] Use logger.Contextual for custom struct logs --- agent/api/task/json.go | 6 +- agent/api/task/task.go | 36 +++--- agent/engine/task_manager.go | 27 ++--- agent/logger/contextual_logger.go | 119 +++++++++++++++++++ agent/logger/log.go | 43 ++++--- agent/taskresource/cgroup/cgroup.go | 29 ++--- agent/taskresource/firelens/firelens_unix.go | 16 +-- agent/taskresource/volume/dockervolume.go | 18 ++- 8 files changed, 200 insertions(+), 94 deletions(-) create mode 100644 agent/logger/contextual_logger.go diff --git a/agent/api/task/json.go b/agent/api/task/json.go index 83a1c5eb5d2..8f318426802 100644 --- a/agent/api/task/json.go +++ b/agent/api/task/json.go @@ -35,6 +35,10 @@ func (t *Task) UnmarshalJSON(data []byte) error { if err != nil { return err } - t.initLog() + t.log.SetContext(map[string]string{ + "taskARN": t.Arn, + "taskFamily": t.Family, + "taskVersion": t.Version, + }) return nil } diff --git a/agent/api/task/task.go b/agent/api/task/task.go index 5a72c082cd9..18049071773 100644 --- a/agent/api/task/task.go +++ b/agent/api/task/task.go @@ -256,7 +256,7 @@ type Task struct { lock sync.RWMutex // log is a custom logger with extra context specific to the task struct - log seelog.LoggerInterface + log logger.Contextual } // TaskFromACS translates ecsacs.Task to apitask.Task by first marshaling the received @@ -270,6 +270,11 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task, if err := json.Unmarshal(data, task); err != nil { return nil, err } + task.log.SetContext(map[string]string{ + "taskARN": task.Arn, + "taskFamily": task.Family, + "taskVersion": task.Version, + }) if task.GetDesiredStatus() == apitaskstatus.TaskRunning && envelope.SeqNum != nil { task.StartSequenceNumber = *envelope.SeqNum } else if task.GetDesiredStatus() == apitaskstatus.TaskStopped && envelope.SeqNum != nil { @@ -286,21 +291,9 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task, //initialize resources map for task task.ResourcesMapUnsafe = make(map[string][]taskresource.TaskResource) - task.initLog() return task, nil } -func (task *Task) initLog() { - if task.log == nil { - task.log = logger.InitLogger() - task.log.SetContext(map[string]string{ - "taskARN": task.Arn, - "taskFamily": task.Family, - "taskVersion": task.Version, - }) - } -} - func (task *Task) initializeVolumes(cfg *config.Config, dockerClient dockerapi.DockerClient, ctx context.Context) error { err := task.initializeDockerLocalVolumes(dockerClient, ctx) if err != nil { @@ -325,7 +318,6 @@ func (task *Task) PostUnmarshalTask(cfg *config.Config, dockerClient dockerapi.DockerClient, ctx context.Context) error { // TODO, add rudimentary plugin support and call any plugins that want to // hook into this - task.initLog() task.adjustForPlatform(cfg) if task.MemoryCPULimitsEnabled { if err := task.initializeCgroupResourceSpec(cfg.CgroupPath, cfg.CgroupCPUPeriod, resourceFields); err != nil { @@ -1311,7 +1303,7 @@ func (task *Task) updateTaskKnownStatus() (newStatus apitaskstatus.TaskStatus) { } } if earliestKnownStatusContainer == nil { - task.log.Criticalf( + task.log.Errorf( "Impossible state found while updating tasks's known status, earliest state recorded as %s", containerEarliestKnownStatus.String()) return apitaskstatus.TaskStatusNone @@ -1342,7 +1334,7 @@ func (task *Task) updateTaskKnownStatus() (newStatus apitaskstatus.TaskStatus) { // based on the known statuses of all containers in the task func (task *Task) getEarliestKnownTaskStatusForContainers() apitaskstatus.TaskStatus { if len(task.Containers) == 0 { - task.log.Criticalf("No containers in the task") + task.log.Errorf("No containers in the task") return apitaskstatus.TaskStatusNone } // Set earliest container status to an impossible to reach 'high' task status @@ -1571,13 +1563,13 @@ func (task *Task) shouldOverrideNetworkMode(container *apicontainer.Container, d } } if pauseContName == "" { - task.log.Critical("Pause container required, but not found in the task") + task.log.Error("Pause container required, but not found in the task") return false, "" } pauseContainer, ok := dockerContainerMap[pauseContName] if !ok || pauseContainer == nil { // This should never be the case and implies a code-bug. - task.log.Criticalf("Pause container required, but not found in container map for container: [%s]", + task.log.Errorf("Pause container required, but not found in container map for container: [%s]", container.String()) return false, "" } @@ -1660,14 +1652,14 @@ func (task *Task) shouldOverridePIDMode(container *apicontainer.Container, docke case pidModeTask: pauseCont, ok := task.ContainerByName(NamespacePauseContainerName) if !ok { - task.log.Criticalf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped") + task.log.Errorf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped") task.SetDesiredStatus(apitaskstatus.TaskStopped) return false, "" } pauseDockerID, ok := dockerContainerMap[pauseCont.Name] if !ok || pauseDockerID == nil { // Docker container shouldn't be nil or not exist if the Container definition within task exists; implies code-bug - task.log.Criticalf("Namespace Pause docker container not found in the task; Setting Task's Desired Status to Stopped") + task.log.Errorf("Namespace Pause docker container not found in the task; Setting Task's Desired Status to Stopped") task.SetDesiredStatus(apitaskstatus.TaskStopped) return false, "" } @@ -1713,14 +1705,14 @@ func (task *Task) shouldOverrideIPCMode(container *apicontainer.Container, docke case ipcModeTask: pauseCont, ok := task.ContainerByName(NamespacePauseContainerName) if !ok { - task.log.Criticalf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped") + task.log.Errorf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped") task.SetDesiredStatus(apitaskstatus.TaskStopped) return false, "" } pauseDockerID, ok := dockerContainerMap[pauseCont.Name] if !ok || pauseDockerID == nil { // Docker container shouldn't be nill or not exist if the Container definition within task exists; implies code-bug - task.log.Criticalf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped") + task.log.Errorf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped") task.SetDesiredStatus(apitaskstatus.TaskStopped) return false, "" } diff --git a/agent/engine/task_manager.go b/agent/engine/task_manager.go index 86dc66388be..26df8082969 100644 --- a/agent/engine/task_manager.go +++ b/agent/engine/task_manager.go @@ -40,7 +40,6 @@ import ( utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync" "github.com/aws/amazon-ecs-agent/agent/utils/ttime" - "github.com/cihub/seelog" "github.com/pkg/errors" ) @@ -127,7 +126,7 @@ type managedTask struct { *apitask.Task ctx context.Context cancel context.CancelFunc - log seelog.LoggerInterface + log logger.Contextual engine *DockerTaskEngine cfg *config.Config @@ -167,18 +166,11 @@ type managedTask struct { // This method must only be called when the engine.processTasks write lock is // already held. func (engine *DockerTaskEngine) newManagedTask(task *apitask.Task) *managedTask { - log := logger.InitLogger() - log.SetContext(map[string]string{ - "taskARN": task.Arn, - "taskFamily": task.Family, - "taskVersion": task.Version, - }) ctx, cancel := context.WithCancel(engine.ctx) t := &managedTask{ ctx: ctx, cancel: cancel, Task: task, - log: log, acsMessages: make(chan acsTransition), dockerMessages: make(chan dockerContainerChange), resourceStateChangeEvent: make(chan resourceStateChange), @@ -192,6 +184,11 @@ func (engine *DockerTaskEngine) newManagedTask(task *apitask.Task) *managedTask taskStopWG: engine.taskStopGroup, steadyStatePollInterval: engine.taskSteadyStatePollInterval, } + t.log.SetContext(map[string]string{ + "taskARN": task.Arn, + "taskFamily": task.Family, + "taskVersion": task.Version, + }) engine.managedTasks[task.Arn] = t return t } @@ -502,8 +499,7 @@ func (mtask *managedTask) emitResourceChange(change resourceStateChange) { func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) { event, err := api.NewTaskStateChangeEvent(task, reason) if err != nil { - mtask.log.Infof("unable to create task state change event [%s]: %v", - task.Arn, reason, err) + mtask.log.Infof("unable to create task state change event [%s]: %v", reason, err) return } mtask.log.Infof("sending task change event [%s]", event.String()) @@ -516,8 +512,7 @@ func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) { func (mtask *managedTask) emitContainerEvent(task *apitask.Task, cont *apicontainer.Container, reason string) { event, err := api.NewContainerStateChangeEvent(task, cont, reason) if err != nil { - mtask.log.Infof("unable to create state change event for container [%s]: %v", - task.Arn, cont.Name, err) + mtask.log.Infof("unable to create state change event for container [%s]: %v", cont.Name, err) return } @@ -575,14 +570,12 @@ func (mtask *managedTask) releaseIPInIPAM() { MinSupportedCNIVersion: config.DefaultMinSupportedCNIVersion, }) if err != nil { - mtask.log.Errorf("failed to release ip; unable to build cni configuration: %v", - err) + mtask.log.Errorf("failed to release ip; unable to build cni configuration: %v", err) return } err = mtask.cniClient.ReleaseIPResource(mtask.ctx, cfg, ipamCleanupTmeout) if err != nil { - mtask.log.Errorf("failed to release ip; IPAM error: %v", - err) + mtask.log.Errorf("failed to release ip; IPAM error: %v", err) return } } diff --git a/agent/logger/contextual_logger.go b/agent/logger/contextual_logger.go new file mode 100644 index 00000000000..897c34f3b81 --- /dev/null +++ b/agent/logger/contextual_logger.go @@ -0,0 +1,119 @@ +// Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package logger + +import ( + "path/filepath" + "runtime" + + "github.com/cihub/seelog" +) + +// Contextual is a logger that can have custom context added to it. Once +// SetContext is called, it will print log messages with the additional context +// appended. Before SetContext is called, it will print messages using the +// default agent logger. +type Contextual struct { + log seelog.LoggerInterface + context map[string]string +} + +// Debugf formats message according to format specifier +// and writes to log with level = Debug. +func (c *Contextual) Debugf(format string, params ...interface{}) { + if c.log == nil { + seelog.Debugf(format, params...) + } else { + c.log.Debugf(format, params...) + } +} + +// Infof formats message according to format specifier +// and writes to log with level = Info. +func (c *Contextual) Infof(format string, params ...interface{}) { + if c.log == nil { + seelog.Infof(format, params...) + } else { + c.log.Infof(format, params...) + } +} + +// Warnf formats message according to format specifier +// and writes to log with level = Warn. +func (c *Contextual) Warnf(format string, params ...interface{}) error { + if c.log == nil { + return seelog.Warnf(format, params...) + } else { + return c.log.Warnf(format, params...) + } +} + +// Errorf formats message according to format specifier +// and writes to log with level = Error. +func (c *Contextual) Errorf(format string, params ...interface{}) error { + if c.log == nil { + return seelog.Errorf(format, params...) + } else { + return c.log.Errorf(format, params...) + } +} + +// Debug formats message using the default formats for its operands +// and writes to log with level = Debug +func (c *Contextual) Debug(v ...interface{}) { + if c.log == nil { + seelog.Debug(v...) + } else { + c.log.Debug(v...) + } +} + +// Info formats message using the default formats for its operands +// and writes to log with level = Info +func (c *Contextual) Info(v ...interface{}) { + if c.log == nil { + seelog.Info(v...) + } else { + c.log.Info(v...) + } +} + +// Warn formats message using the default formats for its operands +// and writes to log with level = Warn +func (c *Contextual) Warn(v ...interface{}) error { + if c.log == nil { + return seelog.Warn(v...) + } else { + return c.log.Warn(v...) + } +} + +// Error formats message using the default formats for its operands +// and writes to log with level = Error +func (c *Contextual) Error(v ...interface{}) error { + if c.log == nil { + return seelog.Error(v...) + } else { + return c.log.Error(v...) + } +} + +func (c *Contextual) SetContext(context map[string]string) { + if c.log == nil { + c.log = InitLogger() + _, f, _, _ := runtime.Caller(1) + context["module"] = filepath.Base(f) + c.log.SetContext(context) + } +} diff --git a/agent/logger/log.go b/agent/logger/log.go index f358c95f9fd..de29c17015a 100644 --- a/agent/logger/log.go +++ b/agent/logger/log.go @@ -55,31 +55,40 @@ var Config *logConfig func logfmtFormatter(params string) seelog.FormatterFunc { return func(message string, level seelog.LogLevel, context seelog.LogContextInterface) interface{} { cc, ok := context.CustomContext().(map[string]string) - var customContext string - if ok && len(cc) > 0 { - var sortedContext []string - for k, v := range cc { - sortedContext = append(sortedContext, k+"="+v) - } - sort.Strings(sortedContext) - customContext = " " + strings.Join(sortedContext, " ") + if !ok { + cc = map[string]string{} } - return fmt.Sprintf(`level=%s time=%s msg=%q module=%s%s -`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, context.FileName(), customContext) + if _, ok = cc["module"]; !ok { + cc["module"] = context.FileName() + } + + var ccStr string + var ccSorted []string + for k, v := range cc { + ccSorted = append(ccSorted, k+"="+v) + } + sort.Strings(ccSorted) + ccStr = " " + strings.Join(ccSorted, " ") + return fmt.Sprintf(`level=%s time=%s msg=%q%s +`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, ccStr) } } func jsonFormatter(params string) seelog.FormatterFunc { return func(message string, level seelog.LogLevel, context seelog.LogContextInterface) interface{} { cc, ok := context.CustomContext().(map[string]string) - var customContext string - if ok && len(cc) > 0 { - for k, v := range cc { - customContext += fmt.Sprintf(", %q: %q", k, v) - } + if !ok { + cc = map[string]string{} + } + if _, ok = cc["module"]; !ok { + cc["module"] = context.FileName() + } + var ccStr string + for k, v := range cc { + ccStr += fmt.Sprintf(", %q: %q", k, v) } - return fmt.Sprintf(`{"level": %q, "time": %q, "msg": %q, "module": %q%s} -`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, context.FileName(), customContext) + return fmt.Sprintf(`{"level": %q, "time": %q, "msg": %q%s} +`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, ccStr) } } diff --git a/agent/taskresource/cgroup/cgroup.go b/agent/taskresource/cgroup/cgroup.go index 111b9b67142..808f560bead 100644 --- a/agent/taskresource/cgroup/cgroup.go +++ b/agent/taskresource/cgroup/cgroup.go @@ -28,7 +28,6 @@ import ( control "github.com/aws/amazon-ecs-agent/agent/taskresource/cgroup/control" resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status" "github.com/aws/amazon-ecs-agent/agent/utils/ioutilwrapper" - "github.com/cihub/seelog" "github.com/containerd/cgroups" specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" @@ -66,7 +65,7 @@ type CgroupResource struct { // lock is used for fields that are accessed and updated concurrently lock sync.RWMutex // log is a custom logger with extra context specific to the cgroup struct - log seelog.LoggerInterface + log logger.Contextual } // NewCgroupResource is used to return an object that implements the Resource interface @@ -85,22 +84,15 @@ func NewCgroupResource(taskARN string, resourceSpec: resourceSpec, } c.initializeResourceStatusToTransitionFunction() - c.initLog() + c.log.SetContext(map[string]string{ + "taskARN": taskARN, + "cgroupRoot": cgroupRoot, + "cgroupMountPath": cgroupMountPath, + "resourceName": resourceName, + }) return c } -func (cgroup *CgroupResource) initLog() { - if cgroup.log == nil { - cgroup.log = logger.InitLogger() - cgroup.log.SetContext(map[string]string{ - "taskARN": cgroup.taskARN, - "cgroupRoot": cgroup.cgroupRoot, - "cgroupMountPath": cgroup.cgroupMountPath, - "resourceName": resourceName, - }) - } -} - // GetTerminalReason returns an error string to propagate up through to task // state change messages func (cgroup *CgroupResource) GetTerminalReason() string { @@ -358,7 +350,12 @@ func (cgroup *CgroupResource) UnmarshalJSON(b []byte) error { if temp.KnownStatus != nil { cgroup.SetKnownStatus(resourcestatus.ResourceStatus(*temp.KnownStatus)) } - cgroup.initLog() + cgroup.log.SetContext(map[string]string{ + "taskARN": cgroup.taskARN, + "cgroupRoot": cgroup.cgroupRoot, + "cgroupMountPath": cgroup.cgroupMountPath, + "resourceName": resourceName, + }) return nil } diff --git a/agent/taskresource/firelens/firelens_unix.go b/agent/taskresource/firelens/firelens_unix.go index 52cd958c3a1..d5d94f83d82 100644 --- a/agent/taskresource/firelens/firelens_unix.go +++ b/agent/taskresource/firelens/firelens_unix.go @@ -23,7 +23,6 @@ import ( "sync" "time" - "github.com/cihub/seelog" "github.com/pkg/errors" "github.com/aws/amazon-ecs-agent/agent/api/task/status" @@ -92,7 +91,7 @@ type FirelensResource struct { terminalReasonOnce sync.Once lock sync.RWMutex // log is a custom logger with extra context specific to the firelens struct - log seelog.LoggerInterface + log logger.Contextual } // NewFirelensResource returns a new FirelensResource. @@ -130,14 +129,11 @@ func NewFirelensResource(cluster, taskARN, taskDefinition, ec2InstanceID, dataDi } func (firelens *FirelensResource) initLog() { - if firelens.log == nil { - firelens.log = logger.InitLogger() - firelens.log.SetContext(map[string]string{ - "taskARN": firelens.taskARN, - "configType": firelens.firelensConfigType, - "resourceName": ResourceName, - }) - } + firelens.log.SetContext(map[string]string{ + "taskARN": firelens.taskARN, + "configType": firelens.firelensConfigType, + "resourceName": ResourceName, + }) } func (firelens *FirelensResource) parseOptions(options map[string]string) error { diff --git a/agent/taskresource/volume/dockervolume.go b/agent/taskresource/volume/dockervolume.go index 733975d76c5..2adb867af53 100644 --- a/agent/taskresource/volume/dockervolume.go +++ b/agent/taskresource/volume/dockervolume.go @@ -25,7 +25,6 @@ import ( "github.com/aws/amazon-ecs-agent/agent/logger" "github.com/aws/amazon-ecs-agent/agent/taskresource" resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status" - "github.com/cihub/seelog" "github.com/pkg/errors" ) @@ -66,7 +65,7 @@ type VolumeResource struct { lock sync.RWMutex // log is a custom logger with extra context specific to the volume resource struct - log seelog.LoggerInterface + log logger.Contextual } // DockerVolumeConfig represents docker volume configuration @@ -128,15 +127,12 @@ func NewVolumeResource(ctx context.Context, } func (vol *VolumeResource) initLog() { - if vol.log == nil { - vol.log = logger.InitLogger() - vol.log.SetContext(map[string]string{ - "volumeName": vol.Name, - "dockerVolumeName": vol.VolumeConfig.DockerVolumeName, - "dockerScope": vol.VolumeConfig.Scope, - "resourceName": "volume", - }) - } + vol.log.SetContext(map[string]string{ + "volumeName": vol.Name, + "dockerVolumeName": vol.VolumeConfig.DockerVolumeName, + "dockerScope": vol.VolumeConfig.Scope, + "resourceName": "volume", + }) } func (vol *VolumeResource) Initialize(resourceFields *taskresource.ResourceFields, From 2c8f46c9c1b3d5e7012afcd7027c4b71cb88f456 Mon Sep 17 00:00:00 2001 From: Cam Date: Fri, 20 Dec 2019 22:28:11 -0800 Subject: [PATCH 3/4] Fixing broke tests --- agent/acs/handler/payload_handler_test.go | 96 +++++++++++++---------- agent/api/task/task_test.go | 1 + agent/logger/log.go | 7 +- agent/logger/log_test.go | 20 +++-- 4 files changed, 71 insertions(+), 53 deletions(-) diff --git a/agent/acs/handler/payload_handler_test.go b/agent/acs/handler/payload_handler_test.go index f33fdecf89d..3c38fe0ee3f 100644 --- a/agent/acs/handler/payload_handler_test.go +++ b/agent/acs/handler/payload_handler_test.go @@ -16,9 +16,10 @@ package handler import ( "context" + "encoding/json" "errors" "fmt" - "reflect" + "runtime" "sync" "testing" @@ -33,11 +34,11 @@ import ( "github.com/aws/amazon-ecs-agent/agent/eventhandler" "github.com/aws/amazon-ecs-agent/agent/statemanager" mock_statemanager "github.com/aws/amazon-ecs-agent/agent/statemanager/mocks" - "github.com/aws/amazon-ecs-agent/agent/taskresource" mock_wsclient "github.com/aws/amazon-ecs-agent/agent/wsclient/mock" "github.com/aws/aws-sdk-go/aws" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -147,13 +148,7 @@ func TestHandlePayloadMessageStateSaveError(t *testing.T) { }) assert.Error(t, err, "Expected error while adding a task from statemanager") - // We expect task to be added to the engine even though it hasn't been saved - expectedTask := &apitask.Task{ - Arn: "t1", - ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource), - } - - assert.Equal(t, addedTask, expectedTask, "added task is not expected") + validateTask(t, addedTask, "t1") } // TestHandlePayloadMessageAckedWhenTaskAdded tests if the handler generates an ack @@ -194,12 +189,7 @@ func TestHandlePayloadMessageAckedWhenTaskAdded(t *testing.T) { // Verify the message id acked assert.Equal(t, aws.StringValue(ackRequested.MessageId), payloadMessageId, "received message is not expected") - // Verify if task added == expected task - expectedTask := &apitask.Task{ - Arn: "t1", - ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource), - } - assert.Equal(t, addedTask, expectedTask, "received task is not expected") + validateTask(t, addedTask, "t1") } // TestHandlePayloadMessageCredentialsAckedWhenTaskAdded tests if the handler generates @@ -290,8 +280,7 @@ func TestHandlePayloadMessageCredentialsAckedWhenTaskAdded(t *testing.T) { SessionToken: credentialsSessionToken, CredentialsID: credentialsId, } - err = validateTaskAndCredentials(taskCredentialsAckRequested, expectedCredentialsAck, addedTask, taskArn, expectedCredentials) - assert.NoError(t, err, "error validating added task or credentials ack for the same") + validateTaskAndCredentials(t, taskCredentialsAckRequested, expectedCredentialsAck, addedTask, taskArn, expectedCredentials, "t1") } // TestAddPayloadTaskAddsNonStoppedTasksAfterStoppedTasks tests if tasks with desired status @@ -371,12 +360,7 @@ func TestPayloadBufferHandler(t *testing.T) { // Verify if payloadMessageId read from the ack buffer is correct assert.Equal(t, aws.StringValue(ackRequested.MessageId), payloadMessageId, "received task is not expected") - // Verify if the task added to the engine is correct - expectedTask := &apitask.Task{ - Arn: taskArn, - ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource), - } - assert.Equal(t, addedTask, expectedTask, "received task is not expected") + validateTask(t, addedTask, "t1") } // TestPayloadBufferHandlerWithCredentials tests if the async payloadBufferHandler routine @@ -495,8 +479,7 @@ func TestPayloadBufferHandlerWithCredentials(t *testing.T) { SessionToken: firstTaskCredentialsSessionToken, CredentialsID: firstTaskCredentialsId, } - err := validateTaskAndCredentials(firstTaskCredentialsAckRequested, expectedCredentialsAckForFirstTask, firstAddedTask, firstTaskArn, expectedCredentialsForFirstTask) - assert.NoError(t, err, "error validating added task or credentials ack for the same") + validateTaskAndCredentials(t, firstTaskCredentialsAckRequested, expectedCredentialsAckForFirstTask, firstAddedTask, firstTaskArn, expectedCredentialsForFirstTask, "t1") // Verify the correctness of the second task added to the engine and the // credentials ack generated for it @@ -513,8 +496,7 @@ func TestPayloadBufferHandlerWithCredentials(t *testing.T) { SessionToken: secondTaskCredentialsSessionToken, CredentialsID: secondTaskCredentialsId, } - err = validateTaskAndCredentials(secondTaskCredentialsAckRequested, expectedCredentialsAckForSecondTask, secondAddedTask, secondTaskArn, expectedCredentialsForSecondTask) - assert.NoError(t, err, "error validating added task or credentials ack for the same") + validateTaskAndCredentials(t, secondTaskCredentialsAckRequested, expectedCredentialsAckForSecondTask, secondAddedTask, secondTaskArn, expectedCredentialsForSecondTask, "t2") } // TestAddPayloadTaskAddsExecutionRoles tests the payload handler will add @@ -596,24 +578,18 @@ func TestAddPayloadTaskAddsExecutionRoles(t *testing.T) { // validateTaskAndCredentials compares a task and a credentials ack object // against expected values. It returns an error if either of the the // comparisons fail -func validateTaskAndCredentials(taskCredentialsAck, expectedCredentialsAckForTask *ecsacs.IAMRoleCredentialsAckRequest, +func validateTaskAndCredentials( + t *testing.T, + taskCredentialsAck *ecsacs.IAMRoleCredentialsAckRequest, + expectedCredentialsAckForTask *ecsacs.IAMRoleCredentialsAckRequest, addedTask *apitask.Task, expectedTaskArn string, - expectedTaskCredentials credentials.IAMRoleCredentials) error { - if !reflect.DeepEqual(taskCredentialsAck, expectedCredentialsAckForTask) { - return fmt.Errorf("Mismatch between expected and received credentials ack requests, expected: %s, got: %s", expectedCredentialsAckForTask.String(), taskCredentialsAck.String()) - } - - expectedTask := &apitask.Task{ - Arn: expectedTaskArn, - ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource), - } - expectedTask.SetCredentialsID(expectedTaskCredentials.CredentialsID) - - if !reflect.DeepEqual(addedTask, expectedTask) { - return fmt.Errorf("Mismatch between expected and added tasks, expected: %v, added: %v", expectedTask, addedTask) - } - return nil + expectedTaskCredentials credentials.IAMRoleCredentials, + taskName string, +) { + require.Equal(t, expectedCredentialsAckForTask, taskCredentialsAck) + require.Equal(t, expectedTaskCredentials.CredentialsID, addedTask.GetCredentialsID()) + validateTask(t, addedTask, taskName) } func TestPayloadHandlerAddedENIToTask(t *testing.T) { @@ -949,3 +925,37 @@ func TestPayloadHandlerAddedFirelensData(t *testing.T) { assert.NotNil(t, actual.Options) assert.Equal(t, aws.StringValue(expected.Options["enable-ecs-log-metadata"]), actual.Options["enable-ecs-log-metadata"]) } + +func validateTask(t *testing.T, addedTask *apitask.Task, expectedTaskName string) { + // We expect task to be added to the engine even though it hasn't been saved + addedTaskJSON, err := json.Marshal(addedTask) + require.NoError(t, err) + platformFields := "{}" + if runtime.GOOS == "windows" { + platformFields = `{"cpuUnbounded": false, "memoryUnbounded": false}` + } + expectedTaskJSON := fmt.Sprintf(` + { + "Arn": "%s", + "Family": "", + "Version": "", + "Containers": null, + "associations": null, + "resources": {}, + "volumes": null, + "DesiredStatus": "NONE", + "KnownStatus": "NONE", + "KnownTime": "0001-01-01T00:00:00Z", + "PullStartedAt": "0001-01-01T00:00:00Z", + "PullStoppedAt": "0001-01-01T00:00:00Z", + "ExecutionStoppedAt": "0001-01-01T00:00:00Z", + "SentStatus": "NONE", + "StartSequenceNumber": 0, + "StopSequenceNumber": 0, + "executionCredentialsID": "", + "ENI": null, + "AppMesh": null, + "PlatformFields": %s + }`, expectedTaskName, platformFields) + require.JSONEq(t, expectedTaskJSON, string(addedTaskJSON)) +} diff --git a/agent/api/task/task_test.go b/agent/api/task/task_test.go index fd6e6393579..aeca7e3c1e9 100644 --- a/agent/api/task/task_test.go +++ b/agent/api/task/task_test.go @@ -1277,6 +1277,7 @@ func TestTaskFromACS(t *testing.T) { seqNum := int64(42) task, err := TaskFromACS(&taskFromAcs, &ecsacs.PayloadMessage{SeqNum: &seqNum}) + expectedTask.log = task.log assert.NoError(t, err) assert.EqualValues(t, expectedTask, task) diff --git a/agent/logger/log.go b/agent/logger/log.go index de29c17015a..0d8c210488c 100644 --- a/agent/logger/log.go +++ b/agent/logger/log.go @@ -55,7 +55,7 @@ var Config *logConfig func logfmtFormatter(params string) seelog.FormatterFunc { return func(message string, level seelog.LogLevel, context seelog.LogContextInterface) interface{} { cc, ok := context.CustomContext().(map[string]string) - if !ok { + if !ok || len(cc) == 0 { cc = map[string]string{} } if _, ok = cc["module"]; !ok { @@ -77,7 +77,7 @@ func logfmtFormatter(params string) seelog.FormatterFunc { func jsonFormatter(params string) seelog.FormatterFunc { return func(message string, level seelog.LogLevel, context seelog.LogContextInterface) interface{} { cc, ok := context.CustomContext().(map[string]string) - if !ok { + if !ok || len(cc) == 0 { cc = map[string]string{} } if _, ok = cc["module"]; !ok { @@ -94,7 +94,7 @@ func jsonFormatter(params string) seelog.FormatterFunc { func seelogConfig() string { c := ` - + ` c += platformLogConfig() @@ -114,6 +114,7 @@ func seelogConfig() string { + ` return c diff --git a/agent/logger/log_test.go b/agent/logger/log_test.go index 7f81b95f0a9..fbe0ae0df56 100644 --- a/agent/logger/log_test.go +++ b/agent/logger/log_test.go @@ -1,4 +1,4 @@ -// +build !windows +// +build !windows,unit // Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. // @@ -115,7 +115,7 @@ func TestSeelogConfig_Default(t *testing.T) { } c := seelogConfig() require.Equal(t, ` - + + `, c) } @@ -139,7 +140,7 @@ func TestSeelogConfig_DebugLevel(t *testing.T) { } c := seelogConfig() require.Equal(t, ` - + + `, c) } @@ -163,7 +165,7 @@ func TestSeelogConfig_SizeRollover(t *testing.T) { } c := seelogConfig() require.Equal(t, ` - + + `, c) } @@ -187,7 +190,7 @@ func TestSeelogConfig_SizeRolloverFileSizeChange(t *testing.T) { } c := seelogConfig() require.Equal(t, ` - + + `, c) } @@ -211,7 +215,7 @@ func TestSeelogConfig_SizeRolloverRollCountChange(t *testing.T) { } c := seelogConfig() require.Equal(t, ` - + + `, c) } @@ -235,7 +240,7 @@ func TestSeelogConfig_JSONOutput(t *testing.T) { } c := seelogConfig() require.Equal(t, ` - + + `, c) } From ea5ba11d7b878ac7c34f150d574cd17e637741a8 Mon Sep 17 00:00:00 2001 From: Cam Date: Mon, 6 Jan 2020 10:40:29 -0800 Subject: [PATCH 4/4] factor reusable code out of formatters --- agent/logger/log.go | 52 ++++++++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/agent/logger/log.go b/agent/logger/log.go index 0d8c210488c..d65409d2b5c 100644 --- a/agent/logger/log.go +++ b/agent/logger/log.go @@ -54,44 +54,42 @@ var Config *logConfig func logfmtFormatter(params string) seelog.FormatterFunc { return func(message string, level seelog.LogLevel, context seelog.LogContextInterface) interface{} { - cc, ok := context.CustomContext().(map[string]string) - if !ok || len(cc) == 0 { - cc = map[string]string{} + c := getContext(context) + var cSorted []string + for k, v := range c { + cSorted = append(cSorted, k+"="+v) } - if _, ok = cc["module"]; !ok { - cc["module"] = context.FileName() - } - - var ccStr string - var ccSorted []string - for k, v := range cc { - ccSorted = append(ccSorted, k+"="+v) - } - sort.Strings(ccSorted) - ccStr = " " + strings.Join(ccSorted, " ") - return fmt.Sprintf(`level=%s time=%s msg=%q%s -`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, ccStr) + sort.Strings(cSorted) + return fmt.Sprintf(`level=%s time=%s msg=%q %s +`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, strings.Join(cSorted, " ")) } } func jsonFormatter(params string) seelog.FormatterFunc { return func(message string, level seelog.LogLevel, context seelog.LogContextInterface) interface{} { - cc, ok := context.CustomContext().(map[string]string) - if !ok || len(cc) == 0 { - cc = map[string]string{} - } - if _, ok = cc["module"]; !ok { - cc["module"] = context.FileName() - } - var ccStr string - for k, v := range cc { - ccStr += fmt.Sprintf(", %q: %q", k, v) + c := getContext(context) + var cStr string + for k, v := range c { + cStr += fmt.Sprintf(", %q: %q", k, v) } return fmt.Sprintf(`{"level": %q, "time": %q, "msg": %q%s} -`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, ccStr) +`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, cStr) } } +// gets any custom context that has been added to this logger as a map, as well +// as setting the 'module' context if it has not been set yet. +func getContext(context seelog.LogContextInterface) map[string]string { + c, ok := context.CustomContext().(map[string]string) + if !ok || c == nil { + c = map[string]string{} + } + if _, ok = c["module"]; !ok { + c["module"] = context.FileName() + } + return c +} + func seelogConfig() string { c := `