From 2bf6eead13e07cd3a9a9ba52f8d2068300ef64b9 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 10 Apr 2023 16:33:00 +0800 Subject: [PATCH] *: LOAD DATA support compressed source file (#42813) ref pingcap/tidb#40499 --- br/pkg/lightning/importer/chunk_process.go | 2 +- br/pkg/lightning/importer/get_pre_info.go | 4 +-- br/pkg/lightning/mydump/parser.go | 7 ++-- br/pkg/lightning/mydump/router.go | 15 ++++++++ br/pkg/storage/compress.go | 11 +++--- br/pkg/storage/storage.go | 3 +- executor/importer/import.go | 14 +++++--- executor/importer/table_import.go | 2 +- executor/loadremotetest/multi_file_test.go | 40 ++++++++++++++++++++++ server/BUILD.bazel | 2 ++ server/conn.go | 14 ++++++-- 11 files changed, 94 insertions(+), 20 deletions(-) diff --git a/br/pkg/lightning/importer/chunk_process.go b/br/pkg/lightning/importer/chunk_process.go index 9967f8b628ef1..f031a85c8fd2a 100644 --- a/br/pkg/lightning/importer/chunk_process.go +++ b/br/pkg/lightning/importer/chunk_process.go @@ -60,7 +60,7 @@ func newChunkProcessor( ) (*chunkProcessor, error) { blockBufSize := int64(cfg.Mydumper.ReadBlockSize) - reader, err := mydump.OpenReader(ctx, chunk.FileMeta, store) + reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/lightning/importer/get_pre_info.go b/br/pkg/lightning/importer/get_pre_info.go index d8d1a9f743b8a..d454ecb7f5210 100644 --- a/br/pkg/lightning/importer/get_pre_info.go +++ b/br/pkg/lightning/importer/get_pre_info.go @@ -457,7 +457,7 @@ func (p *PreImportInfoGetterImpl) ReadFirstNRowsByTableName(ctx context.Context, // ReadFirstNRowsByFileMeta reads the first N rows of an data file. // It implements the PreImportInfoGetter interface. func (p *PreImportInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) ([]string, [][]types.Datum, error) { - reader, err := mydump.OpenReader(ctx, dataFileMeta, p.srcStorage) + reader, err := mydump.OpenReader(ctx, &dataFileMeta, p.srcStorage) if err != nil { return nil, nil, errors.Trace(err) } @@ -606,7 +606,7 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable( return resultIndexRatio, isRowOrdered, nil } sampleFile := tableMeta.DataFiles[0].FileMeta - reader, err := mydump.OpenReader(ctx, sampleFile, p.srcStorage) + reader, err := mydump.OpenReader(ctx, &sampleFile, p.srcStorage) if err != nil { return 0.0, false, errors.Trace(err) } diff --git a/br/pkg/lightning/mydump/parser.go b/br/pkg/lightning/mydump/parser.go index a84eee0082d3d..2184bc06d99f7 100644 --- a/br/pkg/lightning/mydump/parser.go +++ b/br/pkg/lightning/mydump/parser.go @@ -652,8 +652,11 @@ func ReadUntil(parser Parser, pos int64) error { } // OpenReader opens a reader for the given file and storage. -func OpenReader(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) ( - reader storage.ReadSeekCloser, err error) { +func OpenReader( + ctx context.Context, + fileMeta *SourceFileMeta, + store storage.ExternalStorage, +) (reader storage.ReadSeekCloser, err error) { switch { case fileMeta.Type == SourceTypeParquet: reader, err = OpenParquetReader(ctx, store, fileMeta.Path, fileMeta.FileSize) diff --git a/br/pkg/lightning/mydump/router.go b/br/pkg/lightning/mydump/router.go index bf0ccba834fe0..6a4222e236653 100644 --- a/br/pkg/lightning/mydump/router.go +++ b/br/pkg/lightning/mydump/router.go @@ -2,6 +2,7 @@ package mydump import ( "net/url" + "path/filepath" "regexp" "strconv" "strings" @@ -128,6 +129,20 @@ func (s SourceType) String() string { } } +// ParseCompressionOnFileExtension parses the compression type from the file extension. +func ParseCompressionOnFileExtension(filename string) Compression { + fileExt := strings.ToLower(filepath.Ext(filename)) + if len(fileExt) == 0 { + return CompressionNone + } + tp, err := parseCompressionType(fileExt[1:]) + if err != nil { + // file extension is not a compression type, just ignore it + return CompressionNone + } + return tp +} + func parseCompressionType(t string) (Compression, error) { switch strings.ToLower(strings.TrimSpace(t)) { case "gz", "gzip": diff --git a/br/pkg/storage/compress.go b/br/pkg/storage/compress.go index 36c07846f3271..5ab041cfe0bf2 100644 --- a/br/pkg/storage/compress.go +++ b/br/pkg/storage/compress.go @@ -46,7 +46,7 @@ func (w *withCompression) Open(ctx context.Context, path string) (ExternalFileRe if err != nil { return nil, errors.Trace(err) } - uncompressReader, err := newInterceptReader(fileReader, w.compressType) + uncompressReader, err := InterceptDecompressReader(fileReader, w.compressType) if err != nil { return nil, errors.Trace(err) } @@ -87,8 +87,11 @@ type compressReader struct { io.Closer } -// nolint:interfacer -func newInterceptReader(fileReader ExternalFileReader, compressType CompressType) (ExternalFileReader, error) { +// InterceptDecompressReader intercepts the reader and wraps it with a decompress +// reader on the given io.ReadSeekCloser. Note that the returned +// io.ReadSeekCloser does not have the property that Seek(0, io.SeekCurrent) +// equals total bytes Read() if the decompress reader is used. +func InterceptDecompressReader(fileReader io.ReadSeekCloser, compressType CompressType) (io.ReadSeekCloser, error) { if compressType == NoCompression { return fileReader, nil } @@ -114,7 +117,7 @@ func NewLimitedInterceptReader(fileReader ExternalFileReader, compressType Compr Closer: fileReader, } } - return newInterceptReader(newFileReader, compressType) + return InterceptDecompressReader(newFileReader, compressType) } func (c *compressReader) Seek(offset int64, whence int) (int64, error) { diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 93fb56dcd606b..2ccc67d4eab20 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -107,8 +107,7 @@ type ExternalStorage interface { // ExternalFileReader represents the streaming external file reader. type ExternalFileReader interface { - io.ReadCloser - io.Seeker + io.ReadSeekCloser } // ExternalFileWriter represents the streaming external file writer. diff --git a/executor/importer/import.go b/executor/importer/import.go index af4f28565a2bd..754127fff2717 100644 --- a/executor/importer/import.go +++ b/executor/importer/import.go @@ -669,9 +669,11 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { if err3 != nil { return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "failed to read file size by seek in LOAD DATA") } + compressTp := mydump.ParseCompressionOnFileExtension(path) dataFiles = append(dataFiles, &mydump.SourceFileMeta{ - Path: path, - FileSize: size, + Path: path, + FileSize: size, + Compression: compressTp, }) } else { commonPrefix := path[:idx] @@ -685,9 +687,11 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { if !match { return nil } + compressTp := mydump.ParseCompressionOnFileExtension(remotePath) dataFiles = append(dataFiles, &mydump.SourceFileMeta{ - Path: remotePath, - FileSize: size, + Path: remotePath, + FileSize: size, + Compression: compressTp, }) return nil }) @@ -708,7 +712,7 @@ func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo { f := e.dataFiles[i] result = append(result, LoadDataReaderInfo{ Opener: func(ctx context.Context) (io.ReadSeekCloser, error) { - fileReader, err2 := e.dataStore.Open(ctx, f.Path) + fileReader, err2 := mydump.OpenReader(ctx, f, e.dataStore) if err2 != nil { return nil, exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "Please check the INFILE path is correct") } diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index b88fd028e0a63..1a4173e466012 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -198,7 +198,7 @@ var _ io.Closer = &tableImporter{} func (ti *tableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) { info := LoadDataReaderInfo{ Opener: func(ctx context.Context) (io.ReadSeekCloser, error) { - reader, err := mydump.OpenReader(ctx, chunk.FileMeta, ti.dataStore) + reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, ti.dataStore) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/loadremotetest/multi_file_test.go b/executor/loadremotetest/multi_file_test.go index 1e963a16296c8..4ecd80f34c838 100644 --- a/executor/loadremotetest/multi_file_test.go +++ b/executor/loadremotetest/multi_file_test.go @@ -16,11 +16,13 @@ package loadremotetest import ( "bytes" + "compress/gzip" "fmt" "strconv" "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" ) func (s *mockGCSSuite) TestFilenameAsterisk() { @@ -131,3 +133,41 @@ func (s *mockGCSSuite) TestMultiBatchWithIgnoreLines() { "13", "14", "15", "16", "17", "18", "19", "20", )) } + +func (s *mockGCSSuite) TestMixedCompression() { + s.tk.MustExec("DROP DATABASE IF EXISTS multi_load;") + s.tk.MustExec("CREATE DATABASE multi_load;") + s.tk.MustExec("CREATE TABLE multi_load.t (i INT PRIMARY KEY, s varchar(32));") + + // gzip content + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + _, err := w.Write([]byte("1\ttest1\n" + + "2\ttest2")) + require.NoError(s.T(), err) + err = w.Close() + require.NoError(s.T(), err) + + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "test-multi-load", + Name: "compress.001.tsv.gz", + }, + Content: buf.Bytes(), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "test-multi-load", + Name: "compress.002.tsv", + }, + Content: []byte("3\ttest3\n" + + "4\ttest4"), + }) + + sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-multi-load/compress.*?endpoint=%s' + INTO TABLE multi_load.t WITH thread=1;`, gcsEndpoint) + s.tk.MustExec(sql) + s.tk.MustQuery("SELECT * FROM multi_load.t;").Check(testkit.Rows( + "1 test1", "2 test2", "3 test3", "4 test4", + )) +} diff --git a/server/BUILD.bazel b/server/BUILD.bazel index df40b50f44dff..c4295bb94e374 100644 --- a/server/BUILD.bazel +++ b/server/BUILD.bazel @@ -28,6 +28,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//autoid_service", + "//br/pkg/lightning/mydump", + "//br/pkg/storage", "//config", "//ddl", "//domain", diff --git a/server/conn.go b/server/conn.go index a058b39b9c614..ddc7640dce5f3 100644 --- a/server/conn.go +++ b/server/conn.go @@ -56,6 +56,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/errno" @@ -1580,8 +1582,13 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataWorker *execut if loadDataWorker == nil { return errors.New("load data info is empty") } - - err := cc.writeReq(ctx, loadDataWorker.GetInfilePath()) + infile := loadDataWorker.GetInfilePath() + compressTp := mydump.ParseCompressionOnFileExtension(infile) + compressTp2, err := mydump.ToStorageCompressType(compressTp) + if err != nil { + return err + } + err = cc.writeReq(ctx, infile) if err != nil { return err } @@ -1628,7 +1635,8 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataWorker *execut ctx = kv.WithInternalSourceType(ctx, kv.InternalLoadData) _, err = loadDataWorker.Load(ctx, []importer.LoadDataReaderInfo{{ Opener: func(_ context.Context) (io.ReadSeekCloser, error) { - return executor.NewSimpleSeekerOnReadCloser(r), nil + addedSeekReader := executor.NewSimpleSeekerOnReadCloser(r) + return storage.InterceptDecompressReader(addedSeekReader, compressTp2) }}}) _ = r.Close() wg.Wait()