From 70ff0a7aa9a3feabc88f15e4a2558c2f6951e915 Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Tue, 17 Nov 2020 12:29:24 +0200 Subject: [PATCH] Nats module improvements (#22445) --- CHANGELOG.next.asciidoc | 1 + metricbeat/docs/fields.asciidoc | 239 ++++++++++++++++++ metricbeat/docs/modules/nats.asciidoc | 21 +- .../docs/modules/nats/connection.asciidoc | 21 ++ metricbeat/docs/modules/nats/route.asciidoc | 21 ++ metricbeat/docs/modules_list.asciidoc | 4 +- metricbeat/include/list_common.go | 2 + metricbeat/metricbeat.reference.yml | 10 +- metricbeat/module/nats/_meta/Dockerfile | 7 + .../module/nats/_meta/config.reference.yml | 10 +- metricbeat/module/nats/_meta/config.yml | 10 +- metricbeat/module/nats/_meta/docs.asciidoc | 3 +- metricbeat/module/nats/_meta/fields.yml | 1 + metricbeat/module/nats/_meta/run.sh | 14 +- .../module/nats/connection/_meta/data.json | 35 +++ .../nats/connection/_meta/docs.asciidoc | 1 + .../module/nats/connection/_meta/fields.yml | 57 +++++ .../_meta/test/connectionsmetrics.json | 29 +++ .../module/nats/connection/connection.go | 94 +++++++ .../connection/connection_integration_test.go | 56 ++++ .../module/nats/connection/connection_test.go | 63 +++++ metricbeat/module/nats/connection/data.go | 116 +++++++++ .../module/nats/connections/_meta/data.json | 6 +- .../nats/connections/_meta/docs.asciidoc | 2 +- .../_meta/test/connectionsmetrics.json | 30 ++- .../connections_integration_test.go | 7 +- .../nats/connections/connections_test.go | 2 +- metricbeat/module/nats/connections/data.go | 18 +- metricbeat/module/nats/docker-compose.yml | 7 +- metricbeat/module/nats/fields.go | 2 +- metricbeat/module/nats/route/_meta/data.json | 35 +++ .../module/nats/route/_meta/docs.asciidoc | 1 + metricbeat/module/nats/route/_meta/fields.yml | 54 ++++ .../nats/route/_meta/test/routesmetrics.json | 37 +++ metricbeat/module/nats/route/data.go | 105 ++++++++ metricbeat/module/nats/route/route.go | 95 +++++++ .../nats/route/route_integration_test.go | 56 ++++ metricbeat/module/nats/route/route_test.go | 63 +++++ metricbeat/module/nats/routes/_meta/data.json | 4 +- .../module/nats/routes/_meta/docs.asciidoc | 2 +- .../nats/routes/_meta/test/routesmetrics.json | 38 ++- metricbeat/module/nats/routes/data.go | 18 +- .../nats/routes/routes_integration_test.go | 7 +- metricbeat/module/nats/routes/routes_test.go | 2 +- .../module/nats/stats/_meta/docs.asciidoc | 2 +- metricbeat/module/nats/stats/data.go | 105 ++------ .../nats/stats/stats_integration_test.go | 7 +- .../nats/subscriptions/_meta/docs.asciidoc | 2 +- .../subscriptions_integration_test.go | 7 +- metricbeat/module/nats/test_nats.py | 55 +++- metricbeat/module/nats/util/util.go | 121 +++++++++ metricbeat/modules.d/nats.yml.disabled | 10 +- x-pack/metricbeat/metricbeat.reference.yml | 10 +- x-pack/packetbeat/packetbeat.reference.yml | 2 +- x-pack/packetbeat/packetbeat.yml | 2 +- 55 files changed, 1581 insertions(+), 148 deletions(-) create mode 100644 metricbeat/docs/modules/nats/connection.asciidoc create mode 100644 metricbeat/docs/modules/nats/route.asciidoc create mode 100644 metricbeat/module/nats/connection/_meta/data.json create mode 100644 metricbeat/module/nats/connection/_meta/docs.asciidoc create mode 100644 metricbeat/module/nats/connection/_meta/fields.yml create mode 100644 metricbeat/module/nats/connection/_meta/test/connectionsmetrics.json create mode 100644 metricbeat/module/nats/connection/connection.go create mode 100644 metricbeat/module/nats/connection/connection_integration_test.go create mode 100644 metricbeat/module/nats/connection/connection_test.go create mode 100644 metricbeat/module/nats/connection/data.go create mode 100644 metricbeat/module/nats/route/_meta/data.json create mode 100644 metricbeat/module/nats/route/_meta/docs.asciidoc create mode 100644 metricbeat/module/nats/route/_meta/fields.yml create mode 100644 metricbeat/module/nats/route/_meta/test/routesmetrics.json create mode 100644 metricbeat/module/nats/route/data.go create mode 100644 metricbeat/module/nats/route/route.go create mode 100644 metricbeat/module/nats/route/route_integration_test.go create mode 100644 metricbeat/module/nats/route/route_test.go create mode 100644 metricbeat/module/nats/util/util.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8c2ca73468bd..65e2dbe017d3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -839,6 +839,7 @@ same journal. {pull}18467[18467] - Map cloud data filed `cloud.account.id` to azure subscription. {pull}21483[21483] {issue}21381[21381] - Move s3_daily_storage and s3_request metricsets to use cloudwatch input. {pull}21703[21703] - Duplicate system.process.cmdline field with process.command_line ECS field name. {pull}22325[22325] +- Add connection and route metricsets for nats metricbeat module to collect metrics per connection/route. {pull}22445[22445] - Add unit file states to system/service {pull}22557[22557] *Packetbeat* diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index cc4ced5fd782..5a706cea7829 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -35477,6 +35477,9 @@ type: keyword *`nats.server.time`*:: + -- + +deprecated:[8.0.0] + Server time of metric creation @@ -35484,6 +35487,127 @@ type: date -- +[float] +=== connection + +Contains nats connection related metrics + + + +*`nats.connection.name`*:: ++ +-- +The name of the connection + + +type: keyword + +-- + +*`nats.connection.subscriptions`*:: ++ +-- +The number of subscriptions in this connection + + +type: integer + +-- + +*`nats.connection.pending_bytes`*:: ++ +-- +The number of pending bytes of this connection + + +type: long + +format: bytes + +-- + +*`nats.connection.uptime`*:: ++ +-- +The period the connection is up (sec) + + +type: long + +format: duration + +-- + +*`nats.connection.idle_time`*:: ++ +-- +The period the connection is idle (sec) + + +type: long + +format: duration + +-- + +[float] +=== in + +The amount of incoming data + + + +*`nats.connection.in.messages`*:: ++ +-- +The amount of incoming messages + + +type: long + +-- + +*`nats.connection.in.bytes`*:: ++ +-- +The amount of incoming bytes + + +type: long + +format: bytes + +-- + +[float] +=== out + +The amount of outgoing data + + + +*`nats.connection.out.messages`*:: ++ +-- +The amount of outgoing messages + + +type: long + +-- + +*`nats.connection.out.bytes`*:: ++ +-- +The amount of outgoing bytes + + +type: long + +format: bytes + +-- + [float] === connections @@ -35501,6 +35625,121 @@ type: integer -- +[float] +=== route + +Contains nats route related metrics + + + +*`nats.route.subscriptions`*:: ++ +-- +The number of subscriptions in this connection + + +type: integer + +-- + +*`nats.route.remote_id`*:: ++ +-- +The remote id on which the route is connected to + + +type: keyword + +-- + +*`nats.route.pending_size`*:: ++ +-- +The number of pending routes + + +type: long + +-- + +*`nats.route.port`*:: ++ +-- +The port of the route + + +type: integer + +-- + +*`nats.route.ip`*:: ++ +-- +The ip of the route + + +type: ip + +-- + +[float] +=== in + +The amount of incoming data + + + +*`nats.route.in.messages`*:: ++ +-- +The amount of incoming messages + + +type: long + +-- + +*`nats.route.in.bytes`*:: ++ +-- +The amount of incoming bytes + + +type: long + +format: bytes + +-- + +[float] +=== out + +The amount of outgoing data + + + +*`nats.route.out.messages`*:: ++ +-- +The amount of outgoing messages + + +type: long + +-- + +*`nats.route.out.bytes`*:: ++ +-- +The amount of outgoing bytes + + +type: long + +format: bytes + +-- + [float] === routes diff --git a/metricbeat/docs/modules/nats.asciidoc b/metricbeat/docs/modules/nats.asciidoc index 8328da8dce7d..91c45b0c32f7 100644 --- a/metricbeat/docs/modules/nats.asciidoc +++ b/metricbeat/docs/modules/nats.asciidoc @@ -7,7 +7,8 @@ This file is generated! See scripts/mage/docs_collector.go This is the Nats module. The Nats module uses https://nats.io/documentation/managing_the_server/monitoring/[Nats monitoring server APIs] to collect metrics. -The default metricsets are `stats`, `connections`, `routes` and `subscriptions`. +The default metricsets are `stats`, `connections`, `routes` and `subscriptions` while `connection` and `route` +metricsets can be enabled to collect detailed metrics per connection/route. [float] === Compatibility @@ -33,13 +34,21 @@ in <>. Here is an example configuration: ---- metricbeat.modules: - module: nats - metricsets: ["connections", "routes", "stats", "subscriptions"] + metricsets: + - "connections" + - "routes" + - "stats" + - "subscriptions" + #- "connection" + #- "route" period: 10s hosts: ["localhost:8222"] #stats.metrics_path: "/varz" #connections.metrics_path: "/connz" #routes.metrics_path: "/routez" #subscriptions.metrics_path: "/subsz" + #connection.metrics_path: "/connz" + #route.metrics_path: "/routez" ---- [float] @@ -47,16 +56,24 @@ metricbeat.modules: The following metricsets are available: +* <> + * <> +* <> + * <> * <> * <> +include::nats/connection.asciidoc[] + include::nats/connections.asciidoc[] +include::nats/route.asciidoc[] + include::nats/routes.asciidoc[] include::nats/stats.asciidoc[] diff --git a/metricbeat/docs/modules/nats/connection.asciidoc b/metricbeat/docs/modules/nats/connection.asciidoc new file mode 100644 index 000000000000..8ee655538a7f --- /dev/null +++ b/metricbeat/docs/modules/nats/connection.asciidoc @@ -0,0 +1,21 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// + +[[metricbeat-metricset-nats-connection]] +=== NATS connection metricset + +include::../../../module/nats/connection/_meta/docs.asciidoc[] + + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/nats/connection/_meta/data.json[] +---- diff --git a/metricbeat/docs/modules/nats/route.asciidoc b/metricbeat/docs/modules/nats/route.asciidoc new file mode 100644 index 000000000000..af28e678281a --- /dev/null +++ b/metricbeat/docs/modules/nats/route.asciidoc @@ -0,0 +1,21 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// + +[[metricbeat-metricset-nats-route]] +=== NATS route metricset + +include::../../../module/nats/route/_meta/docs.asciidoc[] + + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/nats/route/_meta/data.json[] +---- diff --git a/metricbeat/docs/modules_list.asciidoc b/metricbeat/docs/modules_list.asciidoc index 777a317471b5..68476e1acaa5 100644 --- a/metricbeat/docs/modules_list.asciidoc +++ b/metricbeat/docs/modules_list.asciidoc @@ -205,7 +205,9 @@ This file is generated! See scripts/mage/docs_collector.go |<> beta[] |<> |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | -.4+| .4+| |<> +.6+| .6+| |<> +|<> +|<> |<> |<> |<> diff --git a/metricbeat/include/list_common.go b/metricbeat/include/list_common.go index b77bb57f9a6f..a98e7d2ed878 100644 --- a/metricbeat/include/list_common.go +++ b/metricbeat/include/list_common.go @@ -117,7 +117,9 @@ import ( _ "github.com/elastic/beats/v7/metricbeat/module/mysql/query" _ "github.com/elastic/beats/v7/metricbeat/module/mysql/status" _ "github.com/elastic/beats/v7/metricbeat/module/nats" + _ "github.com/elastic/beats/v7/metricbeat/module/nats/connection" _ "github.com/elastic/beats/v7/metricbeat/module/nats/connections" + _ "github.com/elastic/beats/v7/metricbeat/module/nats/route" _ "github.com/elastic/beats/v7/metricbeat/module/nats/routes" _ "github.com/elastic/beats/v7/metricbeat/module/nats/stats" _ "github.com/elastic/beats/v7/metricbeat/module/nats/subscriptions" diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 09d7406559aa..ca18dd7ed477 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -671,13 +671,21 @@ metricbeat.modules: #--------------------------------- NATS Module --------------------------------- - module: nats - metricsets: ["connections", "routes", "stats", "subscriptions"] + metricsets: + - "connections" + - "routes" + - "stats" + - "subscriptions" + #- "connection" + #- "route" period: 10s hosts: ["localhost:8222"] #stats.metrics_path: "/varz" #connections.metrics_path: "/connz" #routes.metrics_path: "/routez" #subscriptions.metrics_path: "/subsz" + #connection.metrics_path: "/connz" + #route.metrics_path: "/routez" #-------------------------------- Nginx Module -------------------------------- - module: nginx diff --git a/metricbeat/module/nats/_meta/Dockerfile b/metricbeat/module/nats/_meta/Dockerfile index f846c9689732..24757de54005 100644 --- a/metricbeat/module/nats/_meta/Dockerfile +++ b/metricbeat/module/nats/_meta/Dockerfile @@ -1,10 +1,17 @@ ARG NATS_VERSION=2.0.4 FROM nats:$NATS_VERSION +# build stage +FROM golang:1.13-alpine3.11 AS build-env +RUN apk --no-cache add build-base git mercurial gcc +RUN cd src && go get -d github.com/nats-io/nats.go/ +RUN cd src/github.com/nats-io/nats.go/examples/nats-bench && git checkout tags/v1.10.0 && go build . + # create an enhanced container with nc command available since nats is based # on scratch image making healthcheck impossible FROM alpine:latest COPY --from=0 / /opt/nats +COPY --from=build-env /go/src/github.com/nats-io/nats.go/examples/nats-bench/nats-bench /nats-bench COPY run.sh /run.sh # Expose client, management, and cluster ports EXPOSE 4222 8222 6222 diff --git a/metricbeat/module/nats/_meta/config.reference.yml b/metricbeat/module/nats/_meta/config.reference.yml index d27d8f1ab2af..a9440c6de3f1 100644 --- a/metricbeat/module/nats/_meta/config.reference.yml +++ b/metricbeat/module/nats/_meta/config.reference.yml @@ -1,8 +1,16 @@ - module: nats - metricsets: ["connections", "routes", "stats", "subscriptions"] + metricsets: + - "connections" + - "routes" + - "stats" + - "subscriptions" + #- "connection" + #- "route" period: 10s hosts: ["localhost:8222"] #stats.metrics_path: "/varz" #connections.metrics_path: "/connz" #routes.metrics_path: "/routez" #subscriptions.metrics_path: "/subsz" + #connection.metrics_path: "/connz" + #route.metrics_path: "/routez" diff --git a/metricbeat/module/nats/_meta/config.yml b/metricbeat/module/nats/_meta/config.yml index d27d8f1ab2af..540b262a298e 100644 --- a/metricbeat/module/nats/_meta/config.yml +++ b/metricbeat/module/nats/_meta/config.yml @@ -1,8 +1,16 @@ - module: nats - metricsets: ["connections", "routes", "stats", "subscriptions"] + metricsets: + #- "connections" + #- "routes" + #- "stats" + #- "subscriptions" + #- "connection" + #- "route" period: 10s hosts: ["localhost:8222"] #stats.metrics_path: "/varz" #connections.metrics_path: "/connz" #routes.metrics_path: "/routez" #subscriptions.metrics_path: "/subsz" + #connection.metrics_path: "/connz" + #route.metrics_path: "/routez" diff --git a/metricbeat/module/nats/_meta/docs.asciidoc b/metricbeat/module/nats/_meta/docs.asciidoc index 900539ff40d7..01a30e14046c 100644 --- a/metricbeat/module/nats/_meta/docs.asciidoc +++ b/metricbeat/module/nats/_meta/docs.asciidoc @@ -1,6 +1,7 @@ This is the Nats module. The Nats module uses https://nats.io/documentation/managing_the_server/monitoring/[Nats monitoring server APIs] to collect metrics. -The default metricsets are `stats`, `connections`, `routes` and `subscriptions`. +The default metricsets are `stats`, `connections`, `routes` and `subscriptions` while `connection` and `route` +metricsets can be enabled to collect detailed metrics per connection/route. [float] === Compatibility diff --git a/metricbeat/module/nats/_meta/fields.yml b/metricbeat/module/nats/_meta/fields.yml index cfe8cfe1bc46..b70dc9ab0a16 100644 --- a/metricbeat/module/nats/_meta/fields.yml +++ b/metricbeat/module/nats/_meta/fields.yml @@ -15,5 +15,6 @@ The server ID - name: server.time type: date + deprecated: 8.0.0 description: > Server time of metric creation diff --git a/metricbeat/module/nats/_meta/run.sh b/metricbeat/module/nats/_meta/run.sh index e98cf3fb3fb0..1e25ec877222 100755 --- a/metricbeat/module/nats/_meta/run.sh +++ b/metricbeat/module/nats/_meta/run.sh @@ -6,12 +6,22 @@ # NATS 2.X if [ -x /opt/nats/nats-server ]; then - exec /opt/nats/nats-server -c /opt/nats/nats-server.conf + if [[ -z "${ROUTES}" ]]; then + (/opt/nats/nats-server --cluster nats://0.0.0.0:6222 --http_port 8222 --port 4222) & + else + (/opt/nats/nats-server --cluster nats://0.0.0.0:6222 --http_port 8222 --port 4222 --routes nats://nats:6222) & + fi + while true; do /nats-bench -np 1 -n 100000000 -ms 16 foo; done fi # NATS 1.X if [ -x /opt/nats/gnatsd ]; then - exec /opt/nats/gnatsd -c /opt/nats/gnatsd.conf + if [[ -z "${ROUTES}" ]]; then + (/opt/nats/gnatsd --cluster nats://0.0.0.0:6222 --http_port 8222 --port 4222) & + else + (/opt/nats/gnatsd --cluster nats://0.0.0.0:6222 --http_port 8222 --port 4222 --routes nats://nats:6222) & + fi + while true; do /nats-bench -np 1 -n 100000000 -ms 16 foo; done fi echo "Couldn't find the nats server binary" diff --git a/metricbeat/module/nats/connection/_meta/data.json b/metricbeat/module/nats/connection/_meta/data.json new file mode 100644 index 000000000000..7e8bb84b77d7 --- /dev/null +++ b/metricbeat/module/nats/connection/_meta/data.json @@ -0,0 +1,35 @@ +{ + "@timestamp":"2016-05-23T08:05:34.853Z", + "beat":{ + "hostname":"beathost", + "name":"beathost" + }, + "metricset":{ + "host":"localhost", + "module":"nats", + "name":"connections", + "rtt":44269 + }, + "nats": { + "server": { + "id": "bUAdpRFtMWddIBWw80Yd9D", + "time": "2018-12-28T12:33:53.026865597Z" + }, + "connection": { + "idle_time": 0, + "in": { + "bytes": 678867264, + "messages": 42429204 + }, + "name": "NATS Benchmark", + "out": { + "bytes": 0, + "messages": 0 + }, + "pending_bytes": 0, + "subscriptions": 0, + "uptime": 29 + } + }, + "type":"metricsets" +} diff --git a/metricbeat/module/nats/connection/_meta/docs.asciidoc b/metricbeat/module/nats/connection/_meta/docs.asciidoc new file mode 100644 index 000000000000..3945ea045a45 --- /dev/null +++ b/metricbeat/module/nats/connection/_meta/docs.asciidoc @@ -0,0 +1 @@ +This is the connections metricset of the module nats collecting metrics per connection. diff --git a/metricbeat/module/nats/connection/_meta/fields.yml b/metricbeat/module/nats/connection/_meta/fields.yml new file mode 100644 index 000000000000..6ccbab9ca0cb --- /dev/null +++ b/metricbeat/module/nats/connection/_meta/fields.yml @@ -0,0 +1,57 @@ +- name: connection + type: group + description: > + Contains nats connection related metrics + release: ga + fields: + - name: name + type: keyword + description: > + The name of the connection + - name: subscriptions + type: integer + description: > + The number of subscriptions in this connection + - name: pending_bytes + type: long + format: bytes + description: > + The number of pending bytes of this connection + - name: uptime + type: long + format: duration + description: > + The period the connection is up (sec) + - name: idle_time + type: long + format: duration + description: > + The period the connection is idle (sec) + - name: in + type: group + description: > + The amount of incoming data + fields: + - name: messages + type: long + description: > + The amount of incoming messages + - name: bytes + type: long + format: bytes + description: > + The amount of incoming bytes + - name: out + type: group + description: > + The amount of outgoing data + fields: + - name: messages + type: long + description: > + The amount of outgoing messages + - name: bytes + type: long + format: bytes + description: > + The amount of outgoing bytes diff --git a/metricbeat/module/nats/connection/_meta/test/connectionsmetrics.json b/metricbeat/module/nats/connection/_meta/test/connectionsmetrics.json new file mode 100644 index 000000000000..ba34ee5f66a9 --- /dev/null +++ b/metricbeat/module/nats/connection/_meta/test/connectionsmetrics.json @@ -0,0 +1,29 @@ +{ + "server_id": "NCA52DU3MRRJGSDZXWBKSAGCKICQBYFLTQDHHXWAWYYCXBZ33LF55P7H", + "now": "2020-11-05T10:55:51.553046034Z", + "num_connections": 1, + "total": 1, + "offset": 0, + "limit": 1024, + "connections": [ + { + "cid": 3, + "ip": "172.22.0.1", + "port": 33794, + "start": "2020-11-05T10:55:21.968199135Z", + "last_activity": "2020-11-05T10:55:51.552942651Z", + "rtt": "31ms", + "uptime": "29s", + "idle": "0s", + "pending_bytes": 0, + "in_msgs": 42429204, + "out_msgs": 0, + "in_bytes": 678867264, + "out_bytes": 0, + "subscriptions": 0, + "name": "NATS Benchmark", + "lang": "go", + "version": "1.11.0" + } + ] +} diff --git a/metricbeat/module/nats/connection/connection.go b/metricbeat/module/nats/connection/connection.go new file mode 100644 index 000000000000..c960496ece43 --- /dev/null +++ b/metricbeat/module/nats/connection/connection.go @@ -0,0 +1,94 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package connection + +import ( + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/helper" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/mb/parse" +) + +const ( + defaultScheme = "http" + defaultPath = "/connz" +) + +var ( + hostParser = parse.URLHostParserBuilder{ + DefaultScheme: defaultScheme, + DefaultPath: defaultPath, + PathConfigKey: "connection.metrics_path", + }.Build() +) + +// init registers the MetricSet with the central registry as soon as the program +// starts. The New function will be called later to instantiate an instance of +// the MetricSet for each host defined in the module's configuration. After the +// MetricSet has been created then Fetch will begin to be called periodically. +func init() { + mb.Registry.MustAddMetricSet("nats", "connection", New, + mb.WithHostParser(hostParser), + ) +} + +// MetricSet holds any configuration or state information. It must implement +// the mb.MetricSet interface. And this is best achieved by embedding +// mb.BaseMetricSet because it implements all of the required mb.MetricSet +// interface methods except for Fetch. +type MetricSet struct { + mb.BaseMetricSet + http *helper.HTTP + Log *logp.Logger +} + +// New creates a new instance of the MetricSet. New is responsible for unpacking +// any MetricSet specific configuration options if there are any. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + config := struct{}{} + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + http, err := helper.NewHTTP(base) + if err != nil { + return nil, err + } + return &MetricSet{ + base, + http, + logp.NewLogger("nats"), + }, nil +} + +// Fetch methods implements the data gathering and data conversion to the right +// format. It publishes the event which is then forwarded to the output. In case +// of an error set the Error field of mb.Event or simply call report.Error(). +func (m *MetricSet) Fetch(r mb.ReporterV2) error { + content, err := m.http.FetchContent() + if err != nil { + return errors.Wrap(err, "error in fetch") + } + err = eventsMapping(r, content) + if err != nil { + return errors.Wrap(err, "error in mapping") + } + return nil +} diff --git a/metricbeat/module/nats/connection/connection_integration_test.go b/metricbeat/module/nats/connection/connection_integration_test.go new file mode 100644 index 000000000000..f6acb058ffc4 --- /dev/null +++ b/metricbeat/module/nats/connection/connection_integration_test.go @@ -0,0 +1,56 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build integration + +package connection + +import ( + "testing" + + "github.com/elastic/beats/v7/libbeat/tests/compose" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" +) + +func TestData(t *testing.T) { + service := compose.EnsureUp(t, "nats") + compose.EnsureUp(t, "nats-routes") + + m := mbtest.NewFetcher(t, getConfig(service.Host())) + m.WriteEvents(t, "") +} + +func TestFetch(t *testing.T) { + service := compose.EnsureUp(t, "nats") + compose.EnsureUp(t, "nats-routes") + + reporter := &mbtest.CapturingReporterV2{} + + metricSet := mbtest.NewReportingMetricSetV2Error(t, getConfig(service.Host())) + metricSet.Fetch(reporter) + + e := mbtest.StandardizeEvent(metricSet, reporter.GetEvents()[0]) + t.Logf("%s/%s event: %+v", metricSet.Module().Name(), metricSet.Name(), e.Fields.StringToPrint()) +} + +func getConfig(host string) map[string]interface{} { + return map[string]interface{}{ + "module": "nats", + "metricsets": []string{"connection"}, + "hosts": []string{host}, + } +} diff --git a/metricbeat/module/nats/connection/connection_test.go b/metricbeat/module/nats/connection/connection_test.go new file mode 100644 index 000000000000..d6b95960aeff --- /dev/null +++ b/metricbeat/module/nats/connection/connection_test.go @@ -0,0 +1,63 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package connection + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" +) + +func TestEventMapping(t *testing.T) { + content, err := ioutil.ReadFile("./_meta/test/connectionsmetrics.json") + assert.NoError(t, err) + reporter := &mbtest.CapturingReporterV2{} + err = eventsMapping(reporter, content) + assert.NoError(t, err) +} + +func TestFetchEventContent(t *testing.T) { + absPath, _ := filepath.Abs("./_meta/test") + + response, _ := ioutil.ReadFile(absPath + "/connectionsmetrics.json") + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + })) + defer server.Close() + + config := map[string]interface{}{ + "module": "nats", + "metricsets": []string{"connection"}, + "hosts": []string{server.URL}, + } + reporter := &mbtest.CapturingReporterV2{} + + metricSet := mbtest.NewReportingMetricSetV2Error(t, config) + metricSet.Fetch(reporter) + + e := mbtest.StandardizeEvent(metricSet, reporter.GetEvents()[0]) + t.Logf("%s/%s event: %+v", metricSet.Module().Name(), metricSet.Name(), e.Fields.StringToPrint()) +} diff --git a/metricbeat/module/nats/connection/data.go b/metricbeat/module/nats/connection/data.go new file mode 100644 index 000000000000..cc2b26dfbb5f --- /dev/null +++ b/metricbeat/module/nats/connection/data.go @@ -0,0 +1,116 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package connection + +import ( + "encoding/json" + "time" + + "github.com/pkg/errors" + + s "github.com/elastic/beats/v7/libbeat/common/schema" + c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/module/nats/util" +) + +var ( + moduleSchema = s.Schema{ + "server": s.Object{ + "id": c.Str("server_id"), + }, + } + connectionsSchema = s.Schema{ + "name": c.Str("name"), + "subscriptions": c.Int("subscriptions"), + "in": s.Object{ + "messages": c.Int("in_msgs"), + "bytes": c.Int("in_bytes"), + }, + "out": s.Object{ + "messages": c.Int("out_msgs"), + "bytes": c.Int("out_bytes"), + }, + "pending_bytes": c.Int("pending_bytes"), + "uptime": c.Str("uptime"), + "idle_time": c.Str("idle"), + } +) + +// Connections stores connections related information +type Connections struct { + Now time.Time `json:"now"` + ServerID string `json:"server_id"` + Connections []map[string]interface{} `json:"connections,omitempty"` +} + +// eventMapping maps a connection to a Metricbeat event using connectionsSchema +func eventMapping(content map[string]interface{}, fieldsSchema s.Schema) (mb.Event, error) { + fields, err := fieldsSchema.Apply(content) + if err != nil { + return mb.Event{}, errors.Wrap(err, "error applying connection schema") + } + + err = util.UpdateDuration(fields, "uptime") + if err != nil { + return mb.Event{}, errors.Wrap(err, "failure updating uptime key") + } + + err = util.UpdateDuration(fields, "idle_time") + if err != nil { + return mb.Event{}, errors.Wrap(err, "failure updating idle_time key") + } + + moduleFields, err := moduleSchema.Apply(content) + if err != nil { + return mb.Event{}, errors.Wrap(err, "error applying module schema") + } + + if err != nil { + return mb.Event{}, errors.Wrap(err, "failure parsing server timestamp") + } + event := mb.Event{ + MetricSetFields: fields, + ModuleFields: moduleFields, + } + return event, nil +} + +// eventsMapping maps per-connection metrics +func eventsMapping(r mb.ReporterV2, content []byte) error { + var err error + connections := Connections{} + if err = json.Unmarshal(content, &connections); err != nil { + return errors.Wrap(err, "failure parsing NATS connections API response") + } + + for _, con := range connections.Connections { + var evt mb.Event + con["server_id"] = connections.ServerID + evt, err = eventMapping(con, connectionsSchema) + if err != nil { + r.Error(errors.Wrap(err, "error mapping connection event")) + continue + } + evt.Timestamp = connections.Now + if !r.Event(evt) { + return nil + } + } + return nil +} diff --git a/metricbeat/module/nats/connections/_meta/data.json b/metricbeat/module/nats/connections/_meta/data.json index 74374d49c356..932a40be6e40 100644 --- a/metricbeat/module/nats/connections/_meta/data.json +++ b/metricbeat/module/nats/connections/_meta/data.json @@ -10,13 +10,13 @@ "name":"connections", "rtt":44269 }, - "nats":{ + "nats": { "server": { "id": "bUAdpRFtMWddIBWw80Yd9D", "time": "2018-12-28T12:33:53.026865597Z" }, - "connections":{ - "total": 10 + "connections": { + "total": 1 } }, "type":"metricsets" diff --git a/metricbeat/module/nats/connections/_meta/docs.asciidoc b/metricbeat/module/nats/connections/_meta/docs.asciidoc index 1d48f9d5719d..03d16c0c5690 100644 --- a/metricbeat/module/nats/connections/_meta/docs.asciidoc +++ b/metricbeat/module/nats/connections/_meta/docs.asciidoc @@ -1 +1 @@ -This is the connections metricset of the module nats. +This is the connections metricset of the module nats collecting top level metrics about connections. diff --git a/metricbeat/module/nats/connections/_meta/test/connectionsmetrics.json b/metricbeat/module/nats/connections/_meta/test/connectionsmetrics.json index 4dec7c8eaacc..ba34ee5f66a9 100644 --- a/metricbeat/module/nats/connections/_meta/test/connectionsmetrics.json +++ b/metricbeat/module/nats/connections/_meta/test/connectionsmetrics.json @@ -1,5 +1,29 @@ { - "server_id": "bUAdpRFtMWddIBWw80Yd9D", - "now": "2018-12-28T12:33:53.026865597Z", - "total": 10 + "server_id": "NCA52DU3MRRJGSDZXWBKSAGCKICQBYFLTQDHHXWAWYYCXBZ33LF55P7H", + "now": "2020-11-05T10:55:51.553046034Z", + "num_connections": 1, + "total": 1, + "offset": 0, + "limit": 1024, + "connections": [ + { + "cid": 3, + "ip": "172.22.0.1", + "port": 33794, + "start": "2020-11-05T10:55:21.968199135Z", + "last_activity": "2020-11-05T10:55:51.552942651Z", + "rtt": "31ms", + "uptime": "29s", + "idle": "0s", + "pending_bytes": 0, + "in_msgs": 42429204, + "out_msgs": 0, + "in_bytes": 678867264, + "out_bytes": 0, + "subscriptions": 0, + "name": "NATS Benchmark", + "lang": "go", + "version": "1.11.0" + } + ] } diff --git a/metricbeat/module/nats/connections/connections_integration_test.go b/metricbeat/module/nats/connections/connections_integration_test.go index a9758e42e698..e9a51aa8deb7 100644 --- a/metricbeat/module/nats/connections/connections_integration_test.go +++ b/metricbeat/module/nats/connections/connections_integration_test.go @@ -29,11 +29,8 @@ import ( func TestData(t *testing.T) { service := compose.EnsureUp(t, "nats") - metricSet := mbtest.NewReportingMetricSetV2Error(t, getConfig(service.Host())) - err := mbtest.WriteEventsReporterV2Error(metricSet, t, "./test_data.json") - if err != nil { - t.Fatal("write", err) - } + m := mbtest.NewFetcher(t, getConfig(service.Host())) + m.WriteEvents(t, "") } func TestFetch(t *testing.T) { diff --git a/metricbeat/module/nats/connections/connections_test.go b/metricbeat/module/nats/connections/connections_test.go index 8c32d6c92910..e55bf1730717 100644 --- a/metricbeat/module/nats/connections/connections_test.go +++ b/metricbeat/module/nats/connections/connections_test.go @@ -37,7 +37,7 @@ func TestEventMapping(t *testing.T) { assert.NoError(t, err) event := reporter.GetEvents()[0] d, _ := event.MetricSetFields.GetValue("total") - assert.Equal(t, d, int64(10)) + assert.Equal(t, d, int64(1)) } func TestFetchEventContent(t *testing.T) { diff --git a/metricbeat/module/nats/connections/data.go b/metricbeat/module/nats/connections/data.go index 1efe616cb766..241f46f77d20 100644 --- a/metricbeat/module/nats/connections/data.go +++ b/metricbeat/module/nats/connections/data.go @@ -20,12 +20,12 @@ package connections import ( "encoding/json" - "github.com/elastic/beats/v7/metricbeat/mb" - "github.com/pkg/errors" s "github.com/elastic/beats/v7/libbeat/common/schema" c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/module/nats/util" ) var ( @@ -41,23 +41,31 @@ var ( ) func eventMapping(r mb.ReporterV2, content []byte) error { - var event mb.Event var inInterface map[string]interface{} err := json.Unmarshal(content, &inInterface) if err != nil { return errors.Wrap(err, "failure parsing NATS connections API response") } - event.MetricSetFields, err = connectionsSchema.Apply(inInterface) + metricSetFields, err := connectionsSchema.Apply(inInterface) if err != nil { return errors.Wrap(err, "failure applying connections schema") } - event.ModuleFields, err = moduleSchema.Apply(inInterface) + moduleFields, err := moduleSchema.Apply(inInterface) if err != nil { return errors.Wrap(err, "failure applying module schema") } + timestamp, err := util.GetNatsTimestamp(moduleFields) + if err != nil { + return errors.Wrap(err, "failure parsing server timestamp") + } + event := mb.Event{ + MetricSetFields: metricSetFields, + ModuleFields: moduleFields, + Timestamp: timestamp, + } r.Event(event) return nil } diff --git a/metricbeat/module/nats/docker-compose.yml b/metricbeat/module/nats/docker-compose.yml index 36cbd026fc41..74f8075f9e5d 100644 --- a/metricbeat/module/nats/docker-compose.yml +++ b/metricbeat/module/nats/docker-compose.yml @@ -2,7 +2,7 @@ version: '2.3' services: nats: - image: docker.elastic.co/integrations-ci/beats-nats:${NATS_VERSION:-2.0.4}-1 + image: docker.elastic.co/integrations-ci/beats-nats:${NATS_VERSION:-2.0.4}-2 build: context: ./_meta dockerfile: Dockerfile @@ -10,3 +10,8 @@ services: NATS_VERSION: ${NATS_VERSION:-2.0.4} ports: - 8222 + nats-routes: + extends: + service: nats + environment: + - ROUTES=1 diff --git a/metricbeat/module/nats/fields.go b/metricbeat/module/nats/fields.go index ee76b5c6fab6..bb1be3f6dcd8 100644 --- a/metricbeat/module/nats/fields.go +++ b/metricbeat/module/nats/fields.go @@ -32,5 +32,5 @@ func init() { // AssetNats returns asset data. // This is the base64 encoded gzipped contents of module/nats. func AssetNats() string { - return "eJzUmM+S2zYMxu9+CkxO7SH7AD50ptNecmgO3fS8oSlY5kQkFAD0xnn6DiXLlmjJlhtlt6ujKeP76QP4B3wPX/CwhmBUVgDqtMI1vPv4+6fHdyuAAsWyq9VRWMNvKwBo3oS/qIgVrgAYKzSCayjNCmDrsCpk3bz3HoLxeIqcHj3U6U2mWB9/GYmfns/pT5/BUlDjgoCoUSfqrIDujMIzMgKjKWDL5OHjWaJP0KcQ5D3ygytOIx3OFzw8E/d/n4BKz6cdHkPBhz+nRNR5vJApjOI8jcc2fooCtAWPys6CZTTp7QtRSyGgTUNyIdq3+obqH53XTX7PQVOGjWJx5OhrDHPfPXkG+qxKaqrBSEfqgmKJnI1d4e2yEaLfICejbGTGoNUBjFW3R7CVw6By4RhTVFzQrCbe2/GJsXSiyFjkTpzqWI0u6M/e8KLuxDqbYWfGikKZDWyJvdE1FJGHE2i2dTWyowL0PPedQKzhF0H76yihR/+wOQyLbBbk2J9mEB5LP+kSHyCKKZu1Iy3jUDNZFBkFtcQTkAsUWkWls6ZqRRr7+jzAMQhk+Thx1XGUSqypsHjaVmR0wsIa2WLIR+8w0dZx4KBcdbCZqk/jizBcS/ldVjYq1aHdBbC4WNr6RIyepkpv4eVjRKmjcPlEG19EZsobTzFoknfBknehTBuqyUtgZNGAwbSUlNfcGriWphmAVyAnFTuksQl/k+faavFjvJcRO1CK+ZRaKKEUtaT/e0JPkG8koSfe6YRKRc9p3ZLokX/OqpUk4CTRO6BRaLaCUa6dal5PP1RpKV537Djtie0+fme1MX59yo9GtxFnYHaojF8jivb7nbFimcDso0Z2o+O3QGfCZsBQOFF2m9g0DBTAU3BKnArwn78/PE7EuPYhkJ/Xv0++dmNu3flZcFHFO6fpnHLE6H/btMvD9uz12RuKe9H3hl+fPEHcCy5xI69P3lDci85E+T778uQJYgq873GnsmCvOAj7dnrq443DlCn9Y7EgX+wgy2y3bWygGttOV8CFzNDKiU42Dfv5/eqdHUOK/d+4vFG7+0lc6jwKmFYj9fRbiqFIhzEwA77xFtXYHT6I+z5+E7FIpyWxUhDUxrHUQDeiV3B2Tp94eNMIL9I8J9EuWWAYYYNp5eA0cXGPx4vaW/hbEyjqgzfflvXUm2/ORw9t/Pb0V8DmMBfI7PNCO17qUtxUudlzup89silxiuffAAAA//+DHv/E" + return "eJzsmjuT2zgMx/v9FJhUd0U8KW9c3EzmrklxW9zu1Q5NwhYnIqmAoDfOp7+hHl49KEv22vvIrEpLBn4CSOIP2B/hG+6XYAX7GwDWnOMSPtx+vr/7cAOg0EvSBWtnl/DnDQCUT8I/ToUcbwAIcxQel7AVNwAbjbnyy/K5j2CFwYPlePG+iE+SC0X9ScJ+vL7GL30F6SwLbT14Fqw9a+mBM8HwgIRAKBRsyBm4fXTRJmhTeKQd0kKrw50G5xvuHxy1Px+Bitd9hrUp+PL3mBPWBgdulGDs+CgIpWBUS/hj8WnxaZ7/u8p39ABuAwaZtARJKOLTAyDprEXZuZXKwoTTv5o0lKl/tBmTH9+gxvCtL3WXRXP1k9NdJp2gHcvPBG6Tp2gxBokzTAeilbiwPpjzSQptGbdIZ1AEs0aKHB0noC1wpv0UWYFWabtdrfeMabLc2W3vxsaREbyE1JdOQq69V4aqWE4jh6K3AWaxqkBiYHIWboGkneqlGbSHUMBvHuXvSUqtcly9EtDIcgy17yu9g2dSCOOC5ZhMbaUzMb1KsOi/bWKrtpkMei+2g9V1NIIzAI9AjnpskFKrfZLn2FZ5Gu/QYgPqAl8noS7w1r32hB4g30hCD7xdi8MS6199jWXHIr9WeZOBCC3nexCS9Q5B5hotDyNGLvBQH50dq9LcRcP0etUAoXGMK91XQ09USpVZ0AqchYdMy6wsUlVkH6lQAbujKsXrn7Pr6VlCpERKn6qFo/SxenayosVGQPYXbas694/s2usZJ7kuZrh7FwNvp3a8i4GJAP1iYmBwPL262nZVCUC41Z6RUPUjcSitLPiC8dkJumh0XrB1radKU22rQbN4zllAreqiX0d7CHGTxlTffr6/g4KcRJ8+16SjEcgLLLTcbbUUeeWkDF+bByhYDyMSShYhSeWlyFGtNrkT/QO5CWGBJNH2754QRFmETgT90QiWW3WV7i/gYsqq9JLvqxkiqoFqh4H8vFpW28dHwtO7AnpXQO8K6ESe51JAbVCfu4d4bvlgkK5zakUXcHDRmj04W5aCJFfGnO7Vzlxp0V4jOw41sarjJ642wu+rvjSaRpyBCYcO/3tAz+1f0lKLZQSzjRpIJ+9Pgc6E7QGD0p5Jr0M5C3MWjLOaHcUF+N+/X+5GbBx7Eejr9Z+jj03srRNfCwarONMcdUqN0X638Sh3J48vz15SnIq+E/Ty5BHiVHAf1v7lyUuKU9HJuX6dfX7yCDEGPjV+fWKv2B24vpmeuh6mj8+kH2WxRxpUkMuU28o2uAKrTrccWHcDmmvPo03Dbn6/emLHEG2fx2UEy+xKXKwNehCVj9jTb1ywKooxEB2+dIsqZIaL0Vn6RTotH3IGj1z/9ICV0yM4meYVCU4jXbN5jk6bZIEghDWWvwTEjYs7rP8CNIW/EdYFXhjx47IxNeKHNsFAZb9SfwrW+7lAYtdfaPXfhVxY5/1gz+l+dkhii2M8/wcAAP//ssWVPg==" } diff --git a/metricbeat/module/nats/route/_meta/data.json b/metricbeat/module/nats/route/_meta/data.json new file mode 100644 index 000000000000..4858915bf0a5 --- /dev/null +++ b/metricbeat/module/nats/route/_meta/data.json @@ -0,0 +1,35 @@ +{ + "@timestamp":"2016-05-23T08:05:34.853Z", + "beat":{ + "hostname":"beathost", + "name":"beathost" + }, + "metricset":{ + "host":"localhost", + "module":"nats", + "name":"routes", + "rtt":44269 + }, + "nats":{ + "server": { + "id": "bUAdpRFtMWddIBWw80Yd9D", + "time": "2018-12-28T12:33:53.026865597Z" + }, + "routes": { + "in": { + "bytes": 0, + "messages": 0 + }, + "ip": "172.25.255.2", + "out": { + "bytes": 0, + "messages": 0 + }, + "pending_size": 0, + "port": 60736, + "remote_id": "NCYX4P3ZCXZT3UF4BBZUG46S7EU224XXXLUTUBUDOIAUAIR5WJV73BV5", + "subscriptions": 0, + } + }, + "type":"metricsets" +} diff --git a/metricbeat/module/nats/route/_meta/docs.asciidoc b/metricbeat/module/nats/route/_meta/docs.asciidoc new file mode 100644 index 000000000000..eec8b00ea7c6 --- /dev/null +++ b/metricbeat/module/nats/route/_meta/docs.asciidoc @@ -0,0 +1 @@ +This is the route metricset of the module nats collecting metrcis per connection. diff --git a/metricbeat/module/nats/route/_meta/fields.yml b/metricbeat/module/nats/route/_meta/fields.yml new file mode 100644 index 000000000000..b60e5d7c1999 --- /dev/null +++ b/metricbeat/module/nats/route/_meta/fields.yml @@ -0,0 +1,54 @@ +- name: route + type: group + description: > + Contains nats route related metrics + release: ga + fields: + - name: subscriptions + type: integer + description: > + The number of subscriptions in this connection + - name: remote_id + type: keyword + description: > + The remote id on which the route is connected to + - name: pending_size + type: long + description: > + The number of pending routes + - name: port + type: integer + description: > + The port of the route + - name: ip + type: ip + description: > + The ip of the route + - name: in + type: group + description: > + The amount of incoming data + fields: + - name: messages + type: long + description: > + The amount of incoming messages + - name: bytes + type: long + format: bytes + description: > + The amount of incoming bytes + - name: out + type: group + description: > + The amount of outgoing data + fields: + - name: messages + type: long + description: > + The amount of outgoing messages + - name: bytes + type: long + format: bytes + description: > + The amount of outgoing bytes diff --git a/metricbeat/module/nats/route/_meta/test/routesmetrics.json b/metricbeat/module/nats/route/_meta/test/routesmetrics.json new file mode 100644 index 000000000000..1d9c685696fc --- /dev/null +++ b/metricbeat/module/nats/route/_meta/test/routesmetrics.json @@ -0,0 +1,37 @@ +{ + "server_id": "NDIFJNKCKCL2I2ICEXF64LAOAFMW2TN667B4NOWEKF77U7WFQR7CMF7M", + "now": "2020-11-09T09:34:10.027121937Z", + "num_routes": 2, + "routes": [ + { + "rid": 1, + "remote_id": "NCYX4P3ZCXZT3UF4BBZUG46S7EU224XXXLUTUBUDOIAUAIR5WJV73BV5", + "did_solicit": false, + "is_configured": false, + "ip": "172.25.255.2", + "port": 60736, + "pending_size": 0, + "rtt": "1.646926ms", + "in_msgs": 0, + "out_msgs": 0, + "in_bytes": 0, + "out_bytes": 0, + "subscriptions": 0 + }, + { + "rid": 2, + "remote_id": "NAVXZD4IXIH5QYJM33LQYXO3DZGXCUZ4MU72XH5GSELJB3WJWYIGND2D", + "did_solicit": false, + "is_configured": false, + "ip": "172.25.255.3", + "port": 42898, + "pending_size": 0, + "rtt": "511µs", + "in_msgs": 0, + "out_msgs": 0, + "in_bytes": 0, + "out_bytes": 0, + "subscriptions": 0 + } + ] +} diff --git a/metricbeat/module/nats/route/data.go b/metricbeat/module/nats/route/data.go new file mode 100644 index 000000000000..932933f9d9d0 --- /dev/null +++ b/metricbeat/module/nats/route/data.go @@ -0,0 +1,105 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package route + +import ( + "encoding/json" + "time" + + "github.com/pkg/errors" + + s "github.com/elastic/beats/v7/libbeat/common/schema" + c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface" + "github.com/elastic/beats/v7/metricbeat/mb" +) + +var ( + moduleSchema = s.Schema{ + "server": s.Object{ + "id": c.Str("server_id"), + }, + } + routesSchema = s.Schema{ + "remote_id": c.Str("remote_id"), + "subscriptions": c.Int("subscriptions"), + "in": s.Object{ + "messages": c.Int("in_msgs"), + "bytes": c.Int("in_bytes"), + }, + "out": s.Object{ + "messages": c.Int("out_msgs"), + "bytes": c.Int("out_bytes"), + }, + "pending_size": c.Int("pending_size"), + "port": c.Int("port"), + "ip": c.Str("ip"), + } +) + +// Routes stores routes related information +type Routes struct { + Now time.Time `json:"now"` + ServerID string `json:"server_id"` + Routes []map[string]interface{} `json:"routes,omitempty"` +} + +// eventMapping maps a route to a Metricbeat event using routesSchema +func eventMapping(content map[string]interface{}, fieldsSchema s.Schema) (mb.Event, error) { + fields, err := fieldsSchema.Apply(content) + if err != nil { + return mb.Event{}, errors.Wrap(err, "error applying routes schema") + } + + moduleFields, err := moduleSchema.Apply(content) + if err != nil { + return mb.Event{}, errors.Wrap(err, "error applying module schema") + } + + if err != nil { + return mb.Event{}, errors.Wrap(err, "failure parsing server timestamp") + } + event := mb.Event{ + MetricSetFields: fields, + ModuleFields: moduleFields, + } + return event, nil +} + +// eventsMapping maps per-route metrics +func eventsMapping(r mb.ReporterV2, content []byte) error { + var err error + connections := Routes{} + if err = json.Unmarshal(content, &connections); err != nil { + return errors.Wrap(err, "failure parsing NATS connections API response") + } + + for _, con := range connections.Routes { + var evt mb.Event + con["server_id"] = connections.ServerID + evt, err = eventMapping(con, routesSchema) + if err != nil { + r.Error(errors.Wrap(err, "error mapping connection event")) + continue + } + evt.Timestamp = connections.Now + if !r.Event(evt) { + return nil + } + } + return nil +} diff --git a/metricbeat/module/nats/route/route.go b/metricbeat/module/nats/route/route.go new file mode 100644 index 000000000000..fe2109c23323 --- /dev/null +++ b/metricbeat/module/nats/route/route.go @@ -0,0 +1,95 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package route + +import ( + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/helper" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/mb/parse" +) + +const ( + defaultScheme = "http" + defaultPath = "/routez" +) + +var ( + hostParser = parse.URLHostParserBuilder{ + DefaultScheme: defaultScheme, + DefaultPath: defaultPath, + PathConfigKey: "route.metrics_path", + }.Build() +) + +// init registers the MetricSet with the central registry as soon as the program +// starts. The New function will be called later to instantiate an instance of +// the MetricSet for each host defined in the module's configuration. After the +// MetricSet has been created then Fetch will begin to be called periodically. +func init() { + mb.Registry.MustAddMetricSet("nats", "route", New, + mb.WithHostParser(hostParser), + ) +} + +// MetricSet holds any configuration or state information. It must implement +// the mb.MetricSet interface. And this is best achieved by embedding +// mb.BaseMetricSet because it implements all of the required mb.MetricSet +// interface methods except for Fetch. +type MetricSet struct { + mb.BaseMetricSet + http *helper.HTTP + Log *logp.Logger +} + +// New creates a new instance of the MetricSet. New is responsible for unpacking +// any MetricSet specific configuration options if there are any. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + config := struct{}{} + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + http, err := helper.NewHTTP(base) + if err != nil { + return nil, err + } + return &MetricSet{ + base, + http, + logp.NewLogger("nats"), + }, nil +} + +// Fetch methods implements the data gathering and data conversion to the right +// format. It publishes the event which is then forwarded to the output. In case +// of an error set the Error field of mb.Event or simply call report.Error(). +func (m *MetricSet) Fetch(r mb.ReporterV2) error { + content, err := m.http.FetchContent() + if err != nil { + return errors.Wrap(err, "error in fetch") + } + err = eventsMapping(r, content) + if err != nil { + return errors.Wrap(err, "error in mapping") + } + + return nil +} diff --git a/metricbeat/module/nats/route/route_integration_test.go b/metricbeat/module/nats/route/route_integration_test.go new file mode 100644 index 000000000000..f54a0ff3f6ee --- /dev/null +++ b/metricbeat/module/nats/route/route_integration_test.go @@ -0,0 +1,56 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build integration + +package route + +import ( + "testing" + + "github.com/elastic/beats/v7/libbeat/tests/compose" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" +) + +func TestData(t *testing.T) { + service := compose.EnsureUp(t, "nats") + compose.EnsureUp(t, "nats-routes") + + m := mbtest.NewFetcher(t, getConfig(service.Host())) + m.WriteEvents(t, "") +} + +func TestFetch(t *testing.T) { + service := compose.EnsureUp(t, "nats") + compose.EnsureUp(t, "nats-routes") + + reporter := &mbtest.CapturingReporterV2{} + + metricSet := mbtest.NewReportingMetricSetV2Error(t, getConfig(service.Host())) + metricSet.Fetch(reporter) + + e := mbtest.StandardizeEvent(metricSet, reporter.GetEvents()[0]) + t.Logf("%s/%s event: %+v", metricSet.Module().Name(), metricSet.Name(), e.Fields.StringToPrint()) +} + +func getConfig(host string) map[string]interface{} { + return map[string]interface{}{ + "module": "nats", + "metricsets": []string{"route"}, + "hosts": []string{host}, + } +} diff --git a/metricbeat/module/nats/route/route_test.go b/metricbeat/module/nats/route/route_test.go new file mode 100644 index 000000000000..72f5f2f37944 --- /dev/null +++ b/metricbeat/module/nats/route/route_test.go @@ -0,0 +1,63 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package route + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" +) + +func TestEventMapping(t *testing.T) { + content, err := ioutil.ReadFile("./_meta/test/routesmetrics.json") + assert.NoError(t, err) + reporter := &mbtest.CapturingReporterV2{} + err = eventsMapping(reporter, content) + assert.NoError(t, err) +} + +func TestFetchEventContent(t *testing.T) { + absPath, _ := filepath.Abs("./_meta/test") + + response, _ := ioutil.ReadFile(absPath + "/routesmetrics.json") + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + })) + defer server.Close() + + config := map[string]interface{}{ + "module": "nats", + "metricsets": []string{"route"}, + "hosts": []string{server.URL}, + } + reporter := &mbtest.CapturingReporterV2{} + + metricSet := mbtest.NewReportingMetricSetV2Error(t, config) + metricSet.Fetch(reporter) + + e := mbtest.StandardizeEvent(metricSet, reporter.GetEvents()[0]) + t.Logf("%s/%s event: %+v", metricSet.Module().Name(), metricSet.Name(), e.Fields.StringToPrint()) +} diff --git a/metricbeat/module/nats/routes/_meta/data.json b/metricbeat/module/nats/routes/_meta/data.json index 7b3fcb83d6e8..8c1748713b41 100644 --- a/metricbeat/module/nats/routes/_meta/data.json +++ b/metricbeat/module/nats/routes/_meta/data.json @@ -15,8 +15,8 @@ "id": "bUAdpRFtMWddIBWw80Yd9D", "time": "2018-12-28T12:33:53.026865597Z" }, - "routes":{ - "total": 10 + "routes": { + "total": 2 } }, "type":"metricsets" diff --git a/metricbeat/module/nats/routes/_meta/docs.asciidoc b/metricbeat/module/nats/routes/_meta/docs.asciidoc index 85ddb92bb40f..907401be3a7f 100644 --- a/metricbeat/module/nats/routes/_meta/docs.asciidoc +++ b/metricbeat/module/nats/routes/_meta/docs.asciidoc @@ -1 +1 @@ -This is the routes metricset of the module nats. +This is the routes metricset of the module nats collecting top level metrics about routes. diff --git a/metricbeat/module/nats/routes/_meta/test/routesmetrics.json b/metricbeat/module/nats/routes/_meta/test/routesmetrics.json index f850791bc53c..1d9c685696fc 100644 --- a/metricbeat/module/nats/routes/_meta/test/routesmetrics.json +++ b/metricbeat/module/nats/routes/_meta/test/routesmetrics.json @@ -1,5 +1,37 @@ { - "server_id": "bUAdpRFtMWddIBWw80Yd9D", - "now": "2018-12-28T12:33:53.026865597Z", - "num_routes": 10 + "server_id": "NDIFJNKCKCL2I2ICEXF64LAOAFMW2TN667B4NOWEKF77U7WFQR7CMF7M", + "now": "2020-11-09T09:34:10.027121937Z", + "num_routes": 2, + "routes": [ + { + "rid": 1, + "remote_id": "NCYX4P3ZCXZT3UF4BBZUG46S7EU224XXXLUTUBUDOIAUAIR5WJV73BV5", + "did_solicit": false, + "is_configured": false, + "ip": "172.25.255.2", + "port": 60736, + "pending_size": 0, + "rtt": "1.646926ms", + "in_msgs": 0, + "out_msgs": 0, + "in_bytes": 0, + "out_bytes": 0, + "subscriptions": 0 + }, + { + "rid": 2, + "remote_id": "NAVXZD4IXIH5QYJM33LQYXO3DZGXCUZ4MU72XH5GSELJB3WJWYIGND2D", + "did_solicit": false, + "is_configured": false, + "ip": "172.25.255.3", + "port": 42898, + "pending_size": 0, + "rtt": "511µs", + "in_msgs": 0, + "out_msgs": 0, + "in_bytes": 0, + "out_bytes": 0, + "subscriptions": 0 + } + ] } diff --git a/metricbeat/module/nats/routes/data.go b/metricbeat/module/nats/routes/data.go index 83581a16a311..02eb686e709a 100644 --- a/metricbeat/module/nats/routes/data.go +++ b/metricbeat/module/nats/routes/data.go @@ -20,12 +20,12 @@ package routes import ( "encoding/json" - "github.com/elastic/beats/v7/metricbeat/mb" - "github.com/pkg/errors" s "github.com/elastic/beats/v7/libbeat/common/schema" c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/module/nats/util" ) var ( @@ -41,22 +41,30 @@ var ( ) func eventMapping(r mb.ReporterV2, content []byte) error { - var event mb.Event var inInterface map[string]interface{} err := json.Unmarshal(content, &inInterface) if err != nil { return errors.Wrap(err, "failure parsing Nats routes API response") } - event.MetricSetFields, err = routesSchema.Apply(inInterface) + metricSetFields, err := routesSchema.Apply(inInterface) if err != nil { return errors.Wrap(err, "failure applying routes schema") } - event.ModuleFields, err = moduleSchema.Apply(inInterface) + moduleFields, err := moduleSchema.Apply(inInterface) if err != nil { return errors.Wrap(err, "failure applying module schema") } + timestamp, err := util.GetNatsTimestamp(moduleFields) + if err != nil { + errors.Wrap(err, "failure parsing server timestamp") + } + event := mb.Event{ + MetricSetFields: metricSetFields, + ModuleFields: moduleFields, + Timestamp: timestamp, + } r.Event(event) return nil } diff --git a/metricbeat/module/nats/routes/routes_integration_test.go b/metricbeat/module/nats/routes/routes_integration_test.go index a10e4be938ee..2ffb1a64f2a6 100644 --- a/metricbeat/module/nats/routes/routes_integration_test.go +++ b/metricbeat/module/nats/routes/routes_integration_test.go @@ -29,11 +29,8 @@ import ( func TestData(t *testing.T) { service := compose.EnsureUp(t, "nats") - metricSet := mbtest.NewReportingMetricSetV2Error(t, getConfig(service.Host())) - err := mbtest.WriteEventsReporterV2Error(metricSet, t, "./test_data.json") - if err != nil { - t.Fatal("write", err) - } + m := mbtest.NewFetcher(t, getConfig(service.Host())) + m.WriteEvents(t, "") } func TestFetch(t *testing.T) { diff --git a/metricbeat/module/nats/routes/routes_test.go b/metricbeat/module/nats/routes/routes_test.go index a76b35c5e1d5..bf9870198856 100644 --- a/metricbeat/module/nats/routes/routes_test.go +++ b/metricbeat/module/nats/routes/routes_test.go @@ -37,7 +37,7 @@ func TestEventMapping(t *testing.T) { assert.NoError(t, err) event := reporter.GetEvents()[0] d, _ := event.MetricSetFields.GetValue("total") - assert.Equal(t, d, int64(10)) + assert.Equal(t, d, int64(2)) } func TestFetchEventContent(t *testing.T) { diff --git a/metricbeat/module/nats/stats/_meta/docs.asciidoc b/metricbeat/module/nats/stats/_meta/docs.asciidoc index 1e468233bf10..3ce6026a5b65 100644 --- a/metricbeat/module/nats/stats/_meta/docs.asciidoc +++ b/metricbeat/module/nats/stats/_meta/docs.asciidoc @@ -1 +1 @@ -This is the stats metricset of the module nats. +This is the stats metricset of the module nats collecting generic stats. diff --git a/metricbeat/module/nats/stats/data.go b/metricbeat/module/nats/stats/data.go index 594276d35bec..fb014eea466c 100644 --- a/metricbeat/module/nats/stats/data.go +++ b/metricbeat/module/nats/stats/data.go @@ -20,16 +20,13 @@ package stats import ( "encoding/json" - "github.com/elastic/beats/v7/metricbeat/mb" - "github.com/pkg/errors" - "strconv" - "strings" - "github.com/elastic/beats/v7/libbeat/common" s "github.com/elastic/beats/v7/libbeat/common/schema" c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/module/nats/util" ) var ( @@ -68,96 +65,25 @@ var ( } ) -// Converts uptime from formatted string to seconds -// input: "1y20d22h3m30s", output: 33343410 -func convertUptime(uptime string) (seconds int64, err error) { - - var split []string - var years, days, hours, minutes, secs int64 - if strings.Contains(uptime, "y") { - split = strings.Split(uptime, "y") - uptime = split[1] - years, err = strconv.ParseInt(split[0], 10, 64) - if err != nil { - err = errors.Wrap(err, "invalid years format in json data") - return - } - seconds += years * 31536000 - } - - if strings.Contains(uptime, "d") { - split = strings.Split(uptime, "d") - uptime = split[1] - days, err = strconv.ParseInt(split[0], 10, 64) - if err != nil { - err = errors.Wrap(err, "invalid days format in json data") - return - } - seconds += days * 86400 - } - - if strings.Contains(uptime, "h") { - split = strings.Split(uptime, "h") - uptime = split[1] - hours, err = strconv.ParseInt(split[0], 10, 64) - if err != nil { - err = errors.Wrap(err, "invalid hours format in json data") - return - } - seconds += hours * 3600 - } - - if strings.Contains(uptime, "m") { - split = strings.Split(uptime, "m") - uptime = split[1] - minutes, err = strconv.ParseInt(split[0], 10, 64) - if err != nil { - err = errors.Wrap(err, "invalid minutes format in json data") - return - } - seconds += minutes * 60 - } - - if strings.Contains(uptime, "s") { - split = strings.Split(uptime, "s") - uptime = split[1] - secs, err = strconv.ParseInt(split[0], 10, 64) - if err != nil { - err = errors.Wrap(err, "invalid seconds format in json data") - return - } - seconds += secs - } - return -} - func eventMapping(r mb.ReporterV2, content []byte) error { - var event common.MapStr + var metricsetMetrics common.MapStr var inInterface map[string]interface{} err := json.Unmarshal(content, &inInterface) if err != nil { return errors.Wrap(err, "failure parsing Nats stats API response") } - event, err = statsSchema.Apply(inInterface) + metricsetMetrics, err = statsSchema.Apply(inInterface) if err != nil { return errors.Wrap(err, "failure applying stats schema") } - uptime, err := event.GetValue("uptime") - if err != nil { - return errors.Wrap(err, "failure retrieving uptime key") - } - uptime, err = convertUptime(uptime.(string)) - if err != nil { - return errors.Wrap(err, "failure converting uptime from string to integer") - } - _, err = event.Put("uptime", uptime) + err = util.UpdateDuration(metricsetMetrics, "uptime") if err != nil { return errors.Wrap(err, "failure updating uptime key") } - d, err := event.GetValue("http_req_stats") + d, err := metricsetMetrics.GetValue("http_req_stats") if err != nil { return errors.Wrap(err, "failure retrieving http_req_stats key") } @@ -166,12 +92,12 @@ func eventMapping(r mb.ReporterV2, content []byte) error { return errors.Wrap(err, "failure casting http_req_stats to common.Mapstr") } - err = event.Delete("http_req_stats") + err = metricsetMetrics.Delete("http_req_stats") if err != nil { return errors.Wrap(err, "failure deleting http_req_stats key") } - event["http"] = common.MapStr{ + metricsetMetrics["http"] = common.MapStr{ "req_stats": common.MapStr{ "uri": common.MapStr{ "root": httpStats["root_uri"], @@ -182,7 +108,7 @@ func eventMapping(r mb.ReporterV2, content []byte) error { }, }, } - cpu, err := event.GetValue("cpu") + cpu, err := metricsetMetrics.GetValue("cpu") if err != nil { return errors.Wrap(err, "failure retrieving cpu key") } @@ -190,7 +116,7 @@ func eventMapping(r mb.ReporterV2, content []byte) error { if !ok { return errors.Wrap(err, "failure casting cpu to float64") } - _, err = event.Put("cpu", cpuUtil/100.0) + _, err = metricsetMetrics.Put("cpu", cpuUtil/100.0) if err != nil { return errors.Wrap(err, "failure updating cpu key") } @@ -198,6 +124,15 @@ func eventMapping(r mb.ReporterV2, content []byte) error { if err != nil { return errors.Wrap(err, "failure applying module schema") } - r.Event(mb.Event{MetricSetFields: event, ModuleFields: moduleMetrics}) + timestamp, err := util.GetNatsTimestamp(moduleMetrics) + if err != nil { + return errors.Wrap(err, "failure parsing server timestamp") + } + evt := mb.Event{ + MetricSetFields: metricsetMetrics, + ModuleFields: moduleMetrics, + Timestamp: timestamp, + } + r.Event(evt) return nil } diff --git a/metricbeat/module/nats/stats/stats_integration_test.go b/metricbeat/module/nats/stats/stats_integration_test.go index b95447e9c9d9..56b50fc01c37 100644 --- a/metricbeat/module/nats/stats/stats_integration_test.go +++ b/metricbeat/module/nats/stats/stats_integration_test.go @@ -29,11 +29,8 @@ import ( func TestData(t *testing.T) { service := compose.EnsureUp(t, "nats") - metricSet := mbtest.NewReportingMetricSetV2Error(t, getConfig(service.Host())) - err := mbtest.WriteEventsReporterV2Error(metricSet, t, "./test_data.json") - if err != nil { - t.Fatal("write", err) - } + m := mbtest.NewFetcher(t, getConfig(service.Host())) + m.WriteEvents(t, "") } func TestFetch(t *testing.T) { diff --git a/metricbeat/module/nats/subscriptions/_meta/docs.asciidoc b/metricbeat/module/nats/subscriptions/_meta/docs.asciidoc index a0601ada457c..adaa097d75cd 100644 --- a/metricbeat/module/nats/subscriptions/_meta/docs.asciidoc +++ b/metricbeat/module/nats/subscriptions/_meta/docs.asciidoc @@ -1 +1 @@ -This is the subscriptions metricset of the module nats. +This is the subscriptions metricset of the module nats collecting metrics about subscriptions. diff --git a/metricbeat/module/nats/subscriptions/subscriptions_integration_test.go b/metricbeat/module/nats/subscriptions/subscriptions_integration_test.go index 909537b34754..0d0b03f71a49 100644 --- a/metricbeat/module/nats/subscriptions/subscriptions_integration_test.go +++ b/metricbeat/module/nats/subscriptions/subscriptions_integration_test.go @@ -29,11 +29,8 @@ import ( func TestData(t *testing.T) { service := compose.EnsureUp(t, "nats") - metricSet := mbtest.NewReportingMetricSetV2Error(t, getConfig(service.Host())) - err := mbtest.WriteEventsReporterV2Error(metricSet, t, "./test_data.json") - if err != nil { - t.Fatal("write", err) - } + m := mbtest.NewFetcher(t, getConfig(service.Host())) + m.WriteEvents(t, "") } func TestFetch(t *testing.T) { diff --git a/metricbeat/module/nats/test_nats.py b/metricbeat/module/nats/test_nats.py index 406ee61d66d7..810f4afdb88e 100644 --- a/metricbeat/module/nats/test_nats.py +++ b/metricbeat/module/nats/test_nats.py @@ -8,7 +8,7 @@ @metricbeat.parameterized_with_supported_versions class TestNats(metricbeat.BaseTest): - COMPOSE_SERVICES = ['nats'] + COMPOSE_SERVICES = ['nats', 'nats-routes'] @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") def test_stats(self): @@ -60,6 +60,31 @@ def test_connections(self): self.assert_fields_are_documented(evt) + @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") + def test_connection(self): + """ + nats connection test + """ + self.render_config_template(modules=[{ + "name": "nats", + "metricsets": ["connection"], + "hosts": self.get_hosts(), + "period": "5s", + "connections.metrics_path": "/connz" + }]) + proc = self.start_beat() + self.wait_until(lambda: self.output_lines() > 0) + proc.check_kill_and_wait() + self.assert_no_logged_warnings() + + output = self.read_output_json() + self.assertEqual(len(output), 1) + evt = output[0] + + self.assertCountEqual(self.de_dot(NATS_FIELDS), evt.keys(), evt) + + self.assert_fields_are_documented(evt) + @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") def test_routes(self): """ @@ -85,6 +110,31 @@ def test_routes(self): self.assert_fields_are_documented(evt) + @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") + def test_route(self): + """ + nats route test + """ + self.render_config_template(modules=[{ + "name": "nats", + "metricsets": ["route"], + "hosts": self.get_hosts(), + "period": "5s", + "routes.metrics_path": "/routez" + }]) + proc = self.start_beat() + self.wait_until(lambda: self.output_lines() > 0) + proc.check_kill_and_wait() + self.assert_no_logged_warnings() + + output = self.read_output_json() + self.assertEqual(len(output), 1) + evt = output[0] + + self.assertCountEqual(self.de_dot(NATS_FIELDS), evt.keys(), evt) + + self.assert_fields_are_documented(evt) + @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") def test_subscriptions(self): """ @@ -109,3 +159,6 @@ def test_subscriptions(self): self.assertCountEqual(self.de_dot(NATS_FIELDS), evt.keys(), evt) self.assert_fields_are_documented(evt) + + def get_hosts(self): + return [self.compose_host("nats")] diff --git a/metricbeat/module/nats/util/util.go b/metricbeat/module/nats/util/util.go new file mode 100644 index 000000000000..da95c12391ab --- /dev/null +++ b/metricbeat/module/nats/util/util.go @@ -0,0 +1,121 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package util + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" +) + +// convertUptimeToSeconds converts uptime from formatted string to seconds +// input: "1y20d22h3m30s", output: 33343410 +func convertUptimeToSeconds(uptime string) (seconds int64, err error) { + + var split []string + var years, days, hours, minutes, secs int64 + if strings.Contains(uptime, "y") { + split = strings.Split(uptime, "y") + uptime = split[1] + years, err = strconv.ParseInt(split[0], 10, 64) + if err != nil { + err = errors.Wrap(err, "invalid years format in json data") + return + } + seconds += years * 31536000 + } + + if strings.Contains(uptime, "d") { + split = strings.Split(uptime, "d") + uptime = split[1] + days, err = strconv.ParseInt(split[0], 10, 64) + if err != nil { + err = errors.Wrap(err, "invalid days format in json data") + return + } + seconds += days * 86400 + } + + if strings.Contains(uptime, "h") { + split = strings.Split(uptime, "h") + uptime = split[1] + hours, err = strconv.ParseInt(split[0], 10, 64) + if err != nil { + err = errors.Wrap(err, "invalid hours format in json data") + return + } + seconds += hours * 3600 + } + + if strings.Contains(uptime, "m") { + split = strings.Split(uptime, "m") + uptime = split[1] + minutes, err = strconv.ParseInt(split[0], 10, 64) + if err != nil { + err = errors.Wrap(err, "invalid minutes format in json data") + return + } + seconds += minutes * 60 + } + + if strings.Contains(uptime, "s") { + split = strings.Split(uptime, "s") + uptime = split[1] + secs, err = strconv.ParseInt(split[0], 10, 64) + if err != nil { + err = errors.Wrap(err, "invalid seconds format in json data") + return + } + seconds += secs + } + return +} + +// UpdateDuration updates a duration in a common.MapStr from formatted string to seconds +func UpdateDuration(event common.MapStr, key string) error { + item, err := event.GetValue(key) + if err != nil { + return nil + } + itemConverted, err := convertUptimeToSeconds(item.(string)) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("failure converting %v key from string to integer", key)) + } + _, err = event.Put(key, itemConverted) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("failure updating %v key", key)) + } + return nil +} + +// GetNatsTimestamp gets the timestamp of base level metrics NATS server returns +func GetNatsTimestamp(event common.MapStr) (time.Time, error) { + var timeStamp time.Time + timestamp, _ := event.GetValue("server.time") + timestampString := timestamp.(string) + timeStamp, err := time.Parse(time.RFC3339, timestampString) + if err != nil { + return timeStamp, err + } + return timeStamp, nil +} diff --git a/metricbeat/modules.d/nats.yml.disabled b/metricbeat/modules.d/nats.yml.disabled index abe095102f5a..d398ac0be432 100644 --- a/metricbeat/modules.d/nats.yml.disabled +++ b/metricbeat/modules.d/nats.yml.disabled @@ -2,10 +2,18 @@ # Docs: https://www.elastic.co/guide/en/beats/metricbeat/master/metricbeat-module-nats.html - module: nats - metricsets: ["connections", "routes", "stats", "subscriptions"] + metricsets: + #- "connections" + #- "routes" + #- "stats" + #- "subscriptions" + #- "connection" + #- "route" period: 10s hosts: ["localhost:8222"] #stats.metrics_path: "/varz" #connections.metrics_path: "/connz" #routes.metrics_path: "/routez" #subscriptions.metrics_path: "/subsz" + #connection.metrics_path: "/connz" + #route.metrics_path: "/routez" diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 238a4fedb4eb..164dc564f2f8 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -1027,13 +1027,21 @@ metricbeat.modules: #--------------------------------- NATS Module --------------------------------- - module: nats - metricsets: ["connections", "routes", "stats", "subscriptions"] + metricsets: + - "connections" + - "routes" + - "stats" + - "subscriptions" + #- "connection" + #- "route" period: 10s hosts: ["localhost:8222"] #stats.metrics_path: "/varz" #connections.metrics_path: "/connz" #routes.metrics_path: "/routez" #subscriptions.metrics_path: "/subsz" + #connection.metrics_path: "/connz" + #route.metrics_path: "/routez" #-------------------------------- Nginx Module -------------------------------- - module: nginx diff --git a/x-pack/packetbeat/packetbeat.reference.yml b/x-pack/packetbeat/packetbeat.reference.yml index e562c23bab0d..76ecadf4a2c3 100644 --- a/x-pack/packetbeat/packetbeat.reference.yml +++ b/x-pack/packetbeat/packetbeat.reference.yml @@ -1837,7 +1837,7 @@ setup.kibana: #logging.level: info # Enable debug output for selected components. To enable all selectors use ["*"] -# Other available selectors are "beat", "publish", "service" +# Other available selectors are "beat", "publisher", "service" # Multiple selectors can be chained. #logging.selectors: [ ] diff --git a/x-pack/packetbeat/packetbeat.yml b/x-pack/packetbeat/packetbeat.yml index 31c229b1ef76..f7e19b268b8d 100644 --- a/x-pack/packetbeat/packetbeat.yml +++ b/x-pack/packetbeat/packetbeat.yml @@ -223,7 +223,7 @@ processors: # At debug level, you can selectively enable logging only for some components. # To enable all selectors use ["*"]. Examples of other selectors are "beat", -# "publish", "service". +# "publisher", "service". #logging.selectors: ["*"] # ============================= X-Pack Monitoring ==============================