-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
util: support spill offset into disk when spilling #34212
Changes from 7 commits
bfd34ab
9118c71
95f9735
0bb0cc5
593d888
1d0b30b
a5f296b
1cedc90
b636337
e4564f7
9239cf1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,6 @@ import ( | |
"io" | ||
"os" | ||
"strconv" | ||
"sync" | ||
|
||
errors2 "github.com/pingcap/errors" | ||
"github.com/pingcap/tidb/config" | ||
|
@@ -32,18 +31,22 @@ import ( | |
|
||
// ListInDisk represents a slice of chunks storing in temporary disk. | ||
type ListInDisk struct { | ||
fieldTypes []*types.FieldType | ||
// offsets stores the offsets in disk of all RowPtr, | ||
// the offset of one RowPtr is offsets[RowPtr.ChkIdx][RowPtr.RowIdx]. | ||
offsets [][]int64 | ||
// offWrite is the current offset for writing. | ||
offWrite int64 | ||
fieldTypes []*types.FieldType | ||
numRowForEachChunk []int | ||
numRowPrefixSum []int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment for this |
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. useless empty line |
||
disk *os.File | ||
w io.WriteCloser | ||
bufFlushMutex sync.RWMutex | ||
diskTracker *disk.Tracker // track disk usage. | ||
numRowsInDisk int | ||
diskTracker *disk.Tracker // track disk usage. | ||
|
||
dataFile tempFileWithIOWrapper | ||
offsetFile tempFileWithIOWrapper | ||
} | ||
|
||
type tempFileWithIOWrapper struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. diskFileReaderWriter ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment for this definition |
||
disk *os.File | ||
w io.WriteCloser | ||
// offWrite is the current offset for writing. | ||
offWrite int64 | ||
|
||
checksumWriter *checksum.Writer | ||
cipherWriter *encrypt.Writer | ||
|
@@ -52,7 +55,49 @@ type ListInDisk struct { | |
ctrCipher *encrypt.CtrCipher | ||
} | ||
|
||
func (l *tempFileWithIOWrapper) initWithFileName(fileName string) (err error) { | ||
l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, fileName) | ||
if err != nil { | ||
return errors2.Trace(err) | ||
} | ||
var underlying io.WriteCloser = l.disk | ||
if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext { | ||
// The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr" | ||
l.ctrCipher, err = encrypt.NewCtrCipher() | ||
if err != nil { | ||
return | ||
} | ||
l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher) | ||
underlying = l.cipherWriter | ||
} | ||
l.checksumWriter = checksum.NewWriter(underlying) | ||
l.w = l.checksumWriter | ||
return | ||
} | ||
|
||
func (l *tempFileWithIOWrapper) getReader() io.ReaderAt { | ||
var underlying io.ReaderAt = l.disk | ||
if l.ctrCipher != nil { | ||
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) | ||
} | ||
if l.checksumWriter != nil { | ||
underlying = NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) | ||
} | ||
return underlying | ||
} | ||
|
||
func (l *tempFileWithIOWrapper) getSelectionReader(off int64) *io.SectionReader { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
checksumReader := l.getReader() | ||
r := io.NewSectionReader(checksumReader, off, l.offWrite-off) | ||
return r | ||
} | ||
|
||
func (l *tempFileWithIOWrapper) getWriter() io.Writer { | ||
return l.w | ||
} | ||
|
||
var defaultChunkListInDiskPath = "chunk.ListInDisk" | ||
var defaultChunkListInDiskOffsetPath = "chunk.ListInDiskOffset" | ||
|
||
// NewListInDisk creates a new ListInDisk with field types. | ||
func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk { | ||
|
@@ -69,23 +114,11 @@ func (l *ListInDisk) initDiskFile() (err error) { | |
if err != nil { | ||
return | ||
} | ||
l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, defaultChunkListInDiskPath+strconv.Itoa(l.diskTracker.Label())) | ||
err = l.dataFile.initWithFileName(defaultChunkListInDiskPath + strconv.Itoa(l.diskTracker.Label())) | ||
if err != nil { | ||
return errors2.Trace(err) | ||
} | ||
var underlying io.WriteCloser = l.disk | ||
if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext { | ||
// The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr" | ||
l.ctrCipher, err = encrypt.NewCtrCipher() | ||
if err != nil { | ||
return | ||
} | ||
l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher) | ||
underlying = l.cipherWriter | ||
return | ||
} | ||
l.checksumWriter = checksum.NewWriter(underlying) | ||
l.w = l.checksumWriter | ||
l.bufFlushMutex = sync.RWMutex{} | ||
err = l.offsetFile.initWithFileName(defaultChunkListInDiskOffsetPath + strconv.Itoa(l.diskTracker.Label())) | ||
return | ||
} | ||
|
||
|
@@ -101,34 +134,45 @@ func (l *ListInDisk) GetDiskTracker() *disk.Tracker { | |
|
||
// Add adds a chunk to the ListInDisk. Caller must make sure the input chk | ||
// is not empty and not used any more and has the same field types. | ||
// Warning: do not mix Add and GetRow (always use GetRow after you have added all the chunks), and do not use Add concurrently. | ||
// Warning: Do not use Add concurrently. | ||
func (l *ListInDisk) Add(chk *Chunk) (err error) { | ||
if chk.NumRows() == 0 { | ||
return errors2.New("chunk appended to List should have at least 1 row") | ||
} | ||
if l.disk == nil { | ||
if l.dataFile.disk == nil { | ||
err = l.initDiskFile() | ||
if err != nil { | ||
return | ||
} | ||
} | ||
chk2 := chunkInDisk{Chunk: chk, offWrite: l.offWrite} | ||
n, err := chk2.WriteTo(l.w) | ||
l.offWrite += n | ||
// Append data | ||
chkInDisk := chunkInDisk{Chunk: chk, offWrite: l.dataFile.offWrite} | ||
n, err := chkInDisk.WriteTo(l.dataFile.getWriter()) | ||
l.dataFile.offWrite += n | ||
if err != nil { | ||
return | ||
} | ||
|
||
// Append offsets | ||
offsetsOfRows := chkInDisk.getOffsetsOfRows() | ||
l.numRowForEachChunk = append(l.numRowForEachChunk, len(offsetsOfRows)) | ||
l.numRowPrefixSum = append(l.numRowPrefixSum, l.numRowsInDisk) | ||
n2, err := offsetsOfRows.WriteTo(l.offsetFile.getWriter()) | ||
l.offsetFile.offWrite += n2 | ||
if err != nil { | ||
return | ||
} | ||
l.offsets = append(l.offsets, chk2.getOffsetsOfRows()) | ||
l.diskTracker.Consume(n) | ||
|
||
l.diskTracker.Consume(n + n2) | ||
l.numRowsInDisk += chk.NumRows() | ||
return | ||
} | ||
|
||
// GetChunk gets a Chunk from the ListInDisk by chkIdx. | ||
func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) { | ||
chk := NewChunkWithCapacity(l.fieldTypes, l.NumRowsOfChunk(chkIdx)) | ||
offsets := l.offsets[chkIdx] | ||
for rowIdx := range offsets { | ||
chkSize := l.numRowForEachChunk[chkIdx] | ||
for rowIdx := 0; rowIdx < chkSize; rowIdx++ { | ||
row, err := l.GetRow(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) | ||
if err != nil { | ||
return chk, err | ||
|
@@ -140,16 +184,11 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) { | |
|
||
// GetRow gets a Row from the ListInDisk by RowPtr. | ||
func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { | ||
off, err := l.getOffset(ptr.ChkIdx, ptr.RowIdx) | ||
if err != nil { | ||
return | ||
} | ||
off := l.offsets[ptr.ChkIdx][ptr.RowIdx] | ||
var underlying io.ReaderAt = l.disk | ||
if l.ctrCipher != nil { | ||
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) | ||
} | ||
checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) | ||
r := io.NewSectionReader(checksumReader, off, l.offWrite-off) | ||
r := l.dataFile.getSelectionReader(off) | ||
format := rowInDisk{numCol: len(l.fieldTypes)} | ||
_, err = format.ReadFrom(r) | ||
if err != nil { | ||
|
@@ -159,22 +198,40 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { | |
return row, err | ||
} | ||
|
||
func (l *ListInDisk) getOffset(chkIdx uint32, rowIdx uint32) (int64, error) { | ||
offsetInOffsetFile := l.numRowPrefixSum[chkIdx] + int(rowIdx) | ||
b := make([]byte, 8) | ||
reader := l.offsetFile.getSelectionReader(int64(offsetInOffsetFile) * 8) | ||
n, err := io.ReadFull(reader, b) | ||
if err != nil { | ||
return 0, err | ||
} | ||
if n != 8 { | ||
return 0, errors2.New("Can not get offset from disk") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The file spilled is broken, can not get data offset from the disk. |
||
} | ||
return bytesToI64Slice(b)[0], nil | ||
} | ||
|
||
// NumRowsOfChunk returns the number of rows of a chunk in the ListInDisk. | ||
func (l *ListInDisk) NumRowsOfChunk(chkID int) int { | ||
return len(l.offsets[chkID]) | ||
return l.numRowForEachChunk[chkID] | ||
} | ||
|
||
// NumChunks returns the number of chunks in the ListInDisk. | ||
func (l *ListInDisk) NumChunks() int { | ||
return len(l.offsets) | ||
return len(l.numRowForEachChunk) | ||
} | ||
|
||
// Close releases the disk resource. | ||
func (l *ListInDisk) Close() error { | ||
if l.disk != nil { | ||
if l.dataFile.disk != nil { | ||
l.diskTracker.Consume(-l.diskTracker.BytesConsumed()) | ||
terror.Call(l.disk.Close) | ||
terror.Log(os.Remove(l.disk.Name())) | ||
terror.Call(l.dataFile.disk.Close) | ||
terror.Log(os.Remove(l.dataFile.disk.Name())) | ||
} | ||
if l.offsetFile.disk != nil { | ||
terror.Call(l.offsetFile.disk.Close) | ||
terror.Log(os.Remove(l.offsetFile.disk.Name())) | ||
} | ||
return nil | ||
} | ||
|
@@ -198,9 +255,11 @@ type chunkInDisk struct { | |
// offWrite is the current offset for writing. | ||
offWrite int64 | ||
// offsetsOfRows stores the offset of each row. | ||
offsetsOfRows []int64 | ||
offsetsOfRows offsetsOfRows | ||
} | ||
|
||
type offsetsOfRows []int64 | ||
|
||
// WriteTo serializes the chunk into the format of chunkInDisk, and | ||
// writes to w. | ||
func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) { | ||
|
@@ -222,7 +281,13 @@ func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) { | |
} | ||
|
||
// getOffsetsOfRows gets the offset of each row. | ||
func (chk *chunkInDisk) getOffsetsOfRows() []int64 { return chk.offsetsOfRows } | ||
func (chk *chunkInDisk) getOffsetsOfRows() offsetsOfRows { return chk.offsetsOfRows } | ||
|
||
// WriteTo serializes the offsetsOfRow, and writes to w. | ||
func (off offsetsOfRows) WriteTo(w io.Writer) (written int64, err error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Put this function to line line262? |
||
n, err := w.Write(i64SliceToBytes(off)) | ||
return int64(n), err | ||
} | ||
|
||
// rowInDisk represents a Row in format of diskFormatRow. | ||
type rowInDisk struct { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
numRowsOfEachChunk
rowNumOfEachChunkFirstRow
totalNumRows