Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor、server、planner: Increase the support of the returning keyword #83

Merged
merged 10 commits into from
Nov 30, 2021
Merged
Binary file added docs/delete returning.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
62 changes: 61 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ type recordSet struct {
stmt *ExecStmt
lastErr error
txnStartTS uint64
rows []chunk.Row
}

func (a *recordSet) Rows() []chunk.Row {
return a.rows
}

func (a *recordSet) Fields() []*ast.ResultField {
Expand Down Expand Up @@ -364,6 +369,9 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}

if handled, result, err := a.handleNoDelay(ctx, e, isPessimistic); handled {
//if returningRS != nil {
// return returningRS, err
//}
return result, err
}

Expand Down Expand Up @@ -439,6 +447,10 @@ type chunkRowRecordSet struct {
execStmt *ExecStmt
}

func (c *chunkRowRecordSet) Rows() []chunk.Row {
return c.rows
}

func (c *chunkRowRecordSet) Fields() []*ast.ResultField {
return c.fields
}
Expand Down Expand Up @@ -507,7 +519,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

var returningRS *recordSet
// Check if "tidb_snapshot" is set for the write executors.
// In history read mode, we can not do write operations.
switch e.(type) {
Expand All @@ -532,9 +544,57 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
if err != nil {
return nil, err
}

// Do returning
returningRS = getReturningRecordSet(ctx, e, a)

if returningRS != nil {
return returningRS, err
}
return nil, err
}

func getReturningRecordSet(ctx context.Context, e Executor, a *ExecStmt) *recordSet {
var rs *recordSet
if del, ok := e.(*DeleteExec); ok && del.returning != nil {
err := del.returning.Next(ctx, del.AffectedRows())
if err != nil {
return rs
}

if ret, ok := del.returning.(*ReturningExec); ok {
rs = ret.ResultSet
rs.stmt = a
}
}

if updt, ok := e.(*UpdateExec); ok && updt.returning != nil {
err := updt.returning.Next(ctx, updt.AffectedRows())
if err != nil {
return rs
}

if ret, ok := updt.returning.(*ReturningExec); ok {
rs = ret.ResultSet
rs.stmt = a
}
}

if insert, ok := e.(*InsertExec); ok && insert.returning != nil {
err := insert.returning.Next(ctx, insert.evalBuffer4Return.ToRow().Chunk())
if err != nil {
return rs
}

if ret, ok := insert.returning.(*ReturningExec); ok {
rs = ret.ResultSet
rs.stmt = a
rs.stmt.OutputNames = ret.ReturningFields
}
}
return rs
}

func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
sctx := a.Ctx
// Do not active the transaction here.
Expand Down
34 changes: 34 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildAdminShowTelemetry(v)
case *plannercore.AdminResetTelemetryID:
return b.buildAdminResetTelemetryID(v)
case *plannercore.PhysicalReturning:
return b.buildReturning(v)
default:
if mp, ok := p.(MockPhysicalPlan); ok {
return mp.GetExecutor()
Expand Down Expand Up @@ -723,6 +725,12 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
InsertValues: ivs,
OnDuplicate: append(v.OnDuplicate, v.GenCols.OnDuplicates...),
}

if v.ReturningPlan != nil {
insert.returning = b.build(v.ReturningPlan)
insert.returning.(*ReturningExec).ReturningFields = v.OutputNames()
}

return insert
}

Expand Down Expand Up @@ -1708,6 +1716,11 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
tblID2table: tblID2table,
tblColPosInfos: v.TblColPosInfos,
}

if v.ReturningPlan != nil {
updateExec.returning = b.build(v.ReturningPlan)
}

return updateExec
}

Expand All @@ -1721,6 +1734,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
}
b.snapshotTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selExec := b.build(v.SelectPlan)

if b.err != nil {
return nil
}
Expand All @@ -1732,6 +1746,11 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
IsMultiTable: v.IsMultiTable,
tblColPosInfos: v.TblColPosInfos,
}

if v.ReturningPlan != nil {
deleteExec.returning = b.build(v.ReturningPlan)
}

return deleteExec
}

Expand Down Expand Up @@ -3144,3 +3163,18 @@ func (b *executorBuilder) buildAdminShowTelemetry(v *plannercore.AdminShowTeleme
func (b *executorBuilder) buildAdminResetTelemetryID(v *plannercore.AdminResetTelemetryID) Executor {
return &AdminResetTelemetryIDExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID())}
}

func (b *executorBuilder) buildReturning(v *plannercore.PhysicalReturning) Executor {
childExec := b.build(v.Children()[0])
if b.err != nil {
return nil
}

returningExec := ReturningExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec),
schema: v.Schema(),
}
executorCounterSortExec.Inc()

return &returningExec
}
28 changes: 27 additions & 1 deletion executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type DeleteExec struct {
// the columns ordinals is present in ordinal range format, @see plannercore.TblColPosInfos
tblColPosInfos plannercore.TblColPosInfoSlice
memTracker *memory.Tracker
affectedRow *chunk.Chunk

returning Executor
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -112,6 +115,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
}
rowCount++
}
e.AddAffectedRows(chk)
chk = chunk.Renew(chk, e.maxChunkSize)
}

