From c57a8a209e0950a35931fc6bea585d2d97adec40 Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Wed, 22 Dec 2021 10:50:35 +0800 Subject: [PATCH] refactor --- executor/load_data.go | 113 ++++++++---------------------------------- 1 file changed, 22 insertions(+), 91 deletions(-) diff --git a/executor/load_data.go b/executor/load_data.go index 0fa02e7ed7bff..36e064469d1c1 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -366,43 +366,13 @@ func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) { // getValidData returns prevData and curData that starts from starting symbol. // If the data doesn't have starting symbol, prevData is nil and curData is curData[len(curData)-startingLen+1:]. // If curData size less than startingLen, curData is returned directly. -func (e *LoadDataInfo) getValidData(prevData, curData []byte) ([]byte, []byte) { - startingLen := len(e.LinesInfo.Starting) - if startingLen == 0 { - return prevData, curData - } - - prevLen := len(prevData) - if prevLen > 0 { - // starting symbol in the prevData - idx := strings.Index(string(hack.String(prevData)), e.LinesInfo.Starting) - if idx != -1 { - return prevData[idx:], curData - } - - // starting symbol in the middle of prevData and curData - restStart := curData - if len(curData) >= startingLen { - restStart = curData[:startingLen-1] - } - prevData = append(prevData, restStart...) - idx = strings.Index(string(hack.String(prevData)), e.LinesInfo.Starting) - if idx != -1 { - return prevData[idx:prevLen], curData - } - } - - // starting symbol in the curData +func (e *LoadDataInfo) getValidData(curData []byte) ([]byte, bool) { idx := strings.Index(string(hack.String(curData)), e.LinesInfo.Starting) - if idx != -1 { - return nil, curData[idx:] + if idx == -1 { + return curData[len(curData)-len(e.LinesInfo.Starting)+1:], false } - // no starting symbol - if len(curData) >= startingLen { - curData = curData[len(curData)-startingLen+1:] - } - return nil, curData + return curData[idx:], true } func (e *LoadDataInfo) isInQuoter(bs []byte) bool { @@ -464,7 +434,7 @@ func (e *LoadDataInfo) IndexOfTerminator(bs []byte, inQuoter bool) int { atFieldStart := true loop: for i := 0; i < len(bs); i++ { - if atFieldStart && e.FieldsInfo.Enclosed != byte(0) &&bs[i] == e.FieldsInfo.Enclosed { + if atFieldStart && e.FieldsInfo.Enclosed != byte(0) && bs[i] == e.FieldsInfo.Enclosed { inQuoter = !inQuoter atFieldStart = false continue @@ -508,66 +478,31 @@ loop: // getLine returns a line, curData, the next data start index and a bool value. // If it has starting symbol the bool is true, otherwise is false. func (e *LoadDataInfo) getLine(prevData, curData []byte, ignore bool) ([]byte, []byte, bool) { - startingLen := len(e.LinesInfo.Starting) - prevData, curData = e.getValidData(prevData, curData) - if prevData == nil && len(curData) < startingLen { - return nil, curData, false - } - inquotor := e.isInQuoter(prevData) - prevLen := len(prevData) - terminatedLen := len(e.LinesInfo.Terminated) - curStartIdx := 0 - if prevLen < startingLen { - curStartIdx = startingLen - prevLen - } - endIdx := -1 - if len(curData) >= curStartIdx { - if ignore { - endIdx = strings.Index(string(hack.String(curData[curStartIdx:])), e.LinesInfo.Terminated) - } else { - endIdx = e.IndexOfTerminator(curData[curStartIdx:], inquotor) - } - } - if endIdx == -1 { - // no terminated symbol - if len(prevData) == 0 { - return nil, curData, true - } - - // terminated symbol in the middle of prevData and curData + if prevData != nil { curData = append(prevData, curData...) - if ignore { - endIdx = strings.Index(string(hack.String(curData[startingLen:])), e.LinesInfo.Terminated) - } else { - endIdx = e.IndexOfTerminator(curData[startingLen:], false) + } + startLen := len(e.LinesInfo.Starting) + if startLen != 0 { + if len(curData) < startLen { + return nil, curData, false } - if endIdx != -1 { - nextDataIdx := startingLen + endIdx + terminatedLen - return curData[startingLen : startingLen+endIdx], curData[nextDataIdx:], true + var ok bool + curData, ok = e.getValidData(curData) + if !ok { + return nil, curData, false } - // no terminated symbol - return nil, curData, true } - - // terminated symbol in the curData - nextDataIdx := curStartIdx + endIdx + terminatedLen - if len(prevData) == 0 { - return curData[curStartIdx : curStartIdx+endIdx], curData[nextDataIdx:], true - } - - // terminated symbol in the curData - prevData = append(prevData, curData[:nextDataIdx]...) + endIdx := -1 if ignore { - endIdx = strings.Index(string(hack.String(prevData[startingLen:])), e.LinesInfo.Terminated) + endIdx = strings.Index(string(hack.String(curData[startLen:])), e.LinesInfo.Terminated) } else { - endIdx = e.IndexOfTerminator(prevData[startingLen:], false) + endIdx = e.IndexOfTerminator(curData[startLen:], false) } - if endIdx >= prevLen { - return prevData[startingLen : startingLen+endIdx], curData[nextDataIdx:], true + + if endIdx == -1 { + return nil, curData, true } - // terminated symbol in the middle of prevData and curData - lineLen := startingLen + endIdx + terminatedLen defer func() { r := recover() if r != nil { @@ -578,10 +513,6 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte, ignore bool) ([]byte, [ zap.String("Line Starting", e.LinesInfo.Starting), zap.String("Line Terminated", e.LinesInfo.Terminated), zap.Int("endIdx", endIdx), - zap.Int("prevLen", prevLen), - zap.Int("lineLen", lineLen), - zap.Int("nextDataIdx", nextDataIdx), - zap.Bool("inquotor", inquotor), zap.Bool("ignore", ignore), zap.Bool("prevData-contains-00bytes", bytes.Contains(prevData, []byte{e.FieldsInfo.Enclosed})), zap.Bool("curData-contains-00bytes", bytes.Contains(curData, []byte{e.FieldsInfo.Enclosed})), @@ -589,7 +520,7 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte, ignore bool) ([]byte, [ panic(r) } }() - return prevData[startingLen : startingLen+endIdx], curData[lineLen-prevLen:], true + return curData[startLen : startLen+endIdx], curData[startLen+endIdx+len(e.LinesInfo.Terminated):], true } // InsertData inserts data into specified table according to the specified format.