Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tablet query engine: introduce -enable-consolidator-replicas #5862

Merged
merged 1 commit into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions go/vt/vttablet/endtoend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
Expand Down Expand Up @@ -194,6 +195,79 @@ func TestDisableConsolidator(t *testing.T) {
}
}

func TestConsolidatorReplicasOnly(t *testing.T) {
totalConsolidationsTag := "Waits/Histograms/Consolidations/inf"
initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
var wg sync.WaitGroup
wg.Add(2)
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg.Done()
}()
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg.Done()
}()
wg.Wait()
afterOne := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
if initial+1 != afterOne {
t.Errorf("expected one consolidation, but got: before consolidation count: %v; after consolidation count: %v", initial, afterOne)
}

framework.Server.SetConsolidatorEnabled(false)
defer framework.Server.SetConsolidatorEnabled(true)
framework.Server.SetConsolidatorReplicasEnabled(true)
defer framework.Server.SetConsolidatorReplicasEnabled(false)

// master should not do query consolidation
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg2.Done()
}()
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg2.Done()
}()
wg2.Wait()
noNewConsolidations := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
if afterOne != noNewConsolidations {
t.Errorf("expected no new consolidations, but got: before consolidation count: %v; after consolidation count: %v", afterOne, noNewConsolidations)
}

// become a replica, where query consolidation should happen
client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)

err := client.SetServingType(topodatapb.TabletType_REPLICA)
if err != nil {
t.Fatal(err)
}
defer func() {
err = client.SetServingType(topodatapb.TabletType_MASTER)
if err != nil {
t.Fatal(err)
}
}()

initial = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
var wg3 sync.WaitGroup
wg3.Add(2)
go func() {
client.Execute("select sleep(0.5) from dual", nil)
wg3.Done()
}()
go func() {
client.Execute("select sleep(0.5) from dual", nil)
wg3.Done()
}()
wg3.Wait()
afterOne = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
if initial+1 != afterOne {
t.Errorf("expected another consolidation, but got: before consolidation count: %v; after consolidation count: %v", initial, afterOne)
}
}

func TestQueryPlanCache(t *testing.T) {
defer framework.Server.SetQueryPlanCacheCap(framework.Server.QueryPlanCacheCap())
framework.Server.SetQueryPlanCacheCap(1)
Expand Down
15 changes: 15 additions & 0 deletions go/vt/vttablet/endtoend/framework/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ func NewClient() *QueryClient {
}
}

// NewClientWithTabletType creates a new client for Server with the provided tablet type.
func NewClientWithTabletType(tabletType topodatapb.TabletType) *QueryClient {
targetCopy := Target
targetCopy.TabletType = tabletType
return &QueryClient{
ctx: callerid.NewContext(
context.Background(),
&vtrpcpb.CallerID{},
&querypb.VTGateCallerID{Username: "dev"},
),
target: targetCopy,
server: Server,
}
}

