Skip to content

Commit

Permalink
Add returning keyword plan and executor for supporting PG compliant D…
Browse files Browse the repository at this point in the history
…ML (#69)

* Support return result set after deletion

Signed-off-by: jk <51103574+pupillord@users.noreply.github.com>

* add returning plan and exectuor
modify adapter.go to execute returning separately

Signed-off-by: jk <51103574+pupillord@users.noreply.github.com>

Co-authored-by: RogueJin <21214103+RogueJin@users.noreply.github.com>
  • Loading branch information
pupillord and RogueJin authored Sep 16, 2021
1 parent f38ed9e commit 47c8b2e
Show file tree
Hide file tree
Showing 21 changed files with 577 additions and 12 deletions.
32 changes: 31 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ type recordSet struct {
stmt *ExecStmt
lastErr error
txnStartTS uint64
rows []chunk.Row
}

func (a *recordSet) Rows() []chunk.Row {
return a.rows
}

func (a *recordSet) Fields() []*ast.ResultField {
Expand Down Expand Up @@ -363,8 +368,29 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
return a.handlePessimisticSelectForUpdate(ctx, e)
}

// Do returning
var returningRS *recordSet
switch e.(type) {
case *DeleteExec:
if del, ok := e.(*DeleteExec); ok && del.returning != nil {
err = del.returning.Next(ctx, nil)
if err != nil {
return nil, err
}

if ret, ok := del.returning.(*ReturningExec); ok {
returningRS = ret.ResultSet
returningRS.stmt = a // to fix executor.(*recordSet).Fields panic issue
}
}
}

if handled, result, err := a.handleNoDelay(ctx, e, isPessimistic); handled {
return result, err
if returningRS != nil {
return returningRS, err
} else {
return result, err
}
}

var txnStartTS uint64
Expand Down Expand Up @@ -439,6 +465,10 @@ type chunkRowRecordSet struct {
execStmt *ExecStmt
}

func (c *chunkRowRecordSet) Rows() []chunk.Row {
return c.rows
}

func (c *chunkRowRecordSet) Fields() []*ast.ResultField {
return c.fields
}
Expand Down
23 changes: 23 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildAdminShowTelemetry(v)
case *plannercore.AdminResetTelemetryID:
return b.buildAdminResetTelemetryID(v)
case *plannercore.PhysicalReturning:
return b.buildReturning(v)
default:
if mp, ok := p.(MockPhysicalPlan); ok {
return mp.GetExecutor()
Expand Down Expand Up @@ -1721,6 +1723,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
}
b.snapshotTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selExec := b.build(v.SelectPlan)

if b.err != nil {
return nil
}
Expand All @@ -1732,6 +1735,11 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
IsMultiTable: v.IsMultiTable,
tblColPosInfos: v.TblColPosInfos,
}

if v.ReturningPlan != nil {
deleteExec.returning = b.build(v.ReturningPlan)
}

return deleteExec
}

Expand Down Expand Up @@ -3144,3 +3152,18 @@ func (b *executorBuilder) buildAdminShowTelemetry(v *plannercore.AdminShowTeleme
func (b *executorBuilder) buildAdminResetTelemetryID(v *plannercore.AdminResetTelemetryID) Executor {
return &AdminResetTelemetryIDExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID())}
}

func (b *executorBuilder) buildReturning(v *plannercore.PhysicalReturning) Executor {
childExec := b.build(v.Children()[0])
if b.err != nil {
return nil
}

returningExec := ReturningExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec),
schema: v.Schema(),
}
executorCounterSortExec.Inc()

return &returningExec
}
16 changes: 15 additions & 1 deletion executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type DeleteExec struct {
// the columns ordinals is present in ordinal range format, @see plannercore.TblColPosInfos
tblColPosInfos plannercore.TblColPosInfoSlice
memTracker *memory.Tracker

returning Executor
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -197,7 +199,19 @@ func (e *DeleteExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

return e.children[0].Open(ctx)
err := e.children[0].Open(ctx)
if err != nil {
return err
}

if e.returning != nil {
err = e.returning.Open(ctx)
if err != nil {
return err
}
}

return nil
}

// tableRowMapType is a map for unique (Table, Row) pair. key is the tableID.
Expand Down
98 changes: 98 additions & 0 deletions executor/returning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2021 Digital China Group Co.,Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"
"runtime/trace"
"time"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
)

