Skip to content

Commit

Permalink
br/storage: add scope to support service_account type of credential f…
Browse files Browse the repository at this point in the history
…ile & remove default GCS httpclient (#49363)

ref #49340
  • Loading branch information
D3Hunter authored Dec 12, 2023
1 parent 7c58082 commit 9322568
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 46 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func openTestingStorage(t *testing.T) storage.ExternalStorage {
if *testingStorageURI == "" {
t.Skip("testingStorageURI is not set")
}
s, err := storage.NewFromURL(context.Background(), *testingStorageURI, nil)
s, err := storage.NewFromURL(context.Background(), *testingStorageURI)
require.NoError(t, err)
return s
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ go_test(
"//pkg/testkit/testsetup",
"//pkg/types",
"//pkg/util/codec",
"//pkg/util/intest",
"//pkg/util/table-filter",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_golang_protobuf//proto",
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/restore/stream_metas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -302,6 +303,7 @@ func TestTruncateSafepoint(t *testing.T) {
}

func TestTruncateSafepointForGCS(t *testing.T) {
require.True(t, intest.InTest)
ctx := context.Background()
opts := fakestorage.Options{
NoListener: true,
Expand Down
1 change: 1 addition & 0 deletions br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ go_test(
shard_count = 50,
deps = [
"//br/pkg/mock",
"//pkg/util/intest",
"@com_github_aws_aws_sdk_go//aws",
"@com_github_aws_aws_sdk_go//aws/awserr",
"@com_github_aws_aws_sdk_go//aws/request",
Expand Down
41 changes: 26 additions & 15 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/spf13/pflag"
"go.uber.org/zap"
"golang.org/x/oauth2/google"
Expand Down Expand Up @@ -322,23 +323,20 @@ func NewGCSStorage(ctx context.Context, gcs *backuppb.GCS, opts *ExternalStorage
clientOps = append(clientOps, option.WithEndpoint(gcs.Endpoint))
}

httpClient := opts.HTTPClient
if httpClient == nil {
// http2 will reuse the connection to read multiple files, which is
// very slow, the speed is about the same speed as reading a single file.
// So we disable keepalive here to use multiple connections to read files.
// open a new connection takes about 20~50ms, which is acceptable.
transport, _ := CloneDefaultHttpTransport()
transport.DisableKeepAlives = true
httpClient = &http.Client{Transport: transport}
// see https://github.com/pingcap/tidb/issues/47022#issuecomment-1722913455
var err error
httpClient.Transport, err = htransport.NewTransport(ctx, httpClient.Transport, clientOps...)
if err != nil {
return nil, errors.Trace(err)
if opts.HTTPClient != nil {
if !intest.InTest {
// see https://github.com/pingcap/tidb/issues/47022#issuecomment-1722913455
// https://www.googleapis.com/auth/cloud-platform must be set to use service_account
// type of credential-file.
newTransport, err := htransport.NewTransport(ctx, opts.HTTPClient.Transport,
append(clientOps, option.WithScopes(storage.ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform"))...)
if err != nil {
return nil, errors.Trace(err)
}
opts.HTTPClient.Transport = newTransport
}
clientOps = append(clientOps, option.WithHTTPClient(opts.HTTPClient))
}
clientOps = append(clientOps, option.WithHTTPClient(httpClient))

client, err := storage.NewClient(ctx, clientOps...)
if err != nil {
Expand Down Expand Up @@ -480,3 +478,16 @@ func (r *gcsObjectReader) Seek(offset int64, whence int) (int64, error) {
func (r *gcsObjectReader) GetFileSize() (int64, error) {
return r.totalSize, nil
}

// gcsHttpClientForThroughput returns a base http client for GCS that is optimized
// for throughput.
func gcsHttpClientForThroughput() *http.Client {
// http2 will reuse the connection to read multiple files, which is
// very slow, the speed of reading multiple files concurrently is about the
// same speed as reading a single file.
// So we disable keepalive here to use multiple connections to read files.
// open a new connection takes about 20~50ms, which is acceptable.
transport, _ := CloneDefaultHttpTransport()
transport.DisableKeepAlives = true
return &http.Client{Transport: transport}
}
4 changes: 4 additions & 0 deletions br/pkg/storage/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (

"github.com/fsouza/fake-gcs-server/fakestorage"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/stretchr/testify/require"
)

func TestGCS(t *testing.T) {
require.True(t, intest.InTest)
ctx := context.Background()

opts := fakestorage.Options{
Expand Down Expand Up @@ -255,6 +257,7 @@ func TestGCS(t *testing.T) {
}

func TestNewGCSStorage(t *testing.T) {
require.True(t, intest.InTest)
ctx := context.Background()

opts := fakestorage.Options{
Expand Down Expand Up @@ -416,6 +419,7 @@ func TestNewGCSStorage(t *testing.T) {
}

func TestReadRange(t *testing.T) {
require.True(t, intest.InTest)
ctx := context.Background()

opts := fakestorage.Options{
Expand Down
11 changes: 7 additions & 4 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ type ExternalStorageOptions struct {
NoCredentials bool

// HTTPClient to use. The created storage may ignore this field if it is not
// directly using HTTP (e.g. the local storage) or use self-design HTTP client
// with credential (e.g. the gcs).
// directly using HTTP (e.g. the local storage).
// NOTICE: the HTTPClient is only used by s3/azure/gcs.
// For GCS, we will use this as base client to init a client with credentials.
HTTPClient *http.Client

// CheckPermissions check the given permission in New() function.
Expand Down Expand Up @@ -195,11 +195,14 @@ func NewWithDefaultOpt(ctx context.Context, backend *backuppb.StorageBackend) (E
if intest.InTest {
opts.NoCredentials = true
}
if backend.GetGcs() != nil {
opts.HTTPClient = gcsHttpClientForThroughput()
}
return New(ctx, backend, &opts)
}

// NewFromURL creates an ExternalStorage from URL.
func NewFromURL(ctx context.Context, uri string, opts *ExternalStorageOptions) (ExternalStorage, error) {
func NewFromURL(ctx context.Context, uri string) (ExternalStorage, error) {
if len(uri) == 0 {
return nil, errors.Annotate(berrors.ErrStorageInvalidConfig, "empty store is not allowed")
}
Expand All @@ -214,7 +217,7 @@ func NewFromURL(ctx context.Context, uri string, opts *ExternalStorageOptions) (
if err != nil {
return nil, errors.Trace(err)
}
return New(ctx, b, opts)
return NewWithDefaultOpt(ctx, b)
}

// New creates an ExternalStorage with options.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestDefaultHttpClient(t *testing.T) {

func TestNewMemStorage(t *testing.T) {
url := "memstore://"
s, err := storage.NewFromURL(context.Background(), url, nil)
s, err := storage.NewFromURL(context.Background(), url)
require.NoError(t, err)
require.IsType(t, (*storage.MemStorage)(nil), s)
}
2 changes: 1 addition & 1 deletion br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ go_test(
],
embed = [":task"],
flaky = True,
shard_count = 21,
shard_count = 22,
deps = [
"//br/pkg/conn",
"//br/pkg/errors",
Expand Down
24 changes: 14 additions & 10 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,7 @@ func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStora
if err != nil {
return nil, errors.Trace(err)
}
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize),
}
opts := getExternalStorageOptions(&cfg.Config, u)
storage, err := storage.New(ctx, u, &opts)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -1470,11 +1466,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m
return nil, errors.Trace(err)
}

opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize),
}
opts := getExternalStorageOptions(&cfg.Config, u)
if err = client.SetStorage(ctx, u, &opts); err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -1498,6 +1490,18 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m
return client, nil
}

func getExternalStorageOptions(cfg *Config, u *backuppb.StorageBackend) storage.ExternalStorageOptions {
var httpClient *http.Client
if u.GetGcs() == nil {
httpClient = storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize)
}
return storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
HTTPClient: httpClient,
}
}

func checkLogRange(restoreFrom, restoreTo, logMinTS, logMaxTS uint64) error {
// serveral ts constraint:
// logMinTS <= restoreFrom <= restoreTo <= logMaxTS
Expand Down
13 changes: 13 additions & 0 deletions br/pkg/task/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,16 @@ func TestGetLogRangeWithLogBackupDir(t *testing.T) {
require.Nil(t, err)
require.Equal(t, logInfo.logMinTS, startLogBackupTS)
}

func TestGetExternalStorageOptions(t *testing.T) {
cfg := Config{}
u, err := storage.ParseBackend("s3://bucket/path", nil)
require.NoError(t, err)
options := getExternalStorageOptions(&cfg, u)
require.NotNil(t, options.HTTPClient)

u, err = storage.ParseBackend("gs://bucket/path", nil)
require.NoError(t, err)
options = getExternalStorageOptions(&cfg, u)
require.Nil(t, options.HTTPClient)
}
7 changes: 1 addition & 6 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
Expand Down Expand Up @@ -88,11 +87,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
if err != nil {
return err
}
opt := &storage.ExternalStorageOptions{}
if intest.InTest {
opt.NoCredentials = true
}
store, err := storage.New(ctx, storeBackend, opt)
store, err := storage.NewWithDefaultOpt(ctx, storeBackend)
if err != nil {
return err
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/filter"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/stringutil"
kvconfig "github.com/tikv/client-go/v2/config"
Expand Down Expand Up @@ -942,11 +941,7 @@ func (*LoadDataController) initExternalStore(ctx context.Context, u *url.URL, ta
return nil, exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(target, GetMsgFromBRError(err2))
}

opt := &storage.ExternalStorageOptions{}
if intest.InTest {
opt.NoCredentials = true
}
s, err := storage.New(ctx, b, opt)
s, err := storage.NewWithDefaultOpt(ctx, b)
if err != nil {
return nil, exeerrors.ErrLoadDataCantAccess.GenWithStackByArgs(target, GetMsgFromBRError(err))
}
Expand Down
3 changes: 1 addition & 2 deletions tests/realtikvtest/addindextest2/global_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ func genStorageURI(t *testing.T) (host string, port uint16, uri string) {
func checkFileCleaned(t *testing.T, jobID int64, sortStorageURI string) {
storeBackend, err := storage.ParseBackend(sortStorageURI, nil)
require.NoError(t, err)
opts := &storage.ExternalStorageOptions{NoCredentials: true}
extStore, err := storage.New(context.Background(), storeBackend, opts)
extStore, err := storage.NewWithDefaultOpt(context.Background(), storeBackend)
require.NoError(t, err)
prefix := strconv.Itoa(int(jobID))
dataFiles, statFiles, err := external.GetAllFileNames(context.Background(), extStore, prefix)
Expand Down

0 comments on commit 9322568

Please sign in to comment.