Skip to content

Commit a5c5072

Browse files
committed
timers and async call
1 parent 9d64bab commit a5c5072

File tree

3 files changed

+152
-79
lines changed

3 files changed

+152
-79
lines changed

rtq/routes.go

+73-26
Original file line numberDiff line numberDiff line change
@@ -2,62 +2,109 @@ package rtq
22

33
import (
44
"encoding/json"
5+
"errors"
56
"fmt"
7+
"time"
68

79
"github.com/gin-gonic/gin"
10+
"go.uber.org/zap"
811
)
912

10-
// RxRouteHandler handles the http route for inbound data
11-
func RxRouteHandler(c *gin.Context) {
12-
//var json map[string]interface{}
13-
producer := c.Param("producer")
14-
key := c.Param("key")
15-
label := c.Param("label")
16-
17-
rawData, _ := c.GetRawData()
18-
19-
q := c.MustGet("Q").(*rtQ)
13+
func (rt *rtQ) processMessage(msg Message, rawData []byte) error {
14+
start := time.Now()
15+
defer rt.cfg.Pmx.ProcessingTime.Observe(float64(time.Since(start).Seconds()))
2016

2117
// all data is json
2218
payload := make(map[string]interface{})
2319
err := json.Unmarshal(rawData, &payload)
20+
if err != nil {
21+
// increment metric msg_errors
22+
rt.cfg.Pmx.MsgError.Inc()
23+
return errors.New(fmt.Sprintf("could not unmarshal json: %s", rawData))
24+
}
25+
26+
msg.Payload = payload
27+
28+
// write the message
29+
err = rt.QWrite(msg)
2430
if err != nil {
2531

2632
// increment metric msg_errors
27-
q.cfg.MsgError.Inc()
33+
rt.cfg.Pmx.MsgError.Inc()
34+
return errors.New(fmt.Sprintf("failed to write message: %s", err.Error()))
35+
}
2836

37+
return nil
38+
}
39+
40+
// RxRouteHandler handles the http route for inbound data
41+
func (rt *rtQ) RxRouteHandler(c *gin.Context) {
42+
start := time.Now()
43+
defer rt.cfg.Pmx.ResponseTime.Observe(float64(time.Since(start).Seconds()))
44+
45+
rawData, err := c.GetRawData()
46+
47+
if err != nil {
48+
rt.cfg.Logger.Error("Payload error", zap.Error(err))
2949
c.JSON(500, gin.H{
3050
"status": "FAIL",
31-
"message": fmt.Sprintf("could not unmarshal json: %s", rawData),
51+
"message": err.Error(),
3252
})
3353
return
3454
}
3555

36-
// build the message
37-
msg := Message{
38-
Producer: producer,
39-
Label: label,
40-
Key: key,
41-
Payload: payload,
56+
err = rt.processMessage(Message{
57+
Producer: c.Param("producer"),
58+
Key: c.Param("key"),
59+
Label: c.Param("label"),
60+
}, rawData)
61+
62+
if err != nil {
63+
rt.cfg.Logger.Error("Message processing er, or", zap.Error(err))
64+
c.JSON(500, gin.H{
65+
"status": "FAIL",
66+
"message": err.Error(),
67+
})
68+
return
4269
}
4370

44-
// write the message
71+
c.JSON(200, gin.H{
72+
"status": "OK",
73+
})
74+
}
4575

46-
err = q.QWrite(msg)
47-
if err != nil {
76+
func (rt *rtQ) RxRouteHandlerAsync(c *gin.Context) {
77+
start := time.Now()
4878

49-
// increment metric msg_errors
50-
q.cfg.MsgError.Inc()
79+
rawData, err := c.GetRawData()
5180

81+
if err != nil {
82+
rt.cfg.Logger.Error("Payload error", zap.Error(err))
5283
c.JSON(500, gin.H{
5384
"status": "FAIL",
54-
"message": fmt.Sprintf("failed to write message: %s", err.Error()),
85+
"message": err.Error(),
5586
})
56-
return
5787
}
5888

89+
msg := Message{
90+
Producer: c.Param("producer"),
91+
Key: c.Param("key"),
92+
Label: c.Param("label"),
93+
}
94+
95+
go func(msg Message, rawData []byte) {
96+
err := rt.processMessage(msg, rawData)
97+
if err != nil {
98+
rt.cfg.Logger.Error("Message processing error", zap.Error(err))
99+
c.JSON(500, gin.H{
100+
"status": "FAIL",
101+
"message": err.Error(),
102+
})
103+
}
104+
}(msg, rawData)
105+
106+
rt.cfg.Pmx.ResponseTimeAsync.Observe(float64(time.Since(start).Seconds()))
59107
c.JSON(200, gin.H{
60108
"status": "OK",
61109
})
62-
63110
}

rtq/rtq.go

+74-13
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/prometheus/client_golang/prometheus"
13+
"github.com/prometheus/client_golang/prometheus/promauto"
1314

1415
"errors"
1516

@@ -37,6 +38,20 @@ type MessageBatch struct {
3738
Messages []Message `json:"messages"`
3839
}
3940

41+
// Pmx
42+
type Pmx struct {
43+
Processed prometheus.Counter
44+
Queued prometheus.Gauge
45+
TxBatches prometheus.Counter
46+
TxFail prometheus.Counter
47+
DbErr prometheus.Counter
48+
MsgError prometheus.Counter
49+
ResponseTime prometheus.Summary
50+
ResponseTimeAsync prometheus.Summary
51+
ProcessingTime prometheus.Summary
52+
ProcessingErrors prometheus.Counter
53+
}
54+
4055
// Config options for rxtx
4156
type Config struct {
4257
Interval time.Duration
@@ -45,12 +60,7 @@ type Config struct {
4560
Logger *zap.Logger
4661
Receiver string
4762
Path string
48-
Processed prometheus.Counter
49-
Queued prometheus.Gauge
50-
TxBatches prometheus.Counter
51-
TxFail prometheus.Counter
52-
DbErr prometheus.Counter
53-
MsgError prometheus.Counter
63+
Pmx Pmx
5464
}
5565

5666
// rtQ private struct see NewQ
@@ -81,6 +91,57 @@ func NewQ(name string, cfg Config) (*rtQ, error) {
8191
return nil, err
8292
}
8393

94+
// Prometheus Metrics
95+
cfg.Pmx.Processed = promauto.NewCounter(prometheus.CounterOpts{
96+
Name: "rxtx_total_messages_received",
97+
Help: "Total number of messages received.",
98+
})
99+
100+
cfg.Pmx.Queued = promauto.NewGauge(prometheus.GaugeOpts{
101+
Name: "rxtx_messages_in_queue",
102+
Help: "Number os messages in the queue.",
103+
})
104+
105+
cfg.Pmx.TxBatches = promauto.NewCounter(prometheus.CounterOpts{
106+
Name: "rxtx_tx_batches",
107+
Help: "Total number of batch transmissions.",
108+
})
109+
110+
cfg.Pmx.TxFail = promauto.NewCounter(prometheus.CounterOpts{
111+
Name: "rxtx_tx_fails",
112+
Help: "Total number of transaction errors.",
113+
})
114+
115+
cfg.Pmx.DbErr = promauto.NewCounter(prometheus.CounterOpts{
116+
Name: "rxtx_db_errors",
117+
Help: "Total number database errors.",
118+
})
119+
120+
cfg.Pmx.MsgError = promauto.NewCounter(prometheus.CounterOpts{
121+
Name: "rxtx_msg_errors",
122+
Help: "Total number message errors.",
123+
})
124+
125+
cfg.Pmx.ResponseTime = promauto.NewSummary(prometheus.SummaryOpts{
126+
Name: "rxtx_response_time",
127+
Help: "Time it took to respond to a post.",
128+
})
129+
130+
cfg.Pmx.ResponseTimeAsync = promauto.NewSummary(prometheus.SummaryOpts{
131+
Name: "rxtx_response_time_async",
132+
Help: "Time it took to respond to a async post.",
133+
})
134+
135+
cfg.Pmx.ProcessingTime = promauto.NewSummary(prometheus.SummaryOpts{
136+
Name: "rxtx_processing_time",
137+
Help: "Time it took to process a post.",
138+
})
139+
140+
cfg.Pmx.ProcessingErrors = promauto.NewCounter(prometheus.CounterOpts{
141+
Name: "rxtx_processing_errors",
142+
Help: "Total number of processing errors.",
143+
})
144+
84145
mq := make(chan Message, 0)
85146
remove := make(chan int, 0)
86147

@@ -131,7 +192,7 @@ func (rt *rtQ) getMessageBatch() *MessageBatch {
131192
stats := bucket.Stats()
132193

133194
// metric: messages_in_queue
134-
rt.cfg.Queued.Set(float64(stats.KeyN))
195+
rt.cfg.Pmx.Queued.Set(float64(stats.KeyN))
135196

136197
rt.status("QueueState", zapcore.Field{
137198
Key: "TotalRecords",
@@ -175,7 +236,7 @@ func (rt *rtQ) getMessageBatch() *MessageBatch {
175236

176237
if err != nil {
177238
// increment metric db_errors
178-
rt.cfg.DbErr.Inc()
239+
rt.cfg.Pmx.DbErr.Inc()
179240

180241
rt.cfg.Logger.Error("bbolt db View error: " + err.Error())
181242
}
@@ -251,7 +312,7 @@ func (rt *rtQ) tx() {
251312
if err != nil {
252313

253314
// increment metric tx_fails
254-
rt.cfg.TxFail.Inc()
315+
rt.cfg.Pmx.TxFail.Inc()
255316

256317
// transmission failed
257318
rt.status("Transmission", zapcore.Field{
@@ -268,7 +329,7 @@ func (rt *rtQ) tx() {
268329
}
269330

270331
// increment metric tx_batches
271-
rt.cfg.TxBatches.Inc()
332+
rt.cfg.Pmx.TxBatches.Inc()
272333

273334
rt.status("TransmissionComplete", zapcore.Field{
274335
Key: "RemovingMessages",
@@ -282,7 +343,7 @@ func (rt *rtQ) tx() {
282343
func (rt *rtQ) QWrite(msg Message) error {
283344

284345
// increment metric
285-
rt.cfg.Processed.Inc()
346+
rt.cfg.Pmx.Processed.Inc()
286347

287348
rt.mq <- msg
288349

@@ -313,7 +374,7 @@ func messageHandler(db *bolt.DB, mq chan Message, remove chan int) {
313374
if err != nil {
314375
return err
315376
}
316-
b.Put([]byte(msg.Seq), buf)
377+
_ = b.Put([]byte(msg.Seq), buf)
317378

318379
return nil
319380
})
@@ -330,7 +391,7 @@ func messageHandler(db *bolt.DB, mq chan Message, remove chan int) {
330391
// get the first rt.cfg.Batch
331392
i := 1
332393
for k, _ := c.First(); k != nil; k, _ = c.Next() {
333-
c.Delete()
394+
_ = c.Delete()
334395
i++
335396
if i > rmi {
336397
break

rxtx.go

+5-40
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import (
1313

1414
"github.com/gin-contrib/zap"
1515
"github.com/gin-gonic/gin"
16-
"github.com/prometheus/client_golang/prometheus"
17-
"github.com/prometheus/client_golang/prometheus/promauto"
1816
"github.com/prometheus/client_golang/prometheus/promhttp"
1917
"github.com/txn2/rxtx/rtq"
2018
"go.uber.org/zap"
@@ -99,37 +97,6 @@ func main() {
9997

10098
logger.Info("Starting rxtx...")
10199

102-
// Prometheus Metrics
103-
processed := promauto.NewCounter(prometheus.CounterOpts{
104-
Name: "rxtx_total_messages_received",
105-
Help: "Total number of messages received.",
106-
})
107-
108-
queued := promauto.NewGauge(prometheus.GaugeOpts{
109-
Name: "rxtx_messages_in_queue",
110-
Help: "Number os messages in the queue.",
111-
})
112-
113-
txBatches := promauto.NewCounter(prometheus.CounterOpts{
114-
Name: "rxtx_tx_batches",
115-
Help: "Total number of batch transmissions.",
116-
})
117-
118-
txFail := promauto.NewCounter(prometheus.CounterOpts{
119-
Name: "rxtx_tx_fails",
120-
Help: "Total number of transaction errors.",
121-
})
122-
123-
dbErr := promauto.NewCounter(prometheus.CounterOpts{
124-
Name: "rxtx_db_errors",
125-
Help: "Total number database errors.",
126-
})
127-
128-
msgErr := promauto.NewCounter(prometheus.CounterOpts{
129-
Name: "rxtx_msg_errors",
130-
Help: "Total number message errors.",
131-
})
132-
133100
// database
134101
q, err := rtq.NewQ("rxtx", rtq.Config{
135102
Interval: time.Duration(*interval) * time.Second,
@@ -138,12 +105,6 @@ func main() {
138105
Logger: logger,
139106
Receiver: *ingest,
140107
Path: *path,
141-
Processed: processed,
142-
Queued: queued,
143-
TxBatches: txBatches,
144-
TxFail: txFail,
145-
DbErr: dbErr,
146-
MsgError: msgErr,
147108
})
148109
if err != nil {
149110
panic(err)
@@ -171,9 +132,13 @@ func main() {
171132
}
172133

173134
rxRoute := "/rx/:producer/:key/*label"
174-
r.POST(rxRoute, rtq.RxRouteHandler)
135+
r.POST(rxRoute, q.RxRouteHandler)
175136
r.OPTIONS(rxRoute, preflight)
176137

138+
rxRouteAsync := "/rxa/:producer/:key/*label"
139+
r.POST(rxRouteAsync, q.RxRouteHandlerAsync)
140+
r.OPTIONS(rxRouteAsync, preflight)
141+
177142
// Prometheus Metrics
178143
r.GET("/metrics", gin.WrapH(promhttp.Handler()))
179144

0 commit comments

Comments
 (0)