Skip to content

Commit 8a530ad

Browse files
committed
Prometheus metrics
1 parent e741213 commit 8a530ad

File tree

4 files changed

+167
-11
lines changed

4 files changed

+167
-11
lines changed

README.md

+11
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,17 @@ curl -w "\n" -d "{\"generic\": \"$RANDOM\"}" -X POST http://localhost:8080/rx/me
8787
time for i in {1..1000}; do curl -w "\n" -d "{\"generic\": \"$RANDOM\"}" -X POST http://localhost:8080/rx/me/generic_data/generic/test/data; done
8888
```
8989

90+
### Profile
91+
92+
```bash
93+
go build ./rxtx.go && time ./rxtx --path=./data/ --cpuprofile=rxtxcpu.prof --memprofile=rxtxmem.prof
94+
```
95+
96+
Browser-based profile viewer:
97+
```bash
98+
go tool pprof -http=:8081 rxtxcpu.prof
99+
```
100+
90101
### Building and Releasing
91102

92103
**rxtx** uses [GORELEASER] to build binaries and [Docker] containers.

rtq/routes.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,16 @@ func RxRouteHandler(c *gin.Context) {
1616

1717
rawData, _ := c.GetRawData()
1818

19+
q := c.MustGet("Q").(*rtQ)
20+
1921
// all data is json
2022
payload := make(map[string]interface{})
2123
err := json.Unmarshal(rawData, &payload)
2224
if err != nil {
25+
26+
// increment metric msg_errors
27+
q.cfg.MsgError.Inc()
28+
2329
c.JSON(500, gin.H{
2430
"status": "FAIL",
2531
"message": fmt.Sprintf("could not unmarshal json: %s", rawData),
@@ -36,9 +42,13 @@ func RxRouteHandler(c *gin.Context) {
3642
}
3743

3844
// write the message
39-
q := c.MustGet("Q").(*rtQ)
45+
4046
err = q.QWrite(msg)
4147
if err != nil {
48+
49+
// increment metric msg_errors
50+
q.cfg.MsgError.Inc()
51+
4252
c.JSON(500, gin.H{
4353
"status": "FAIL",
4454
"message": fmt.Sprintf("failed to write message: %s", err.Error()),

rtq/rtq.go

+38-7
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ import (
66
"fmt"
77
"net"
88
"net/http"
9+
"os"
910
"time"
1011

12+
"github.com/prometheus/client_golang/prometheus"
13+
1114
"errors"
1215

1316
"github.com/coreos/bbolt"
@@ -42,6 +45,12 @@ type Config struct {
4245
Logger *zap.Logger
4346
Receiver string
4447
Path string
48+
Processed prometheus.Counter
49+
Queued prometheus.Gauge
50+
TxBatches prometheus.Counter
51+
TxFail prometheus.Counter
52+
DbErr prometheus.Counter
53+
MsgError prometheus.Counter
4554
}
4655

4756
// rtQ private struct see NewQ
@@ -56,6 +65,10 @@ type rtQ struct {
5665
statusError func(msg string, fields ...zapcore.Field) // status error output
5766
}
5867

68+
func (rt *rtQ) GetMessageCount() int {
69+
return rt.mCount
70+
}
71+
5972
// NewQ returns a new rtQ
6073
func NewQ(name string, cfg Config) (*rtQ, error) {
6174
db, err := bolt.Open(cfg.Path+name+".db", 0600, &bolt.Options{Timeout: 1 * time.Second})
@@ -103,6 +116,9 @@ func (rt *rtQ) getMessageBatch() MessageBatch {
103116
// get bucket stats
104117
stats := bucket.Stats()
105118

119+
// metric: messages_in_queue
120+
rt.cfg.Queued.Set(float64(stats.KeyN))
121+
106122
rt.status("QueueState", zapcore.Field{
107123
Key: "TotalRecords",
108124
Type: zapcore.Int32Type,
@@ -144,6 +160,9 @@ func (rt *rtQ) getMessageBatch() MessageBatch {
144160
})
145161

146162
if err != nil {
163+
// increment metric db_errors
164+
rt.cfg.DbErr.Inc()
165+
147166
rt.cfg.Logger.Error("bbolt db View error: " + err.Error())
148167
}
149168

@@ -217,6 +236,10 @@ func (rt *rtQ) tx() {
217236
// try to send
218237
err := rt.transmit(mb)
219238
if err != nil {
239+
240+
// increment metric tx_fails
241+
rt.cfg.TxFail.Inc()
242+
220243
// transmission failed
221244
rt.status("Transmission", zapcore.Field{
222245
Key: "TransmissionError",
@@ -232,6 +255,9 @@ func (rt *rtQ) tx() {
232255
return
233256
}
234257

258+
// increment metric tx_batches
259+
rt.cfg.TxBatches.Inc()
260+
235261
rt.status("TransmissionComplete", zapcore.Field{
236262
Key: "RemovingMessages",
237263
Type: zapcore.Int32Type,
@@ -256,11 +282,8 @@ func (rt *rtQ) waitTx() {
256282
// Write to the queue
257283
func (rt *rtQ) QWrite(msg Message) error {
258284

259-
rt.status("ReceiverStatus", zapcore.Field{
260-
Key: "KeyValues",
261-
Type: zapcore.Int32Type,
262-
Integer: int64(len(msg.Payload)),
263-
})
285+
// increment metric
286+
rt.cfg.Processed.Inc()
264287

265288
rt.mq <- msg
266289

@@ -274,7 +297,7 @@ func messageHandler(db *bolt.DB, mq chan Message, remove chan int) {
274297
for {
275298
select {
276299
case msg := <-mq:
277-
db.Update(func(tx *bolt.Tx) error {
300+
err := db.Update(func(tx *bolt.Tx) error {
278301
uuidV4, _ := uuid.NewV4()
279302

280303
msg.Time = time.Now()
@@ -295,8 +318,12 @@ func messageHandler(db *bolt.DB, mq chan Message, remove chan int) {
295318

296319
return nil
297320
})
321+
if err != nil {
322+
fmt.Println("Error updating database. FATAL")
323+
os.Exit(1)
324+
}
298325
case rmi := <-remove:
299-
db.Update(func(tx *bolt.Tx) error {
326+
err := db.Update(func(tx *bolt.Tx) error {
300327
bucket := tx.Bucket([]byte("mq"))
301328

302329
c := bucket.Cursor()
@@ -313,6 +340,10 @@ func messageHandler(db *bolt.DB, mq chan Message, remove chan int) {
313340

314341
return nil
315342
})
343+
if err != nil {
344+
fmt.Println("Error updating database. FATAL")
345+
os.Exit(1)
346+
}
316347

317348
}
318349
}

rxtx.go

+107-3
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,89 @@ package main
22

33
import (
44
"flag"
5+
"fmt"
56
"io/ioutil"
67
"net/http"
8+
"os"
9+
"os/signal"
10+
"runtime"
11+
"runtime/pprof"
712
"time"
813

9-
"fmt"
14+
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/client_golang/prometheus/promauto"
1016

1117
"github.com/gin-contrib/zap"
18+
19+
"github.com/prometheus/client_golang/prometheus/promhttp"
20+
1221
"github.com/gin-gonic/gin"
1322
"github.com/txn2/rxtx/rtq"
1423
"go.uber.org/zap"
1524
)
1625

1726
func main() {
27+
1828
var port = flag.String("port", "8080", "Server port.")
1929
var path = flag.String("path", "./", "Directory to store database.")
2030
var interval = flag.Int("interval", 60, "Seconds between intervals.")
2131
var batch = flag.Int("batch", 100, "Batch size.")
2232
var maxq = flag.Int("maxq", 100000, "Max number of message in queue.")
2333
var ingest = flag.String("ingest", "http://localhost:8081/in", "Ingest server.")
34+
var verbose = flag.Bool("verbose", false, "Verbose")
35+
36+
// Instrumentation
37+
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`")
38+
var memprofile = flag.String("memprofile", "", "write memory profile to `file`")
2439

