Skip to content

Commit

Permalink
Improvement of RabbitMQ plugin influxdata#3025 influxdata#3252
Browse files Browse the repository at this point in the history
* new metrics:
  * unroutable messages
  * node uptime
  * gc metrics
  * mnesia metrics
  * node healthcheck
  * IO metrics
* refactoring tests:
  * moved the json examples to a separate files
  * check metric values

Signed-off-by: Vitalii Solodilov <mcdkr@yandex.ru>
  • Loading branch information
mcdoker18 committed Jun 19, 2018
1 parent b66eb2f commit 1d9131a
Show file tree
Hide file tree
Showing 8 changed files with 589 additions and 590 deletions.
25 changes: 25 additions & 0 deletions plugins/inputs/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,42 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- queues (int, queues)
- clustering_listeners (int, cluster nodes)
- amqp_listeners (int, amqp nodes up)
- return_unroutable (int, number of unroutable messages)
- return_unroutable_rate (float, number of unroutable messages per second)

- rabbitmq_node
- disk_free (int, bytes)
- disk_free_limit (int, bytes)
- disk_free_alarm (int, disk alarm)
- fd_total (int, file descriptors)
- fd_used (int, file descriptors)
- mem_limit (int, bytes)
- mem_used (int, bytes)
- mem_alarm (int, memory a)
- proc_total (int, erlang processes)
- proc_used (int, erlang processes)
- run_queue (int, erlang processes)
- sockets_total (int, sockets)
- sockets_used (int, sockets)
- running (int, node up)
- uptime (int, milliseconds)
- health_check_status (int, 1 or 0)
- mnesia_disk_tx_count (int, number of disk transaction)
- mnesia_ram_tx_count (int, number of ram transaction)
- mnesia_disk_tx_count_rate (float, number of disk transaction per second)
- mnesia_ram_tx_count_rate (float, number of ram transaction per second)
- gc_num (int, number of garbage collection)
- gc_bytes_reclaimed (int, bytes)
- gc_num_rate (float, number of garbage collection per second)
- gc_bytes_reclaimed_rate (float, bytes per second)
- io_read_avg_time (float, number of read operations)
- io_read_avg_time_rate (int, number of read operations per second)
- io_read_bytes (int, bytes)
- io_read_bytes_rate (float, bytes per second)
- io_write_avg_time (int, milliseconds)
- io_write_avg_time_rate (float, milliseconds per second)
- io_write_bytes (int, bytes)
- io_write_bytes_rate (float, bytes per second)

- rabbitmq_queue
- consumer_utilisation (float, percent)
Expand Down Expand Up @@ -109,7 +131,9 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd

- rabbitmq_exchange
- messages_publish_in (int, count)
- messages_publish_in_rate (int, messages per second)
- messages_publish_out (int, count)
- messages_publish_out_rate (int, messages per second)

### Tags:

Expand All @@ -121,6 +145,7 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd

- rabbitmq_node
- node
- url

