Skip to content

Commit

Permalink
*: LOAD DATA support compressed source file (#42813)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
lance6716 authored Apr 10, 2023
1 parent ad59092 commit 2bf6eea
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 20 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func newChunkProcessor(
) (*chunkProcessor, error) {
blockBufSize := int64(cfg.Mydumper.ReadBlockSize)

reader, err := mydump.OpenReader(ctx, chunk.FileMeta, store)
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (p *PreImportInfoGetterImpl) ReadFirstNRowsByTableName(ctx context.Context,
// ReadFirstNRowsByFileMeta reads the first N rows of an data file.
// It implements the PreImportInfoGetter interface.
func (p *PreImportInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) ([]string, [][]types.Datum, error) {
reader, err := mydump.OpenReader(ctx, dataFileMeta, p.srcStorage)
reader, err := mydump.OpenReader(ctx, &dataFileMeta, p.srcStorage)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -606,7 +606,7 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable(
return resultIndexRatio, isRowOrdered, nil
}
sampleFile := tableMeta.DataFiles[0].FileMeta
reader, err := mydump.OpenReader(ctx, sampleFile, p.srcStorage)
reader, err := mydump.OpenReader(ctx, &sampleFile, p.srcStorage)
if err != nil {
return 0.0, false, errors.Trace(err)
}
Expand Down
7 changes: 5 additions & 2 deletions br/pkg/lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,11 @@ func ReadUntil(parser Parser, pos int64) error {
}

// OpenReader opens a reader for the given file and storage.
func OpenReader(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (
reader storage.ReadSeekCloser, err error) {
func OpenReader(
ctx context.Context,
fileMeta *SourceFileMeta,
store storage.ExternalStorage,
) (reader storage.ReadSeekCloser, err error) {
switch {
case fileMeta.Type == SourceTypeParquet:
reader, err = OpenParquetReader(ctx, store, fileMeta.Path, fileMeta.FileSize)
Expand Down
15 changes: 15 additions & 0 deletions br/pkg/lightning/mydump/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mydump

import (
"net/url"
"path/filepath"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -128,6 +129,20 @@ func (s SourceType) String() string {
}
}

// ParseCompressionOnFileExtension parses the compression type from the file extension.
func ParseCompressionOnFileExtension(filename string) Compression {
fileExt := strings.ToLower(filepath.Ext(filename))
if len(fileExt) == 0 {
return CompressionNone
}
tp, err := parseCompressionType(fileExt[1:])
if err != nil {
// file extension is not a compression type, just ignore it
return CompressionNone
}
return tp
}

func parseCompressionType(t string) (Compression, error) {
switch strings.ToLower(strings.TrimSpace(t)) {
case "gz", "gzip":
Expand Down
11 changes: 7 additions & 4 deletions br/pkg/storage/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (w *withCompression) Open(ctx context.Context, path string) (ExternalFileRe
if err != nil {
return nil, errors.Trace(err)
}
uncompressReader, err := newInterceptReader(fileReader, w.compressType)
uncompressReader, err := InterceptDecompressReader(fileReader, w.compressType)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -87,8 +87,11 @@ type compressReader struct {
io.Closer
}

// nolint:interfacer
func newInterceptReader(fileReader ExternalFileReader, compressType CompressType) (ExternalFileReader, error) {
// InterceptDecompressReader intercepts the reader and wraps it with a decompress
// reader on the given io.ReadSeekCloser. Note that the returned
// io.ReadSeekCloser does not have the property that Seek(0, io.SeekCurrent)
// equals total bytes Read() if the decompress reader is used.
func InterceptDecompressReader(fileReader io.ReadSeekCloser, compressType CompressType) (io.ReadSeekCloser, error) {
if compressType == NoCompression {
return fileReader, nil
}
Expand All @@ -114,7 +117,7 @@ func NewLimitedInterceptReader(fileReader ExternalFileReader, compressType Compr
Closer: fileReader,
}
}
return newInterceptReader(newFileReader, compressType)
return InterceptDecompressReader(newFileReader, compressType)
}

func (c *compressReader) Seek(offset int64, whence int) (int64, error) {
Expand Down
3 changes: 1 addition & 2 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ type ExternalStorage interface {

// ExternalFileReader represents the streaming external file reader.
type ExternalFileReader interface {
io.ReadCloser
io.Seeker
io.ReadSeekCloser
}

// ExternalFileWriter represents the streaming external file writer.
Expand Down
14 changes: 9 additions & 5 deletions executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,11 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
if err3 != nil {
return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "failed to read file size by seek in LOAD DATA")
}
compressTp := mydump.ParseCompressionOnFileExtension(path)
dataFiles = append(dataFiles, &mydump.SourceFileMeta{
Path: path,
FileSize: size,
Path: path,
FileSize: size,
Compression: compressTp,
})
} else {
commonPrefix := path[:idx]
Expand All @@ -685,9 +687,11 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
if !match {
return nil
}
compressTp := mydump.ParseCompressionOnFileExtension(remotePath)
dataFiles = append(dataFiles, &mydump.SourceFileMeta{
Path: remotePath,
FileSize: size,
Path: remotePath,
FileSize: size,
Compression: compressTp,
})
return nil
})
Expand All @@ -708,7 +712,7 @@ func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo {
f := e.dataFiles[i]
result = append(result, LoadDataReaderInfo{
Opener: func(ctx context.Context) (io.ReadSeekCloser, error) {
fileReader, err2 := e.dataStore.Open(ctx, f.Path)
fileReader, err2 := mydump.OpenReader(ctx, f, e.dataStore)
if err2 != nil {
return nil, exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "Please check the INFILE path is correct")
}
Expand Down
2 changes: 1 addition & 1 deletion executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ var _ io.Closer = &tableImporter{}
func (ti *tableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) {
info := LoadDataReaderInfo{
Opener: func(ctx context.Context) (io.ReadSeekCloser, error) {
reader, err := mydump.OpenReader(ctx, chunk.FileMeta, ti.dataStore)
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, ti.dataStore)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
40 changes: 40 additions & 0 deletions executor/loadremotetest/multi_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package loadremotetest

import (
"bytes"
"compress/gzip"
"fmt"
"strconv"

"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func (s *mockGCSSuite) TestFilenameAsterisk() {
Expand Down Expand Up @@ -131,3 +133,41 @@ func (s *mockGCSSuite) TestMultiBatchWithIgnoreLines() {
"13", "14", "15", "16", "17", "18", "19", "20",
))
}

func (s *mockGCSSuite) TestMixedCompression() {
s.tk.MustExec("DROP DATABASE IF EXISTS multi_load;")
s.tk.MustExec("CREATE DATABASE multi_load;")
s.tk.MustExec("CREATE TABLE multi_load.t (i INT PRIMARY KEY, s varchar(32));")

// gzip content
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
_, err := w.Write([]byte("1\ttest1\n" +
"2\ttest2"))
require.NoError(s.T(), err)
err = w.Close()
require.NoError(s.T(), err)

s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-multi-load",
Name: "compress.001.tsv.gz",
},
Content: buf.Bytes(),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-multi-load",
Name: "compress.002.tsv",
},
Content: []byte("3\ttest3\n" +
"4\ttest4"),
})

sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-multi-load/compress.*?endpoint=%s'
INTO TABLE multi_load.t WITH thread=1;`, gcsEndpoint)
s.tk.MustExec(sql)
s.tk.MustQuery("SELECT * FROM multi_load.t;").Check(testkit.Rows(
"1 test1", "2 test2", "3 test3", "4 test4",
))
}
2 changes: 2 additions & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//autoid_service",
"//br/pkg/lightning/mydump",
"//br/pkg/storage",
"//config",
"//ddl",
"//domain",
Expand Down
14 changes: 11 additions & 3 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/errno"
Expand Down Expand Up @@ -1580,8 +1582,13 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataWorker *execut
if loadDataWorker == nil {
return errors.New("load data info is empty")
}

err := cc.writeReq(ctx, loadDataWorker.GetInfilePath())
infile := loadDataWorker.GetInfilePath()
compressTp := mydump.ParseCompressionOnFileExtension(infile)
compressTp2, err := mydump.ToStorageCompressType(compressTp)
if err != nil {
return err
}
err = cc.writeReq(ctx, infile)
if err != nil {
return err
}
Expand Down Expand Up @@ -1628,7 +1635,8 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataWorker *execut
ctx = kv.WithInternalSourceType(ctx, kv.InternalLoadData)
_, err = loadDataWorker.Load(ctx, []importer.LoadDataReaderInfo{{
Opener: func(_ context.Context) (io.ReadSeekCloser, error) {
return executor.NewSimpleSeekerOnReadCloser(r), nil
addedSeekReader := executor.NewSimpleSeekerOnReadCloser(r)
return storage.InterceptDecompressReader(addedSeekReader, compressTp2)
}}})
_ = r.Close()
wg.Wait()
Expand Down

0 comments on commit 2bf6eea

Please sign in to comment.