Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add returning keyword plan and executor for supporting PG compliant DML #68

Closed
wants to merge 10 commits into from
32 changes: 31 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ type recordSet struct {
stmt *ExecStmt
lastErr error
txnStartTS uint64
rows []chunk.Row
}

func (a *recordSet) Rows() []chunk.Row {
return a.rows
}

func (a *recordSet) Fields() []*ast.ResultField {
Expand Down Expand Up @@ -363,8 +368,29 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
return a.handlePessimisticSelectForUpdate(ctx, e)
}

// Do returning
var returningRS *recordSet
switch e.(type) {
case *DeleteExec:
if del, ok := e.(*DeleteExec); ok && del.returning != nil {
err = del.returning.Next(ctx, nil)
if err != nil {
return nil, err
}

if ret, ok := del.returning.(*ReturningExec); ok {
returningRS = ret.ResultSet
returningRS.stmt = a // to fix executor.(*recordSet).Fields panic issue
}
}
}

if handled, result, err := a.handleNoDelay(ctx, e, isPessimistic); handled {
return result, err
if returningRS != nil {
return returningRS, err
} else {
return result, err
}
}

var txnStartTS uint64
Expand Down Expand Up @@ -439,6 +465,10 @@ type chunkRowRecordSet struct {
execStmt *ExecStmt
}

func (c *chunkRowRecordSet) Rows() []chunk.Row {
return c.rows
}

func (c *chunkRowRecordSet) Fields() []*ast.ResultField {
return c.fields
}
Expand Down
23 changes: 23 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildAdminShowTelemetry(v)
case *plannercore.AdminResetTelemetryID:
return b.buildAdminResetTelemetryID(v)
case *plannercore.PhysicalReturning:
return b.buildReturning(v)
default:
if mp, ok := p.(MockPhysicalPlan); ok {
return mp.GetExecutor()
Expand Down Expand Up @@ -1721,6 +1723,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
}
b.snapshotTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selExec := b.build(v.SelectPlan)

if b.err != nil {
return nil
}
Expand All @@ -1732,6 +1735,11 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
IsMultiTable: v.IsMultiTable,
tblColPosInfos: v.TblColPosInfos,
}

if v.ReturningPlan != nil {
deleteExec.returning = b.build(v.ReturningPlan)
}

return deleteExec
}

Expand Down Expand Up @@ -3144,3 +3152,18 @@ func (b *executorBuilder) buildAdminShowTelemetry(v *plannercore.AdminShowTeleme
func (b *executorBuilder) buildAdminResetTelemetryID(v *plannercore.AdminResetTelemetryID) Executor {
return &AdminResetTelemetryIDExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID())}
}

func (b *executorBuilder) buildReturning(v *plannercore.PhysicalReturning) Executor {
childExec := b.build(v.Children()[0])
if b.err != nil {
return nil
}

returningExec := ReturningExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec),
schema: v.Schema(),
}
executorCounterSortExec.Inc()

return &returningExec
}
16 changes: 15 additions & 1 deletion executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type DeleteExec struct {
// the columns ordinals is present in ordinal range format, @see plannercore.TblColPosInfos
tblColPosInfos plannercore.TblColPosInfoSlice
memTracker *memory.Tracker

returning Executor
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -197,7 +199,19 @@ func (e *DeleteExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

return e.children[0].Open(ctx)
err := e.children[0].Open(ctx)
if err != nil {
return err
}

if e.returning != nil {
err = e.returning.Open(ctx)
if err != nil {
return err
}
}

return nil
}

// tableRowMapType is a map for unique (Table, Row) pair. key is the tableID.
Expand Down
98 changes: 98 additions & 0 deletions executor/returning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2021 Digital China Group Co.,Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"
"runtime/trace"
"time"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
)

type ReturningExec struct {
baseExecutor

Idx int
fetched bool
schema *expression.Schema

ResultSet *recordSet
}