// NewClientWithContext creates a new client for Server with the provided context.
func NewClientWithContext(ctx context.Context) *QueryClient {
return &QueryClient{
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ type QueryEngine struct {
strictTransTables bool

enableConsolidator bool
enableConsolidatorReplicas bool
enableQueryPlanFieldCaching bool

// Loggers
Expand Down Expand Up @@ -206,6 +207,7 @@ func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tab
checker,
)
qe.enableConsolidator = config.EnableConsolidator
qe.enableConsolidatorReplicas = config.EnableConsolidatorReplicas
qe.enableQueryPlanFieldCaching = config.EnableQueryPlanFieldCaching
qe.consolidator = sync2.NewConsolidator()
qe.txSerializer = txserializer.New(config.EnableHotRowProtectionDryRun,
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

Expand All @@ -55,6 +56,7 @@ type QueryExecutor struct {
ctx context.Context
logStats *tabletenv.LogStats
tsv *TabletServer
tabletType topodata.TabletType
}

var sequenceFields = []*querypb.Field{
Expand Down Expand Up @@ -829,7 +831,8 @@ func (qre *QueryExecutor) qFetch(logStats *tabletenv.LogStats, parsedQuery *sqlp
if err != nil {
return nil, err
}
if qre.tsv.qe.enableConsolidator {
// Check tablet type.
if qre.tsv.qe.enableConsolidator || (qre.tsv.qe.enableConsolidatorReplicas && qre.tabletType != topodata.TabletType_MASTER) {
q, original := qre.tsv.qe.consolidator.Create(string(sqlWithoutComments))
if original {
defer q.Broadcast()
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func init() {

flag.BoolVar(&Config.EnforceStrictTransTables, "enforce_strict_trans_tables", DefaultQsConfig.EnforceStrictTransTables, "If true, vttablet requires MySQL to run with STRICT_TRANS_TABLES or STRICT_ALL_TABLES on. It is recommended to not turn this flag off. Otherwise MySQL may alter your supplied values before saving them to the database.")
flag.BoolVar(&Config.EnableConsolidator, "enable-consolidator", DefaultQsConfig.EnableConsolidator, "This option enables the query consolidator.")
flag.BoolVar(&Config.EnableConsolidatorReplicas, "enable-consolidator-replicas", DefaultQsConfig.EnableConsolidatorReplicas, "This option enables the query consolidator only on replicas.")
flag.BoolVar(&Config.EnableQueryPlanFieldCaching, "enable-query-plan-field-caching", DefaultQsConfig.EnableQueryPlanFieldCaching, "This option fetches & caches fields (columns) when storing query plans")
}

Expand Down Expand Up @@ -182,6 +183,7 @@ type TabletConfig struct {

EnforceStrictTransTables bool
EnableConsolidator bool
EnableConsolidatorReplicas bool
EnableQueryPlanFieldCaching bool
}

Expand Down Expand Up @@ -262,6 +264,7 @@ var DefaultQsConfig = TabletConfig{

EnforceStrictTransTables: true,
EnableConsolidator: true,
EnableConsolidatorReplicas: false,
EnableQueryPlanFieldCaching: true,
}

Expand Down
19 changes: 14 additions & 5 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"vitess.io/vitess/go/vt/logutil"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -1008,9 +1009,9 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq
return err
}
if plan.PlanID == planbuilder.PlanInsertTopic {
result, err = tsv.topicExecute(ctx, query, comments, bindVariables, transactionID, options, plan, logStats)
result, err = tsv.topicExecute(ctx, query, comments, bindVariables, transactionID, options, plan, logStats, target.GetTabletType())
} else {
result, err = tsv.qreExecute(ctx, query, comments, bindVariables, transactionID, options, plan, logStats)
result, err = tsv.qreExecute(ctx, query, comments, bindVariables, transactionID, options, plan, logStats, target.GetTabletType())
}

return err
Expand All @@ -1019,7 +1020,7 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq
return result, err
}

func (tsv *TabletServer) topicExecute(ctx context.Context, query string, comments sqlparser.MarginComments, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions, plan *TabletPlan, logStats *tabletenv.LogStats) (result *sqltypes.Result, err error) {
func (tsv *TabletServer) topicExecute(ctx context.Context, query string, comments sqlparser.MarginComments, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions, plan *TabletPlan, logStats *tabletenv.LogStats, tabletType topodata.TabletType) (result *sqltypes.Result, err error) {
for _, subscriber := range plan.Table.TopicInfo.Subscribers {
// replace the topic name with the subscribed message table name
newQuery := strings.Replace(query, plan.Table.Name.String(), subscriber.Name.String(), -1)
Expand All @@ -1031,12 +1032,12 @@ func (tsv *TabletServer) topicExecute(ctx context.Context, query string, comment

// because there isn't an option to return multiple results, only the last
// message table result is returned
result, err = tsv.qreExecute(ctx, newQuery, comments, bindVariables, transactionID, options, newPlan, logStats)
result, err = tsv.qreExecute(ctx, newQuery, comments, bindVariables, transactionID, options, newPlan, logStats, tabletType)
}
return result, err
}

func (tsv *TabletServer) qreExecute(ctx context.Context, query string, comments sqlparser.MarginComments, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions, plan *TabletPlan, logStats *tabletenv.LogStats) (result *sqltypes.Result, err error) {
func (tsv *TabletServer) qreExecute(ctx context.Context, query string, comments sqlparser.MarginComments, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions, plan *TabletPlan, logStats *tabletenv.LogStats, tabletType topodata.TabletType) (result *sqltypes.Result, err error) {
qre := &QueryExecutor{
query: query,
marginComments: comments,
Expand All @@ -1047,6 +1048,7 @@ func (tsv *TabletServer) qreExecute(ctx context.Context, query string, comments
ctx: ctx,
logStats: logStats,
tsv: tsv,
tabletType: tabletType,
}
extras := tsv.watcher.ComputeExtras(options)
result, err = qre.Execute()
Expand Down Expand Up @@ -2308,6 +2310,13 @@ func (tsv *TabletServer) SetConsolidatorEnabled(enabled bool) {
tsv.qe.enableConsolidator = enabled
}

// SetConsolidatorReplicasEnabled (true) will enable the query consolidator for replicas.
// SetConsolidatorReplicasEnabled (false) will disable the query consolidator for replicas.
// This function should only be used for testing.
func (tsv *TabletServer) SetConsolidatorReplicasEnabled(enabled bool) {
tsv.qe.enableConsolidatorReplicas = enabled
}

// queryAsString returns a readable version of query+bind variables.
func queryAsString(sql string, bindVariables map[string]*querypb.BindVariable) string {
buf := &bytes.Buffer{}
Expand Down