Skip to content

Commit

Permalink
infoschema: show query disk usage in information_schema.processlist (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Tjianke authored Feb 2, 2021
1 parent f05f4a8 commit f8b0d8f
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
1 change: 1 addition & 0 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ var tableProcesslistCols = []columnInfo{
{name: "INFO", tp: mysql.TypeLongBlob, size: types.UnspecifiedLength},
{name: "DIGEST", tp: mysql.TypeVarchar, size: 64, deflt: ""},
{name: "MEM", tp: mysql.TypeLonglong, size: 21, flag: mysql.UnsignedFlag},
{name: "DISK", tp: mysql.TypeLonglong, size: 21, flag: mysql.UnsignedFlag},
{name: "TxnStart", tp: mysql.TypeVarchar, size: 64, flag: mysql.NotNullFlag, deflt: ""},
}

Expand Down
17 changes: 9 additions & 8 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ func (s *testTableSuite) TestInfoschemaFieldValue(c *C) {
" `INFO` longtext DEFAULT NULL,\n" +
" `DIGEST` varchar(64) DEFAULT '',\n" +
" `MEM` bigint(21) unsigned DEFAULT NULL,\n" +
" `DISK` bigint(21) unsigned DEFAULT NULL,\n" +
" `TxnStart` varchar(64) NOT NULL DEFAULT ''\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustQuery("show create table information_schema.cluster_log").Check(
Expand Down Expand Up @@ -483,8 +484,8 @@ func (s *testTableSuite) TestSomeTables(c *C) {
tk.Se.SetSessionManager(sm)
tk.MustQuery("select * from information_schema.PROCESSLIST order by ID;").Sort().Check(
testkit.Rows(
fmt.Sprintf("1 user-1 localhost information_schema Quit 9223372036 %s %s abc1 0 ", "in transaction", "do something"),
fmt.Sprintf("2 user-2 localhost test Init DB 9223372036 %s %s abc2 0 ", "autocommit", strings.Repeat("x", 101)),
fmt.Sprintf("1 user-1 localhost information_schema Quit 9223372036 %s %s abc1 0 0 ", "in transaction", "do something"),
fmt.Sprintf("2 user-2 localhost test Init DB 9223372036 %s %s abc2 0 0 ", "autocommit", strings.Repeat("x", 101)),
))
tk.MustQuery("SHOW PROCESSLIST;").Sort().Check(
testkit.Rows(
Expand Down Expand Up @@ -523,8 +524,8 @@ func (s *testTableSuite) TestSomeTables(c *C) {
tk.Se.GetSessionVars().TimeZone = time.UTC
tk.MustQuery("select * from information_schema.PROCESSLIST order by ID;").Check(
testkit.Rows(
fmt.Sprintf("1 user-1 localhost information_schema Quit 9223372036 %s %s abc1 0 ", "in transaction", "<nil>"),
fmt.Sprintf("2 user-2 localhost <nil> Init DB 9223372036 %s %s abc2 0 07-29 03:26:05.158(410090409861578752)", "autocommit", strings.Repeat("x", 101)),
fmt.Sprintf("1 user-1 localhost information_schema Quit 9223372036 %s %s abc1 0 0 ", "in transaction", "<nil>"),
fmt.Sprintf("2 user-2 localhost <nil> Init DB 9223372036 %s %s abc2 0 0 07-29 03:26:05.158(410090409861578752)", "autocommit", strings.Repeat("x", 101)),
))
tk.MustQuery("SHOW PROCESSLIST;").Sort().Check(
testkit.Rows(
Expand All @@ -538,11 +539,11 @@ func (s *testTableSuite) TestSomeTables(c *C) {
))
tk.MustQuery("select * from information_schema.PROCESSLIST where db is null;").Check(
testkit.Rows(
fmt.Sprintf("2 user-2 localhost <nil> Init DB 9223372036 %s %s abc2 0 07-29 03:26:05.158(410090409861578752)", "autocommit", strings.Repeat("x", 101)),
fmt.Sprintf("2 user-2 localhost <nil> Init DB 9223372036 %s %s abc2 0 0 07-29 03:26:05.158(410090409861578752)", "autocommit", strings.Repeat("x", 101)),
))
tk.MustQuery("select * from information_schema.PROCESSLIST where Info is null;").Check(
testkit.Rows(
fmt.Sprintf("1 user-1 localhost information_schema Quit 9223372036 %s %s abc1 0 ", "in transaction", "<nil>"),
fmt.Sprintf("1 user-1 localhost information_schema Quit 9223372036 %s %s abc1 0 0 ", "in transaction", "<nil>"),
))
}

Expand Down Expand Up @@ -807,7 +808,7 @@ func (s *testClusterTableSuite) TestSelectClusterTable(c *C) {
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY`").Check(testkit.Rows("1"))
tk.MustQuery("select time from `CLUSTER_SLOW_QUERY` where time='2019-02-12 19:33:56.571953'").Check(testutil.RowsWithSep("|", "2019-02-12 19:33:56.571953"))
tk.MustQuery("select count(*) from `CLUSTER_PROCESSLIST`").Check(testkit.Rows("1"))
tk.MustQuery("select * from `CLUSTER_PROCESSLIST`").Check(testkit.Rows(fmt.Sprintf(":10080 1 root 127.0.0.1 <nil> Query 9223372036 %s <nil> 0 ", "")))
tk.MustQuery("select * from `CLUSTER_PROCESSLIST`").Check(testkit.Rows(fmt.Sprintf(":10080 1 root 127.0.0.1 <nil> Query 9223372036 %s <nil> 0 0 ", "")))
tk.MustQuery("select query_time, conn_id from `CLUSTER_SLOW_QUERY` order by time limit 1").Check(testkit.Rows("4.895492 6"))
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` group by digest").Check(testkit.Rows("1"))
tk.MustQuery("select digest, count(*) from `CLUSTER_SLOW_QUERY` group by digest").Check(testkit.Rows("42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 1"))
Expand Down Expand Up @@ -858,7 +859,7 @@ select * from t3;
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY`").Check(testkit.Rows("4"))
tk.MustQuery("select count(*) from `SLOW_QUERY`").Check(testkit.Rows("4"))
tk.MustQuery("select count(*) from `CLUSTER_PROCESSLIST`").Check(testkit.Rows("1"))
tk.MustQuery("select * from `CLUSTER_PROCESSLIST`").Check(testkit.Rows(fmt.Sprintf(":10080 1 root 127.0.0.1 <nil> Query 9223372036 %s <nil> 0 ", "")))
tk.MustQuery("select * from `CLUSTER_PROCESSLIST`").Check(testkit.Rows(fmt.Sprintf(":10080 1 root 127.0.0.1 <nil> Query 9223372036 %s <nil> 0 0 ", "")))
tk.MustExec("create user user1")
tk.MustExec("create user user2")
user1 := testkit.NewTestKit(c, s.store)
Expand Down
12 changes: 9 additions & 3 deletions util/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,16 @@ func (pi *ProcessInfo) txnStartTs(tz *time.Location) (txnStart string) {
// "SELECT * FROM INFORMATION_SCHEMA.PROCESSLIST".
func (pi *ProcessInfo) ToRow(tz *time.Location) []interface{} {
bytesConsumed := int64(0)
if pi.StmtCtx != nil && pi.StmtCtx.MemTracker != nil {
bytesConsumed = pi.StmtCtx.MemTracker.BytesConsumed()
diskConsumed := int64(0)
if pi.StmtCtx != nil {
if pi.StmtCtx.MemTracker != nil {
bytesConsumed = pi.StmtCtx.MemTracker.BytesConsumed()
}
if pi.StmtCtx.DiskTracker != nil {
diskConsumed = pi.StmtCtx.DiskTracker.BytesConsumed()
}
}
return append(pi.ToRowForShow(true), pi.Digest, bytesConsumed, pi.txnStartTs(tz))
return append(pi.ToRowForShow(true), pi.Digest, bytesConsumed, diskConsumed, pi.txnStartTs(tz))
}

// ascServerStatus is a slice of all defined server status in ascending order.
Expand Down

0 comments on commit f8b0d8f

Please sign in to comment.