- rabbitmq_queue
- url
Expand Down
227 changes: 162 additions & 65 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,27 @@ type Listeners struct {

// Details ...
type Details struct {
Rate float64
Rate float64 `json:"rate"`
}

// MessageStats ...
type MessageStats struct {
Ack int64
AckDetails Details `json:"ack_details"`
Deliver int64
DeliverDetails Details `json:"deliver_details"`
DeliverGet int64 `json:"deliver_get"`
DeliverGetDetails Details `json:"deliver_get_details"`
Publish int64
PublishDetails Details `json:"publish_details"`
Redeliver int64
RedeliverDetails Details `json:"redeliver_details"`
PublishIn int64 `json:"publish_in"`
PublishOut int64 `json:"publish_out"`
Ack int64
AckDetails Details `json:"ack_details"`
Deliver int64
DeliverDetails Details `json:"deliver_details"`
DeliverGet int64 `json:"deliver_get"`
DeliverGetDetails Details `json:"deliver_get_details"`
Publish int64
PublishDetails Details `json:"publish_details"`
Redeliver int64
RedeliverDetails Details `json:"redeliver_details"`
PublishIn int64 `json:"publish_in"`
PublishInDetails Details `json:"publish_in_details"`
PublishOut int64 `json:"publish_out"`
PublishOutDetails Details `json:"publish_out_details"`
ReturnUnroutable int64 `json:"return_unroutable"`
ReturnUnroutableDetails Details `json:"return_unroutable_details"`
}

// ObjectTotals ...
Expand All @@ -114,45 +118,68 @@ type QueueTotals struct {

// Queue ...
type Queue struct {
QueueTotals // just to not repeat the same code
MessageStats `json:"message_stats"`
QueueTotals // just to not repeat the same code
MessageStats `json:"message_stats"`
Memory int64
Consumers int64
ConsumerUtilisation float64 `json:"consumer_utilisation"`
Name string
Node string
Vhost string
Durable bool
AutoDelete bool `json:"auto_delete"`
IdleSince string `json:"idle_since"`
AutoDelete bool `json:"auto_delete"`
IdleSince string `json:"idle_since"`
}

// Node ...
type Node struct {
Name string

DiskFree int64 `json:"disk_free"`
DiskFreeLimit int64 `json:"disk_free_limit"`
FdTotal int64 `json:"fd_total"`
FdUsed int64 `json:"fd_used"`
MemLimit int64 `json:"mem_limit"`
MemUsed int64 `json:"mem_used"`
ProcTotal int64 `json:"proc_total"`
ProcUsed int64 `json:"proc_used"`
RunQueue int64 `json:"run_queue"`
SocketsTotal int64 `json:"sockets_total"`
SocketsUsed int64 `json:"sockets_used"`
Running bool `json:"running"`
DiskFree int64 `json:"disk_free"`
DiskFreeLimit int64 `json:"disk_free_limit"`
DiskFreeAlarm bool `json:"disk_free_alarm"`
FdTotal int64 `json:"fd_total"`
FdUsed int64 `json:"fd_used"`
MemLimit int64 `json:"mem_limit"`
MemUsed int64 `json:"mem_used"`
MemAlarm bool `json:"mem_alarm"`
ProcTotal int64 `json:"proc_total"`
ProcUsed int64 `json:"proc_used"`
RunQueue int64 `json:"run_queue"`
SocketsTotal int64 `json:"sockets_total"`
SocketsUsed int64 `json:"sockets_used"`
Running bool `json:"running"`
Uptime int64 `json:"uptime"`
MnesiaDiskTxCount int64 `json:"mnesia_disk_tx_count"`
MnesiaDiskTxCountDetails Details `json:"mnesia_disk_tx_count_details"`
MnesiaRamTxCount int64 `json:"mnesia_ram_tx_count"`
MnesiaRamTxCountDetails Details `json:"mnesia_ram_tx_count_details"`
GcNum int64 `json:"gc_num"`
GcNumDetails Details `json:"gc_num_details"`
GcBytesReclaimed int64 `json:"gc_bytes_reclaimed"`
GcBytesReclaimedDetails Details `json:"gc_bytes_reclaimed_details"`
IoReadAvgTime int64 `json:"io_read_avg_time"`
IoReadAvgTimeDetails Details `json:"io_read_avg_time_details"`
IoReadBytes int64 `json:"io_read_bytes"`
IoReadBytesDetails Details `json:"io_read_bytes_details"`
IoWriteAvgTime int64 `json:"io_write_avg_time"`
IoWriteAvgTimeDetails Details `json:"io_write_avg_time_details"`
IoWriteBytes int64 `json:"io_write_bytes"`
IoWriteBytesDetails Details `json:"io_write_bytes_details"`
}

type Exchange struct {
Name string
MessageStats `json:"message_stats"`
Type string
Internal bool
Vhost string
Durable bool
AutoDelete bool `json:"auto_delete"`
Name string
MessageStats `json:"message_stats"`
Type string
Internal bool
Vhost string
Durable bool
AutoDelete bool `json:"auto_delete"`
}

type HealthCheck struct {
Status string `json:"status"`
}

// gatherFunc ...
Expand Down Expand Up @@ -204,6 +231,13 @@ var sampleConfig = `
queue_name_exclude = []
`

func boolToInt(b bool) int64 {
if b {
return 1
}
return 0
}

// SampleConfig ...
func (r *RabbitMQ) SampleConfig() string {
return sampleConfig
Expand Down Expand Up @@ -302,12 +336,12 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
return
}

var clustering_listeners, amqp_listeners int64 = 0, 0
var clusteringListeners, amqpListeners int64 = 0, 0
for _, listener := range overview.Listeners {
if listener.Protocol == "clustering" {
clustering_listeners++
clusteringListeners++
} else if listener.Protocol == "amqp" {
amqp_listeners++
amqpListeners++
}
}

Expand All @@ -328,48 +362,109 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
"messages_delivered": overview.MessageStats.Deliver,
"messages_delivered_get": overview.MessageStats.DeliverGet,
"messages_published": overview.MessageStats.Publish,
"clustering_listeners": clustering_listeners,
"amqp_listeners": amqp_listeners,
"clustering_listeners": clusteringListeners,
"amqp_listeners": amqpListeners,
"return_unroutable": overview.MessageStats.ReturnUnroutable,
"return_unroutable_rate": overview.MessageStats.ReturnUnroutableDetails.Rate,
}
acc.AddFields("rabbitmq_overview", fields, tags)
}