2540
flag.Parse()
2641

42+
if *cpuprofile != "" {
43+
f, err := os.Create(*cpuprofile)
44+
if err != nil {
45+
fmt.Println(err.Error())
46+
os.Exit(1)
47+
}
48+
49+
err = pprof.StartCPUProfile(f)
50+
if err != nil {
51+
fmt.Println(err.Error())
52+
os.Exit(1)
53+
}
54+
}
55+
56+
// Instrumentation and Signal handling
57+
c := make(chan os.Signal, 1)
58+
signal.Notify(c, os.Interrupt)
59+
go func() {
60+
for sig := range c {
61+
if sig.String() == "interrupt" {
62+
fmt.Printf("Exiting on interrupt...")
63+
if *cpuprofile != "" {
64+
pprof.StopCPUProfile()
65+
}
66+
67+
if *memprofile != "" {
68+
f, err := os.Create(*memprofile)
69+
if err != nil {
70+
fmt.Println("could not create memory profile: " + err.Error())
71+
}
72+
runtime.GC() // get up-to-date statistics
73+
if err := pprof.WriteHeapProfile(f); err != nil {
74+
fmt.Println("could not write memory profile: " + err.Error())
75+
}
76+
err = f.Close()
77+
if err != nil {
78+
fmt.Printf("Can not close memory profile: %s\n", err.Error())
79+
return
80+
}
81+
}
82+
83+
os.Exit(0)
84+
}
85+
}
86+
}()
87+
2788
zapCfg := zap.NewProductionConfig()
2889
zapCfg.DisableCaller = true
2990
zapCfg.DisableStacktrace = true
@@ -41,6 +102,37 @@ func main() {
41102

42103
logger.Info("Starting rxtx...")
43104

105+
// Prometheus Metrics
106+
processed := promauto.NewCounter(prometheus.CounterOpts{
107+
Name: "total_messages_received",
108+
Help: "Total number of messages received.",
109+
})
110+
111+
queued := promauto.NewGauge(prometheus.GaugeOpts{
112+
Name: "messages_in_queue",
113+
Help: "Number os messages in the queue.",
114+
})
115+
116+
txBatches := promauto.NewCounter(prometheus.CounterOpts{
117+
Name: "tx_batches",
118+
Help: "Total number of batch transmissions.",
119+
})
120+
121+
txFail := promauto.NewCounter(prometheus.CounterOpts{
122+
Name: "tx_fails",
123+
Help: "Total number of transaction errors.",
124+
})
125+
126+
dbErr := promauto.NewCounter(prometheus.CounterOpts{
127+
Name: "db_errors",
128+
Help: "Total number database errors.",
129+
})
130+
131+
msgErr := promauto.NewCounter(prometheus.CounterOpts{
132+
Name: "msg_errors",
133+
Help: "Total number message errors.",
134+
})
135+
44136
// database
45137
q, err := rtq.NewQ("rxtx", rtq.Config{
46138
Interval: time.Duration(*interval) * time.Second,
@@ -49,6 +141,12 @@ func main() {
49141
Logger: logger,
50142
Receiver: *ingest,
51143
Path: *path,
144+
Processed: processed,
145+
Queued: queued,
146+
TxBatches: txBatches,
147+
TxFail: txFail,
148+
DbErr: dbErr,
149+
MsgError: msgErr,
52150
})
53151
if err != nil {
54152
panic(err)
@@ -70,14 +168,20 @@ func main() {
70168
c.Next()
71169
})
72170

73-
// use zap logger
74-
r.Use(ginzap.Ginzap(logger, time.RFC3339, true))
171+
// use zap logger on http
172+
if *verbose == true {
173+
r.Use(ginzap.Ginzap(logger, time.RFC3339, true))
174+
}
75175

76176
rxRoute := "/rx/:producer/:key/*label"
77177
r.POST(rxRoute, rtq.RxRouteHandler)
78178
r.OPTIONS(rxRoute, preflight)
79179

180+
// Prometheus Metrics
181+
r.GET("/metrics", gin.WrapH(promhttp.Handler()))
182+
80183
logger.Info("Listening on port: " + *port)
184+
81185
// block on server run
82186
r.Run(":" + *port)
83187
}

0 commit comments

Comments
 (0)