Skip to content

Commit

Permalink
allow GCS over S3, remove Accept-Encoding header from sign https://st…
Browse files Browse the repository at this point in the history
…ackoverflow.com/a/74382598/1204665, aws/aws-sdk-go-v2#1816, GCS over S3 have no 5Gb restriction, TestIntegrationGCS pass again, TestIntegrationEmbedded need to fix
  • Loading branch information
Slach committed Jul 24, 2023
1 parent d45f3f5 commit aaa5c89
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 20 deletions.
1 change: 1 addition & 0 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (gcs *GCS) Connect(ctx context.Context) error {
clientOptions := make([]option.ClientOption, 0)
clientOptions = append(clientOptions, option.WithTelemetryDisabled())
endpoint := "https://storage.googleapis.com/storage/v1/"

if gcs.Config.Endpoint != "" {
endpoint = gcs.Config.Endpoint
clientOptions = append([]option.ClientOption{option.WithoutAuthentication()}, clientOptions...)
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/object_disk/object_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ func makeObjectDiskConnection(ctx context.Context, ch *clickhouse.ClickHouse, cf
}
if creds.S3StorageClass != "" {
s3cfg.StorageClass = creds.S3StorageClass
} else {
s3cfg.StorageClass = cfg.S3.StorageClass
}
if creds.S3AssumeRole != "" {
s3cfg.AssumeRoleARN = creds.S3AssumeRole
Expand Down
126 changes: 106 additions & 20 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ import (
"context"
"crypto/tls"
"fmt"
"github.com/Altinity/clickhouse-backup/pkg/config"
"github.com/aws/smithy-go"
awsV2http "github.com/aws/smithy-go/transport/http"
"golang.org/x/sync/semaphore"
"io"
"net/http"
"os"
Expand All @@ -16,20 +12,23 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/Altinity/clickhouse-backup/pkg/config"
apexLog "github.com/apex/log"
"github.com/aws/aws-sdk-go-v2/aws"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
awsV2Config "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
"github.com/aws/aws-sdk-go-v2/service/sts"

s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/aws-sdk-go-v2/service/sts"
"github.com/aws/smithy-go"
awsV2Logging "github.com/aws/smithy-go/logging"
awsV2http "github.com/aws/smithy-go/transport/http"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

type S3LogToApexLogAdapter struct {
Expand All @@ -51,6 +50,39 @@ func (S3LogToApexLogAdapter S3LogToApexLogAdapter) Logf(severity awsV2Logging.Cl
}
}

// RecalculateV4Signature allow GCS over S3, remove Accept-Encoding header from sign https://stackoverflow.com/a/74382598/1204665, https://github.com/aws/aws-sdk-go-v2/issues/1816
type RecalculateV4Signature struct {
next http.RoundTripper
signer *v4.Signer
awsConfig aws.Config
}

func (lt *RecalculateV4Signature) RoundTrip(req *http.Request) (*http.Response, error) {
// store for later use
acceptEncodingValue := req.Header.Get("Accept-Encoding")

// delete the header so the header doesn't account for in the signature
req.Header.Del("Accept-Encoding")

// sign with the same date
timeString := req.Header.Get("X-Amz-Date")
timeDate, _ := time.Parse("20060102T150405Z", timeString)

creds, err := lt.awsConfig.Credentials.Retrieve(req.Context())
if err != nil {
return nil, err
}
err = lt.signer.SignHTTP(req.Context(), creds, req, v4.GetPayloadHash(req.Context()), "s3", lt.awsConfig.Region, timeDate)
if err != nil {
return nil, err
}
// Reset Accept-Encoding if desired
req.Header.Set("Accept-Encoding", acceptEncodingValue)

// follows up the original round tripper
return lt.next.RoundTrip(req)
}

// S3 - presents methods for manipulate data on s3
type S3 struct {
client *s3.Client
Expand Down Expand Up @@ -115,11 +147,12 @@ func (s *S3) Connect(ctx context.Context) error {
awsConfig.ClientLogMode = aws.LogRetries | aws.LogRequestWithBody | aws.LogResponseWithBody
}

httpTransport := http.DefaultTransport
if s.Config.DisableCertVerification {
tr := &http.Transport{
httpTransport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
awsConfig.HTTPClient = &http.Client{Transport: tr}
awsConfig.HTTPClient = &http.Client{Transport: httpTransport}
}

if s.Config.Endpoint != "" {
Expand All @@ -134,6 +167,11 @@ func (s *S3) Connect(ctx context.Context) error {
})

}
// allow GCS over S3, remove Accept-Encoding header from sign https://stackoverflow.com/a/74382598/1204665, https://github.com/aws/aws-sdk-go-v2/issues/1816
if strings.Contains(s.Config.Endpoint, "storage.googleapis.com") {
// Assign custom client with our own transport
awsConfig.HTTPClient = &http.Client{Transport: &RecalculateV4Signature{httpTransport, v4.NewSigner(), awsConfig}}
}
s.client = s3.NewFromConfig(awsConfig, func(o *s3.Options) {
o.UsePathStyle = s.Config.ForcePathStyle
o.EndpointOptions.DisableHTTPS = s.Config.DisableSSL
Expand Down Expand Up @@ -362,6 +400,64 @@ func (s *S3) remotePager(ctx context.Context, s3Path string, recursive bool, pro

func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (int64, error) {
dstKey = path.Join(s.Config.ObjectDiskPath, dstKey)
if strings.Contains(s.Config.Endpoint, "storage.googleapis.com") {
params := s3.CopyObjectInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(dstKey),
CopySource: aws.String(path.Join(srcBucket, srcKey)),
StorageClass: s3types.StorageClass(strings.ToUpper(s.Config.StorageClass)),
}
// https://github.com/Altinity/clickhouse-backup/issues/588
if len(s.Config.ObjectLabels) > 0 {
tags := ""
for k, v := range s.Config.ObjectLabels {
if tags != "" {
tags += "&"
}
tags += k + "=" + v
}
params.Tagging = aws.String(tags)
}
if s.Config.SSE != "" {
params.ServerSideEncryption = s3types.ServerSideEncryption(s.Config.SSE)
}
if s.Config.SSEKMSKeyId != "" {
params.SSEKMSKeyId = aws.String(s.Config.SSEKMSKeyId)
}
if s.Config.SSECustomerAlgorithm != "" {
params.SSECustomerAlgorithm = aws.String(s.Config.SSECustomerAlgorithm)
}
if s.Config.SSECustomerKey != "" {
params.SSECustomerKey = aws.String(s.Config.SSECustomerKey)
}
if s.Config.SSECustomerKeyMD5 != "" {
params.SSECustomerKeyMD5 = aws.String(s.Config.SSECustomerKeyMD5)
}
if s.Config.SSEKMSEncryptionContext != "" {
params.SSEKMSEncryptionContext = aws.String(s.Config.SSEKMSEncryptionContext)
}
_, err := s.client.CopyObject(ctx, &params)
if err != nil {
return 0, err
}
dstObjResp, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(dstKey),
})
if err != nil {
return 0, err
}
return dstObjResp.ContentLength, nil
}
// Get the size of the source object
sourceObjResp, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(srcBucket),
Key: aws.String(srcKey),
})
if err != nil {
return 0, err
}
srcSize := sourceObjResp.ContentLength
// Initiate a multipart upload
params := s3.CreateMultipartUploadInput{
Bucket: aws.String(s.Config.Bucket),
Expand Down Expand Up @@ -398,16 +494,6 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (
params.SSEKMSEncryptionContext = aws.String(s.Config.SSEKMSEncryptionContext)
}

// Get the size of the source object
sourceObjResp, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(srcBucket),
Key: aws.String(srcKey),
})
if err != nil {
return 0, err
}
srcSize := sourceObjResp.ContentLength

initResp, err := s.client.CreateMultipartUpload(ctx, &params)
if err != nil {
return 0, err
Expand Down

0 comments on commit aaa5c89

Please sign in to comment.