func (e *ReturningExec) Open(ctx context.Context) error {

return e.children[0].Open(ctx)
}

func (e *ReturningExec) Next(ctx context.Context, req *chunk.Chunk) error {
return e.fetchRowChunks(ctx)
}

func (e *ReturningExec) Close() error {
return e.children[0].Close()
}

func (e *ReturningExec) fetchRowChunks(ctx context.Context) error {
defer func() {
e.fetched = true
}()

var stmtDetail *execdetails.StmtExecDetails
stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey)
if stmtDetailRaw != nil {
stmtDetail = stmtDetailRaw.(*execdetails.StmtExecDetails)
}

rs := &recordSet{
executor: e.base().children[0],
}

rs.rows = make([]chunk.Row, 0, 1024)

for {
req := rs.NewChunk()
// Here server.tidbResultSet implements Next method.
err := rs.Next(ctx, req)
if err != nil {
return err
}

rowCount := req.NumRows()
if rowCount == 0 {
break
}
start := time.Now()
reg := trace.StartRegion(ctx, "ProcessReturning")

for i := 0; i < rowCount; i++ {
row := req.GetRow(i)

row.IsNull(0)

rs.rows = append(rs.rows, row)
}

if stmtDetail != nil {
stmtDetail.WriteSQLRespDuration += time.Since(start)
}
reg.End()
}

e.ResultSet = rs

return nil
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/pingcap/tidb
require (
cloud.google.com/go v0.51.0 // indirect
github.com/BurntSushi/toml v0.3.1
github.com/DigitalChinaOpenSource/DCParser v0.0.0-20210628023842-8b1d530491a5
github.com/DigitalChinaOpenSource/DCParser v0.0.0-20210825054110-e3314361a5f0
github.com/DigitalChinaOpenSource/dcbr v4.0.11+incompatible
github.com/Jeffail/gabs/v2 v2.5.1
github.com/aws/aws-sdk-go v1.30.24 // indirect
Expand All @@ -30,6 +30,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334
github.com/jackc/pgio v1.0.0
github.com/jackc/pgproto3 v1.1.0
github.com/jackc/pgproto3/v2 v2.0.7
github.com/jackc/pgtype v1.6.2
github.com/klauspost/cpuid v1.2.1
Expand All @@ -53,6 +54,7 @@ require (
github.com/pingcap/sysutil v0.0.0-20201130064824-f0c8aa6a6966
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DigitalChinaOpenSource/DCParser v0.0.0-20210628023842-8b1d530491a5 h1:E8nid1A2Qc/S7paFlGgPw3YBliLVNI0PxTOffdShCk8=
github.com/DigitalChinaOpenSource/DCParser v0.0.0-20210628023842-8b1d530491a5/go.mod h1:3n404SkCabBM0pYRgCyXEKttOLdrmMyBaWMGvaRs/ns=
github.com/DigitalChinaOpenSource/DCParser v0.0.0-20210825054110-e3314361a5f0 h1:aEmZRULTRVMA8bsu0XiiJslCmuhVwef92nLK+2XZqcM=
github.com/DigitalChinaOpenSource/DCParser v0.0.0-20210825054110-e3314361a5f0/go.mod h1:3n404SkCabBM0pYRgCyXEKttOLdrmMyBaWMGvaRs/ns=
github.com/DigitalChinaOpenSource/dcbr v4.0.11+incompatible h1:ICQTYSnJC7i3Ns4xXGS5DMi1wTfg44woW4eWQi2ZNTQ=
github.com/DigitalChinaOpenSource/dcbr v4.0.11+incompatible/go.mod h1:sIm2Eayjb84BFeHrWkkA+4Zdt+4CRSs1HhrlosPwRNk=
github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk=
Expand Down
2 changes: 2 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,8 @@ type Delete struct {

SelectPlan PhysicalPlan

ReturningPlan PhysicalPlan

TblColPosInfos TblColPosInfoSlice
}

Expand Down
9 changes: 9 additions & 0 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2109,3 +2109,12 @@ func (p *LogicalMaxOneRow) exhaustPhysicalPlans(prop *property.PhysicalProperty)
mor := PhysicalMaxOneRow{}.Init(p.ctx, p.stats, p.blockOffset, &property.PhysicalProperty{ExpectedCnt: 2})
return []PhysicalPlan{mor}, true
}

func (p *LogicalReturning) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) {
if !prop.IsEmpty() {
return nil, true
}

returning := PhysicalReturning{}.Init(p.ctx, p.stats, p.blockOffset, &property.PhysicalProperty{ExpectedCnt: 2})
return []PhysicalPlan{returning}, true
}
12 changes: 12 additions & 0 deletions planner/core/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,18 @@ func (p PointGetPlan) Init(ctx sessionctx.Context, stats *property.StatsInfo, of
return &p
}

