Skip to content

Commit

Permalink
[Heartbeat] Improve browser heap memory usage (#32317)
Browse files Browse the repository at this point in the history
This patch  does not fix a potential memory leak as mentioned on the forum, but does dramatically increases heartbeat's efficiency WRT parsing the JSON output of the synthetics agent.

It does appear that we don't close all the readers used by scanToSynthEvents which this patch fixes. I did observe heap growth w/o this patch over a short timescale, but that's almost certainly not a resource leak, those readers should have been auto closed when exec ended anyway.

We do however waste a lot of memory / allocations buffering lines of JSON, which can be quite large. The json decoder can actually parse ndjson very efficiently itself. This also lets us use a smaller buffer for stdout/stderr buffering. We had allocated a larger one prior to handle the base64 image data passed by the agent.
  • Loading branch information
andrewvc authored Jul 18, 2022
1 parent 04491be commit e6db9a5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
*Heartbeat*

- Send targetted error message for unexpected synthetics exits. {pull}31936[31936]
- Reduced memory usage slightly for browser monitors. {pull}32317[32317]

*Metricbeat*

Expand Down
29 changes: 23 additions & 6 deletions x-pack/heartbeat/monitors/browser/synthexec/synthexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -165,6 +166,7 @@ func runCmd(
if err != nil {
logp.Warn("could not scan stdout events from synthetics: %s", err)
}

wg.Done()
}()

Expand All @@ -184,10 +186,25 @@ func runCmd(
// Send the test results into the output
wg.Add(1)
go func() {
err := scanToSynthEvents(jsonReader, jsonToSynthEvent, mpx.writeSynthEvent)
if err != nil {
logp.Warn("could not scan JSON events from synthetics: %s", err)
defer jsonReader.Close()

// We don't use scanToSynthEvents here because all lines here will be JSON
// It's more efficient to let the json decoder handle the ndjson than
// using the scanner
decoder := json.NewDecoder(jsonReader)
for {
var se SynthEvent
err := decoder.Decode(&se)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
logp.L().Warn("error decoding json for test json results: %w", err)
}

mpx.writeSynthEvent(&se)
}

wg.Done()
}()
err = cmd.Start()
Expand All @@ -209,7 +226,6 @@ func runCmd(
go func() {
err := cmd.Wait()
jsonWriter.Close()
jsonReader.Close()
logp.Info("Command has completed(%d): %s", cmd.ProcessState.ExitCode(), loggableCmd.String())

var cmdError *SynthError = nil
Expand All @@ -234,9 +250,10 @@ func runCmd(
// scanToSynthEvents takes a reader, a transform function, and a callback, and processes
// each scanned line via the reader before invoking it with the callback.
func scanToSynthEvents(rdr io.ReadCloser, transform func(bytes []byte, text string) (*SynthEvent, error), cb func(*SynthEvent)) error {
defer rdr.Close()
scanner := bufio.NewScanner(rdr)
buf := make([]byte, 1024*1024*2) // 2MiB initial buffer (images can be big!)
scanner.Buffer(buf, 1024*1024*40) // Max 50MiB Buffer
buf := make([]byte, 1024*10) // 10KiB initial buffer
scanner.Buffer(buf, 1024*1024*10) // Max 10MiB Buffer

for scanner.Scan() {
se, err := transform(scanner.Bytes(), scanner.Text())
Expand Down

0 comments on commit e6db9a5

Please sign in to comment.