diff --git a/server/tidb_test.go b/server/tidb_test.go index 60bcee29c2532..08647d19ba1f8 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -1536,7 +1536,13 @@ func TestTopSQLAgent(t *testing.T) { dbt.MustExec("set @@global.tidb_top_sql_report_interval_seconds=2;") dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;") - r := reporter.NewRemoteTopSQLReporter(reporter.NewSingleTargetDataSink(), plancodec.DecodeNormalizedPlan) + r := reporter.NewRemoteTopSQLReporter(plancodec.DecodeNormalizedPlan) + s := reporter.NewSingleTargetDataSink(r) + defer func() { + r.Close() + s.Close() + }() + tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{r}) // TODO: change to ensure that the right sql statements are reported, not just counts diff --git a/util/topsql/reporter/datasink.go b/util/topsql/reporter/datasink.go index b82649095f08f..2196be54a0f93 100644 --- a/util/topsql/reporter/datasink.go +++ b/util/topsql/reporter/datasink.go @@ -23,14 +23,6 @@ type DataSink interface { // the specified deadline, or the sink is closed, an error will be returned. TrySend(data *ReportData, deadline time.Time) error - // IsPaused indicates that the DataSink is not expecting to receive records for now - // and may resume in the future. - IsPaused() bool - - // IsDown indicates that the DataSink has been down and can be cleared. - // Note that: once a DataSink is down, it cannot go back to be up. - IsDown() bool - - // Close cleans up resources owned by this DataSink - Close() + // OnReporterClosing notifies DataSink that the reporter is closing. + OnReporterClosing() } diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index 574472da60536..879ac0d61e438 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -17,6 +17,7 @@ package reporter import ( "bytes" "context" + "errors" "sort" "sync" "sync/atomic" @@ -53,6 +54,12 @@ type TopSQLReporter interface { Close() } +// DataSinkRegisterer is for registering DataSink +type DataSinkRegisterer interface { + Register(dataSink DataSink) error + Deregister(dataSink DataSink) +} + type cpuData struct { timestamp uint64 records []tracecpu.SQLCPUTimeRecord @@ -118,9 +125,11 @@ type planBinaryDecodeFunc func(string) (string, error) // RemoteTopSQLReporter implements a TopSQL reporter that sends data to a remote agent // This should be called periodically to collect TopSQL resource usage metrics type RemoteTopSQLReporter struct { - ctx context.Context - cancel context.CancelFunc - dataSink DataSink + ctx context.Context + cancel context.CancelFunc + + dataSinkMu sync.Mutex + dataSinks map[DataSink]struct{} // normalizedSQLMap is an map, whose keys are SQL digest strings and values are SQLMeta. normalizedSQLMap atomic.Value // sync.Map @@ -148,12 +157,14 @@ type SQLMeta struct { // // planBinaryDecoder is a decoding function which will be called asynchronously to decode the plan binary to string // MaxStatementsNum is the maximum SQL and plan number, which will restrict the memory usage of the internal LFU cache -func NewRemoteTopSQLReporter(dataSink DataSink, decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter { +func NewRemoteTopSQLReporter(decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter { ctx, cancel := context.WithCancel(context.Background()) tsr := &RemoteTopSQLReporter{ - ctx: ctx, - cancel: cancel, - dataSink: dataSink, + ctx: ctx, + cancel: cancel, + + dataSinks: make(map[DataSink]struct{}, 10), + collectCPUDataChan: make(chan cpuData, 1), reportCollectedDataChan: make(chan collectedData, 1), decodePlan: decodePlan, @@ -222,6 +233,47 @@ func (tsr *RemoteTopSQLReporter) RegisterPlan(planDigest []byte, normalizedBinar } } +var _ DataSinkRegisterer = &RemoteTopSQLReporter{} + +// Register implements DataSinkRegisterer interface. +func (tsr *RemoteTopSQLReporter) Register(dataSink DataSink) error { + tsr.dataSinkMu.Lock() + defer tsr.dataSinkMu.Unlock() + + select { + case <-tsr.ctx.Done(): + return errors.New("reporter is closed") + default: + if len(tsr.dataSinks) >= 10 { + return errors.New("too many datasinks") + } + + tsr.dataSinks[dataSink] = struct{}{} + + if len(tsr.dataSinks) > 0 { + variable.TopSQLVariable.Enable.Store(true) + } + + return nil + } +} + +// Deregister implements DataSinkRegisterer interface. +func (tsr *RemoteTopSQLReporter) Deregister(dataSink DataSink) { + tsr.dataSinkMu.Lock() + defer tsr.dataSinkMu.Unlock() + + select { + case <-tsr.ctx.Done(): + default: + delete(tsr.dataSinks, dataSink) + + if len(tsr.dataSinks) == 0 { + variable.TopSQLVariable.Enable.Store(false) + } + } +} + // Collect receives CPU time records for processing. WARN: It will drop the records if the processing is not in time. // This function is thread-safe and efficient. func (tsr *RemoteTopSQLReporter) Collect(timestamp uint64, records []tracecpu.SQLCPUTimeRecord) { @@ -242,7 +294,15 @@ func (tsr *RemoteTopSQLReporter) Collect(timestamp uint64, records []tracecpu.SQ // Close uses to close and release the reporter resource. func (tsr *RemoteTopSQLReporter) Close() { tsr.cancel() - tsr.dataSink.Close() + + var m map[DataSink]struct{} + tsr.dataSinkMu.Lock() + m, tsr.dataSinks = tsr.dataSinks, make(map[DataSink]struct{}) + tsr.dataSinkMu.Unlock() + + for d := range m { + d.OnReporterClosing() + } } func addEvictedCPUTime(collectTarget map[string]*dataPoints, timestamp uint64, totalCPUTimeMs uint32) { @@ -585,7 +645,17 @@ func (tsr *RemoteTopSQLReporter) doReport(data *ReportData) { } }) deadline := time.Now().Add(timeout) - if err := tsr.dataSink.TrySend(data, deadline); err != nil { - logutil.BgLogger().Warn("[top-sql] failed to send data to datasink", zap.Error(err)) + + tsr.dataSinkMu.Lock() + dataSinks := make([]DataSink, 0, len(tsr.dataSinks)) + for ds := range tsr.dataSinks { + dataSinks = append(dataSinks, ds) + } + tsr.dataSinkMu.Unlock() + + for _, ds := range dataSinks { + if err := ds.TrySend(data, deadline); err != nil { + logutil.BgLogger().Warn("[top-sql] failed to send data to datasink", zap.Error(err)) + } } } diff --git a/util/topsql/reporter/reporter_test.go b/util/topsql/reporter/reporter_test.go index 7f6c9f9a25a32..df251165de02d 100644 --- a/util/topsql/reporter/reporter_test.go +++ b/util/topsql/reporter/reporter_test.go @@ -64,7 +64,25 @@ func mockPlanBinaryDecoderFunc(plan string) (string, error) { return plan, nil } -func setupRemoteTopSQLReporter(maxStatementsNum, interval int, addr string) *RemoteTopSQLReporter { +type mockDataSink struct { + ch chan *ReportData +} + +func newMockDataSink(ch chan *ReportData) DataSink { + return &mockDataSink{ch: ch} +} + +var _ DataSink = &mockDataSink{} + +func (ds *mockDataSink) TrySend(data *ReportData, _ time.Time) error { + ds.ch <- data + return nil +} + +func (ds *mockDataSink) OnReporterClosing() { +} + +func setupRemoteTopSQLReporter(maxStatementsNum, interval int, addr string) (*RemoteTopSQLReporter, *SingleTargetDataSink) { variable.TopSQLVariable.MaxStatementCount.Store(int64(maxStatementsNum)) variable.TopSQLVariable.MaxCollect.Store(10000) variable.TopSQLVariable.ReportIntervalSeconds.Store(int64(interval)) @@ -72,15 +90,15 @@ func setupRemoteTopSQLReporter(maxStatementsNum, interval int, addr string) *Rem conf.TopSQL.ReceiverAddress = addr }) - rc := NewSingleTargetDataSink() - ts := NewRemoteTopSQLReporter(rc, mockPlanBinaryDecoderFunc) - return ts + ts := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + ds := NewSingleTargetDataSink(ts) + return ts, ds } -func initializeCache(maxStatementsNum, interval int, addr string) *RemoteTopSQLReporter { - ts := setupRemoteTopSQLReporter(maxStatementsNum, interval, addr) +func initializeCache(maxStatementsNum, interval int, addr string) (*RemoteTopSQLReporter, *SingleTargetDataSink) { + ts, ds := setupRemoteTopSQLReporter(maxStatementsNum, interval, addr) populateCache(ts, 0, maxStatementsNum, 1) - return ts + return ts, ds } func TestCollectAndSendBatch(t *testing.T) { @@ -88,8 +106,11 @@ func TestCollectAndSendBatch(t *testing.T) { require.NoError(t, err) defer agentServer.Stop() - tsr := setupRemoteTopSQLReporter(maxSQLNum, 1, agentServer.Address()) - defer tsr.Close() + tsr, ds := setupRemoteTopSQLReporter(maxSQLNum, 1, agentServer.Address()) + defer func() { + ds.Close() + tsr.Close() + }() populateCache(tsr, 0, maxSQLNum, 1) agentServer.WaitCollectCnt(1, time.Second*5) @@ -127,8 +148,11 @@ func TestCollectAndEvicted(t *testing.T) { require.NoError(t, err) defer agentServer.Stop() - tsr := setupRemoteTopSQLReporter(maxSQLNum, 1, agentServer.Address()) - defer tsr.Close() + tsr, ds := setupRemoteTopSQLReporter(maxSQLNum, 1, agentServer.Address()) + defer func() { + ds.Close() + tsr.Close() + }() populateCache(tsr, 0, maxSQLNum*2, 2) agentServer.WaitCollectCnt(1, time.Second*10) @@ -192,8 +216,11 @@ func TestCollectAndTopN(t *testing.T) { require.NoError(t, err) defer agentServer.Stop() - tsr := setupRemoteTopSQLReporter(2, 1, agentServer.Address()) - defer tsr.Close() + tsr, ds := setupRemoteTopSQLReporter(2, 1, agentServer.Address()) + defer func() { + ds.Close() + tsr.Close() + }() records := []tracecpu.SQLCPUTimeRecord{ newSQLCPUTimeRecord(tsr, 1, 1), @@ -257,8 +284,11 @@ func TestCollectAndTopN(t *testing.T) { } func TestCollectCapacity(t *testing.T) { - tsr := setupRemoteTopSQLReporter(maxSQLNum, 60, "") - defer tsr.Close() + tsr, ds := setupRemoteTopSQLReporter(maxSQLNum, 60, "") + defer func() { + ds.Close() + tsr.Close() + }() registerSQL := func(n int) { for i := 0; i < n; i++ { @@ -398,8 +428,11 @@ func TestCollectInternal(t *testing.T) { require.NoError(t, err) defer agentServer.Stop() - tsr := setupRemoteTopSQLReporter(3000, 1, agentServer.Address()) - defer tsr.Close() + tsr, ds := setupRemoteTopSQLReporter(3000, 1, agentServer.Address()) + defer func() { + ds.Close() + tsr.Close() + }() records := []tracecpu.SQLCPUTimeRecord{ newSQLCPUTimeRecord(tsr, 1, 1), @@ -428,15 +461,100 @@ func TestCollectInternal(t *testing.T) { } } +func TestMultipleDataSinks(t *testing.T) { + variable.TopSQLVariable.ReportIntervalSeconds.Store(1) + + tsr := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + defer tsr.Close() + + var chs []chan *ReportData + for i := 0; i < 7; i++ { + chs = append(chs, make(chan *ReportData, 1)) + } + var dss []DataSink + for _, ch := range chs { + dss = append(dss, newMockDataSink(ch)) + } + for _, ds := range dss { + require.NoError(t, tsr.Register(ds)) + } + + records := []tracecpu.SQLCPUTimeRecord{ + newSQLCPUTimeRecord(tsr, 1, 2), + } + tsr.Collect(3, records) + + for _, ch := range chs { + d := <-ch + require.NotNil(t, d) + require.Equal(t, []tipb.CPUTimeRecord{{ + SqlDigest: []byte("sqlDigest1"), + PlanDigest: []byte("planDigest1"), + RecordListTimestampSec: []uint64{3}, + RecordListCpuTimeMs: []uint32{2}, + }}, d.CPUTimeRecords) + + require.Equal(t, []tipb.SQLMeta{{ + SqlDigest: []byte("sqlDigest1"), + NormalizedSql: "sqlNormalized1", + }}, d.SQLMetas) + + require.Equal(t, []tipb.PlanMeta{{ + PlanDigest: []byte("planDigest1"), + NormalizedPlan: "planNormalized1", + }}, d.PlanMetas) + } + + // deregister half of dataSinks + for i := 0; i < 7; i += 2 { + tsr.Deregister(dss[i]) + } + + records = []tracecpu.SQLCPUTimeRecord{ + newSQLCPUTimeRecord(tsr, 4, 5), + } + tsr.Collect(6, records) + + for i := 1; i < 7; i += 2 { + d := <-chs[i] + require.NotNil(t, d) + require.Equal(t, []tipb.CPUTimeRecord{{ + SqlDigest: []byte("sqlDigest4"), + PlanDigest: []byte("planDigest4"), + RecordListTimestampSec: []uint64{6}, + RecordListCpuTimeMs: []uint32{5}, + }}, d.CPUTimeRecords) + + require.Equal(t, []tipb.SQLMeta{{ + SqlDigest: []byte("sqlDigest4"), + NormalizedSql: "sqlNormalized4", + IsInternalSql: true, + }}, d.SQLMetas) + + require.Equal(t, []tipb.PlanMeta{{ + PlanDigest: []byte("planDigest4"), + NormalizedPlan: "planNormalized4", + }}, d.PlanMetas) + } + + for i := 0; i < 7; i += 2 { + select { + case <-chs[i]: + require.Fail(t, "unexpected to receive messages") + default: + } + } +} + func BenchmarkTopSQL_CollectAndIncrementFrequency(b *testing.B) { - tsr := initializeCache(maxSQLNum, 120, ":23333") + tsr, _ := initializeCache(maxSQLNum, 120, ":23333") for i := 0; i < b.N; i++ { populateCache(tsr, 0, maxSQLNum, uint64(i)) } } func BenchmarkTopSQL_CollectAndEvict(b *testing.B) { - tsr := initializeCache(maxSQLNum, 120, ":23333") + tsr, _ := initializeCache(maxSQLNum, 120, ":23333") begin := 0 end := maxSQLNum for i := 0; i < b.N; i++ { diff --git a/util/topsql/reporter/single_target.go b/util/topsql/reporter/single_target.go index b7a34250357d8..3ea61d75f633a 100644 --- a/util/topsql/reporter/single_target.go +++ b/util/topsql/reporter/single_target.go @@ -37,10 +37,13 @@ type SingleTargetDataSink struct { curRPCAddr string conn *grpc.ClientConn sendTaskCh chan sendTask + + registered bool + registerer DataSinkRegisterer } // NewSingleTargetDataSink returns a new SingleTargetDataSink -func NewSingleTargetDataSink() *SingleTargetDataSink { +func NewSingleTargetDataSink(registerer DataSinkRegisterer) *SingleTargetDataSink { ctx, cancel := context.WithCancel(context.Background()) dataSink := &SingleTargetDataSink{ ctx: ctx, @@ -49,13 +52,37 @@ func NewSingleTargetDataSink() *SingleTargetDataSink { curRPCAddr: "", conn: nil, sendTaskCh: make(chan sendTask, 1), + + registered: false, + registerer: registerer, + } + + addr := config.GetGlobalConfig().TopSQL.ReceiverAddress + if addr != "" { + dataSink.curRPCAddr = addr + if err := registerer.Register(dataSink); err != nil { + logutil.BgLogger().Warn("failed to register single target datasink", zap.Error(err)) + return nil + } } + go dataSink.recoverRun() return dataSink } // recoverRun will run until SingleTargetDataSink is closed. func (ds *SingleTargetDataSink) recoverRun() { + defer func() { + if ds.conn == nil { + return + } + err := ds.conn.Close() + if err != nil { + logutil.BgLogger().Warn("[top-sql] single target dataSink close connection failed", zap.Error(err)) + } + ds.conn = nil + }() + for ds.run() { } } @@ -71,30 +98,40 @@ func (ds *SingleTargetDataSink) run() (rerun bool) { } }() + ticker := time.NewTicker(time.Second) for { - var task sendTask + var targetRPCAddr string select { case <-ds.ctx.Done(): return false - case task = <-ds.sendTaskCh: + case task := <-ds.sendTaskCh: + targetRPCAddr = config.GetGlobalConfig().TopSQL.ReceiverAddress + ds.doSend(targetRPCAddr, task) + case <-ticker.C: + targetRPCAddr = config.GetGlobalConfig().TopSQL.ReceiverAddress } - targetRPCAddr := config.GetGlobalConfig().TopSQL.ReceiverAddress - if targetRPCAddr == "" { - continue + if err := ds.tryRegister(targetRPCAddr); err != nil { + logutil.BgLogger().Warn("failed to register the single target datasink", zap.Error(err)) + return false } + } +} - ctx, cancel := context.WithDeadline(context.Background(), task.deadline) - start := time.Now() - err := ds.doSend(ctx, targetRPCAddr, task.data) - cancel() - if err != nil { - logutil.BgLogger().Warn("[top-sql] single target data sink failed to send data to receiver", zap.Error(err)) - reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) - } else { - reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) +func (ds *SingleTargetDataSink) tryRegister(addr string) error { + if addr == "" && ds.registered { + ds.registerer.Deregister(ds) + ds.registered = false + return nil + } + + if addr != "" && !ds.registered { + if err := ds.registerer.Register(ds); err != nil { + return err } + ds.registered = true } + return nil } var _ DataSink = &SingleTargetDataSink{} @@ -114,38 +151,39 @@ func (ds *SingleTargetDataSink) TrySend(data *ReportData, deadline time.Time) er } } -// IsPaused implements the DataSink interface. -func (ds *SingleTargetDataSink) IsPaused() bool { - return len(config.GetGlobalConfig().TopSQL.ReceiverAddress) == 0 -} - -// IsDown implements the DataSink interface. -func (ds *SingleTargetDataSink) IsDown() bool { - select { - case <-ds.ctx.Done(): - return true - default: - return false - } +// OnReporterClosing implements the DataSink interface. +func (ds *SingleTargetDataSink) OnReporterClosing() { + ds.cancel() } // Close uses to close grpc connection. func (ds *SingleTargetDataSink) Close() { ds.cancel() - if ds.conn == nil { + + if ds.registered { + ds.registerer.Deregister(ds) + } +} + +func (ds *SingleTargetDataSink) doSend(addr string, task sendTask) { + if addr == "" { return } - err := ds.conn.Close() + + var err error + start := time.Now() if err != nil { - logutil.BgLogger().Warn("[top-sql] single target dataSink close connection failed", zap.Error(err)) + logutil.BgLogger().Warn("[top-sql] single target data sink failed to send data to receiver", zap.Error(err)) + reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) } - ds.conn = nil -} -func (ds *SingleTargetDataSink) doSend(ctx context.Context, addr string, data *ReportData) error { - err := ds.tryEstablishConnection(ctx, addr) - if err != nil { - return err + ctx, cancel := context.WithDeadline(context.Background(), task.deadline) + defer cancel() + + if err = ds.tryEstablishConnection(ctx, addr); err != nil { + return } var wg sync.WaitGroup @@ -154,24 +192,23 @@ func (ds *SingleTargetDataSink) doSend(ctx context.Context, addr string, data *R go func() { defer wg.Done() - errCh <- ds.sendBatchSQLMeta(ctx, data.SQLMetas) + errCh <- ds.sendBatchSQLMeta(ctx, task.data.SQLMetas) }() go func() { defer wg.Done() - errCh <- ds.sendBatchPlanMeta(ctx, data.PlanMetas) + errCh <- ds.sendBatchPlanMeta(ctx, task.data.PlanMetas) }() go func() { defer wg.Done() - errCh <- ds.sendBatchCPUTimeRecord(ctx, data.CPUTimeRecords) + errCh <- ds.sendBatchCPUTimeRecord(ctx, task.data.CPUTimeRecords) }() wg.Wait() close(errCh) - for err := range errCh { + for err = range errCh { if err != nil { - return err + return } } - return nil } // sendBatchCPUTimeRecord sends a batch of TopSQL records by stream. diff --git a/util/topsql/topsql.go b/util/topsql/topsql.go index 8167997b24263..8f2aac1566642 100644 --- a/util/topsql/topsql.go +++ b/util/topsql/topsql.go @@ -36,18 +36,27 @@ const ( MaxBinaryPlanSize = 2 * 1024 ) -var globalTopSQLReport reporter.TopSQLReporter +var ( + globalTopSQLReport reporter.TopSQLReporter + singleTargetDataSink *reporter.SingleTargetDataSink +) // SetupTopSQL sets up the top-sql worker. func SetupTopSQL() { - ds := reporter.NewSingleTargetDataSink() - globalTopSQLReport = reporter.NewRemoteTopSQLReporter(ds, plancodec.DecodeNormalizedPlan) - tracecpu.GlobalSQLCPUProfiler.SetCollector(globalTopSQLReport) + remoteReporter := reporter.NewRemoteTopSQLReporter(plancodec.DecodeNormalizedPlan) + singleTargetDataSink = reporter.NewSingleTargetDataSink(remoteReporter) + + globalTopSQLReport = remoteReporter + + tracecpu.GlobalSQLCPUProfiler.SetCollector(remoteReporter) tracecpu.GlobalSQLCPUProfiler.Run() } // Close uses to close and release the top sql resource. func Close() { + if singleTargetDataSink != nil { + singleTargetDataSink.Close() + } if globalTopSQLReport != nil { globalTopSQLReport.Close() } diff --git a/util/topsql/topsql_test.go b/util/topsql/topsql_test.go index d2e8bbf96b586..d4aabc746ce0e 100644 --- a/util/topsql/topsql_test.go +++ b/util/topsql/topsql_test.go @@ -115,9 +115,13 @@ func TestTopSQLReporter(t *testing.T) { conf.TopSQL.ReceiverAddress = server.Address() }) - dataSink := reporter.NewSingleTargetDataSink() - report := reporter.NewRemoteTopSQLReporter(dataSink, mockPlanBinaryDecoderFunc) - defer report.Close() + report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + ds := reporter.NewSingleTargetDataSink(report) + + defer func() { + ds.Close() + report.Close() + }() tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{report}) reqs := []struct {