type ReturningExec struct {
baseExecutor

Idx int
fetched bool
schema *expression.Schema

ResultSet *recordSet
}

func (e *ReturningExec) Open(ctx context.Context) error {

return e.children[0].Open(ctx)
}

func (e *ReturningExec) Next(ctx context.Context, req *chunk.Chunk) error {
return e.fetchRowChunks(ctx)
}

func (e *ReturningExec) Close() error {
return e.children[0].Close()
}

func (e *ReturningExec) fetchRowChunks(ctx context.Context) error {
defer func() {
e.fetched = true
}()

var stmtDetail *execdetails.StmtExecDetails
stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey)
if stmtDetailRaw != nil {
stmtDetail = stmtDetailRaw.(*execdetails.StmtExecDetails)
}

rs := &recordSet{
executor: e.base().children[0],
}

rs.rows = make([]chunk.Row, 0, 1024)

for {
req := rs.NewChunk()
// Here server.tidbResultSet implements Next method.
err := rs.Next(ctx, req)
if err != nil {
return err
}

rowCount := req.NumRows()
if rowCount == 0 {
break
}
start := time.Now()
reg := trace.StartRegion(ctx, "ProcessReturning")

for i := 0; i < rowCount; i++ {
row := req.GetRow(i)

row.IsNull(0)

rs.rows = append(rs.rows, row)
}

if stmtDetail != nil {
stmtDetail.WriteSQLRespDuration += time.Since(start)
}
reg.End()
}

e.ResultSet = rs

return nil
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/pingcap/tidb
require (
cloud.google.com/go v0.51.0 // indirect
github.com/BurntSushi/toml v0.3.1
github.com/DigitalChinaOpenSource/DCParser v0.0.0-20210628023842-8b1d530491a5
github.com/DigitalChinaOpenSource/DCParser v0.0.0-20210825054110-e3314361a5f0
github.com/DigitalChinaOpenSource/dcbr v4.0.11+incompatible
github.com/Jeffail/gabs/v2 v2.5.1
github.com/aws/aws-sdk-go v1.30.24 // indirect
Expand All @@ -30,6 +30,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334
github.com/jackc/pgio v1.0.0
github.com/jackc/pgproto3 v1.1.0
github.com/jackc/pgproto3/v2 v2.0.7
github.com/jackc/pgtype v1.6.2
github.com/klauspost/cpuid v1.2.1
Expand All @@ -53,6 +54,7 @@ require (
github.com/pingcap/sysutil v0.0.0-20201130064824-f0c8aa6a6966
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DigitalChinaOpenSource/DCParser v0.0.0-20210628023842-8b1d530491a5 h1:E8nid1A2Qc/S7paFlGgPw3YBliLVNI0PxTOffdShCk8=
github.com/DigitalChinaOpenSource/DCParser v0.0.0-20210628023842-8b1d530491a5/go.mod h1:3n404SkCabBM0pYRgCyXEKttOLdrmMyBaWMGvaRs/ns=
github.com/DigitalChinaOpenSource/DCParser v0.0.0-20210825054110-e3314361a5f0 h1:aEmZRULTRVMA8bsu0XiiJslCmuhVwef92nLK+2XZqcM=
github.com/DigitalChinaOpenSource/DCParser v0.0.0-20210825054110-e3314361a5f0/go.mod h1:3n404SkCabBM0pYRgCyXEKttOLdrmMyBaWMGvaRs/ns=
github.com/DigitalChinaOpenSource/dcbr v4.0.11+incompatible h1:ICQTYSnJC7i3Ns4xXGS5DMi1wTfg44woW4eWQi2ZNTQ=
github.com/DigitalChinaOpenSource/dcbr v4.0.11+incompatible/go.mod h1:sIm2Eayjb84BFeHrWkkA+4Zdt+4CRSs1HhrlosPwRNk=
github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk=
Expand Down
2 changes: 2 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,8 @@ type Delete struct {

SelectPlan PhysicalPlan

ReturningPlan PhysicalPlan

TblColPosInfos TblColPosInfoSlice
}

Expand Down
9 changes: 9 additions & 0 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2109,3 +2109,12 @@ func (p *LogicalMaxOneRow) exhaustPhysicalPlans(prop *property.PhysicalProperty)
mor := PhysicalMaxOneRow{}.Init(p.ctx, p.stats, p.blockOffset, &property.PhysicalProperty{ExpectedCnt: 2})
return []PhysicalPlan{mor}, true
}

func (p *LogicalReturning) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) {
if !prop.IsEmpty() {
return nil, true
}

returning := PhysicalReturning{}.Init(p.ctx, p.stats, p.blockOffset, &property.PhysicalProperty{ExpectedCnt: 2})
return []PhysicalPlan{returning}, true
}
12 changes: 12 additions & 0 deletions planner/core/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,18 @@ func (p PointGetPlan) Init(ctx sessionctx.Context, stats *property.StatsInfo, of
return &p
}

