diff --git a/br/pkg/lightning/backend/external/bench_test.go b/br/pkg/lightning/backend/external/bench_test.go index f03bcaa14b3e5..95b00aafa50d0 100644 --- a/br/pkg/lightning/backend/external/bench_test.go +++ b/br/pkg/lightning/backend/external/bench_test.go @@ -45,7 +45,7 @@ func openTestingStorage(t *testing.T) storage.ExternalStorage { if *testingStorageURI == "" { t.Skip("testingStorageURI is not set") } - s, err := storage.NewFromURL(context.Background(), *testingStorageURI, nil) + s, err := storage.NewFromURL(context.Background(), *testingStorageURI) require.NoError(t, err) return s } diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index 12cc400eee19a..ba7f70e66eb81 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -162,6 +162,7 @@ go_test( "//pkg/testkit/testsetup", "//pkg/types", "//pkg/util/codec", + "//pkg/util/intest", "//pkg/util/table-filter", "@com_github_fsouza_fake_gcs_server//fakestorage", "@com_github_golang_protobuf//proto", diff --git a/br/pkg/restore/stream_metas_test.go b/br/pkg/restore/stream_metas_test.go index d0c7d65e8a93d..f1ee34feb66e4 100644 --- a/br/pkg/restore/stream_metas_test.go +++ b/br/pkg/restore/stream_metas_test.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -302,6 +303,7 @@ func TestTruncateSafepoint(t *testing.T) { } func TestTruncateSafepointForGCS(t *testing.T) { + require.True(t, intest.InTest) ctx := context.Background() opts := fakestorage.Options{ NoListener: true, diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 902e95e5df3ab..3c5dd4d662705 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -92,6 +92,7 @@ go_test( shard_count = 50, deps = [ "//br/pkg/mock", + "//pkg/util/intest", "@com_github_aws_aws_sdk_go//aws", "@com_github_aws_aws_sdk_go//aws/awserr", "@com_github_aws_aws_sdk_go//aws/request", diff --git a/br/pkg/storage/gcs.go b/br/pkg/storage/gcs.go index 8637e73ddacb1..6d0865539d88f 100644 --- a/br/pkg/storage/gcs.go +++ b/br/pkg/storage/gcs.go @@ -16,6 +16,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/spf13/pflag" "go.uber.org/zap" "golang.org/x/oauth2/google" @@ -322,23 +323,20 @@ func NewGCSStorage(ctx context.Context, gcs *backuppb.GCS, opts *ExternalStorage clientOps = append(clientOps, option.WithEndpoint(gcs.Endpoint)) } - httpClient := opts.HTTPClient - if httpClient == nil { - // http2 will reuse the connection to read multiple files, which is - // very slow, the speed is about the same speed as reading a single file. - // So we disable keepalive here to use multiple connections to read files. - // open a new connection takes about 20~50ms, which is acceptable. - transport, _ := CloneDefaultHttpTransport() - transport.DisableKeepAlives = true - httpClient = &http.Client{Transport: transport} - // see https://github.com/pingcap/tidb/issues/47022#issuecomment-1722913455 - var err error - httpClient.Transport, err = htransport.NewTransport(ctx, httpClient.Transport, clientOps...) - if err != nil { - return nil, errors.Trace(err) + if opts.HTTPClient != nil { + if !intest.InTest { + // see https://github.com/pingcap/tidb/issues/47022#issuecomment-1722913455 + // https://www.googleapis.com/auth/cloud-platform must be set to use service_account + // type of credential-file. + newTransport, err := htransport.NewTransport(ctx, opts.HTTPClient.Transport, + append(clientOps, option.WithScopes(storage.ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform"))...) + if err != nil { + return nil, errors.Trace(err) + } + opts.HTTPClient.Transport = newTransport } + clientOps = append(clientOps, option.WithHTTPClient(opts.HTTPClient)) } - clientOps = append(clientOps, option.WithHTTPClient(httpClient)) client, err := storage.NewClient(ctx, clientOps...) if err != nil { @@ -480,3 +478,16 @@ func (r *gcsObjectReader) Seek(offset int64, whence int) (int64, error) { func (r *gcsObjectReader) GetFileSize() (int64, error) { return r.totalSize, nil } + +// gcsHttpClientForThroughput returns a base http client for GCS that is optimized +// for throughput. +func gcsHttpClientForThroughput() *http.Client { + // http2 will reuse the connection to read multiple files, which is + // very slow, the speed of reading multiple files concurrently is about the + // same speed as reading a single file. + // So we disable keepalive here to use multiple connections to read files. + // open a new connection takes about 20~50ms, which is acceptable. + transport, _ := CloneDefaultHttpTransport() + transport.DisableKeepAlives = true + return &http.Client{Transport: transport} +} diff --git a/br/pkg/storage/gcs_test.go b/br/pkg/storage/gcs_test.go index c223b0952a83e..6c67e4bc541b4 100644 --- a/br/pkg/storage/gcs_test.go +++ b/br/pkg/storage/gcs_test.go @@ -11,10 +11,12 @@ import ( "github.com/fsouza/fake-gcs-server/fakestorage" backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/stretchr/testify/require" ) func TestGCS(t *testing.T) { + require.True(t, intest.InTest) ctx := context.Background() opts := fakestorage.Options{ @@ -255,6 +257,7 @@ func TestGCS(t *testing.T) { } func TestNewGCSStorage(t *testing.T) { + require.True(t, intest.InTest) ctx := context.Background() opts := fakestorage.Options{ @@ -416,6 +419,7 @@ func TestNewGCSStorage(t *testing.T) { } func TestReadRange(t *testing.T) { + require.True(t, intest.InTest) ctx := context.Background() opts := fakestorage.Options{ diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 0f36091d54a74..3e5a2ec34d800 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -161,9 +161,9 @@ type ExternalStorageOptions struct { NoCredentials bool // HTTPClient to use. The created storage may ignore this field if it is not - // directly using HTTP (e.g. the local storage) or use self-design HTTP client - // with credential (e.g. the gcs). + // directly using HTTP (e.g. the local storage). // NOTICE: the HTTPClient is only used by s3/azure/gcs. + // For GCS, we will use this as base client to init a client with credentials. HTTPClient *http.Client // CheckPermissions check the given permission in New() function. @@ -195,11 +195,14 @@ func NewWithDefaultOpt(ctx context.Context, backend *backuppb.StorageBackend) (E if intest.InTest { opts.NoCredentials = true } + if backend.GetGcs() != nil { + opts.HTTPClient = gcsHttpClientForThroughput() + } return New(ctx, backend, &opts) } // NewFromURL creates an ExternalStorage from URL. -func NewFromURL(ctx context.Context, uri string, opts *ExternalStorageOptions) (ExternalStorage, error) { +func NewFromURL(ctx context.Context, uri string) (ExternalStorage, error) { if len(uri) == 0 { return nil, errors.Annotate(berrors.ErrStorageInvalidConfig, "empty store is not allowed") } @@ -214,7 +217,7 @@ func NewFromURL(ctx context.Context, uri string, opts *ExternalStorageOptions) ( if err != nil { return nil, errors.Trace(err) } - return New(ctx, b, opts) + return NewWithDefaultOpt(ctx, b) } // New creates an ExternalStorage with options. diff --git a/br/pkg/storage/storage_test.go b/br/pkg/storage/storage_test.go index 6183e98c19de2..d52d8b6768d3e 100644 --- a/br/pkg/storage/storage_test.go +++ b/br/pkg/storage/storage_test.go @@ -28,7 +28,7 @@ func TestDefaultHttpClient(t *testing.T) { func TestNewMemStorage(t *testing.T) { url := "memstore://" - s, err := storage.NewFromURL(context.Background(), url, nil) + s, err := storage.NewFromURL(context.Background(), url) require.NoError(t, err) require.IsType(t, (*storage.MemStorage)(nil), s) } diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index cac5b33d9d0bc..7fabfbc379ed6 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -99,7 +99,7 @@ go_test( ], embed = [":task"], flaky = True, - shard_count = 21, + shard_count = 22, deps = [ "//br/pkg/conn", "//br/pkg/errors", diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 6fa3e243bc3ec..045e612e93eb4 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -131,11 +131,7 @@ func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStora if err != nil { return nil, errors.Trace(err) } - opts := storage.ExternalStorageOptions{ - NoCredentials: cfg.NoCreds, - SendCredentials: cfg.SendCreds, - HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), - } + opts := getExternalStorageOptions(&cfg.Config, u) storage, err := storage.New(ctx, u, &opts) if err != nil { return nil, errors.Trace(err) @@ -1470,11 +1466,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m return nil, errors.Trace(err) } - opts := storage.ExternalStorageOptions{ - NoCredentials: cfg.NoCreds, - SendCredentials: cfg.SendCreds, - HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), - } + opts := getExternalStorageOptions(&cfg.Config, u) if err = client.SetStorage(ctx, u, &opts); err != nil { return nil, errors.Trace(err) } @@ -1498,6 +1490,18 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m return client, nil } +func getExternalStorageOptions(cfg *Config, u *backuppb.StorageBackend) storage.ExternalStorageOptions { + var httpClient *http.Client + if u.GetGcs() == nil { + httpClient = storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize) + } + return storage.ExternalStorageOptions{ + NoCredentials: cfg.NoCreds, + SendCredentials: cfg.SendCreds, + HTTPClient: httpClient, + } +} + func checkLogRange(restoreFrom, restoreTo, logMinTS, logMaxTS uint64) error { // serveral ts constraint: // logMinTS <= restoreFrom <= restoreTo <= logMaxTS diff --git a/br/pkg/task/stream_test.go b/br/pkg/task/stream_test.go index 8d2ecb35c9247..627924e8239cc 100644 --- a/br/pkg/task/stream_test.go +++ b/br/pkg/task/stream_test.go @@ -248,3 +248,16 @@ func TestGetLogRangeWithLogBackupDir(t *testing.T) { require.Nil(t, err) require.Equal(t, logInfo.logMinTS, startLogBackupTS) } + +func TestGetExternalStorageOptions(t *testing.T) { + cfg := Config{} + u, err := storage.ParseBackend("s3://bucket/path", nil) + require.NoError(t, err) + options := getExternalStorageOptions(&cfg, u) + require.NotNil(t, options.HTTPClient) + + u, err = storage.ParseBackend("gs://bucket/path", nil) + require.NoError(t, err) + options = getExternalStorageOptions(&cfg, u) + require.Nil(t, options.HTTPClient) +} diff --git a/pkg/ddl/backfilling_merge_sort.go b/pkg/ddl/backfilling_merge_sort.go index 6de8ee0d7b756..c0fdbaffd67eb 100644 --- a/pkg/ddl/backfilling_merge_sort.go +++ b/pkg/ddl/backfilling_merge_sort.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" - "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/size" "go.uber.org/zap" @@ -88,11 +87,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta if err != nil { return err } - opt := &storage.ExternalStorageOptions{} - if intest.InTest { - opt.NoCredentials = true - } - store, err := storage.New(ctx, storeBackend, opt) + store, err := storage.NewWithDefaultOpt(ctx, storeBackend) if err != nil { return err } diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index 0ab47f80d96c8..5b12a874a3e61 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -57,7 +57,6 @@ import ( "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/filter" - "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/stringutil" kvconfig "github.com/tikv/client-go/v2/config" @@ -942,11 +941,7 @@ func (*LoadDataController) initExternalStore(ctx context.Context, u *url.URL, ta return nil, exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(target, GetMsgFromBRError(err2)) } - opt := &storage.ExternalStorageOptions{} - if intest.InTest { - opt.NoCredentials = true - } - s, err := storage.New(ctx, b, opt) + s, err := storage.NewWithDefaultOpt(ctx, b) if err != nil { return nil, exeerrors.ErrLoadDataCantAccess.GenWithStackByArgs(target, GetMsgFromBRError(err)) } diff --git a/tests/realtikvtest/addindextest2/global_sort_test.go b/tests/realtikvtest/addindextest2/global_sort_test.go index 9c0274978df88..2dc503ab7eeb7 100644 --- a/tests/realtikvtest/addindextest2/global_sort_test.go +++ b/tests/realtikvtest/addindextest2/global_sort_test.go @@ -57,8 +57,7 @@ func genStorageURI(t *testing.T) (host string, port uint16, uri string) { func checkFileCleaned(t *testing.T, jobID int64, sortStorageURI string) { storeBackend, err := storage.ParseBackend(sortStorageURI, nil) require.NoError(t, err) - opts := &storage.ExternalStorageOptions{NoCredentials: true} - extStore, err := storage.New(context.Background(), storeBackend, opts) + extStore, err := storage.NewWithDefaultOpt(context.Background(), storeBackend) require.NoError(t, err) prefix := strconv.Itoa(int(jobID)) dataFiles, statFiles, err := external.GetAllFileNames(context.Background(), extStore, prefix)