Skip to content

Commit

Permalink
util/rowcodec,tablecodec: remove stmtctx dependency from rowcodec
Browse files Browse the repository at this point in the history
… and `tablecodec` (#48816)

close #48751
  • Loading branch information
YangKeao authored Nov 23, 2023
1 parent 3543275 commit 707f860
Show file tree
Hide file tree
Showing 30 changed files with 180 additions and 146 deletions.
3 changes: 2 additions & 1 deletion pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,8 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
}
checksums := w.calcChecksums()
sctx, rd := w.sessCtx.GetSessionVars().StmtCtx, &w.sessCtx.GetSessionVars().RowEncoder
newRowVal, err := tablecodec.EncodeRow(sctx, newRow, newColumnIDs, nil, nil, rd, checksums...)
newRowVal, err := tablecodec.EncodeRow(sctx.TimeZone(), newRow, newColumnIDs, nil, nil, rd, checksums...)
err = sctx.HandleError(err)
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/errctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ go_test(
"//pkg/types",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_uber_go_multierr//:multierr",
],
)
20 changes: 20 additions & 0 deletions pkg/errctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,27 @@ func (ctx *Context) appendWarning(err error) {
}

// HandleError handles the error according to the context. See the comment of `HandleErrorWithAlias` for detailed logic.
//
// It also allows using `errors.ErrorGroup`, in this case, it'll handle each error in order, and return the first error
// it founds.
func (ctx *Context) HandleError(err error) error {
// The function of handling `errors.ErrorGroup` is placed in `HandleError` but not in `HandleErrorWithAlias`, because
// it's hard to give a proper error and warn alias for an error group.
if errs, ok := err.(errors.ErrorGroup); ok {
for _, singleErr := range errs.Errors() {
singleErr = ctx.HandleError(singleErr)
// If the one error is found, just return it.
// TODO: consider whether it's more appropriate to continue to handle other errors. For example, other errors
// may need to append warnings. The current behavior is same with TiDB original behavior before using
// `errctx` to handle multiple errors.
if singleErr != nil {
return singleErr
}
}

return nil
}

return ctx.HandleErrorWithAlias(err, err, err)
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/errctx/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/types"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
)