func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
nodes := make([]Node, 0)
allNodes := make([]Node, 0)
// Gather information about nodes
err := r.requestJSON("/api/nodes", &nodes)
err := r.requestJSON("/api/nodes", &allNodes)
if err != nil {
acc.AddError(err)
return
}
now := time.Now()

nodes := make(map[string]Node)
for _, node := range allNodes {
if r.shouldGatherNode(node) {
nodes[node.Name] = node
}
}

numberNodes := len(nodes)
if numberNodes == 0 {
return
}

type NodeHealthCheck struct {
NodeName string
HealthCheck HealthCheck
Error error
}

healthChecksChannel := make(chan NodeHealthCheck, numberNodes)

for _, node := range nodes {
if !r.shouldGatherNode(node) {
continue
go func(nodeName string, healthChecksChannel chan NodeHealthCheck) {
var healthCheck HealthCheck

err := r.requestJSON("/api/healthchecks/node/"+nodeName, &healthCheck)
nodeHealthCheck := NodeHealthCheck{
NodeName: nodeName,
Error: err,
HealthCheck: healthCheck,
}

healthChecksChannel <- nodeHealthCheck
}(node.Name, healthChecksChannel)
}

now := time.Now()

for i := 0; i < len(nodes); i++ {
nodeHealthCheck := <-healthChecksChannel

var healthCheckStatus int64 = 0

if nodeHealthCheck.Error != nil {
acc.AddError(nodeHealthCheck.Error)
} else if nodeHealthCheck.HealthCheck.Status == "ok" {
healthCheckStatus = 1
}

node := nodes[nodeHealthCheck.NodeName]

tags := map[string]string{"url": r.URL}
tags["node"] = node.Name

var running int64 = 0
if node.Running {
running = 1
}

fields := map[string]interface{}{
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"running": running,
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"disk_free_alarm": boolToInt(node.DiskFreeAlarm),
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"mem_alarm": boolToInt(node.MemAlarm),
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"uptime": node.Uptime,
"mnesia_disk_tx_count": node.MnesiaDiskTxCount,
"mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate,
"mnesia_ram_tx_count": node.MnesiaRamTxCount,
"mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate,
"gc_num": node.GcNum,
"gc_num_rate": node.GcNumDetails.Rate,
"gc_bytes_reclaimed": node.GcBytesReclaimed,
"gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate,
"io_read_avg_time": node.IoReadAvgTime,
"io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate,
"io_read_bytes": node.IoReadBytes,
"io_read_bytes_rate": node.IoReadBytesDetails.Rate,
"io_write_avg_time": node.IoWriteAvgTime,
"io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate,
"io_write_bytes": node.IoWriteBytes,
"io_write_bytes_rate": node.IoWriteBytesDetails.Rate,
"running": boolToInt(node.Running),
"health_check_status": healthCheckStatus,
}
acc.AddFields("rabbitmq_node", fields, tags, now)
}
Expand Down Expand Up @@ -459,8 +554,10 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
acc.AddFields(
"rabbitmq_exchange",
map[string]interface{}{
"messages_publish_in": exchange.MessageStats.PublishIn,
"messages_publish_out": exchange.MessageStats.PublishOut,
"messages_publish_in": exchange.MessageStats.PublishIn,
"messages_publish_in_rate": exchange.MessageStats.PublishInDetails.Rate,
"messages_publish_out": exchange.MessageStats.PublishOut,
"messages_publish_out_rate": exchange.MessageStats.PublishOutDetails.Rate,
},
tags,
)
Expand Down
Loading

0 comments on commit 1d9131a

Please sign in to comment.