Skip to content

Commit

Permalink
topsql: remove do topN before report (#31192)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Dec 31, 2021
1 parent 072561c commit 74ef32a
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 48 deletions.
20 changes: 3 additions & 17 deletions util/topsql/reporter/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,29 +496,15 @@ func (c *collecting) appendOthersStmtStatsItem(timestamp uint64, item stmtstats.
others.appendStmtStatsItem(timestamp, item)
}

// compactToTopNAndOthers returns the largest N records, other records will be packed and appended to the end.
func (c *collecting) compactToTopNAndOthers(n int) records {
// getReportRecords returns all records, others record will be packed and appended to the end.
func (c *collecting) getReportRecords() records {
others := c.records[keyOthers]
delete(c.records, keyOthers)
rs := make(records, 0, len(c.records))
for _, v := range c.records {
rs = append(rs, *v)
}
// Fetch TopN records.
var evicted records
rs, evicted = rs.topN(n)
if others != nil {
// Sort the records by timestamp to fix the affect of time jump backward.
sort.Sort(others)
} else {
others = newRecord(nil, nil)
}
for _, evict := range evicted {
e := evict // Avoid implicit memory aliasing in for loop.
others.merge(&e)
}
if others.totalCPUTimeMs > 0 {
// append others which summarize all evicted item's cpu-time.
if others != nil && others.totalCPUTimeMs > 0 {
rs = append(rs, *others)
}
return rs
Expand Down
15 changes: 6 additions & 9 deletions util/topsql/reporter/datamodel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,19 +273,16 @@ func Test_collecting_appendOthers(t *testing.T) {
assert.Equal(t, uint64(2000), r.tsItems[1].stmtStats.SumDurationNs)
}

func Test_collecting_compactToTopNAndOthers(t *testing.T) {
func Test_collecting_getReportRecords(t *testing.T) {
c := newCollecting()
c.getOrCreateRecord([]byte("SQL-1"), []byte("PLAN-1")).appendCPUTime(1, 1)
c.getOrCreateRecord([]byte("SQL-2"), []byte("PLAN-2")).appendCPUTime(1, 2)
c.getOrCreateRecord([]byte("SQL-3"), []byte("PLAN-3")).appendCPUTime(1, 3)
rs := c.compactToTopNAndOthers(1)
assert.Len(t, rs, 2)
assert.Equal(t, []byte("SQL-3"), rs[0].sqlDigest)
assert.Equal(t, []byte("PLAN-3"), rs[0].planDigest)
assert.Equal(t, uint64(3), rs[0].totalCPUTimeMs)
assert.Len(t, rs[0].tsItems, 1)
assert.Equal(t, uint32(3), rs[0].tsItems[0].cpuTimeMs)
assert.Equal(t, uint64(3), rs[1].totalCPUTimeMs) // 1 + 2 = 3
c.getOrCreateRecord([]byte(keyOthers), []byte(keyOthers)).appendCPUTime(1, 10)
rs := c.getReportRecords()
assert.Len(t, rs, 4)
assert.Equal(t, uint32(10), rs[3].tsItems[0].cpuTimeMs)
assert.Equal(t, uint64(10), rs[3].totalCPUTimeMs)
}

func Test_collecting_take(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion util/topsql/reporter/datasink.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type DataSinkRegisterer interface {

// ReportData contains data that reporter sends to the agent.
type ReportData struct {
// DataRecords contains the compactToTopNAndOthers records []tipb.TopSQLRecord and the `others`
// DataRecords contains the topN records of each second and the `others`
// record which aggregation all []tipb.TopSQLRecord that is out of Top N.
DataRecords []tipb.TopSQLRecord
SQLMetas []tipb.SQLMeta
Expand Down
3 changes: 1 addition & 2 deletions util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,7 @@ func (tsr *RemoteTopSQLReporter) reportWorker() {
// that `data` contains. So we wait for a little while to ensure that writes
// are finished.
time.Sleep(time.Millisecond * 100)
// Get top N records from records.
rs := data.collected.compactToTopNAndOthers(int(topsqlstate.GlobalState.MaxStatementCount.Load()))
rs := data.collected.getReportRecords()
// Convert to protobuf data and do report.
tsr.doReport(&ReportData{
DataRecords: rs.toProto(),
Expand Down
70 changes: 52 additions & 18 deletions util/topsql/reporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,40 +224,67 @@ func TestCollectAndTopN(t *testing.T) {
records := []collector.SQLCPUTimeRecord{
newSQLCPUTimeRecord(tsr, 1, 1),
newSQLCPUTimeRecord(tsr, 2, 2),
newSQLCPUTimeRecord(tsr, 3, 3),
}
// SQL-2: 2ms
// SQL-3: 3ms
// Others: 1ms
collectAndWait(tsr, 1, records)

records = []collector.SQLCPUTimeRecord{
newSQLCPUTimeRecord(tsr, 3, 3),
newSQLCPUTimeRecord(tsr, 1, 1),
newSQLCPUTimeRecord(tsr, 3, 3),
}
// SQL-1: 1ms
// SQL-3: 3ms
collectAndWait(tsr, 2, records)

records = []collector.SQLCPUTimeRecord{
newSQLCPUTimeRecord(tsr, 4, 1),
newSQLCPUTimeRecord(tsr, 1, 1),
newSQLCPUTimeRecord(tsr, 4, 4),
newSQLCPUTimeRecord(tsr, 1, 10),
newSQLCPUTimeRecord(tsr, 3, 1),
}
// SQL-1: 10ms
// SQL-4: 4ms
// Others: 1ms
collectAndWait(tsr, 3, records)

records = []collector.SQLCPUTimeRecord{
newSQLCPUTimeRecord(tsr, 5, 1),
newSQLCPUTimeRecord(tsr, 1, 1),
newSQLCPUTimeRecord(tsr, 5, 5),
newSQLCPUTimeRecord(tsr, 4, 4),
newSQLCPUTimeRecord(tsr, 1, 10),
newSQLCPUTimeRecord(tsr, 2, 20),
}
// SQL-2: 20ms
// SQL-1: 1ms
// Others: 9ms
collectAndWait(tsr, 4, records)

// Test for time jump back.
records = []collector.SQLCPUTimeRecord{
newSQLCPUTimeRecord(tsr, 6, 1),
newSQLCPUTimeRecord(tsr, 6, 6),
newSQLCPUTimeRecord(tsr, 1, 1),
newSQLCPUTimeRecord(tsr, 2, 2),
newSQLCPUTimeRecord(tsr, 3, 3),
}
// SQL-6: 6ms
// SQL-3: 3ms
// Others: 3ms
collectAndWait(tsr, 0, records)

// Wait agent server collect finish.
agentServer.WaitCollectCnt(1, time.Second*10)

// check for equality of server received batch and the original data
results := agentServer.GetLatestRecords()
require.Len(t, results, 3)
// Digest total
// "": 14ms (others)
// SQL-1: 21ms
// SQL-2: 22ms
// SQL-3: 9ms
// SQL-4: 4ms
// SQL-6: 6ms
require.Len(t, results, 6)
sort.Slice(results, func(i, j int) bool {
return string(results[i].SqlDigest) < string(results[j].SqlDigest)
})
Expand All @@ -269,19 +296,26 @@ func TestCollectAndTopN(t *testing.T) {
return int(total)
}
require.Nil(t, results[0].SqlDigest)
require.Equal(t, 5, getTotalCPUTime(results[0]))
require.Equal(t, uint64(0), results[0].Items[0].TimestampSec)
require.Equal(t, uint64(1), results[0].Items[1].TimestampSec)
require.Equal(t, uint64(3), results[0].Items[2].TimestampSec)
require.Equal(t, uint64(4), results[0].Items[3].TimestampSec)
require.Equal(t, []byte(nil), results[0].SqlDigest)
require.Equal(t, 14, getTotalCPUTime(results[0]))
require.Equal(t, uint64(1), results[0].Items[0].TimestampSec)
require.Equal(t, uint64(3), results[0].Items[1].TimestampSec)
require.Equal(t, uint64(4), results[0].Items[2].TimestampSec)
require.Equal(t, uint64(0), results[0].Items[3].TimestampSec)
require.Equal(t, uint32(1), results[0].Items[0].CpuTimeMs)
require.Equal(t, uint32(2), results[0].Items[1].CpuTimeMs)
require.Equal(t, uint32(1), results[0].Items[2].CpuTimeMs)
require.Equal(t, uint32(1), results[0].Items[3].CpuTimeMs)
require.Equal(t, uint32(1), results[0].Items[1].CpuTimeMs)
require.Equal(t, uint32(9), results[0].Items[2].CpuTimeMs)
require.Equal(t, uint32(3), results[0].Items[3].CpuTimeMs)
require.Equal(t, []byte("sqlDigest1"), results[1].SqlDigest)
require.Equal(t, 5, getTotalCPUTime(results[1]))
require.Equal(t, []byte("sqlDigest3"), results[2].SqlDigest)
require.Equal(t, 3, getTotalCPUTime(results[2]))
require.Equal(t, 21, getTotalCPUTime(results[1]))
require.Equal(t, []byte("sqlDigest2"), results[2].SqlDigest)
require.Equal(t, 22, getTotalCPUTime(results[2]))
require.Equal(t, []byte("sqlDigest3"), results[3].SqlDigest)
require.Equal(t, 9, getTotalCPUTime(results[3]))
require.Equal(t, []byte("sqlDigest4"), results[4].SqlDigest)
require.Equal(t, 4, getTotalCPUTime(results[4]))
require.Equal(t, []byte("sqlDigest6"), results[5].SqlDigest)
require.Equal(t, 6, getTotalCPUTime(results[5]))
// sleep to wait for all SQL meta received.
time.Sleep(50 * time.Millisecond)
totalMetas := agentServer.GetTotalSQLMetas()
Expand Down
2 changes: 1 addition & 1 deletion util/topsql/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import "go.uber.org/atomic"
const (
DefTiDBTopSQLEnable = false
DefTiDBTopSQLPrecisionSeconds = 1
DefTiDBTopSQLMaxStatementCount = 200
DefTiDBTopSQLMaxStatementCount = 100
DefTiDBTopSQLMaxCollect = 5000
DefTiDBTopSQLReportIntervalSeconds = 60
)
Expand Down

0 comments on commit 74ef32a

Please sign in to comment.