func TestContext(t *testing.T) {
Expand Down Expand Up @@ -50,4 +51,13 @@ func TestContext(t *testing.T) {
require.Equal(t, warn, testWarn)
// newCtx2 will return all errors
require.Equal(t, newCtx2.HandleErrorWithAlias(testInternalErr, testErr, testWarn), testErr)

// test `multierr`
testErrs := multierr.Append(testInternalErr, testErr)
require.Equal(t, ctx.HandleError(testErrs), testInternalErr)
require.Equal(t, newCtx.HandleError(testErrs), testErr)
require.Equal(t, warn, testInternalErr)

// test nil
require.Nil(t, ctx.HandleError(nil))
}
7 changes: 5 additions & 2 deletions pkg/executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ func buildMemTableReader(ctx context.Context, us *UnionScanExec, kvRanges []kv.K
if err != nil {
return nil, err
}
return tablecodec.EncodeValue(us.Ctx().GetSessionVars().StmtCtx, nil, d)
sctx := us.Ctx().GetSessionVars().StmtCtx
buf, err := tablecodec.EncodeValue(sctx.TimeZone(), nil, d)
return buf, sctx.HandleError(err)
}
cd := NewRowDecoder(us.Ctx(), us.Schema(), us.table.Meta())
rd := rowcodec.NewByteDecoder(colInfo, pkColIDs, defVal, us.Ctx().GetSessionVars().Location())
Expand Down Expand Up @@ -1164,7 +1166,8 @@ func getColIDAndPkColIDs(ctx sessionctx.Context, tbl table.Table, columns []*mod
if err != nil {
return nil, err
}
return tablecodec.EncodeValue(ctx.GetSessionVars().StmtCtx, nil, d)
buf, err := tablecodec.EncodeValue(ctx.GetSessionVars().StmtCtx.TimeZone(), nil, d)
return buf, ctx.GetSessionVars().StmtCtx.HandleError(err)
}
rd := rowcodec.NewByteDecoder(colInfos, pkColIDs, defVal, ctx.GetSessionVars().Location())
return colIDs, pkColIDs, rd
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/test/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func setColValue(t *testing.T, txn kv.Transaction, key kv.Key, v types.Datum) {
colIDs := []int64{2, 3}
sc := stmtctx.NewStmtCtxWithTimeZone(time.Local)
rd := rowcodec.Encoder{Enable: true}
value, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil, &rd)
value, err := tablecodec.EncodeRow(sc.TimeZone(), row, colIDs, nil, nil, &rd)
require.NoError(t, err)
err = txn.Set(key, value)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,15 @@ func addUnchangedKeysForLockByRow(
return count, err
}
unchangedUniqueKey, _, err := tablecodec.GenIndexKey(
stmtCtx,
stmtCtx.TimeZone(),
idx.TableMeta(),
meta,
physicalID,
ukVals,
h,
nil,
)
err = stmtCtx.HandleError(err)
if err != nil {
return count, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/handler/tests/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ func TestDecodeColumnValue(t *testing.T) {
}
rd := rowcodec.Encoder{Enable: true}
sc := stmtctx.NewStmtCtxWithTimeZone(time.UTC)
bs, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil, &rd)
bs, err := tablecodec.EncodeRow(sc.TimeZone(), row, colIDs, nil, nil, &rd)
require.NoError(t, err)
require.NotNil(t, bs)
bin := base64.StdEncoding.EncodeToString(bs)
Expand Down
4 changes: 4 additions & 0 deletions pkg/sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ func (sc *StatementContext) HandleTruncate(err error) error {

// HandleError handles the error based on `ErrCtx()`
func (sc *StatementContext) HandleError(err error) error {
intest.AssertNotNil(sc)
if sc == nil {
return err
}
errCtx := sc.ErrCtx()
return errCtx.HandleError(err)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"slices"
"sort"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -259,10 +260,15 @@ func (c *CMSketch) SubValue(h1, h2 uint64, count uint64) {
// QueryValue is used to query the count of specified value.
func QueryValue(sctx sessionctx.Context, c *CMSketch, t *TopN, val types.Datum) (uint64, error) {
var sc *stmtctx.StatementContext
tz := time.UTC
if sctx != nil {
sc = sctx.GetSessionVars().StmtCtx
tz = sc.TimeZone()
}
rawData, err := tablecodec.EncodeValue(tz, nil, val)
if sc != nil {
err = sc.HandleError(err)
}
rawData, err := tablecodec.EncodeValue(sc, nil, val)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/row_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ func (s *RowSampleBuilder) Collect() (RowSampleCollector, error) {
return nil, err
}
decodedVal.SetBytesAsString(s.Collators[i].Key(decodedVal.GetString()), decodedVal.Collation(), uint32(decodedVal.Length()))
encodedKey, err := tablecodec.EncodeValue(s.Sc, nil, decodedVal)
encodedKey, err := tablecodec.EncodeValue(s.Sc.TimeZone(), nil, decodedVal)
err = s.Sc.HandleError(err)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/statistics/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func (s SampleBuilder) CollectColumnStats() ([]*SampleCollector, *SortedBuilder,
return nil, nil, err
}
decodedVal.SetBytesAsString(s.Collators[i].Key(decodedVal.GetString()), decodedVal.Collation(), uint32(decodedVal.Length()))
encodedKey, err := tablecodec.EncodeValue(s.Sc, nil, decodedVal)
encodedKey, err := tablecodec.EncodeValue(s.Sc.TimeZone(), nil, decodedVal)
err = s.Sc.HandleError(err)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -306,7 +307,8 @@ func (c *SampleCollector) ExtractTopN(numTop uint32, sc *stmtctx.StatementContex
if err != nil {
return err
}
data, err := tablecodec.EncodeValue(sc, nil, d)
data, err := tablecodec.EncodeValue(sc.TimeZone(), nil, d)
err = sc.HandleError(err)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/mockstore/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestClusterSplit(t *testing.T) {
colValue := types.NewStringDatum(strconv.Itoa(int(handle)))
// TODO: Should use session's TimeZone instead of UTC.
rd := rowcodec.Encoder{Enable: true}
rowValue, err1 := tablecodec.EncodeRow(sc, []types.Datum{colValue}, []int64{colID}, nil, nil, &rd)
rowValue, err1 := tablecodec.EncodeRow(sc.TimeZone(), []types.Datum{colValue}, []int64{colID}, nil, nil, &rd)
require.NoError(t, err1)
txn.Set(rowKey, rowValue)

Expand Down
3 changes: 2 additions & 1 deletion pkg/store/mockstore/unistore/cophandler/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ func (e *analyzeColumnsExec) Process(key, value []byte) error {
continue
}

value, err := tablecodec.EncodeValue(e.evalCtx.sc, nil, d)
value, err := tablecodec.EncodeValue(e.evalCtx.sc.TimeZone(), nil, d)
err = e.evalCtx.sc.HandleError(err)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func prepareTestTableData(keyNumber int, tableID int64) (*data, error) {
for i := 0; i < keyNumber; i++ {
datum := types.MakeDatums(i, "abc", 10.0)
rows[int64(i)] = datum
rowEncodedData, err := tablecodec.EncodeRow(stmtCtx, datum, colIds, nil, nil, encoder)
rowEncodedData, err := tablecodec.EncodeRow(stmtCtx.TimeZone(), datum, colIds, nil, nil, encoder)
if err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/store/mockstore/unistore/tikv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv",
"//pkg/sessionctx/stmtctx",
"//pkg/store/mockstore/unistore/client",
"//pkg/store/mockstore/unistore/config",
"//pkg/store/mockstore/unistore/cophandler",
Expand Down
3 changes: 1 addition & 2 deletions pkg/store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore/config"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore/lockstore"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore/pd"
Expand Down Expand Up @@ -1001,7 +1000,7 @@ func encodeFromOldRow(oldRow, buf []byte) ([]byte, error) {
}
var encoder rowcodec.Encoder
buf = buf[:0]
return encoder.Encode(stmtctx.NewStmtCtx(), colIDs, datums, buf)
return encoder.Encode(time.UTC, colIDs, datums, buf)
}

func (store *MVCCStore) buildPrewriteLock(reqCtx *requestCtx, m *kvrpcpb.Mutation, item *badger.Item,
Expand Down
11 changes: 8 additions & 3 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ func (c *index) GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types.
if c.idxInfo.Global {
idxTblID = c.tblInfo.ID
}
return tablecodec.GenIndexKey(sc, c.tblInfo, c.idxInfo, idxTblID, indexedValues, h, buf)
key, distinct, err = tablecodec.GenIndexKey(sc.TimeZone(), c.tblInfo, c.idxInfo, idxTblID, indexedValues, h, buf)
err = sc.HandleError(err)
return
}

// GenIndexValue generates the index value.
Expand All @@ -102,7 +104,9 @@ func (c *index) GenIndexValue(sc *stmtctx.StatementContext, distinct bool, index
c.initNeedRestoreData.Do(func() {
c.needRestoredData = NeedRestoredData(c.idxInfo.Columns, c.tblInfo.Columns)
})
return tablecodec.GenIndexValuePortal(sc, c.tblInfo, c.idxInfo, c.needRestoredData, distinct, false, indexedValues, h, c.phyTblID, restoredData, buf)
idx, err := tablecodec.GenIndexValuePortal(sc.TimeZone(), c.tblInfo, c.idxInfo, c.needRestoredData, distinct, false, indexedValues, h, c.phyTblID, restoredData, buf)
err = sc.HandleError(err)
return idx, err
}

// getIndexedValue will produce the result like:
Expand Down Expand Up @@ -233,8 +237,9 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
c.initNeedRestoreData.Do(func() {
c.needRestoredData = NeedRestoredData(c.idxInfo.Columns, c.tblInfo.Columns)
})
idxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx, c.tblInfo, c.idxInfo,
idxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx.TimeZone(), c.tblInfo, c.idxInfo,
c.needRestoredData, distinct, opt.Untouched, value, h, c.phyTblID, handleRestoreData, nil)
err = sctx.GetSessionVars().StmtCtx.HandleError(err)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/table/tables/mutation_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestCheckRowInsertionConsistency(t *testing.T) {
// mocked data
mockRowKey233 := tablecodec.EncodeRowKeyWithHandle(1, kv.IntHandle(233))
mockValue233, err := tablecodec.EncodeRow(
sessVars.StmtCtx, []types.Datum{types.NewIntDatum(233)}, []int64{101}, nil, nil, &rd,
sessVars.StmtCtx.TimeZone(), []types.Datum{types.NewIntDatum(233)}, []int64{101}, nil, nil, &rd,
)
require.Nil(t, err)
fakeRowInsertion := mutation{key: []byte{1, 1}, value: []byte{1, 1, 1}}
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) {
// test checkHandleConsistency
rowKey := tablecodec.EncodeRowKeyWithHandle(table.tableID, handle)
corruptedRowKey := tablecodec.EncodeRowKeyWithHandle(table.tableID, corruptedHandle)
rowValue, err := tablecodec.EncodeRow(sessVars.StmtCtx, rowToInsert, []int64{1, 2}, nil, nil, &rd)
rowValue, err := tablecodec.EncodeRow(sessVars.StmtCtx.TimeZone(), rowToInsert, []int64{1, 2}, nil, nil, &rd)
require.Nil(t, err)
rowMutation := mutation{key: rowKey, value: rowValue}
corruptedRowMutation := mutation{key: corruptedRowKey, value: rowValue}
Expand All @@ -327,14 +327,14 @@ func buildIndexKeyValue(index table.Index, rowToInsert []types.Datum, sessVars *
return nil, nil, err
}
key, distinct, err := tablecodec.GenIndexKey(
sessVars.StmtCtx, &tableInfo, indexInfo, 1, indexedValues, handle, nil,
sessVars.StmtCtx.TimeZone(), &tableInfo, indexInfo, 1, indexedValues, handle, nil,
)
if err != nil {
return nil, nil, err
}
rsData := TryGetHandleRestoredDataWrapper(table.meta, rowToInsert, nil, indexInfo)
value, err := tablecodec.GenIndexValuePortal(
sessVars.StmtCtx, &tableInfo, indexInfo, NeedRestoredData(indexInfo.Columns, tableInfo.Columns),
sessVars.StmtCtx.TimeZone(), &tableInfo, indexInfo, NeedRestoredData(indexInfo.Columns, tableInfo.Columns),
distinct, false, indexedValues, handle, 0, rsData, nil,
)
if err != nil {
Expand Down
21 changes: 14 additions & 7 deletions pkg/table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,8 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context,
key := t.RecordKey(h)
sc, rd := sessVars.StmtCtx, &sessVars.RowEncoder
checksums, writeBufs.RowValBuf = t.calcChecksums(sctx, h, checksumData, writeBufs.RowValBuf)
writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc, row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd, checksums...)
writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc.TimeZone(), row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd, checksums...)
err = sc.HandleError(err)
if err != nil {
return err
}
Expand Down Expand Up @@ -988,7 +989,8 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
zap.Stringer("key", key))
sc, rd := sessVars.StmtCtx, &sessVars.RowEncoder
checksums, writeBufs.RowValBuf = t.calcChecksums(sctx, recordID, checksumData, writeBufs.RowValBuf)
writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc, row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd, checksums...)
writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc.TimeZone(), row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd, checksums...)
err = sc.HandleError(err)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1395,7 +1397,8 @@ func (t *TableCommon) addInsertBinlog(ctx sessionctx.Context, h kv.Handle, row [
if err != nil {
return err
}
value, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx, row, colIDs, nil, nil)
value, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), row, colIDs, nil, nil)
err = ctx.GetSessionVars().StmtCtx.HandleError(err)
if err != nil {
return err
}
Expand All @@ -1406,11 +1409,13 @@ func (t *TableCommon) addInsertBinlog(ctx sessionctx.Context, h kv.Handle, row [
}

func (t *TableCommon) addUpdateBinlog(ctx sessionctx.Context, oldRow, newRow []types.Datum, colIDs []int64) error {
old, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx, oldRow, colIDs, nil, nil)
old, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), oldRow, colIDs, nil, nil)
err = ctx.GetSessionVars().StmtCtx.HandleError(err)
if err != nil {
return err
}
newVal, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx, newRow, colIDs, nil, nil)
newVal, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), newRow, colIDs, nil, nil)
err = ctx.GetSessionVars().StmtCtx.HandleError(err)
if err != nil {
return err
}
Expand All @@ -1422,7 +1427,8 @@ func (t *TableCommon) addUpdateBinlog(ctx sessionctx.Context, oldRow, newRow []t
}

func (t *TableCommon) addDeleteBinlog(ctx sessionctx.Context, r []types.Datum, colIDs []int64) error {
data, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx, r, colIDs, nil, nil)
data, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), r, colIDs, nil, nil)
err = ctx.GetSessionVars().StmtCtx.HandleError(err)
if err != nil {
return err
}
Expand Down Expand Up @@ -2316,7 +2322,8 @@ func SetPBColumnsDefaultValue(ctx sessionctx.Context, pbColumns []*tipb.ColumnIn
return err
}

pbColumns[i].DefaultVal, err = tablecodec.EncodeValue(sessVars.StmtCtx, nil, d)
pbColumns[i].DefaultVal, err = tablecodec.EncodeValue(sessVars.StmtCtx.TimeZone(), nil, d)
err = sessVars.StmtCtx.HandleError(err)
if err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/tablecodec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/tablecodec",
visibility = ["//visibility:public"],
deps = [
"//pkg/errctx",
"//pkg/errno",
"//pkg/kv",
"//pkg/parser/charset",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/sessionctx/stmtctx",
"//pkg/structure",
"//pkg/types",
"//pkg/util/codec",
Expand Down
Loading

0 comments on commit 707f860

Please sign in to comment.