func (p LogicalReturning) Init(ctx sessionctx.Context, offset int) *LogicalReturning {
p.baseLogicalPlan = newBaseLogicalPlan(ctx, plancodec.TypeReturning, &p, offset)
return &p
}

func (p PhysicalReturning) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalReturning {
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeReturning, &p, offset)
p.childrenReqProps = props
p.stats = stats
return &p
}

func flattenTreePlan(plan PhysicalPlan, plans []PhysicalPlan) []PhysicalPlan {
plans = append(plans, plan)
for _, child := range plan.Children() {
Expand Down
34 changes: 33 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"time"
"unicode"

"github.com/DigitalChinaOpenSource/DCParser"
parser "github.com/DigitalChinaOpenSource/DCParser"
"github.com/DigitalChinaOpenSource/DCParser/ast"
"github.com/DigitalChinaOpenSource/DCParser/format"
"github.com/DigitalChinaOpenSource/DCParser/model"
Expand Down Expand Up @@ -4378,6 +4378,16 @@ func (b *PlanBuilder) buildDelete(ctx context.Context, delete *ast.DeleteStmt) (
tblID2table[id], _ = b.is.TableByID(id)
}
del.TblColPosInfos, err = buildColumns2Handle(del.names, tblID2Handle, tblID2table, false)

if delete.Returning != nil {
retPlan, err := b.buildReturning(ctx, delete, delete.Returning)
if err != nil {
return nil, err
}

del.ReturningPlan, _, err = DoOptimize(ctx, b.ctx, b.optFlag, retPlan)
}

return del, err
}

Expand Down Expand Up @@ -5164,3 +5174,25 @@ func containDifferentJoinTypes(preferJoinType uint) bool {
}
return cnt > 1
}

func (b *PlanBuilder) buildReturning(ctx context.Context, delete *ast.DeleteStmt, returning *ast.ReturningClause) (p LogicalPlan, err error) {

p, err = b.buildTableRefsWithCache(ctx, delete.TableRefs)
if err != nil {
return nil, err
}

returning.Fields.Fields, err = b.unfoldWildStar(p, returning.Fields.Fields)
if err != nil {
return nil, err
}

var totalMap map[*ast.AggregateFuncExpr]int

p, _, err = b.buildProjection(ctx, p, returning.Fields.Fields, totalMap, nil, false, false)

ret := LogicalReturning{}.Init(b.ctx, b.getSelectOffset())
ret.SetChildren(p)

return ret, err
}
12 changes: 12 additions & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,3 +1256,15 @@ type LogicalShowDDLJobs struct {
func (p *LogicalShowDDLJobs) SetParamType(paramExprs *[]ast.ParamMarkerExpr) (err error) {
return err
}

// LogicalReturning represents returning plan.
type LogicalReturning struct {
baseLogicalPlan

Offset uint64
Count uint64
}

func (p *LogicalReturning) SetParamType(paramExprs *[]ast.ParamMarkerExpr) (err error) {
return err
}
Loading