diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 24fc6beb..7a01000f 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -64,6 +64,7 @@ func (gcs *GCS) Connect(ctx context.Context) error { clientOptions := make([]option.ClientOption, 0) clientOptions = append(clientOptions, option.WithTelemetryDisabled()) endpoint := "https://storage.googleapis.com/storage/v1/" + if gcs.Config.Endpoint != "" { endpoint = gcs.Config.Endpoint clientOptions = append([]option.ClientOption{option.WithoutAuthentication()}, clientOptions...) diff --git a/pkg/storage/object_disk/object_disk.go b/pkg/storage/object_disk/object_disk.go index aff4b4ec..9c00b9bf 100644 --- a/pkg/storage/object_disk/object_disk.go +++ b/pkg/storage/object_disk/object_disk.go @@ -449,6 +449,8 @@ func makeObjectDiskConnection(ctx context.Context, ch *clickhouse.ClickHouse, cf } if creds.S3StorageClass != "" { s3cfg.StorageClass = creds.S3StorageClass + } else { + s3cfg.StorageClass = cfg.S3.StorageClass } if creds.S3AssumeRole != "" { s3cfg.AssumeRoleARN = creds.S3AssumeRole diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 8aa58451..cb373710 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -4,10 +4,6 @@ import ( "context" "crypto/tls" "fmt" - "github.com/Altinity/clickhouse-backup/pkg/config" - "github.com/aws/smithy-go" - awsV2http "github.com/aws/smithy-go/transport/http" - "golang.org/x/sync/semaphore" "io" "net/http" "os" @@ -16,20 +12,23 @@ import ( "sync" "time" - "golang.org/x/sync/errgroup" - + "github.com/Altinity/clickhouse-backup/pkg/config" apexLog "github.com/apex/log" "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" awsV2Config "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/credentials/stscreds" - "github.com/aws/aws-sdk-go-v2/service/sts" - s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/aws-sdk-go-v2/service/sts" + "github.com/aws/smithy-go" awsV2Logging "github.com/aws/smithy-go/logging" + awsV2http "github.com/aws/smithy-go/transport/http" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" ) type S3LogToApexLogAdapter struct { @@ -51,6 +50,39 @@ func (S3LogToApexLogAdapter S3LogToApexLogAdapter) Logf(severity awsV2Logging.Cl } } +// RecalculateV4Signature allow GCS over S3, remove Accept-Encoding header from sign https://stackoverflow.com/a/74382598/1204665, https://github.com/aws/aws-sdk-go-v2/issues/1816 +type RecalculateV4Signature struct { + next http.RoundTripper + signer *v4.Signer + awsConfig aws.Config +} + +func (lt *RecalculateV4Signature) RoundTrip(req *http.Request) (*http.Response, error) { + // store for later use + acceptEncodingValue := req.Header.Get("Accept-Encoding") + + // delete the header so the header doesn't account for in the signature + req.Header.Del("Accept-Encoding") + + // sign with the same date + timeString := req.Header.Get("X-Amz-Date") + timeDate, _ := time.Parse("20060102T150405Z", timeString) + + creds, err := lt.awsConfig.Credentials.Retrieve(req.Context()) + if err != nil { + return nil, err + } + err = lt.signer.SignHTTP(req.Context(), creds, req, v4.GetPayloadHash(req.Context()), "s3", lt.awsConfig.Region, timeDate) + if err != nil { + return nil, err + } + // Reset Accept-Encoding if desired + req.Header.Set("Accept-Encoding", acceptEncodingValue) + + // follows up the original round tripper + return lt.next.RoundTrip(req) +} + // S3 - presents methods for manipulate data on s3 type S3 struct { client *s3.Client @@ -115,11 +147,12 @@ func (s *S3) Connect(ctx context.Context) error { awsConfig.ClientLogMode = aws.LogRetries | aws.LogRequestWithBody | aws.LogResponseWithBody } + httpTransport := http.DefaultTransport if s.Config.DisableCertVerification { - tr := &http.Transport{ + httpTransport = &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } - awsConfig.HTTPClient = &http.Client{Transport: tr} + awsConfig.HTTPClient = &http.Client{Transport: httpTransport} } if s.Config.Endpoint != "" { @@ -134,6 +167,11 @@ func (s *S3) Connect(ctx context.Context) error { }) } + // allow GCS over S3, remove Accept-Encoding header from sign https://stackoverflow.com/a/74382598/1204665, https://github.com/aws/aws-sdk-go-v2/issues/1816 + if strings.Contains(s.Config.Endpoint, "storage.googleapis.com") { + // Assign custom client with our own transport + awsConfig.HTTPClient = &http.Client{Transport: &RecalculateV4Signature{httpTransport, v4.NewSigner(), awsConfig}} + } s.client = s3.NewFromConfig(awsConfig, func(o *s3.Options) { o.UsePathStyle = s.Config.ForcePathStyle o.EndpointOptions.DisableHTTPS = s.Config.DisableSSL @@ -362,6 +400,64 @@ func (s *S3) remotePager(ctx context.Context, s3Path string, recursive bool, pro func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) { dstKey = path.Join(s.Config.ObjectDiskPath, dstKey) + if strings.Contains(s.Config.Endpoint, "storage.googleapis.com") { + params := s3.CopyObjectInput{ + Bucket: aws.String(s.Config.Bucket), + Key: aws.String(dstKey), + CopySource: aws.String(path.Join(srcBucket, srcKey)), + StorageClass: s3types.StorageClass(strings.ToUpper(s.Config.StorageClass)), + } + // https://github.com/Altinity/clickhouse-backup/issues/588 + if len(s.Config.ObjectLabels) > 0 { + tags := "" + for k, v := range s.Config.ObjectLabels { + if tags != "" { + tags += "&" + } + tags += k + "=" + v + } + params.Tagging = aws.String(tags) + } + if s.Config.SSE != "" { + params.ServerSideEncryption = s3types.ServerSideEncryption(s.Config.SSE) + } + if s.Config.SSEKMSKeyId != "" { + params.SSEKMSKeyId = aws.String(s.Config.SSEKMSKeyId) + } + if s.Config.SSECustomerAlgorithm != "" { + params.SSECustomerAlgorithm = aws.String(s.Config.SSECustomerAlgorithm) + } + if s.Config.SSECustomerKey != "" { + params.SSECustomerKey = aws.String(s.Config.SSECustomerKey) + } + if s.Config.SSECustomerKeyMD5 != "" { + params.SSECustomerKeyMD5 = aws.String(s.Config.SSECustomerKeyMD5) + } + if s.Config.SSEKMSEncryptionContext != "" { + params.SSEKMSEncryptionContext = aws.String(s.Config.SSEKMSEncryptionContext) + } + _, err := s.client.CopyObject(ctx, ¶ms) + if err != nil { + return 0, err + } + dstObjResp, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(s.Config.Bucket), + Key: aws.String(dstKey), + }) + if err != nil { + return 0, err + } + return dstObjResp.ContentLength, nil + } + // Get the size of the source object + sourceObjResp, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(srcBucket), + Key: aws.String(srcKey), + }) + if err != nil { + return 0, err + } + srcSize := sourceObjResp.ContentLength // Initiate a multipart upload params := s3.CreateMultipartUploadInput{ Bucket: aws.String(s.Config.Bucket), @@ -398,16 +494,6 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) ( params.SSEKMSEncryptionContext = aws.String(s.Config.SSEKMSEncryptionContext) } - // Get the size of the source object - sourceObjResp, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ - Bucket: aws.String(srcBucket), - Key: aws.String(srcKey), - }) - if err != nil { - return 0, err - } - srcSize := sourceObjResp.ContentLength - initResp, err := s.client.CreateMultipartUpload(ctx, ¶ms) if err != nil { return 0, err