func (p LogicalReturning) Init(ctx sessionctx.Context, offset int) *LogicalReturning {
p.baseLogicalPlan = newBaseLogicalPlan(ctx, plancodec.TypeReturning, &p, offset)
return &p
}

func (p PhysicalReturning) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalReturning {
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeReturning, &p, offset)
p.childrenReqProps = props
p.stats = stats
return &p
}

func flattenTreePlan(plan PhysicalPlan, plans []PhysicalPlan) []PhysicalPlan {
plans = append(plans, plan)
for _, child := range plan.Children() {
Expand Down
34 changes: 33 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"time"
"unicode"

"github.com/DigitalChinaOpenSource/DCParser"
parser "github.com/DigitalChinaOpenSource/DCParser"
"github.com/DigitalChinaOpenSource/DCParser/ast"
"github.com/DigitalChinaOpenSource/DCParser/format"
"github.com/DigitalChinaOpenSource/DCParser/model"
Expand Down Expand Up @@ -4378,6 +4378,16 @@ func (b *PlanBuilder) buildDelete(ctx context.Context, delete *ast.DeleteStmt) (
tblID2table[id], _ = b.is.TableByID(id)
}
del.TblColPosInfos, err = buildColumns2Handle(del.names, tblID2Handle, tblID2table, false)

if delete.Returning != nil {
retPlan, err := b.buildReturning(ctx, delete, delete.Returning)
if err != nil {
return nil, err
}

del.ReturningPlan, _, err = DoOptimize(ctx, b.ctx, b.optFlag, retPlan)
}

return del, err
}

Expand Down Expand Up @@ -5164,3 +5174,25 @@ func containDifferentJoinTypes(preferJoinType uint) bool {
}
return cnt > 1
}

func (b *PlanBuilder) buildReturning(ctx context.Context, delete *ast.DeleteStmt, returning *ast.ReturningClause) (p LogicalPlan, err error) {

p, err = b.buildTableRefsWithCache(ctx, delete.TableRefs)
if err != nil {
return nil, err
}

returning.Fields.Fields, err = b.unfoldWildStar(p, returning.Fields.Fields)
if err != nil {
return nil, err
}

var totalMap map[*ast.AggregateFuncExpr]int

p, _, err = b.buildProjection(ctx, p, returning.Fields.Fields, totalMap, nil, false, false)

ret := LogicalReturning{}.Init(b.ctx, b.getSelectOffset())
ret.SetChildren(p)

return ret, err
}
12 changes: 12 additions & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,3 +1256,15 @@ type LogicalShowDDLJobs struct {
func (p *LogicalShowDDLJobs) SetParamType(paramExprs *[]ast.ParamMarkerExpr) (err error) {
return err
}

// LogicalReturning represents returning plan.
type LogicalReturning struct {
baseLogicalPlan

Offset uint64
Count uint64
}

func (p *LogicalReturning) SetParamType(paramExprs *[]ast.ParamMarkerExpr) (err error) {
return err
}
Loading

0 comments on commit 47c8b2e

Please sign in to comment.