Skip to content

Commit

Permalink
Add more metrics to Filebeat harvesters (elastic#13395)
Browse files Browse the repository at this point in the history
I have added a few new metrics to harvesters of Filebeat:

- `"last_event_timestamp"`: `@timestamp` of the last published event
- `"last_event_publised_time"`:  the time when the last event was published and the offset was updated
- `"size"`: file size in bytes
- `"read_offset"`: offset of the file
- `"start_time"`: harvester start time 

By reporting these metrics it is possible to create more complex checks in Kibana to help to diagnose issues in harvesters.

```
"harvester": {
    "files": {
        "0be8e828-e39f-42ba-8468-029a08451a37": {
            "last_event_timestamp": "2019-08-29T13:17:22.961Z",
            "last_event_published_time": "2019-08-29T13:17:22.961Z",
            "name": "/var/log/dpkg.log",
            "read_offset": 42417,
            "size": 42417,
            "start_time": "2019-08-29T13:17:19.908Z"
        },
        "6950906f-ef19-4d99-aff9-ca81b49e024f": {
            "last_event_timestamp": "",
            "last_event_published_time": "",
            "name": "/var/log/pm-powersave.log",
            "start_time": "2019-08-29T13:17:22.907Z"
        },
        "752bb2c3-2a61-4055-b3c6-5b8b87204f2b": {
            "last_event_timestamp": "2019-08-29T13:17:22.905Z",
            "last_event_published_time": "2019-08-29T13:17:22.905Z",
            "name": "/var/log/vbox-setup.log",
            "read_offset": 140,
            "size": 140,
            "start_time": "2019-08-29T13:17:19.908Z"
        },
        "c339d330-cfa6-41ae-8a77-83a019ca99ab": {
            "last_event_timestamp": "2019-08-29T13:17:22.924Z",
            "last_event_published_time": "2019-08-29T13:17:22.925Z",
            "name": "/var/log/fontconfig.log",
            "read_offset": 2269,
            "size": 2269,
            "start_time": "2019-08-29T13:17:22.915Z"
        },
        "edb8b270-904a-40dc-bb3b-e6ef278585a2": {
            "last_event_timestamp": "2019-08-29T13:17:22.915Z",
            "last_event_published_time": "2019-08-29T13:17:22.915Z",
            "name": "/var/log/alternatives.log",
            "read_offset": 5752,
            "size": 5752,
            "start_time": "2019-08-29T13:17:22.913Z"
        },
        "f63a4922-164b-47bb-9edd-1b446685090c": {
            "last_event_@timestamp": "",
            "last_event_published_time": "",
            "name": "/var/log/pm-suspend.log",
            "start_time": "2019-08-29T13:17:22.906Z"
        }
    },
    "open_files": 6,
    "running": 6,
    "started": 6
}
```

Closes elastic#7743
  • Loading branch information
kvch authored Jan 9, 2020
1 parent 92faab7 commit 33a638b
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add dashboards for the ActiveMQ Filebeat module. {pull}14880[14880]
- Add STAN Metricbeat module. {pull}14839[14839]
- Add new fileset googlecloud/audit for ingesting Google Cloud Audit logs. {pull}15200[15200]
- Expose more metrics of harvesters (e.g. `read_offset`, `start_time`). {pull}13395[13395]

*Heartbeat*
- Add non-privileged icmp on linux and darwin(mac). {pull}13795[13795] {issue}11498[11498]
Expand Down
72 changes: 72 additions & 0 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (

var (
harvesterMetrics = monitoring.Default.NewRegistry("filebeat.harvester")
filesMetrics = harvesterMetrics.NewRegistry("files")

harvesterStarted = monitoring.NewInt(harvesterMetrics, "started")
harvesterClosed = monitoring.NewInt(harvesterMetrics, "closed")
Expand Down Expand Up @@ -100,9 +101,22 @@ type Harvester struct {
outletFactory OutletFactory
publishState func(file.State) bool

metrics *harvesterProgressMetrics

onTerminate func()
}

// stores the metrics of the harvester
type harvesterProgressMetrics struct {
metricsRegistry *monitoring.Registry
filename *monitoring.String
started *monitoring.String
lastPublished *monitoring.Timestamp
lastPublishedEventTimestamp *monitoring.Timestamp
currentSize *monitoring.Int
readOffset *monitoring.Int
}

// NewHarvester creates a new harvester
func NewHarvester(
config *common.Config,
Expand Down Expand Up @@ -179,11 +193,43 @@ func (h *Harvester) Setup() error {
return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)
}

h.metrics = newHarvesterProgressMetrics(h.id.String())
h.metrics.filename.Set(h.source.Name())
h.metrics.started.Set(common.Time(time.Now()).String())
h.metrics.readOffset.Set(h.state.Offset)
err = h.updateCurrentSize()
if err != nil {
return err
}

logp.Debug("harvester", "Harvester setup successful. Line terminator: %d", h.config.LineTerminator)

return nil
}

func newHarvesterProgressMetrics(id string) *harvesterProgressMetrics {
r := filesMetrics.NewRegistry(id)
return &harvesterProgressMetrics{
metricsRegistry: r,
filename: monitoring.NewString(r, "name"),
started: monitoring.NewString(r, "start_time"),
lastPublished: monitoring.NewTimestamp(r, "last_event_published_time"),
lastPublishedEventTimestamp: monitoring.NewTimestamp(r, "last_event_timestamp"),
currentSize: monitoring.NewInt(r, "size"),
readOffset: monitoring.NewInt(r, "read_offset"),
}
}

func (h *Harvester) updateCurrentSize() error {
fInfo, err := h.source.Stat()
if err != nil {
return err
}

h.metrics.currentSize.Set(fInfo.Size())
return nil
}

// Run start the harvester and reads files line by line and sends events to the defined output
func (h *Harvester) Run() error {
// Allow for some cleanup on termination
Expand Down Expand Up @@ -250,6 +296,8 @@ func (h *Harvester) Run() error {

logp.Info("Harvester started for file: %s", h.state.Source)

go h.monitorFileSize()

for {
select {
case <-h.done:
Expand Down Expand Up @@ -298,13 +346,37 @@ func (h *Harvester) Run() error {

// Update state of harvester as successfully sent
h.state = state

// Update metics of harvester as event was sent
h.metrics.readOffset.Set(state.Offset)
h.metrics.lastPublished.Set(time.Now())
h.metrics.lastPublishedEventTimestamp.Set(message.Ts)
}
}

func (h *Harvester) monitorFileSize() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-h.done:
return
case <-ticker.C:
err := h.updateCurrentSize()
if err != nil {
logp.Err("Error updating file size: %v; File: %v", err, h.state.Source)
}
}
}
}

// stop is intended for internal use and closed the done channel to stop execution
func (h *Harvester) stop() {
h.stopOnce.Do(func() {
close(h.done)

filesMetrics.Remove(h.id.String())
})
}

Expand Down
56 changes: 56 additions & 0 deletions libbeat/monitoring/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"math"
"strconv"
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/atomic"
)

Expand Down Expand Up @@ -232,3 +234,57 @@ func fullName(r *Registry, name string) string {
}
return r.name + "." + name
}

// Timestamp is a timestamp variable satisfying the Var interface.
type Timestamp struct {
mu sync.RWMutex
ts time.Time
cached string
}

// NewTimestamp creates and registeres a new timestamp variable.
func NewTimestamp(r *Registry, name string, opts ...Option) *Timestamp {
if r == nil {
r = Default
}

v := &Timestamp{}
addVar(r, name, opts, v, makeExpvar(func() string {
return v.toString()

}))
return v
}

func (v *Timestamp) Set(t time.Time) {
v.mu.Lock()
defer v.mu.Unlock()

v.ts = t
v.cached = ""
}

func (v *Timestamp) Get() time.Time {
v.mu.RLock()
defer v.mu.RUnlock()

return v.ts
}

func (v *Timestamp) Visit(_ Mode, vs Visitor) {
vs.OnString(v.toString())
}

func (v *Timestamp) toString() string {
v.mu.RLock()
defer v.mu.RUnlock()

if v.ts.IsZero() {
return ""
}

if v.cached == "" {
v.cached = v.ts.Format(common.TsLayout)
}
return v.cached
}

0 comments on commit 33a638b

Please sign in to comment.