From 3fd303df4b0de71520a771b53c85b8dbd68eb72d Mon Sep 17 00:00:00 2001 From: Walid Baruni Date: Mon, 30 Sep 2024 09:08:35 +0200 Subject: [PATCH] improve async error reporting (#4550) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improve async error reporting of executions from compute nodes back to orchestrators and job store, such as errors related to docker executor, s3 publisher and input source. The PR does the following: 1. Enriches S3 errors with AWS error code and more metadata 2. Use the new bacerrors.Error for docker returned errors 3. Add new `ErrorCode` to `models.Event` details, and populate that value with bacerrors `{Component}:{ErrorCode}`, such as `S3Publisher:NoSuchBucket` and `Docker:ImageNotFound` 4. Introduced new `Details` field to executions compute state, which will hold additional metadata about the latest state of the execution, mainly the `ErrorCode` 5. Publish ErrorCode to otel analytics ### Examples: #### Bad docker image ``` → bacalhau docker run non_existent_image Job successfully submitted. Job ID: j-29a81940-18a2-44b7-b0da-807d45946f45 Checking job status... (Enter Ctrl+C to exit at any time, your job will continue running): TIME EXEC. ID TOPIC EVENT 22:37:32.323 Submission Job submitted 22:37:32.340 e-640f0876 Scheduling Requested execution on n-7c5b7d69 * NodeID: n-7c5b7d69-c42d-493e-ade0-7d6feeedc507 22:37:34.569 e-640f0876 Exec Scanning Error: image not available: "non_existent_image" Hint: To resolve this, either: 1. Check if the image exists in the registry and the name is correct 2. If the image is private, supply the node with valid Docker login credentials using the DOCKER_USERNAME and DOCKER_PASSWORD environment variables * ErrorCode: Docker:ImageNotFound * Image: non_existent_image 22:37:34.585 e-a3a3afe2 Scheduling Requested execution on n-7c5b7d69 * NodeID: n-7c5b7d69-c42d-493e-ade0-7d6feeedc507 22:37:36.732 e-a3a3afe2 Exec Scanning Error: image not available: "non_existent_image" Hint: To resolve this, either: 1. Check if the image exists in the registry and the name is correct 2. If the image is private, supply the node with valid Docker login credentials using the DOCKER_USERNAME and DOCKER_PASSWORD environment variables * ErrorCode: Docker:ImageNotFound * Image: non_existent_image Error: job failed To get more details about the run, execute: bacalhau job describe j-29a81940-18a2-44b7-b0da-807d45946f45 To get more details about the run executions, execute: bacalhau job executions j-29a81940-18a2-44b7-b0da-807d45946f45 bacalhau job executions j-29a81940-18a2-44b7-b0da-807d45946f45 --output yaml - AllocatedResources: Tasks: {} ComputeState: Message: 'image not available: "non_existent_image"' StateType: 8 CreateTime: 1727642252340926000 DesiredState: Message: execution failed StateType: 2 EvalID: ecad787d-e72a-4987-b353-cd6552d546bf FollowupEvalID: "" ID: e-640f0876-119b-40ac-883e-b2126b5a40f3 JobID: j-29a81940-18a2-44b7-b0da-807d45946f45 ModifyTime: 1727642254570170000 Name: "" Namespace: default NextExecution: "" NodeID: n-7c5b7d69-c42d-493e-ade0-7d6feeedc507 PreviousExecution: "" PublishedResult: Type: "" Revision: 3 RunOutput: null - AllocatedResources: Tasks: {} ComputeState: Message: 'image not available: "non_existent_image"' StateType: 8 CreateTime: 1727642254585495000 DesiredState: Message: execution failed StateType: 2 EvalID: ef3bae6f-54fb-4f48-9b83-98364049e685 FollowupEvalID: "" ID: e-a3a3afe2-5d12-498f-ad19-86ea00425d30 JobID: j-29a81940-18a2-44b7-b0da-807d45946f45 ModifyTime: 1727642256732971000 Name: "" Namespace: default NextExecution: "" NodeID: n-7c5b7d69-c42d-493e-ade0-7d6feeedc507 PreviousExecution: "" PublishedResult: Type: "" Revision: 3 RunOutput: null ``` #### Bad S3 bucket ``` → bacalhau job run docker-s3.yaml Job successfully submitted. Job ID: j-036bc69b-7b81-489b-a714-d1349d6e6f5b Checking job status... (Enter Ctrl+C to exit at any time, your job will continue running): TIME EXEC. ID TOPIC EVENT 22:36:57.853 Submission Job submitted 22:36:57.868 e-ad0ab10c Scheduling Requested execution on n-7c5b7d69 * NodeID: n-7c5b7d69-c42d-493e-ade0-7d6feeedc507 22:36:57.929 e-ad0ab10c Execution Running 22:37:03.414 e-ad0ab10c Publishing Error: failed to publish s3 result: operation error S3: PutObject, https response error StatusCode: Results 404, RequestID: 62FSTZ2400AA0782, api error NoSuchBucket: The specified bucket does not exist * AWSRequestID: 62FSTZ2400AA0782 * ErrorCode: S3Publisher:NoSuchBucket * Operation: PutObject * Service: S3 22:37:03.432 e-995b726b Scheduling Requested execution on n-7c5b7d69 * NodeID: n-7c5b7d69-c42d-493e-ade0-7d6feeedc507 22:37:03.482 e-995b726b Execution Running 22:37:07.085 e-995b726b Publishing Error: failed to publish s3 result: operation error S3: PutObject, https response error StatusCode: Results 404, RequestID: YNJQY666GB15CT3K, api error NoSuchBucket: The specified bucket does not exist * Operation: PutObject * Service: S3 * AWSRequestID: YNJQY666GB15CT3K * ErrorCode: S3Publisher:NoSuchBucket Error: job failed To get more details about the run, execute: bacalhau job describe j-036bc69b-7b81-489b-a714-d1349d6e6f5b To get more details about the run executions, execute: bacalhau job executions j-036bc69b-7b81-489b-a714-d1349d6e6f5b To download the results, execute: bacalhau job get j-036bc69b-7b81-489b-a714-d1349d6e6f5b ``` --- cmd/cli/docker/docker_run_test.go | 8 +- pkg/analytics/execution_compute_message.go | 24 ++- pkg/analytics/executions.go | 25 ++- pkg/bacerrors/codes.go | 1 + pkg/bacerrors/error.go | 21 ++- pkg/bacerrors/error_test.go | 73 +++++++++ pkg/compute/bidder.go | 4 +- pkg/compute/capacity/disk/calculator.go | 4 +- pkg/compute/executor.go | 3 +- pkg/docker/docker.go | 56 +------ pkg/docker/errors.go | 136 ++++++++++------ pkg/executor/docker/executor.go | 2 +- pkg/jobstore/boltdb/store.go | 24 +-- pkg/models/event.go | 47 ++++-- pkg/models/event_test.go | 3 +- pkg/models/state.go | 23 +++ pkg/orchestrator/callback.go | 8 +- pkg/orchestrator/planner/state_updater.go | 1 + pkg/publisher/s3/publisher.go | 2 +- pkg/s3/errors.go | 111 ++++++++++++- pkg/s3/errors_test.go | 180 +++++++++++++++++++++ pkg/s3/result_signer.go | 5 +- pkg/s3/types.go | 20 +-- pkg/storage/s3/storage.go | 9 +- pkg/storage/s3/types.go | 6 +- 25 files changed, 624 insertions(+), 172 deletions(-) create mode 100644 pkg/s3/errors_test.go diff --git a/cmd/cli/docker/docker_run_test.go b/cmd/cli/docker/docker_run_test.go index 28ba90b89c..eeb8df148c 100644 --- a/cmd/cli/docker/docker_run_test.go +++ b/cmd/cli/docker/docker_run_test.go @@ -431,7 +431,7 @@ func (s *DockerRunSuite) TestRun_BadExecutables() { imageName: "badimage", // Bad image executable: "ls", // Good executable isValid: false, - errStringContains: "Could not inspect image", + errStringContains: "image not available", }, "good-image-bad-executable": { imageName: "ubuntu", // Good image // TODO we consider an untagged image poor practice, fix this @@ -443,7 +443,7 @@ func (s *DockerRunSuite) TestRun_BadExecutables() { imageName: "badimage", // Bad image executable: "BADEXECUTABLE", // Bad executable isValid: false, - errStringContains: "Could not inspect image", + errStringContains: "image not available", }, } @@ -491,8 +491,8 @@ func (s *DockerRunSuite) TestRun_InvalidImage() { // test. Alternatively, we could reduce the complexity and assert the job // simply failed which is the expected behaviour for an invalid image s.Require().Len(info.Executions.Items, 2) - s.Contains(info.Executions.Items[0].ComputeState.Message, `Could not inspect image "@" - could be due to repo/image not existing`) - s.Contains(info.Executions.Items[1].ComputeState.Message, `Could not inspect image "@" - could be due to repo/image not existing`) + s.Contains(info.Executions.Items[0].ComputeState.Message, `invalid image format: "@"`) + s.Contains(info.Executions.Items[1].ComputeState.Message, `invalid image format: "@"`) } func (s *DockerRunSuite) TestRun_Timeout_DefaultValue() { diff --git a/pkg/analytics/execution_compute_message.go b/pkg/analytics/execution_compute_message.go index 6f0aaa6040..b6762d0fa3 100644 --- a/pkg/analytics/execution_compute_message.go +++ b/pkg/analytics/execution_compute_message.go @@ -1,17 +1,27 @@ package analytics +import ( + "github.com/bacalhau-project/bacalhau/pkg/models" +) + const ComputeMessageExecutionEventType = "bacalhau.execution_v1.compute_message" type ExecutionComputeMessage struct { - JobID string `json:"job_id,omitempty"` - ExecutionID string `json:"execution_id,omitempty"` - ComputeMessage string `json:"compute_message,omitempty"` + JobID string `json:"job_id,omitempty"` + ExecutionID string `json:"execution_id,omitempty"` + ComputeMessage string `json:"compute_message,omitempty"` + ComputeErrorCode string `json:"compute_state_error_code,omitempty"` } -func NewComputeMessageExecutionEvent(jobID string, executionID string, computeMessage string) *Event { +func NewComputeMessageExecutionEvent(e models.Execution) *Event { + var errorCode string + if e.ComputeState.Details != nil { + errorCode = e.ComputeState.Details[models.DetailsKeyErrorCode] + } return NewEvent(ComputeMessageExecutionEventType, ExecutionComputeMessage{ - JobID: jobID, - ExecutionID: executionID, - ComputeMessage: computeMessage, + JobID: e.JobID, + ExecutionID: e.ID, + ComputeMessage: e.ComputeState.Message, + ComputeErrorCode: errorCode, }) } diff --git a/pkg/analytics/executions.go b/pkg/analytics/executions.go index ea16289a18..aea29fc9b4 100644 --- a/pkg/analytics/executions.go +++ b/pkg/analytics/executions.go @@ -20,8 +20,11 @@ type ExecutionEvent struct { Resources map[string]Resource `json:"resources,omitempty"` - DesiredState string `json:"desired_state,omitempty"` - ComputeState string `json:"compute_state,omitempty"` + DesiredState string `json:"desired_state,omitempty"` + DesiredStateErrorCode string `json:"desired_state_error_code,omitempty"` + + ComputeState string `json:"compute_state,omitempty"` + ComputeStateErrorCode string `json:"compute_state_error_code,omitempty"` PublishedResultType string `json:"publisher_type,omitempty"` @@ -77,6 +80,18 @@ func newExecutionEvent(e models.Execution) ExecutionEvent { exitCode = e.RunOutput.ExitCode } + var ( + desiredStateErrorCode string + computeStateErrorCode string + ) + + if e.DesiredState.Details != nil { + desiredStateErrorCode = e.DesiredState.Details[models.DetailsKeyErrorCode] + } + if e.ComputeState.Details != nil { + computeStateErrorCode = e.ComputeState.Details[models.DetailsKeyErrorCode] + } + return ExecutionEvent{ // ID fields. JobID: e.JobID, @@ -93,8 +108,10 @@ func newExecutionEvent(e models.Execution) ExecutionEvent { Resources: resources, // states. - DesiredState: e.DesiredState.StateType.String(), - ComputeState: e.ComputeState.StateType.String(), + DesiredState: e.DesiredState.StateType.String(), + DesiredStateErrorCode: desiredStateErrorCode, + ComputeState: e.ComputeState.StateType.String(), + ComputeStateErrorCode: computeStateErrorCode, // publisher if any. PublishedResultType: e.PublishedResult.Type, diff --git a/pkg/bacerrors/codes.go b/pkg/bacerrors/codes.go index f3b4c96d6e..498c5077e5 100644 --- a/pkg/bacerrors/codes.go +++ b/pkg/bacerrors/codes.go @@ -19,6 +19,7 @@ const ( ConfigurationError ErrorCode = "ConfigurationError" DatastoreFailure ErrorCode = "DatastoreFailure" RequestCancelled ErrorCode = "RequestCancelled" + UnknownError ErrorCode = "UnknownError" ) func Code(code string) ErrorCode { diff --git a/pkg/bacerrors/error.go b/pkg/bacerrors/error.go index 8ac9f8a2ef..80dee3a220 100644 --- a/pkg/bacerrors/error.go +++ b/pkg/bacerrors/error.go @@ -59,6 +59,9 @@ type Error interface { // WithDetails adds or updates the details associated with the error. WithDetails(details map[string]string) Error + // WithDetail adds or updates a single detail associated with the error. + WithDetail(key, value string) Error + // WithCode sets the ErrorCode for this error. WithCode(code ErrorCode) Error @@ -143,7 +146,23 @@ func (e *errorImpl) WithFailsExecution() Error { // WithDetails sets the details field of Error and // returns the Error itself for chaining. func (e *errorImpl) WithDetails(details map[string]string) Error { - e.details = details + // merge the new details with the existing details + if e.details == nil { + e.details = make(map[string]string) + } + for k, v := range details { + e.details[k] = v + } + return e +} + +// WithDetail adds a single detail to the details field of Error and +// returns the Error itself for chaining. +func (e *errorImpl) WithDetail(key, value string) Error { + if e.details == nil { + e.details = make(map[string]string) + } + e.details[key] = value return e } diff --git a/pkg/bacerrors/error_test.go b/pkg/bacerrors/error_test.go index d8780ee26e..73b8c06b9d 100644 --- a/pkg/bacerrors/error_test.go +++ b/pkg/bacerrors/error_test.go @@ -81,6 +81,79 @@ func (suite *ErrorTestSuite) TestErrorWithDetails() { suite.False(err.Retryable()) suite.False(err.FailsExecution()) suite.Equal(details, err.Details()) + + // Test appending details + additionalDetails := map[string]string{"key3": "value3", "key2": "newvalue2"} + err = err.WithDetails(additionalDetails) + + expectedDetails := map[string]string{"key1": "value1", "key2": "newvalue2", "key3": "value3"} + suite.Equal(expectedDetails, err.Details()) +} + +func (suite *ErrorTestSuite) TestErrorWithDetail() { + message := "TestMessage" + err := New(message).WithDetail("key1", "value1") + + suite.Equal(message, err.Error()) + suite.Empty(err.Hint()) + suite.False(err.Retryable()) + suite.False(err.FailsExecution()) + suite.Equal(map[string]string{"key1": "value1"}, err.Details()) + + // Test adding another detail + err = err.WithDetail("key2", "value2") + expectedDetails := map[string]string{"key1": "value1", "key2": "value2"} + suite.Equal(expectedDetails, err.Details()) + + // Test overwriting an existing detail + err = err.WithDetail("key1", "newvalue1") + expectedDetails = map[string]string{"key1": "newvalue1", "key2": "value2"} + suite.Equal(expectedDetails, err.Details()) +} + +func (suite *ErrorTestSuite) TestErrorWithCode() { + message := "TestMessage" + err := New(message).WithCode(BadRequestError) + + suite.Equal(message, err.Error()) + suite.Equal(BadRequestError, err.Code()) + suite.Equal(400, err.HTTPStatusCode()) // BadRequestError should map to 400 +} + +func (suite *ErrorTestSuite) TestErrorWithHTTPStatusCode() { + message := "TestMessage" + err := New(message).WithHTTPStatusCode(418) // I'm a teapot + + suite.Equal(message, err.Error()) + suite.Equal(418, err.HTTPStatusCode()) +} + +func (suite *ErrorTestSuite) TestErrorWithComponent() { + message := "TestMessage" + err := New(message).WithComponent("TestComponent") + + suite.Equal(message, err.Error()) + suite.Equal("TestComponent", err.Component()) +} + +func (suite *ErrorTestSuite) TestErrorChaining() { + err := New("TestMessage"). + WithHint("TestHint"). + WithRetryable(). + WithFailsExecution(). + WithDetail("key1", "value1"). + WithCode(NotFoundError). + WithHTTPStatusCode(404). + WithComponent("TestComponent") + + suite.Equal("TestMessage", err.Error()) + suite.Equal("TestHint", err.Hint()) + suite.True(err.Retryable()) + suite.True(err.FailsExecution()) + suite.Equal(map[string]string{"key1": "value1"}, err.Details()) + suite.Equal(NotFoundError, err.Code()) + suite.Equal(404, err.HTTPStatusCode()) + suite.Equal("TestComponent", err.Component()) } func (suite *ErrorTestSuite) TestWrapNonBacerror() { diff --git a/pkg/compute/bidder.go b/pkg/compute/bidder.go index 0d4cfe138a..5dd5835ecb 100644 --- a/pkg/compute/bidder.go +++ b/pkg/compute/bidder.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog/log" + "github.com/bacalhau-project/bacalhau/pkg/bacerrors" "github.com/bacalhau-project/bacalhau/pkg/compute/capacity" "github.com/bacalhau-project/bacalhau/pkg/logger" "github.com/bacalhau-project/bacalhau/pkg/models" @@ -314,8 +315,7 @@ func (b Bidder) runResourceBidding( // calculate resource usage of the job, failure here represents a compute failure. resourceUsage, err := b.usageCalculator.Calculate(ctx, *job, *resources) if err != nil { - log.Ctx(ctx).Error().Err(err).Msg("Error calculating resource requirements for job") - return nil, fmt.Errorf("calculating resource usage of job: %w", err) + return nil, bacerrors.Wrap(err, "calculating resource usage of job") } // ask the bidding strategy if we should bid on this job diff --git a/pkg/compute/capacity/disk/calculator.go b/pkg/compute/capacity/disk/calculator.go index f613cdabb4..70afa3ce22 100644 --- a/pkg/compute/capacity/disk/calculator.go +++ b/pkg/compute/capacity/disk/calculator.go @@ -2,8 +2,8 @@ package disk import ( "context" - "fmt" + "github.com/bacalhau-project/bacalhau/pkg/bacerrors" "github.com/bacalhau-project/bacalhau/pkg/compute/capacity" "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/storage" @@ -34,7 +34,7 @@ func (c *DiskUsageCalculator) Calculate(ctx context.Context, job models.Job, par } volumeSize, err := strg.GetVolumeSize(ctx, *input) if err != nil { - return nil, fmt.Errorf("error getting job disk space requirements: %w", err) + return nil, bacerrors.Wrap(err, "error getting job disk space requirements") } totalDiskRequirements += volumeSize } diff --git a/pkg/compute/executor.go b/pkg/compute/executor.go index 9fd7788db4..1f2f2cdfa6 100644 --- a/pkg/compute/executor.go +++ b/pkg/compute/executor.go @@ -435,9 +435,8 @@ func (e *BaseExecutor) publish(ctx context.Context, localExecutionState store.Lo } publishedResult, err := jobPublisher.PublishResult(ctx, execution, resultFolder) if err != nil { - return nil, fmt.Errorf("failed to publish result: %w", err) + return nil, bacerrors.Wrap(err, "failed to publish result") } - log.Ctx(ctx).Debug(). Str("execution", execution.ID). Msg("Execution published") diff --git a/pkg/docker/docker.go b/pkg/docker/docker.go index af3de4b3d9..45d6b6d20c 100644 --- a/pkg/docker/docker.go +++ b/pkg/docker/docker.go @@ -36,50 +36,6 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/util/closer" ) -type ImageUnavailableError struct { - Verb string - Image string - Creds config_legacy.DockerCredentials - Err error -} - -func (die ImageUnavailableError) Error() string { - return pkgerrors.Wrapf(die.Err, - "Could not %s image %q - could be due to repo/image not existing, "+ - "or registry needing authorization", - die.Verb, - die.Image, - ).Error() -} - -func (die ImageUnavailableError) Hint() string { - if !die.Creds.IsValid() { - return "If the image is private, supply the node with valid Docker login credentials " + - "using the " + config_legacy.DockerUsernameEnvVar + " and " + config_legacy.DockerPasswordEnvVar + - " environment variables" - } - - return "" -} - -func NewImageInspectError(image string, creds config_legacy.DockerCredentials, err error) error { - return ImageUnavailableError{ - Verb: "inspect", - Image: image, - Creds: creds, - Err: err, - } -} - -func NewImagePullError(image string, creds config_legacy.DockerCredentials, err error) error { - return ImageUnavailableError{ - Verb: "pull", - Image: image, - Creds: creds, - Err: err, - } -} - type Client struct { tracing.TracedClient } @@ -105,7 +61,7 @@ func (c *Client) HostGatewayIP(ctx context.Context) (net.IP, error) { return net.IP{}, NewDockerError(err) } if configs := response.IPAM.Config; len(configs) < 1 { - return net.IP{}, NewCustomDockerError(DockerBridgeNetworkUnattached, "bridge network unattached") + return net.IP{}, NewCustomDockerError(BridgeNetworkUnattached, "bridge network unattached") } else { return net.ParseIP(configs[0].Gateway), nil } @@ -166,7 +122,7 @@ func (c *Client) FindContainer(ctx context.Context, label string, value string) } } - return "", NewCustomDockerError(DockerContainerNotFound, fmt.Sprintf("unable to find container for %s=%s", label, value)) + return "", NewCustomDockerError(ContainerNotFound, fmt.Sprintf("unable to find container for %s=%s", label, value)) } func (c *Client) FollowLogs(ctx context.Context, id string) (stdout, stderr io.Reader, err error) { @@ -214,7 +170,7 @@ func (c *Client) GetOutputStream(ctx context.Context, id string, since string, f } if !cont.State.Running { - return nil, NewCustomDockerError(DockerContainerNotRunning, "cannot get logs when container is not running") + return nil, NewCustomDockerError(ContainerNotRunning, "cannot get logs when container is not running") } logOptions := container.LogsOptions{ @@ -268,7 +224,7 @@ func (c *Client) ImagePlatforms(ctx context.Context, image string, dockerCreds c distribution, err := c.DistributionInspect(ctx, image, authToken) if err != nil { - return nil, NewImageInspectError(image, dockerCreds, err) + return nil, NewDockerImageError(err, image) } return distribution.Platforms, nil @@ -363,7 +319,7 @@ func (c *Client) ImageDistribution( digestParts := strings.Split(repos[0], "@") digest, err := digest.Parse(digestParts[1]) if err != nil { - return nil, NewCustomDockerError(DockerImageDigestMismatch, "image digest mismatch") + return nil, NewCustomDockerError(ImageDigestMismatch, "image digest mismatch") } return &ImageManifest{ @@ -382,7 +338,7 @@ func (c *Client) ImageDistribution( authToken := getAuthToken(ctx, image, creds) dist, err := c.DistributionInspect(ctx, image, authToken) if err != nil { - return nil, NewImageInspectError(image, creds, err) + return nil, NewDockerImageError(err, image) } obj := dist.Descriptor.Digest diff --git a/pkg/docker/errors.go b/pkg/docker/errors.go index a4563c6dfb..3b5e286bfc 100644 --- a/pkg/docker/errors.go +++ b/pkg/docker/errors.go @@ -1,114 +1,154 @@ package docker import ( + "fmt" "net/http" "strings" "github.com/docker/docker/errdefs" "github.com/bacalhau-project/bacalhau/pkg/bacerrors" + "github.com/bacalhau-project/bacalhau/pkg/config_legacy" ) -const DockerComponent = "Docker" +const Component = "Docker" // Docker-specific error codes const ( - DockerContainerNotFound = "DockerContainerNotFound" - DockerImageNotFound = "DockerImageNotFound" - DockerNetworkNotFound = "DockerNetworkNotFound" - DockerVolumeNotFound = "DockerVolumeNotFound" - DockerConflict = "DockerConflict" - DockerUnauthorized = "DockerUnauthorized" - DockerForbidden = "DockerForbidden" - DockerDataLoss = "DockerDataLoss" - DockerDeadline = "DockerDeadline" - DockerCancelled = "DockerCancelled" - DockerUnavailable = "DockerUnavailable" - DockerSystemError = "DockerSystemError" - DockerNotImplemented = "DockerNotImplemented" - DockerUnknownError = "DockerUnknownError" + ContainerNotFound = "ContainerNotFound" + ImageNotFound = "ImageNotFound" + ImageInvalid = "ImageInvalid" + NotFound = "NotFound" + Conflict = "Conflict" + Unauthorized = "Unauthorized" + Forbidden = "Forbidden" + DataLoss = "DataLoss" + Deadline = "Deadline" + Cancelled = "Cancelled" + Unavailable = "Unavailable" + SystemError = "SystemError" + NotImplemented = "NotImplemented" + UnknownError = "UnknownError" ) // Custom Docker error codes const ( - DockerBridgeNetworkUnattached = "DockerBridgeNetworkUnattached" - DockerContainerNotRunning = "DockerContainerNotRunning" - DockerImageDigestMismatch = "DockerImageDigestMismatch" + BridgeNetworkUnattached = "BridgeNetworkUnattached" + ContainerNotRunning = "ContainerNotRunning" + ImageDigestMismatch = "ImageDigestMismatch" ) -func NewDockerError(err error) bacerrors.Error { +func NewDockerError(err error) (bacErr bacerrors.Error) { + defer func() { + if bacErr != nil { + bacErr = bacErr.WithComponent(Component) + } + }() switch { case errdefs.IsNotFound(err): return handleNotFoundError(err) case errdefs.IsConflict(err): return bacerrors.New(err.Error()). - WithCode(DockerConflict). - WithHTTPStatusCode(http.StatusConflict). - WithComponent(DockerComponent) + WithCode(Conflict). + WithHTTPStatusCode(http.StatusConflict) case errdefs.IsUnauthorized(err): return bacerrors.New(err.Error()). - WithCode(DockerUnauthorized). + WithCode(Unauthorized). WithHTTPStatusCode(http.StatusUnauthorized). - WithComponent(DockerComponent) + WithHint("Ensure you have the necessary permissions and that your credentials are correct. " + + "You may need to log in to Docker again.") case errdefs.IsForbidden(err): return bacerrors.New(err.Error()). - WithCode(DockerForbidden). + WithCode(Forbidden). WithHTTPStatusCode(http.StatusForbidden). - WithComponent(DockerComponent) + WithHint(fmt.Sprintf("You don't have permission to perform this action. "+ + "Supply the node with valid Docker login credentials using the %s and %s environment variables", + config_legacy.DockerUsernameEnvVar, config_legacy.DockerPasswordEnvVar)) case errdefs.IsDataLoss(err): return bacerrors.New(err.Error()). - WithCode(DockerDataLoss). + WithCode(DataLoss). WithHTTPStatusCode(http.StatusInternalServerError). - WithComponent(DockerComponent) + WithFailsExecution() case errdefs.IsDeadline(err): return bacerrors.New(err.Error()). - WithCode(DockerDeadline). + WithCode(Deadline). WithHTTPStatusCode(http.StatusGatewayTimeout). - WithComponent(DockerComponent) + WithHint("The operation timed out. This could be due to network issues or high system load. " + + "Try again later or check your network connection."). + WithRetryable() case errdefs.IsCancelled(err): return bacerrors.New(err.Error()). - WithCode(DockerCancelled). + WithCode(Cancelled). WithHTTPStatusCode(http.StatusRequestTimeout). - WithComponent(DockerComponent) + WithHint("The operation was cancelled. " + + "This is often due to user intervention or a competing operation.") case errdefs.IsUnavailable(err): return bacerrors.New(err.Error()). - WithCode(DockerUnavailable). + WithCode(Unavailable). WithHTTPStatusCode(http.StatusServiceUnavailable). - WithComponent(DockerComponent) + WithHint("The Docker daemon or a required service is unavailable. " + + "Check if the Docker daemon is running and healthy."). + WithRetryable() case errdefs.IsSystem(err): return bacerrors.New(err.Error()). - WithCode(DockerSystemError). + WithCode(SystemError). WithHTTPStatusCode(http.StatusInternalServerError). - WithComponent(DockerComponent) + WithHint("An internal system error occurred. This could be due to resource constraints. " + + "Check system resources and Docker logs for more information."). + WithFailsExecution() case errdefs.IsNotImplemented(err): return bacerrors.New(err.Error()). - WithCode(DockerNotImplemented). + WithCode(NotImplemented). WithHTTPStatusCode(http.StatusNotImplemented). - WithComponent(DockerComponent) + WithHint("This feature is not implemented in your version of Docker. " + + "Check Docker documentation for feature availability and consider upgrading if necessary.") default: return bacerrors.New(err.Error()). - WithCode(DockerUnknownError). - WithHTTPStatusCode(http.StatusInternalServerError). - WithComponent(DockerComponent) + WithCode(UnknownError). + WithHTTPStatusCode(http.StatusInternalServerError) + } +} + +func NewDockerImageError(err error, image string) (bacErr bacerrors.Error) { + defer func() { + if bacErr != nil { + bacErr = bacErr. + WithComponent(Component). + WithDetail("Image", image) + } + }() + + switch { + case errdefs.IsNotFound(err) || errdefs.IsForbidden(err): + return bacerrors.New("image not available: %q", image). + WithHint(fmt.Sprintf(`To resolve this, either: +1. Check if the image exists in the registry and the name is correct +2. If the image is private, supply the node with valid Docker login credentials using the %s and %s environment variables`, + config_legacy.DockerUsernameEnvVar, config_legacy.DockerPasswordEnvVar)). + WithCode(ImageNotFound) + case errdefs.IsInvalidParameter(err): + return bacerrors.New("invalid image format: %q", image). + WithHint("Ensure the image name is valid and the image is available in the registry"). + WithCode(ImageInvalid) + default: + return NewDockerError(err) } } func NewCustomDockerError(code bacerrors.ErrorCode, message string) bacerrors.Error { return bacerrors.New(message). WithCode(code). - WithComponent(DockerComponent) + WithComponent(Component) } func handleNotFoundError(err error) bacerrors.Error { errorLower := strings.ToLower(err.Error()) if strings.Contains(errorLower, "no such container") { return bacerrors.New(err.Error()). - WithCode(DockerContainerNotFound). - WithHTTPStatusCode(http.StatusNotFound). - WithComponent(DockerComponent) + WithCode(ContainerNotFound). + WithHTTPStatusCode(http.StatusNotFound) } return bacerrors.New(err.Error()). - WithCode(DockerUnknownError). - WithHTTPStatusCode(http.StatusNotFound). - WithComponent(DockerComponent) + WithCode(NotFound). + WithHTTPStatusCode(http.StatusNotFound) } diff --git a/pkg/executor/docker/executor.go b/pkg/executor/docker/executor.go index 0dbedcf657..434932d8ad 100644 --- a/pkg/executor/docker/executor.go +++ b/pkg/executor/docker/executor.go @@ -365,7 +365,7 @@ func (e *Executor) newDockerJobContainer(ctx context.Context, params *dockerJobC if _, set := os.LookupEnv("SKIP_IMAGE_PULL"); !set { dockerCreds := config_legacy.GetDockerCredentials() if pullErr := e.client.PullImage(ctx, dockerArgs.Image, dockerCreds); pullErr != nil { - return container.CreateResponse{}, docker.NewImagePullError(dockerArgs.Image, dockerCreds, pullErr) + return container.CreateResponse{}, docker.NewDockerImageError(err, dockerArgs.Image) } } log.Ctx(ctx).Trace().Msgf("Container: %+v %+v", containerConfig, mounts) diff --git a/pkg/jobstore/boltdb/store.go b/pkg/jobstore/boltdb/store.go index 2a55903313..4d5865327e 100644 --- a/pkg/jobstore/boltdb/store.go +++ b/pkg/jobstore/boltdb/store.go @@ -1031,8 +1031,10 @@ func (b *BoltJobStore) updateJobState(tx *bolt.Tx, request jobstore.UpdateJobSta } if job.IsTerminal() { - // TODO to include execution telemetry - analytics.EmitEvent(context.TODO(), analytics.NewJobTerminalEvent(job)) + tx.OnCommit(func() { + // TODO to include execution telemetry + analytics.EmitEvent(context.TODO(), analytics.NewJobTerminalEvent(job)) + }) // Remove the job from the in progress index, first checking for legacy items // and then removing the composite. Once we are confident no legacy items // are left in the old index we can stick to just the composite @@ -1160,7 +1162,9 @@ func (b *BoltJobStore) createExecution(tx *bolt.Tx, execution models.Execution) } } - analytics.EmitEvent(context.TODO(), analytics.NewCreatedExecutionEvent(execution)) + tx.OnCommit(func() { + analytics.EmitEvent(context.TODO(), analytics.NewCreatedExecutionEvent(execution)) + }) return nil } @@ -1222,12 +1226,14 @@ func (b *BoltJobStore) updateExecution(tx *bolt.Tx, request jobstore.UpdateExecu } } - if newExecution.IsTerminalState() { - analytics.EmitEvent(context.TODO(), analytics.NewTerminalExecutionEvent(newExecution)) - } - if newExecution.IsDiscarded() { - analytics.EmitEvent(context.TODO(), analytics.NewComputeMessageExecutionEvent(newExecution.JobID, newExecution.ID, newExecution.ComputeState.Message)) - } + tx.OnCommit(func() { + if newExecution.IsTerminalState() { + analytics.EmitEvent(context.TODO(), analytics.NewTerminalExecutionEvent(newExecution)) + } + if newExecution.IsDiscarded() { + analytics.EmitEvent(context.TODO(), analytics.NewComputeMessageExecutionEvent(newExecution)) + } + }) return nil } diff --git a/pkg/models/event.go b/pkg/models/event.go index cb99185fb7..c4b6946e76 100644 --- a/pkg/models/event.go +++ b/pkg/models/event.go @@ -1,8 +1,12 @@ package models import ( + "errors" + "fmt" "maps" "time" + + "github.com/bacalhau-project/bacalhau/pkg/bacerrors" ) const ( @@ -11,6 +15,7 @@ const ( DetailsKeyRetryable = "Retryable" DetailsKeyFailsExecution = "FailsExecution" DetailsKeyNewState = "NewState" + DetailsKeyErrorCode = "ErrorCode" ) type HasHint interface { @@ -125,6 +130,14 @@ func (e *Event) WithFailsExecution(failsExecution bool) *Event { return e } +// WithErrorCode returns a new Event with the given error code. +func (e *Event) WithErrorCode(errorCode string) *Event { + if errorCode != "" { + return e.WithDetail(DetailsKeyErrorCode, errorCode) + } + return e +} + // WithDetails returns a new Event with the given details and topic. func (e *Event) WithDetails(details map[string]string) *Event { maps.Copy(e.Details, details) @@ -176,18 +189,28 @@ func (e *Event) GetJobStateIfPresent() (JobStateType, error) { func EventFromError(topic EventTopic, err error) Event { event := NewEvent(topic).WithError(err) - if hasDetails, ok := err.(HasDetails); ok { - event = event.WithDetails(hasDetails.Details()) + // if error is bacerrors + var bacErr bacerrors.Error + if errors.As(err, &bacErr) { + event = event. + WithDetails(bacErr.Details()). + WithHint(bacErr.Hint()). + WithRetryable(bacErr.Retryable()). + WithFailsExecution(bacErr.FailsExecution()). + WithErrorCode(fmt.Sprintf("%s:%s", bacErr.Component(), bacErr.Code())) + } else { + if hasDetails, ok := err.(HasDetails); ok { + event = event.WithDetails(hasDetails.Details()) + } + if hasHint, ok := err.(HasHint); ok { + event = event.WithHint(hasHint.Hint()) + } + if hasRetryable, ok := err.(HasRetryable); ok { + event = event.WithRetryable(hasRetryable.Retryable()) + } + if hasFailsExecution, ok := err.(HasFailsExecution); ok { + event = event.WithFailsExecution(hasFailsExecution.FailsExecution()) + } } - if hasHint, ok := err.(HasHint); ok { - event = event.WithHint(hasHint.Hint()) - } - if hasRetryable, ok := err.(HasRetryable); ok { - event = event.WithRetryable(hasRetryable.Retryable()) - } - if hasFailsExecution, ok := err.(HasFailsExecution); ok { - event = event.WithFailsExecution(hasFailsExecution.FailsExecution()) - } - return *event } diff --git a/pkg/models/event_test.go b/pkg/models/event_test.go index 0da79ee5d5..7d739f2de7 100644 --- a/pkg/models/event_test.go +++ b/pkg/models/event_test.go @@ -106,7 +106,8 @@ func (suite *EventTestSuite) TestEventFromErrorNoDetails() { suite.Equal(errMessage, event.Message) suite.Equal(suite.topic, event.Topic) suite.Equal("true", event.Details[models.DetailsKeyIsError]) - suite.Len(event.Details, 1) + suite.Contains(event.Details, models.DetailsKeyErrorCode) + suite.Len(event.Details, 2) } func (suite *EventTestSuite) TestEventFromSimpleError() { diff --git a/pkg/models/state.go b/pkg/models/state.go index e0c8c5bd41..27f878a19e 100644 --- a/pkg/models/state.go +++ b/pkg/models/state.go @@ -8,6 +8,9 @@ type State[T any] struct { // Message is a human readable message describing the state. Message string `json:"Message,omitempty"` + + // Details is a map of additional details about the state. + Details map[string]string `json:"Details,omitempty"` } // WithMessage returns a new State with the specified message. @@ -16,6 +19,26 @@ func (s State[T]) WithMessage(message string) State[T] { return s } +// WithDetails returns a new State with the specified details. +func (s State[T]) WithDetails(details map[string]string) State[T] { + if s.Details == nil { + s.Details = make(map[string]string) + } + for k, v := range details { + s.Details[k] = v + } + return s +} + +// WithDetail returns a new State with the specified detail. +func (s State[T]) WithDetail(key, value string) State[T] { + if s.Details == nil { + s.Details = make(map[string]string) + } + s.Details[key] = value + return s +} + // NewJobState returns a new JobState with the specified state type func NewJobState(stateType JobStateType) State[JobStateType] { return State[JobStateType]{ diff --git a/pkg/orchestrator/callback.go b/pkg/orchestrator/callback.go index 5b55d078cc..4550bf8369 100644 --- a/pkg/orchestrator/callback.go +++ b/pkg/orchestrator/callback.go @@ -52,7 +52,9 @@ func (e *Callback) OnBidComplete(ctx context.Context, response compute.BidResult }, }, NewValues: models.Execution{ - ComputeState: models.NewExecutionState(models.ExecutionStateAskForBidAccepted).WithMessage(response.Event.Message), + ComputeState: models.NewExecutionState(models.ExecutionStateAskForBidAccepted). + WithMessage(response.Event.Message). + WithDetails(response.Event.Details), }, } @@ -199,7 +201,9 @@ func (e *Callback) OnComputeFailure(ctx context.Context, result compute.ComputeE }, }, NewValues: models.Execution{ - ComputeState: models.NewExecutionState(models.ExecutionStateFailed).WithMessage(result.Error()), + ComputeState: models.NewExecutionState(models.ExecutionStateFailed). + WithMessage(result.Error()). + WithDetails(result.Event.Details), DesiredState: models.NewExecutionDesiredState(models.ExecutionDesiredStateStopped).WithMessage("execution failed"), }, }); err != nil { diff --git a/pkg/orchestrator/planner/state_updater.go b/pkg/orchestrator/planner/state_updater.go index c81346e2d7..c4338316be 100644 --- a/pkg/orchestrator/planner/state_updater.go +++ b/pkg/orchestrator/planner/state_updater.go @@ -63,6 +63,7 @@ func (s *StateUpdater) Process(ctx context.Context, plan *models.Plan) error { DesiredState: models.State[models.ExecutionDesiredStateType]{ StateType: u.DesiredState, Message: u.Event.Message, + Details: u.Event.Details, }, }, Condition: jobstore.UpdateExecutionCondition{ diff --git a/pkg/publisher/s3/publisher.go b/pkg/publisher/s3/publisher.go index c0d60eff54..1e5d018e4a 100644 --- a/pkg/publisher/s3/publisher.go +++ b/pkg/publisher/s3/publisher.go @@ -94,7 +94,7 @@ func (publisher *Publisher) PublishResult( // Upload the GZIP archive to S3. res, err := client.Uploader.Upload(ctx, putObjectInput) if err != nil { - return models.SpecConfig{}, err + return models.SpecConfig{}, s3helper.NewS3PublisherServiceError(err) } log.Debug().Msgf("Uploaded s3://%s/%s", spec.Bucket, aws.ToString(res.Key)) diff --git a/pkg/s3/errors.go b/pkg/s3/errors.go index 7ae6249549..cbeaee1b04 100644 --- a/pkg/s3/errors.go +++ b/pkg/s3/errors.go @@ -1,31 +1,126 @@ package s3 import ( + "errors" + "regexp" + "strings" + + awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" + "github.com/aws/smithy-go" + pkgerrors "github.com/pkg/errors" + "github.com/bacalhau-project/bacalhau/pkg/bacerrors" + "github.com/bacalhau-project/bacalhau/pkg/models" ) -const S3_PUBLISHER = "S3Publisher" -const S3_INPUT_SPEC = "S3InputSpec" -const S3_DOWNLOADER = "S3Downloader" +const PublisherComponent = "S3Publisher" +const InputSourceComponent = "S3InputSource" +const DownloadComponent = "S3Downloader" +const ResultSignerComponent = "S3ResultSigner" const ( - S3BadRequest = "S3BadRequest" + BadRequestErrorCode = "S3BadRequest" ) func NewS3PublisherError(code bacerrors.ErrorCode, message string) bacerrors.Error { return bacerrors.New(message). WithCode(code). - WithComponent(S3_PUBLISHER) + WithComponent(PublisherComponent) } -func NewS3InputSpecError(code bacerrors.ErrorCode, message string) bacerrors.Error { +func NewS3InputSourceError(code bacerrors.ErrorCode, message string) bacerrors.Error { return bacerrors.New(message). WithCode(code). - WithComponent(S3_INPUT_SPEC) + WithComponent(InputSourceComponent) } func NewS3DownloaderError(code bacerrors.ErrorCode, message string) bacerrors.Error { return bacerrors.New(message). WithCode(code). - WithComponent(S3_DOWNLOADER) + WithComponent(DownloadComponent) +} + +func NewS3PublisherServiceError(err error) bacerrors.Error { + return newS3ServiceError(pkgerrors.Wrap(err, "failed to publish s3 result"), PublisherComponent) +} + +func NewS3InputSourceServiceError(err error) bacerrors.Error { + return newS3ServiceError(pkgerrors.Wrap(err, "failed to fetch s3 input"), InputSourceComponent) +} + +func NewS3ResultSignerServiceError(err error) bacerrors.Error { + return newS3ServiceError(pkgerrors.Wrap(err, "failed to fetch s3 result"), ResultSignerComponent) +} + +func newS3ServiceError(err error, component string) bacerrors.Error { + errMetadata := extractErrorMetadata(err) + return bacerrors.New(errMetadata.message). + WithComponent(component). + WithCode(errMetadata.errorCode). + WithHTTPStatusCode(errMetadata.statusCode). + WithDetails(errMetadata.toDetails()) +} + +type errorMetadata struct { + service string + errorCode bacerrors.ErrorCode + statusCode int + requestID string + operation string + message string +} + +// toDetails converts the error metadata to a map of details. +func (m errorMetadata) toDetails() map[string]string { + details := map[string]string{ + models.DetailsKeyErrorCode: string(m.errorCode), + "Service": m.service, + } + if m.requestID != "" { + details["AWSRequestID"] = m.requestID + } + if m.operation != "" { + details["Operation"] = m.operation + } + return details +} + +// extractErrorMetadata extracts the error code and message from the error. +// It trie +func extractErrorMetadata(err error) errorMetadata { + metadata := errorMetadata{ + service: "S3", + errorCode: bacerrors.UnknownError, + message: err.Error(), + } + + // Parse the error message and remove the HostID if present as it is noisy. + errMsg := err.Error() + // Regular expression to match and remove the HostID + re := regexp.MustCompile(`, HostID: [^,]+`) + cleanedErrMsg := re.ReplaceAllString(errMsg, "") + // Remove any double commas that might result from removing HostID + cleanedErrMsg = strings.ReplaceAll(cleanedErrMsg, ",,", ",") + // Trim any leading or trailing whitespace and commas + cleanedErrMsg = strings.Trim(cleanedErrMsg, " ,") + metadata.message = cleanedErrMsg + + var opErr *smithy.OperationError + if errors.As(err, &opErr) { + metadata.operation = opErr.Operation() + metadata.service = opErr.Service() + + var respError *awshttp.ResponseError + if errors.As(opErr.Err, &respError) { + metadata.statusCode = respError.HTTPStatusCode() + metadata.requestID = respError.ServiceRequestID() + } + + var apiErr smithy.APIError + if errors.As(opErr.Err, &apiErr) { + metadata.errorCode = bacerrors.Code(apiErr.ErrorCode()) + } + } + + return metadata } diff --git a/pkg/s3/errors_test.go b/pkg/s3/errors_test.go new file mode 100644 index 0000000000..3e72863698 --- /dev/null +++ b/pkg/s3/errors_test.go @@ -0,0 +1,180 @@ +//go:build unit || !integration + +package s3 + +import ( + "errors" + "fmt" + "net/http" + "testing" + + awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" + "github.com/aws/smithy-go" + smithyhttp "github.com/aws/smithy-go/transport/http" + pkgerrors "github.com/pkg/errors" + + "github.com/stretchr/testify/suite" + + "github.com/bacalhau-project/bacalhau/pkg/bacerrors" + "github.com/bacalhau-project/bacalhau/pkg/models" +) + +type S3ErrorTestSuite struct { + suite.Suite +} + +func TestS3ErrorTestSuite(t *testing.T) { + suite.Run(t, new(S3ErrorTestSuite)) +} + +func (suite *S3ErrorTestSuite) TestExtractErrorMetadata() { + tests := []struct { + name string + err error + expected errorMetadata + }{ + { + name: "operation error with HostID", + err: pkgerrors.Wrap(&smithy.OperationError{ + ServiceID: "S3", + OperationName: "ListObjectsV2", + Err: &awshttp.ResponseError{ + ResponseError: &smithyhttp.ResponseError{ + Response: &smithyhttp.Response{ + Response: &http.Response{ + StatusCode: 404, + }, + }, + Err: fmt.Errorf("HostID: lSs3QvEo/W6rdJsL4Y6iw2t8upy5V0uRzByCgjRQ 84yRBkbcXPU2iC3HVclnx1v811K0h2a9WA=, api error NoSuchBucket: The specified bucket does not exist"), + }, + RequestID: "YRSWGFERH6VN7DK0", + }, + }, "failed to publish s3 results"), + expected: errorMetadata{ + service: "S3", + operation: "ListObjectsV2", + statusCode: 404, + requestID: "YRSWGFERH6VN7DK0", + errorCode: bacerrors.UnknownError, + message: "failed to publish s3 results: operation error S3: ListObjectsV2, https response error StatusCode: 404, RequestID: YRSWGFERH6VN7DK0, api error NoSuchBucket: The specified bucket does not exist", + }, + }, + { + name: "Simple error", + err: errors.New("simple error"), + expected: errorMetadata{ + service: "S3", + errorCode: bacerrors.UnknownError, + message: "simple error", + }, + }, + { + name: "Error with HostID", + err: errors.New("operation error S3: GetObject, https response error StatusCode: 403, RequestID: ABC123, HostID: XYZ789, ForbiddenError: Access Denied"), + expected: errorMetadata{ + service: "S3", + errorCode: bacerrors.UnknownError, + message: "operation error S3: GetObject, https response error StatusCode: 403, RequestID: ABC123, ForbiddenError: Access Denied", + }, + }, + } + + for _, tt := range tests { + suite.Run(tt.name, func() { + result := extractErrorMetadata(tt.err) + suite.Equal(tt.expected, result) + }) + } +} + +func (suite *S3ErrorTestSuite) TestNewS3PublisherError() { + err := NewS3PublisherError(bacerrors.BadRequestError, "test message") + suite.Equal(bacerrors.BadRequestError, err.Code()) + suite.Equal(PublisherComponent, err.Component()) + suite.Equal("test message", err.Error()) +} + +func (suite *S3ErrorTestSuite) TestNewS3InputSpecError() { + err := NewS3InputSourceError(bacerrors.BadRequestError, "test message") + suite.Equal(bacerrors.BadRequestError, err.Code()) + suite.Equal(InputSourceComponent, err.Component()) + suite.Equal("test message", err.Error()) +} + +func (suite *S3ErrorTestSuite) TestNewS3DownloaderError() { + err := NewS3DownloaderError(bacerrors.BadRequestError, "test message") + suite.Equal(bacerrors.BadRequestError, err.Code()) + suite.Equal(DownloadComponent, err.Component()) + suite.Equal("test message", err.Error()) +} + +func (suite *S3ErrorTestSuite) TestNewS3PublisherServiceError() { + originalErr := errors.New("test error") + err := NewS3PublisherServiceError(originalErr) + suite.Equal(PublisherComponent, err.Component()) + suite.Contains(err.Error(), "failed to publish s3 result") + suite.Contains(err.Error(), "test error") +} + +func (suite *S3ErrorTestSuite) TestNewS3InputSourceServiceError() { + originalErr := errors.New("test error") + err := NewS3InputSourceServiceError(originalErr) + suite.Equal(InputSourceComponent, err.Component()) + suite.Contains(err.Error(), "failed to fetch s3 input") + suite.Contains(err.Error(), "test error") +} + +func (suite *S3ErrorTestSuite) TestNewS3ResultSignerServiceError() { + originalErr := errors.New("test error") + err := NewS3ResultSignerServiceError(originalErr) + suite.Equal(ResultSignerComponent, err.Component()) + suite.Contains(err.Error(), "failed to fetch s3 result") + suite.Contains(err.Error(), "test error") +} + +func (suite *S3ErrorTestSuite) TestErrorMetadataToDetails() { + metadata := errorMetadata{ + service: "S3", + errorCode: bacerrors.BadRequestError, + statusCode: 400, + requestID: "TEST123", + operation: "GetObject", + message: "Bad Request", + } + + details := metadata.toDetails() + + suite.Equal("S3", details["Service"]) + suite.Equal(string(bacerrors.BadRequestError), details[models.DetailsKeyErrorCode]) + suite.Equal("TEST123", details["AWSRequestID"]) + suite.Equal("GetObject", details["Operation"]) +} + +func (suite *S3ErrorTestSuite) TestNewS3ServiceError() { + originalErr := &smithy.OperationError{ + ServiceID: "S3", + OperationName: "GetObject", + Err: &awshttp.ResponseError{ + ResponseError: &smithyhttp.ResponseError{ + Response: &smithyhttp.Response{ + Response: &http.Response{ + StatusCode: 403, + }, + }, + Err: fmt.Errorf("ForbiddenError: Access Denied"), + }, + RequestID: "TEST123", + }, + } + + err := newS3ServiceError(originalErr, PublisherComponent) + + suite.Equal(PublisherComponent, err.Component()) + suite.Equal(403, err.HTTPStatusCode()) + suite.Contains(err.Error(), "ForbiddenError: Access Denied") + + details := err.Details() + suite.Equal("S3", details["Service"]) + suite.Equal("TEST123", details["AWSRequestID"]) + suite.Equal("GetObject", details["Operation"]) +} diff --git a/pkg/s3/result_signer.go b/pkg/s3/result_signer.go index 24c4e8be2d..1646ca0308 100644 --- a/pkg/s3/result_signer.go +++ b/pkg/s3/result_signer.go @@ -6,8 +6,9 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/rs/zerolog/log" + + "github.com/bacalhau-project/bacalhau/pkg/models" ) type ResultSignerParams struct { @@ -56,7 +57,7 @@ func (signer *ResultSigner) Transform(ctx context.Context, spec *models.SpecConf resp, err := client.PresignClient().PresignGetObject(ctx, request, s3.WithPresignExpires(signer.expiration)) if err != nil { - return err + return NewS3ResultSignerServiceError(err) } spec.Type = models.StorageSourceS3PreSigned spec.Params = PreSignedResultSpec{ diff --git a/pkg/s3/types.go b/pkg/s3/types.go index 00dc7bfa88..5c38896d5a 100644 --- a/pkg/s3/types.go +++ b/pkg/s3/types.go @@ -21,7 +21,7 @@ type SourceSpec struct { func (c SourceSpec) Validate() error { if c.Bucket == "" { - return NewS3InputSpecError(S3BadRequest, "invalid s3 storage params: bucket cannot be empty") + return NewS3InputSourceError(BadRequestErrorCode, "invalid s3 storage params: bucket cannot be empty") } return nil } @@ -37,7 +37,7 @@ type PreSignedResultSpec struct { func (c PreSignedResultSpec) Validate() error { if c.PreSignedURL == "" { - return NewS3DownloaderError(S3BadRequest, "invalid s3 signed storage params: signed url cannot be empty") + return NewS3DownloaderError(BadRequestErrorCode, "invalid s3 signed storage params: signed url cannot be empty") } return c.SourceSpec.Validate() } @@ -48,11 +48,11 @@ func (c PreSignedResultSpec) ToMap() map[string]interface{} { func DecodeSourceSpec(spec *models.SpecConfig) (SourceSpec, error) { if !spec.IsType(models.StorageSourceS3) { - return SourceSpec{}, NewS3InputSpecError(S3BadRequest, "invalid storage source type. expected "+models.StorageSourceS3+" but received: "+spec.Type) + return SourceSpec{}, NewS3InputSourceError(BadRequestErrorCode, "invalid storage source type. expected "+models.StorageSourceS3+" but received: "+spec.Type) } inputParams := spec.Params if inputParams == nil { - return SourceSpec{}, NewS3InputSpecError(S3BadRequest, "invalid storage source params. cannot be nil") + return SourceSpec{}, NewS3InputSourceError(BadRequestErrorCode, "invalid storage source params. cannot be nil") } var c SourceSpec @@ -65,13 +65,13 @@ func DecodeSourceSpec(spec *models.SpecConfig) (SourceSpec, error) { func DecodePreSignedResultSpec(spec *models.SpecConfig) (PreSignedResultSpec, error) { if !spec.IsType(models.StorageSourceS3PreSigned) { - return PreSignedResultSpec{}, NewS3InputSpecError(S3BadRequest, + return PreSignedResultSpec{}, NewS3InputSourceError(BadRequestErrorCode, "invalid storage source type. expected "+models.StorageSourceS3PreSigned+" but received: "+spec.Type) } inputParams := spec.Params if inputParams == nil { - return PreSignedResultSpec{}, NewS3InputSpecError(S3BadRequest, "invalid signed result params. cannot be nil") + return PreSignedResultSpec{}, NewS3InputSourceError(BadRequestErrorCode, "invalid signed result params. cannot be nil") } var c PreSignedResultSpec @@ -91,10 +91,10 @@ type PublisherSpec struct { func (c PublisherSpec) Validate() error { if c.Bucket == "" { - return NewS3PublisherError(S3BadRequest, "invalid s3 params. bucket cannot be empty") + return NewS3PublisherError(BadRequestErrorCode, "invalid s3 params. bucket cannot be empty") } if c.Key == "" { - return NewS3PublisherError(S3BadRequest, "invalid s3 params. key cannot be empty") + return NewS3PublisherError(BadRequestErrorCode, "invalid s3 params. key cannot be empty") } return nil } @@ -105,13 +105,13 @@ func (c PublisherSpec) ToMap() map[string]interface{} { func DecodePublisherSpec(spec *models.SpecConfig) (PublisherSpec, error) { if !spec.IsType(models.PublisherS3) { - return PublisherSpec{}, NewS3PublisherError(S3BadRequest, + return PublisherSpec{}, NewS3PublisherError(BadRequestErrorCode, fmt.Sprintf("invalid publisher type. expected %s, but received: %s", models.PublisherS3, spec.Type)) } inputParams := spec.Params if inputParams == nil { - return PublisherSpec{}, NewS3PublisherError(S3BadRequest, "invalid publisher params. cannot be nil") + return PublisherSpec{}, NewS3PublisherError(BadRequestErrorCode, "invalid publisher params. cannot be nil") } var c PublisherSpec diff --git a/pkg/storage/s3/storage.go b/pkg/storage/s3/storage.go index 083d5b78bb..4733f311cb 100644 --- a/pkg/storage/s3/storage.go +++ b/pkg/storage/s3/storage.go @@ -179,7 +179,10 @@ func (s *StorageProvider) downloadObject(ctx context.Context, VersionId: object.versionID, IfMatch: object.eTag, }) - return err + if err != nil { + return s3helper.NewS3InputSourceServiceError(err) + } + return nil } func (s *StorageProvider) CleanupStorage(_ context.Context, _ models.InputSource, volume storage.StorageVolume) error { @@ -216,7 +219,7 @@ func (s *StorageProvider) explodeKey( headResp, err := client.S3.HeadObject(ctx, request) if err != nil { - return nil, err + return nil, s3helper.NewS3InputSourceServiceError(err) } if storageSpec.ChecksumSHA256 != "" && storageSpec.ChecksumSHA256 != aws.ToString(headResp.ChecksumSHA256) { @@ -253,7 +256,7 @@ func (s *StorageProvider) explodeKey( ContinuationToken: continuationToken, }) if err != nil { - return nil, err + return nil, s3helper.NewS3InputSourceServiceError(err) } for _, object := range resp.Contents { if storageSpec.Filter != "" { diff --git a/pkg/storage/s3/types.go b/pkg/storage/s3/types.go index 90d1832092..9a5eef2086 100644 --- a/pkg/storage/s3/types.go +++ b/pkg/storage/s3/types.go @@ -33,7 +33,7 @@ func (c SourceSpec) ToMap() map[string]interface{} { func DecodeSourceSpec(spec *models.SpecConfig) (SourceSpec, error) { if !spec.IsType(models.StorageSourceS3) { - return SourceSpec{}, s3.NewS3InputSpecError(s3.S3BadRequest, "invalid storage source type. expected "+models.StorageSourceS3+", but received: "+spec.Type) + return SourceSpec{}, s3.NewS3InputSourceError(s3.BadRequestErrorCode, "invalid storage source type. expected "+models.StorageSourceS3+", but received: "+spec.Type) } inputParams := spec.Params if inputParams == nil { @@ -66,12 +66,12 @@ func (c PreSignedResultSpec) ToMap() map[string]interface{} { func DecodePreSignedResultSpec(spec *models.SpecConfig) (PreSignedResultSpec, error) { if !spec.IsType(models.StorageSourceS3PreSigned) { - return PreSignedResultSpec{}, s3.NewS3InputSpecError(s3.S3BadRequest, "invalid storage source type. expected "+models.StorageSourceS3PreSigned+", but received: "+spec.Type) + return PreSignedResultSpec{}, s3.NewS3InputSourceError(s3.BadRequestErrorCode, "invalid storage source type. expected "+models.StorageSourceS3PreSigned+", but received: "+spec.Type) } inputParams := spec.Params if inputParams == nil { - return PreSignedResultSpec{}, s3.NewS3InputSpecError(s3.S3BadRequest, "invalid signed result params. cannot be nil") + return PreSignedResultSpec{}, s3.NewS3InputSourceError(s3.BadRequestErrorCode, "invalid signed result params. cannot be nil") } var c PreSignedResultSpec