-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathparse.go
144 lines (125 loc) · 4.58 KB
/
parse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package main
import (
"bufio"
"encoding/json"
"fmt"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/pingcap/tidb/pkg/parser"
)
func ParseLogs(slowLogPath, slowOutputPath string) {
if slowLogPath == "" || slowOutputPath == "" {
fmt.Println("Usage: ./sql-replay -mode parse -slow-in <path_to_slow_query_log> -slow-out <path_to_slow_output_file>")
return
}
file, err := os.Open(slowLogPath)
if err != nil {
fmt.Println("Error opening file:", err)
return
}
defer file.Close()
outputFile, err := os.Create(slowOutputPath)
if err != nil {
fmt.Println("Error creating output file:", err)
return
}
defer outputFile.Close()
scanner := bufio.NewScanner(file)
buf := make([]byte, 0, 512*1024*1024) // 512MB buffer
scanner.Buffer(buf, bufio.MaxScanTokenSize)
var currentEntry LogEntry
var sqlBuffer strings.Builder
var entryStarted bool = false
// Add support for MySQL 5.6 time format
reTime56 := regexp.MustCompile(`Time: (\d{6}) ?(\d{1,2}:\d{2}:\d{2})`)
reTime := regexp.MustCompile(`Time: ([\d-T:.Z]+)`)
reUser := regexp.MustCompile(`User@Host: (\w+)\[`)
reConnectionID := regexp.MustCompile(`Id:\s*(\d+)`)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "# Time:") {
if entryStarted {
finalizeEntry(¤tEntry, &sqlBuffer, outputFile)
}
entryStarted = true
// MySQL 5.6 Time Format
if match := reTime56.FindStringSubmatch(line); len(match) > 1 {
timeStr := fmt.Sprintf("%s %s", match[1], match[2])
parsedTime, err := time.Parse("060102 15:04:05", timeStr)
if err != nil {
fmt.Println("Error parsing time:", err)
continue
}
currentEntry.Timestamp = float64(parsedTime.UnixNano()) / 1e9
continue
}
// MySQL 5.7/8.0 Time Format
if match := reTime.FindStringSubmatch(line); len(match) > 1 {
parsedTime, _ := time.Parse(time.RFC3339Nano, match[1])
currentEntry.Timestamp = float64(parsedTime.UnixNano()) / 1e9
continue
}
continue
}
if entryStarted {
if strings.HasPrefix(line, "# User@Host:") {
match := reUser.FindStringSubmatch(line)
if len(match) > 1 {
currentEntry.Username = match[1]
}
matchID := reConnectionID.FindStringSubmatch(line)
if len(matchID) > 1 {
currentEntry.ConnectionID = matchID[1]
}
} else if strings.HasPrefix(line, "# Query_time:") {
processQueryTimeAndRowsSent(line, ¤tEntry)
} else if !strings.HasPrefix(line, "#") {
if !(strings.HasPrefix(line, "SET timestamp=") || strings.HasPrefix(line, "-- ") || strings.HasPrefix(line, "use ")) {
sqlBuffer.WriteString(line + " ")
}
}
}
}
// Process the last entry if there is one
if entryStarted {
finalizeEntry(¤tEntry, &sqlBuffer, outputFile)
}
if err := scanner.Err(); err != nil {
fmt.Println("Error reading file:", err)
}
}
func processQueryTimeAndRowsSent(line string, entry *LogEntry) {
reTime := regexp.MustCompile(`Query_time: (\d+\.\d+)`)
matchTime := reTime.FindStringSubmatch(line)
if len(matchTime) > 1 {
queryTime, _ := strconv.ParseFloat(matchTime[1], 64)
entry.QueryTime = int64(queryTime * 1000000) // Convert seconds to microseconds
}
reRows := regexp.MustCompile(`Rows_sent: (\d+)`)
matchRows := reRows.FindStringSubmatch(line)
if len(matchRows) > 1 {
entry.RowsSent, _ = strconv.Atoi(matchRows[1])
}
}
func finalizeEntry(entry *LogEntry, sqlBuffer *strings.Builder, outputFile *os.File) {
entry.SQL = strings.TrimSpace(sqlBuffer.String())
// 检查 SQL 是否为空,如果为空,则不处理这条记录
if entry.SQL == "" {
return
}
normalizedSQL := parser.Normalize(entry.SQL)
entry.Digest = parser.DigestNormalized(normalizedSQL).String()
words := strings.Fields(normalizedSQL)
entry.SQLType = "other"
if len(words) > 0 {
entry.SQLType = words[0]
}
jsonEntry, _ := json.Marshal(entry)
fmt.Fprintln(outputFile, string(jsonEntry))
// Reset for next entry
*entry = LogEntry{}
sqlBuffer.Reset()
}