forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add ActiveMQ input plugin (influxdata#2689)
- Loading branch information
1 parent
edc6230
commit 59b7f91
Showing
5 changed files
with
488 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
# Telegraf Input Plugin: ActiveMQ | ||
|
||
This plugin gather queues, topics & subscribers metrics using ActiveMQ Console API. | ||
|
||
### Configuration: | ||
|
||
```toml | ||
# Description | ||
[[inputs.activemq]] | ||
## Required ActiveMQ Endpoint | ||
# server = "192.168.50.10" | ||
|
||
## Required ActiveMQ port | ||
# port = 8161 | ||
|
||
## Credentials for basic HTTP authentication | ||
# username = "admin" | ||
# password = "admin" | ||
|
||
## Required ActiveMQ webadmin root path | ||
# webadmin = "admin" | ||
|
||
## Maximum time to receive response. | ||
# response_timeout = "5s" | ||
|
||
## Optional TLS Config | ||
# tls_ca = "/etc/telegraf/ca.pem" | ||
# tls_cert = "/etc/telegraf/cert.pem" | ||
# tls_key = "/etc/telegraf/key.pem" | ||
## Use TLS but skip chain & host verification | ||
``` | ||
|
||
### Measurements & Fields: | ||
|
||
Every effort was made to preserve the names based on the XML response from the ActiveMQ Console API. | ||
|
||
- activemq_queues: | ||
- size | ||
- consumer_count | ||
- enqueue_count | ||
- dequeue_count | ||
- activemq_topics: | ||
- size | ||
- consumer_count | ||
- enqueue_count | ||
- dequeue_count | ||
- subscribers_metrics: | ||
- pending_queue_size | ||
- dispatched_queue_size | ||
- dispatched_counter | ||
- enqueue_counter | ||
- dequeue_counter | ||
|
||
### Tags: | ||
|
||
- activemq_queues: | ||
- name | ||
- source | ||
- port | ||
- activemq_topics: | ||
- name | ||
- source | ||
- port | ||
- activemq_subscribers: | ||
- client_id | ||
- subscription_name | ||
- connection_id | ||
- destination_name | ||
- selector | ||
- active | ||
- source | ||
- port | ||
|
||
### Example Output: | ||
|
||
``` | ||
$ ./telegraf -config telegraf.conf -input-filter activemq -test | ||
activemq_queues,name=sandra,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 | ||
activemq_queues,name=Test,host=88284b2fe51b,source=localhost,port=8161 dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000 | ||
activemq_topics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b,source=localhost,port=8161 size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000 | ||
activemq_topics,host=88284b2fe51b,name=AAA\,source=localhost,port=8161 size=0i,consumer_count=1i,enqueue_count=0i,dequeue_count=0i 1492610703000000000 | ||
activemq_topics,name=ActiveMQ.Advisory.Topic\,source=localhost,port=8161 ,host=88284b2fe51b enqueue_count=1i,dequeue_count=0i,size=0i,consumer_count=0i 1492610703000000000 | ||
activemq_topics,name=ActiveMQ.Advisory.Queue\,source=localhost,port=8161 ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=2i,dequeue_count=0i 1492610703000000000 | ||
activemq_topics,name=AAAA\ ,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 | ||
activemq_subscribers,connection_id=NOTSET,destination_name=AAA,,source=localhost,port=8161,selector=AA,active=no,host=88284b2fe51b,client_id=AAA,subscription_name=AAA pending_queue_size=0i,dispatched_queue_size=0i,dispatched_counter=0i,enqueue_counter=0i,dequeue_counter=0i 1492610703000000000 | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,261 @@ | ||
package activemq | ||
|
||
import ( | ||
"encoding/xml" | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"strconv" | ||
"time" | ||
|
||
"strings" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/internal" | ||
"github.com/influxdata/telegraf/internal/tls" | ||
"github.com/influxdata/telegraf/plugins/inputs" | ||
) | ||
|
||
type ActiveMQ struct { | ||
Server string `json:"server"` | ||
Port int `json:"port"` | ||
Username string `json:"username"` | ||
Password string `json:"password"` | ||
Webadmin string `json:"webadmin"` | ||
ResponseTimeout internal.Duration | ||
tls.ClientConfig | ||
|
||
client *http.Client | ||
} | ||
|
||
type Topics struct { | ||
XMLName xml.Name `xml:"topics"` | ||
TopicItems []Topic `xml:"topic"` | ||
} | ||
|
||
type Topic struct { | ||
XMLName xml.Name `xml:"topic"` | ||
Name string `xml:"name,attr"` | ||
Stats Stats `xml:"stats"` | ||
} | ||
|
||
type Subscribers struct { | ||
XMLName xml.Name `xml:"subscribers"` | ||
SubscriberItems []Subscriber `xml:"subscriber"` | ||
} | ||
|
||
type Subscriber struct { | ||
XMLName xml.Name `xml:"subscriber"` | ||
ClientId string `xml:"clientId,attr"` | ||
SubscriptionName string `xml:"subscriptionName,attr"` | ||
ConnectionId string `xml:"connectionId,attr"` | ||
DestinationName string `xml:"destinationName,attr"` | ||
Selector string `xml:"selector,attr"` | ||
Active string `xml:"active,attr"` | ||
Stats Stats `xml:"stats"` | ||
} | ||
|
||
type Queues struct { | ||
XMLName xml.Name `xml:"queues"` | ||
QueueItems []Queue `xml:"queue"` | ||
} | ||
|
||
type Queue struct { | ||
XMLName xml.Name `xml:"queue"` | ||
Name string `xml:"name,attr"` | ||
Stats Stats `xml:"stats"` | ||
} | ||
|
||
type Stats struct { | ||
XMLName xml.Name `xml:"stats"` | ||
Size int `xml:"size,attr"` | ||
ConsumerCount int `xml:"consumerCount,attr"` | ||
EnqueueCount int `xml:"enqueueCount,attr"` | ||
DequeueCount int `xml:"dequeueCount,attr"` | ||
PendingQueueSize int `xml:"pendingQueueSize,attr"` | ||
DispatchedQueueSize int `xml:"dispatchedQueueSize,attr"` | ||
DispatchedCounter int `xml:"dispatchedCounter,attr"` | ||
EnqueueCounter int `xml:"enqueueCounter,attr"` | ||
DequeueCounter int `xml:"dequeueCounter,attr"` | ||
} | ||
|
||
const ( | ||
QUEUES_STATS = "queues" | ||
TOPICS_STATS = "topics" | ||
SUBSCRIBERS_STATS = "subscribers" | ||
) | ||
|
||
var sampleConfig = ` | ||
## Required ActiveMQ Endpoint | ||
# server = "192.168.50.10" | ||
## Required ActiveMQ port | ||
# port = 8161 | ||
## Credentials for basic HTTP authentication | ||
# username = "admin" | ||
# password = "admin" | ||
## Required ActiveMQ webadmin root path | ||
# webadmin = "admin" | ||
## Maximum time to receive response. | ||
# response_timeout = "5s" | ||
## Optional TLS Config | ||
# tls_ca = "/etc/telegraf/ca.pem" | ||
# tls_cert = "/etc/telegraf/cert.pem" | ||
# tls_key = "/etc/telegraf/key.pem" | ||
## Use TLS but skip chain & host verification | ||
` | ||
|
||
func (a *ActiveMQ) Description() string { | ||
return "Gather ActiveMQ metrics" | ||
} | ||
|
||
func (a *ActiveMQ) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (a *ActiveMQ) createHttpClient() (*http.Client, error) { | ||
tlsCfg, err := a.ClientConfig.TLSConfig() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
client := &http.Client{ | ||
Transport: &http.Transport{ | ||
TLSClientConfig: tlsCfg, | ||
}, | ||
Timeout: a.ResponseTimeout.Duration, | ||
} | ||
|
||
return client, nil | ||
} | ||
|
||
func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) { | ||
if a.ResponseTimeout.Duration < time.Second { | ||
a.ResponseTimeout.Duration = time.Second * 5 | ||
} | ||
|
||
if a.client == nil { | ||
client, err := a.createHttpClient() | ||
if err != nil { | ||
return nil, err | ||
} | ||
a.client = client | ||
} | ||
url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword) | ||
|
||
req, err := http.NewRequest("GET", url, nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
req.SetBasicAuth(a.Username, a.Password) | ||
resp, err := a.client.Do(req) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
defer resp.Body.Close() | ||
return ioutil.ReadAll(resp.Body) | ||
} | ||
|
||
func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues) { | ||
for _, queue := range queues.QueueItems { | ||
records := make(map[string]interface{}) | ||
tags := make(map[string]string) | ||
|
||
tags["name"] = strings.TrimSpace(queue.Name) | ||
tags["source"] = a.Server | ||
tags["port"] = strconv.Itoa(a.Port) | ||
|
||
records["size"] = queue.Stats.Size | ||
records["consumer_count"] = queue.Stats.ConsumerCount | ||
records["enqueue_count"] = queue.Stats.EnqueueCount | ||
records["dequeue_count"] = queue.Stats.DequeueCount | ||
|
||
acc.AddFields("activemq_queues", records, tags) | ||
} | ||
} | ||
|
||
func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics) { | ||
for _, topic := range topics.TopicItems { | ||
records := make(map[string]interface{}) | ||
tags := make(map[string]string) | ||
|
||
tags["name"] = topic.Name | ||
tags["source"] = a.Server | ||
tags["port"] = strconv.Itoa(a.Port) | ||
|
||
records["size"] = topic.Stats.Size | ||
records["consumer_count"] = topic.Stats.ConsumerCount | ||
records["enqueue_count"] = topic.Stats.EnqueueCount | ||
records["dequeue_count"] = topic.Stats.DequeueCount | ||
|
||
acc.AddFields("activemq_topics", records, tags) | ||
} | ||
} | ||
|
||
func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscribers Subscribers) { | ||
for _, subscriber := range subscribers.SubscriberItems { | ||
records := make(map[string]interface{}) | ||
tags := make(map[string]string) | ||
|
||
tags["client_id"] = subscriber.ClientId | ||
tags["subscription_name"] = subscriber.SubscriptionName | ||
tags["connection_id"] = subscriber.ConnectionId | ||
tags["destination_name"] = subscriber.DestinationName | ||
tags["selector"] = subscriber.Selector | ||
tags["active"] = subscriber.Active | ||
tags["source"] = a.Server | ||
tags["port"] = strconv.Itoa(a.Port) | ||
|
||
records["pending_queue_size"] = subscriber.Stats.PendingQueueSize | ||
records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize | ||
records["dispatched_counter"] = subscriber.Stats.DispatchedCounter | ||
records["enqueue_counter"] = subscriber.Stats.EnqueueCounter | ||
records["dequeue_counter"] = subscriber.Stats.DequeueCounter | ||
|
||
acc.AddFields("activemq_subscribers", records, tags) | ||
} | ||
} | ||
|
||
func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error { | ||
dataQueues, err := a.GetMetrics(QUEUES_STATS) | ||
queues := Queues{} | ||
err = xml.Unmarshal(dataQueues, &queues) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
dataTopics, err := a.GetMetrics(TOPICS_STATS) | ||
topics := Topics{} | ||
err = xml.Unmarshal(dataTopics, &topics) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
dataSubscribers, err := a.GetMetrics(SUBSCRIBERS_STATS) | ||
subscribers := Subscribers{} | ||
err = xml.Unmarshal(dataSubscribers, &subscribers) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
a.GatherQueuesMetrics(acc, queues) | ||
a.GatherTopicsMetrics(acc, topics) | ||
a.GatherSubscribersMetrics(acc, subscribers) | ||
|
||
return nil | ||
} | ||
|
||
func init() { | ||
inputs.Add("activemq", func() telegraf.Input { | ||
return &ActiveMQ{ | ||
Server: "localhost", | ||
Port: 8161, | ||
} | ||
}) | ||
} |
Oops, something went wrong.