Expand Down Expand Up @@ -197,10 +201,32 @@ func (e *DeleteExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

return e.children[0].Open(ctx)
err := e.children[0].Open(ctx)
if err != nil {
return err
}

if e.returning != nil {
err = e.returning.Open(ctx)
if err != nil {
return err
}
}

return nil
}

// tableRowMapType is a map for unique (Table, Row) pair. key is the tableID.
// the key in map[int64]Row is the joined table handle, which represent a unique reference row.
// the value in map[int64]Row is the deleting row.
type tableRowMapType map[int64]map[int64][]types.Datum

// AffectedRows get affected rows.
func (e *DeleteExec) AffectedRows() *chunk.Chunk {
return e.affectedRow
}

// AddAffectedRows adds affected row.
func (e *DeleteExec) AddAffectedRows(chk *chunk.Chunk) {
e.affectedRow = chk
}
35 changes: 31 additions & 4 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ import (
// InsertExec represents an insert executor.
type InsertExec struct {
*InsertValues
OnDuplicate []*expression.Assignment
evalBuffer4Dup chunk.MutRow
curInsertVals chunk.MutRow
row4Update []types.Datum
OnDuplicate []*expression.Assignment
evalBuffer4Dup chunk.MutRow
evalBuffer4Return chunk.MutRow
curInsertVals chunk.MutRow
row4Update []types.Datum
returning Executor

Priority mysql.PriorityEnum
}
Expand Down Expand Up @@ -312,6 +314,10 @@ func (e *InsertExec) Open(ctx context.Context) error {
if !e.allAssignmentsAreConstant {
e.initEvalBuffer()
}
if e.returning != nil {
e.initEvalBuffer4Return()
}

return nil
}

Expand All @@ -338,6 +344,27 @@ func (e *InsertExec) initEvalBuffer4Dup() {
e.row4Update = make([]types.Datum, 0, len(evalBufferTypes))
}

func (e *InsertExec) initEvalBuffer4Return() {
// Use public columns for new row.
numCols := len(e.Table.Cols())
// Use writable columns for old row for update.
numWritableCols := len(e.Table.WritableCols())

evalBufferTypes := make([]*types.FieldType, 0, numCols+numWritableCols)

// Append the old row before the new row, to be consistent with "Schema4OnDuplicate" in the "Insert" PhysicalPlan.
for _, col := range e.Table.WritableCols() {
evalBufferTypes = append(evalBufferTypes, &col.FieldType)
}
for _, col := range e.Table.Cols() {
evalBufferTypes = append(evalBufferTypes, &col.FieldType)
}
if e.hasExtraHandle {
evalBufferTypes = append(evalBufferTypes, types.NewFieldType(mysql.TypeLonglong))
}
e.evalBuffer4Return = chunk.MutRowFromTypes(evalBufferTypes[numWritableCols:])
}

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle int64, oldRow []types.Datum, newRow []types.Datum,
cols []*expression.Assignment) ([]types.Datum, bool, int64, error) {
Expand Down
92 changes: 92 additions & 0 deletions executor/returning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2021 Digital China Group Co.,Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"
"github.com/pingcap/tidb/types"
"runtime/trace"
"time"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
)

// ReturningExec represents Returning Executor
type ReturningExec struct {
baseExecutor

Idx int
fetched bool
schema *expression.Schema
ReturningFields types.NameSlice
ResultSet *recordSet
}

// Open Returning Executor
func (e *ReturningExec) Open(ctx context.Context) error {

return e.children[0].Open(ctx)
}

// Next Returning Executor
func (e *ReturningExec) Next(ctx context.Context, req *chunk.Chunk) error {
return e.fetchRowChunks(ctx, req)
}

// Close Returning Executor
func (e *ReturningExec) Close() error {
return e.children[0].Close()
}

// Returning Executor fetchRowChunks
func (e *ReturningExec) fetchRowChunks(ctx context.Context, req *chunk.Chunk) error {
defer func() {
e.fetched = true
}()

var stmtDetail *execdetails.StmtExecDetails
stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey)
if stmtDetailRaw != nil {
stmtDetail = stmtDetailRaw.(*execdetails.StmtExecDetails)
}

rs := &recordSet{
executor: e.base().children[0],
}
e.ResultSet = rs
rs.NewChunk()
if req == nil {
return nil
}

rowCount := req.NumRows()
if rowCount == 0 {
return nil
}
start := time.Now()
reg := trace.StartRegion(ctx, "ProcessReturning")
iter := chunk.NewIterator4Chunk(req)
for chunkRow := iter.Begin(); chunkRow != iter.End(); chunkRow = iter.Next() {
rs.rows = append(rs.rows, chunkRow)
}
e.ResultSet = rs
if stmtDetail != nil {
stmtDetail.WriteSQLRespDuration += time.Since(start)
}
reg.End()

return nil
}
Loading