Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topsql: support multiple datasinks #30808

Merged
merged 15 commits into from
Dec 17, 2021
8 changes: 7 additions & 1 deletion server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,13 @@ func TestTopSQLAgent(t *testing.T) {
dbt.MustExec("set @@global.tidb_top_sql_report_interval_seconds=2;")
dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;")

r := reporter.NewRemoteTopSQLReporter(reporter.NewSingleTargetDataSink(), plancodec.DecodeNormalizedPlan)
r := reporter.NewRemoteTopSQLReporter(plancodec.DecodeNormalizedPlan)
s := reporter.NewSingleTargetDataSink(r)
defer func() {
r.Close()
s.Close()
}()

tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{r})

// TODO: change to ensure that the right sql statements are reported, not just counts
Expand Down
12 changes: 2 additions & 10 deletions util/topsql/reporter/datasink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@ type DataSink interface {
// the specified deadline, or the sink is closed, an error will be returned.
TrySend(data *ReportData, deadline time.Time) error

// IsPaused indicates that the DataSink is not expecting to receive records for now
// and may resume in the future.
IsPaused() bool

// IsDown indicates that the DataSink has been down and can be cleared.
// Note that: once a DataSink is down, it cannot go back to be up.
IsDown() bool

// Close cleans up resources owned by this DataSink
Close()
// OnReporterClosing notifies DataSink that the reporter is closing.
OnReporterClosing()
}
90 changes: 80 additions & 10 deletions util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package reporter
import (
"bytes"
"context"
"errors"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -53,6 +54,12 @@ type TopSQLReporter interface {
Close()
}

// DataSinkRegisterer is for registering DataSink
type DataSinkRegisterer interface {
Register(dataSink DataSink) error
Deregister(dataSink DataSink)
}

type cpuData struct {
timestamp uint64
records []tracecpu.SQLCPUTimeRecord
Expand Down Expand Up @@ -118,9 +125,11 @@ type planBinaryDecodeFunc func(string) (string, error)
// RemoteTopSQLReporter implements a TopSQL reporter that sends data to a remote agent
// This should be called periodically to collect TopSQL resource usage metrics
type RemoteTopSQLReporter struct {
ctx context.Context
cancel context.CancelFunc
dataSink DataSink
ctx context.Context
cancel context.CancelFunc

dataSinkMu sync.Mutex
dataSinks map[DataSink]struct{}

// normalizedSQLMap is an map, whose keys are SQL digest strings and values are SQLMeta.
normalizedSQLMap atomic.Value // sync.Map
Expand Down Expand Up @@ -148,12 +157,14 @@ type SQLMeta struct {
//
// planBinaryDecoder is a decoding function which will be called asynchronously to decode the plan binary to string
// MaxStatementsNum is the maximum SQL and plan number, which will restrict the memory usage of the internal LFU cache
func NewRemoteTopSQLReporter(dataSink DataSink, decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter {
func NewRemoteTopSQLReporter(decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter {
ctx, cancel := context.WithCancel(context.Background())
tsr := &RemoteTopSQLReporter{
ctx: ctx,
cancel: cancel,
dataSink: dataSink,
ctx: ctx,
cancel: cancel,

dataSinks: make(map[DataSink]struct{}, 10),

collectCPUDataChan: make(chan cpuData, 1),
reportCollectedDataChan: make(chan collectedData, 1),
decodePlan: decodePlan,
Expand Down Expand Up @@ -222,6 +233,47 @@ func (tsr *RemoteTopSQLReporter) RegisterPlan(planDigest []byte, normalizedBinar
}
}

var _ DataSinkRegisterer = &RemoteTopSQLReporter{}

// Register implements DataSinkRegisterer interface.
func (tsr *RemoteTopSQLReporter) Register(dataSink DataSink) error {
tsr.dataSinkMu.Lock()
defer tsr.dataSinkMu.Unlock()

select {
case <-tsr.ctx.Done():
return errors.New("reporter is closed")
default:
if len(tsr.dataSinks) >= 10 {
return errors.New("too many datasinks")
}

tsr.dataSinks[dataSink] = struct{}{}

if len(tsr.dataSinks) > 0 {
variable.TopSQLVariable.Enable.Store(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that now TopSQLVariable.Enable is actually a state of the internal, instead of a controllable variable, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a transition here. No more controllable variable after a few PRs.

The profiler is expected to run when len(tsr.dataSinks.m) > 0 and stop when len(tsr.dataSinks.m) == 0, and for now the only way to control it is modifying variable.TopSQLVariable.Enable.

}

return nil
}
}

// Deregister implements DataSinkRegisterer interface.
func (tsr *RemoteTopSQLReporter) Deregister(dataSink DataSink) {
tsr.dataSinkMu.Lock()
defer tsr.dataSinkMu.Unlock()

select {
case <-tsr.ctx.Done():
default:
delete(tsr.dataSinks, dataSink)

if len(tsr.dataSinks) == 0 {
variable.TopSQLVariable.Enable.Store(false)
}
}
}

// Collect receives CPU time records for processing. WARN: It will drop the records if the processing is not in time.
// This function is thread-safe and efficient.
func (tsr *RemoteTopSQLReporter) Collect(timestamp uint64, records []tracecpu.SQLCPUTimeRecord) {
Expand All @@ -242,7 +294,15 @@ func (tsr *RemoteTopSQLReporter) Collect(timestamp uint64, records []tracecpu.SQ
// Close uses to close and release the reporter resource.
func (tsr *RemoteTopSQLReporter) Close() {
tsr.cancel()
tsr.dataSink.Close()

var m map[DataSink]struct{}
tsr.dataSinkMu.Lock()
m, tsr.dataSinks = tsr.dataSinks, make(map[DataSink]struct{})
tsr.dataSinkMu.Unlock()

for d := range m {
d.OnReporterClosing()
}
}

func addEvictedCPUTime(collectTarget map[string]*dataPoints, timestamp uint64, totalCPUTimeMs uint32) {
Expand Down Expand Up @@ -585,7 +645,17 @@ func (tsr *RemoteTopSQLReporter) doReport(data *ReportData) {
}
})
deadline := time.Now().Add(timeout)
if err := tsr.dataSink.TrySend(data, deadline); err != nil {
logutil.BgLogger().Warn("[top-sql] failed to send data to datasink", zap.Error(err))

tsr.dataSinkMu.Lock()
dataSinks := make([]DataSink, 0, len(tsr.dataSinks))
for ds := range tsr.dataSinks {
dataSinks = append(dataSinks, ds)
}
tsr.dataSinkMu.Unlock()

for _, ds := range dataSinks {
if err := ds.TrySend(data, deadline); err != nil {
logutil.BgLogger().Warn("[top-sql] failed to send data to datasink", zap.Error(err))
}
}
}
156 changes: 137 additions & 19 deletions util/topsql/reporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,32 +64,53 @@ func mockPlanBinaryDecoderFunc(plan string) (string, error) {
return plan, nil
}

func setupRemoteTopSQLReporter(maxStatementsNum, interval int, addr string) *RemoteTopSQLReporter {
type mockDataSink struct {
ch chan *ReportData
}

func newMockDataSink(ch chan *ReportData) DataSink {
return &mockDataSink{ch: ch}
}

var _ DataSink = &mockDataSink{}

func (ds *mockDataSink) TrySend(data *ReportData, _ time.Time) error {
ds.ch <- data
return nil
}

func (ds *mockDataSink) OnReporterClosing() {
}

func setupRemoteTopSQLReporter(maxStatementsNum, interval int, addr string) (*RemoteTopSQLReporter, *SingleTargetDataSink) {
variable.TopSQLVariable.MaxStatementCount.Store(int64(maxStatementsNum))
variable.TopSQLVariable.MaxCollect.Store(10000)
variable.TopSQLVariable.ReportIntervalSeconds.Store(int64(interval))
config.UpdateGlobal(func(conf *config.Config) {
conf.TopSQL.ReceiverAddress = addr
})

rc := NewSingleTargetDataSink()
ts := NewRemoteTopSQLReporter(rc, mockPlanBinaryDecoderFunc)
return ts
ts := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc)
ds := NewSingleTargetDataSink(ts)
return ts, ds
}

func initializeCache(maxStatementsNum, interval int, addr string) *RemoteTopSQLReporter {
ts := setupRemoteTopSQLReporter(maxStatementsNum, interval, addr)
func initializeCache(maxStatementsNum, interval int, addr string) (*RemoteTopSQLReporter, *SingleTargetDataSink) {
ts, ds := setupRemoteTopSQLReporter(maxStatementsNum, interval, addr)
populateCache(ts, 0, maxStatementsNum, 1)
return ts
return ts, ds
}

func TestCollectAndSendBatch(t *testing.T) {
agentServer, err := mock.StartMockAgentServer()
require.NoError(t, err)
defer agentServer.Stop()

tsr := setupRemoteTopSQLReporter(maxSQLNum, 1, agentServer.Address())
defer tsr.Close()
tsr, ds := setupRemoteTopSQLReporter(maxSQLNum, 1, agentServer.Address())
defer func() {
ds.Close()
tsr.Close()
}()
populateCache(tsr, 0, maxSQLNum, 1)

agentServer.WaitCollectCnt(1, time.Second*5)
Expand Down Expand Up @@ -127,8 +148,11 @@ func TestCollectAndEvicted(t *testing.T) {
require.NoError(t, err)
defer agentServer.Stop()

tsr := setupRemoteTopSQLReporter(maxSQLNum, 1, agentServer.Address())
defer tsr.Close()
tsr, ds := setupRemoteTopSQLReporter(maxSQLNum, 1, agentServer.Address())
defer func() {
ds.Close()
tsr.Close()
}()
populateCache(tsr, 0, maxSQLNum*2, 2)

agentServer.WaitCollectCnt(1, time.Second*10)
Expand Down Expand Up @@ -192,8 +216,11 @@ func TestCollectAndTopN(t *testing.T) {
require.NoError(t, err)
defer agentServer.Stop()

tsr := setupRemoteTopSQLReporter(2, 1, agentServer.Address())
defer tsr.Close()
tsr, ds := setupRemoteTopSQLReporter(2, 1, agentServer.Address())
defer func() {
ds.Close()
tsr.Close()
}()

records := []tracecpu.SQLCPUTimeRecord{
newSQLCPUTimeRecord(tsr, 1, 1),
Expand Down Expand Up @@ -257,8 +284,11 @@ func TestCollectAndTopN(t *testing.T) {
}

func TestCollectCapacity(t *testing.T) {
tsr := setupRemoteTopSQLReporter(maxSQLNum, 60, "")
defer tsr.Close()
tsr, ds := setupRemoteTopSQLReporter(maxSQLNum, 60, "")
defer func() {
ds.Close()
tsr.Close()
}()

registerSQL := func(n int) {
for i := 0; i < n; i++ {
Expand Down Expand Up @@ -398,8 +428,11 @@ func TestCollectInternal(t *testing.T) {
require.NoError(t, err)
defer agentServer.Stop()

tsr := setupRemoteTopSQLReporter(3000, 1, agentServer.Address())
defer tsr.Close()
tsr, ds := setupRemoteTopSQLReporter(3000, 1, agentServer.Address())
defer func() {
ds.Close()
tsr.Close()
}()

records := []tracecpu.SQLCPUTimeRecord{
newSQLCPUTimeRecord(tsr, 1, 1),
Expand Down Expand Up @@ -428,15 +461,100 @@ func TestCollectInternal(t *testing.T) {
}
}

func TestMultipleDataSinks(t *testing.T) {
variable.TopSQLVariable.ReportIntervalSeconds.Store(1)

tsr := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc)
defer tsr.Close()

var chs []chan *ReportData
for i := 0; i < 7; i++ {
chs = append(chs, make(chan *ReportData, 1))
}
var dss []DataSink
for _, ch := range chs {
dss = append(dss, newMockDataSink(ch))
}
for _, ds := range dss {
require.NoError(t, tsr.Register(ds))
}

records := []tracecpu.SQLCPUTimeRecord{
newSQLCPUTimeRecord(tsr, 1, 2),
}
tsr.Collect(3, records)

for _, ch := range chs {
d := <-ch
require.NotNil(t, d)
require.Equal(t, []tipb.CPUTimeRecord{{
SqlDigest: []byte("sqlDigest1"),
PlanDigest: []byte("planDigest1"),
RecordListTimestampSec: []uint64{3},
RecordListCpuTimeMs: []uint32{2},
}}, d.CPUTimeRecords)

require.Equal(t, []tipb.SQLMeta{{
SqlDigest: []byte("sqlDigest1"),
NormalizedSql: "sqlNormalized1",
}}, d.SQLMetas)

require.Equal(t, []tipb.PlanMeta{{
PlanDigest: []byte("planDigest1"),
NormalizedPlan: "planNormalized1",
}}, d.PlanMetas)
}

// deregister half of dataSinks
for i := 0; i < 7; i += 2 {
tsr.Deregister(dss[i])
}

records = []tracecpu.SQLCPUTimeRecord{
newSQLCPUTimeRecord(tsr, 4, 5),
}
tsr.Collect(6, records)

for i := 1; i < 7; i += 2 {
d := <-chs[i]
require.NotNil(t, d)
require.Equal(t, []tipb.CPUTimeRecord{{
SqlDigest: []byte("sqlDigest4"),
PlanDigest: []byte("planDigest4"),
RecordListTimestampSec: []uint64{6},
RecordListCpuTimeMs: []uint32{5},
}}, d.CPUTimeRecords)

require.Equal(t, []tipb.SQLMeta{{
SqlDigest: []byte("sqlDigest4"),
NormalizedSql: "sqlNormalized4",
IsInternalSql: true,
}}, d.SQLMetas)

require.Equal(t, []tipb.PlanMeta{{
PlanDigest: []byte("planDigest4"),
NormalizedPlan: "planNormalized4",
}}, d.PlanMetas)
}

for i := 0; i < 7; i += 2 {
select {
case <-chs[i]:
require.Fail(t, "unexpected to receive messages")
default:
}
}
}

func BenchmarkTopSQL_CollectAndIncrementFrequency(b *testing.B) {
tsr := initializeCache(maxSQLNum, 120, ":23333")
tsr, _ := initializeCache(maxSQLNum, 120, ":23333")
for i := 0; i < b.N; i++ {
populateCache(tsr, 0, maxSQLNum, uint64(i))
}
}

func BenchmarkTopSQL_CollectAndEvict(b *testing.B) {
tsr := initializeCache(maxSQLNum, 120, ":23333")
tsr, _ := initializeCache(maxSQLNum, 120, ":23333")
begin := 0
end := maxSQLNum
for i := 0; i < b.N; i++ {
Expand Down
Loading