Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in filtering, cleanup #31595

Merged
merged 3 commits into from
May 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions libbeat/metric/system/process/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package process

import (
"math"
"time"

"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -31,11 +32,11 @@ func unixTimeMsToTime(unixTimeMs uint64) string {
return common.Time(time.Unix(0, int64(unixTimeMs*1000000))).String()
}

func stripNullByte(buf []byte) string {
func stripNullByte(buf []byte) string { //nolint:deadcode,unused // used by platform-specific code
return string(buf[0 : len(buf)-1])
}

func stripNullByteRaw(buf []byte) []byte {
func stripNullByteRaw(buf []byte) []byte { //nolint:deadcode,unused // used by platform-specific code
return buf[0 : len(buf)-1]
}

Expand All @@ -52,7 +53,7 @@ func GetProcMemPercentage(proc ProcState, totalPhyMem uint64) opt.Float {

// isProcessInSlice looks up proc in the processes slice and returns if
// found or not
func isProcessInSlice(processes []ProcState, proc *ProcState) bool {
func isProcessInSlice(processes []ProcState, proc ProcState) bool {
for _, p := range processes {
if p.Pid == proc.Pid {
return true
Expand All @@ -79,10 +80,15 @@ func GetProcCPUPercentage(s0, s1 ProcState) ProcState {
}

timeDelta := s1.SampleTime.Sub(s0.SampleTime)
timeDeltaMillis := timeDelta / time.Millisecond
timeDeltaMillis := float64(timeDelta / time.Millisecond)
totalCPUDeltaMillis := int64(s1.CPU.Total.Ticks.ValueOr(0) - s0.CPU.Total.Ticks.ValueOr(0))

pct := float64(totalCPUDeltaMillis) / float64(timeDeltaMillis)
pct := float64(totalCPUDeltaMillis) / timeDeltaMillis
// In theory this can only happen if the time delta is 0, which is unlikely but possible.
// With all the type conversion and non-integer math, this is probably the safest way to check.
if math.IsNaN(pct) {
return s1
}
normalizedPct := pct / float64(numcpu.NumCPU())

s1.CPU.Total.Norm.Pct = opt.FloatWith(common.Round(normalizedPct, common.DefaultDecimalPlacesCount))
Expand Down
51 changes: 27 additions & 24 deletions libbeat/metric/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
package process

import (
"errors"
"fmt"
"os"
"sort"
"strings"
"time"

"github.com/pkg/errors"

"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/go-sysinfo/types"

Expand Down Expand Up @@ -128,13 +127,13 @@ func ListStates(hostfs resolve.Resolver) ([]ProcState, error) {
}
err := init.Init()
if err != nil {
return nil, errors.Wrap(err, "error initializing process collectors")
return nil, fmt.Errorf("error initializing process collectors: %w", err)
}

// actually fetch the PIDs from the OS-specific code
_, plist, err := init.FetchPids()
_, plist, err := init.FetchPids() //nolint:typecheck // platform-specific code
if err != nil {
return nil, errors.Wrap(err, "error gathering PIDs")
return nil, fmt.Errorf("error gathering PIDs: %w", err)
}

return plist, nil
Expand Down Expand Up @@ -166,7 +165,7 @@ func (procStats *Stats) Init() error {
for _, pattern := range procStats.Procs {
reg, err := match.Compile(pattern)
if err != nil {
return fmt.Errorf("Failed to compile regexp [%s]: %v", pattern, err)
return fmt.Errorf("failed to compile regexp [%s]: %w", pattern, err)
}
procStats.procRegexps = append(procStats.procRegexps, reg)
}
Expand All @@ -175,17 +174,17 @@ func (procStats *Stats) Init() error {
for _, pattern := range procStats.EnvWhitelist {
reg, err := match.Compile(pattern)
if err != nil {
return fmt.Errorf("failed to compile env whitelist regexp [%v]: %v", pattern, err)
return fmt.Errorf("failed to compile env whitelist regexp [%v]: %w", pattern, err)
}
procStats.envRegexps = append(procStats.envRegexps, reg)
}

if procStats.EnableCgroups {
cgReader, err := cgroup.NewReaderOptions(procStats.CgroupOpts)
if err == cgroup.ErrCgroupsMissing {
if errors.Is(err, cgroup.ErrCgroupsMissing) {
logp.Warn("cgroup data collection will be disabled: %v", err)
} else if err != nil {
return errors.Wrap(err, "error initializing cgroup reader")
return fmt.Errorf("error initializing cgroup reader: %w", err)
}
procStats.cgroups = cgReader
}
Expand All @@ -200,16 +199,16 @@ func (procStats *Stats) Get() ([]mapstr.M, []mapstr.M, error) {
}

// actually fetch the PIDs from the OS-specific code
pidMap, plist, err := procStats.FetchPids()
pidMap, plist, err := procStats.FetchPids() //nolint:typecheck // platform-specific code

if err != nil {
return nil, nil, errors.Wrap(err, "error gathering PIDs")
return nil, nil, fmt.Errorf("error gathering PIDs: %w", err)
}
// We use this to track processes over time.
procStats.ProcsMap = pidMap

// filter the process list that will be passed down to users
procStats.includeTopProcesses(plist)
plist = procStats.includeTopProcesses(plist)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the primary bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup.


// This is a holdover until we migrate this library to metricbeat/internal
// At which point we'll use the memory code there.
Expand All @@ -228,17 +227,21 @@ func (procStats *Stats) Get() ([]mapstr.M, []mapstr.M, error) {
procs := []mapstr.M{}
rootEvents := []mapstr.M{}

// bulk of data collection done, now convert to the various data formats expected by the output
for _, process := range plist {
// Add the RSS pct memory first
process.Memory.Rss.Pct = GetProcMemPercentage(process, totalPhyMem)
//Create the root event
root := process.FormatForRoot()
rootMap := mapstr.M{}
err := typeconv.Convert(&rootMap, root)
if err != nil {
return nil, nil, fmt.Errorf("error converting process for pid %d: %w", process.Pid.ValueOr(0), err)
}

proc, err := procStats.getProcessEvent(&process)
proc, err := procStats.getProcessEvent(process)
if err != nil {
return nil, nil, errors.Wrapf(err, "error converting process for pid %d", process.Pid.ValueOr(0))
return nil, nil, fmt.Errorf("error fetching process event for pid %d: %w", process.Pid.ValueOr(0), err)
}

procs = append(procs, proc)
Expand All @@ -252,13 +255,13 @@ func (procStats *Stats) Get() ([]mapstr.M, []mapstr.M, error) {
func (procStats *Stats) GetOne(pid int) (mapstr.M, error) {
pidStat, _, err := procStats.pidFill(pid, false)
if err != nil {
return nil, errors.Wrapf(err, "error fetching PID %d", pid)
return nil, fmt.Errorf("error fetching PID %d: %w", pid, err)
}
newMap := make(ProcsMap)
newMap[pid] = pidStat
procStats.ProcsMap = newMap

return procStats.getProcessEvent(&pidStat)
return procStats.getProcessEvent(pidStat)
}

// GetSelf gets process info for the beat itself
Expand All @@ -267,7 +270,7 @@ func (procStats *Stats) GetSelf() (ProcState, error) {

pidStat, _, err := procStats.pidFill(self, false)
if err != nil {
return ProcState{}, errors.Wrapf(err, "error fetching PID %d", self)
return ProcState{}, fmt.Errorf("error fetching PID %d: %w", self, err)
}

return pidStat, nil
Expand Down Expand Up @@ -299,9 +302,9 @@ func (procStats *Stats) pidFill(pid int, filter bool) (ProcState, bool, error) {
// Fetch proc state so we can get the name for filtering based on user's filter.

// OS-specific entrypoint, get basic info so we can at least run matchProcess
status, err := GetInfoForPid(procStats.Hostfs, pid)
status, err := GetInfoForPid(procStats.Hostfs, pid) //nolint:typecheck // platform-specific code
if err != nil {
return status, true, errors.Wrap(err, "GetInfoForPid")
return status, true, fmt.Errorf("GetInfoForPid: %w", err)
}
if procStats.skipExtended {
return status, true, nil
Expand All @@ -316,9 +319,9 @@ func (procStats *Stats) pidFill(pid int, filter bool) (ProcState, bool, error) {
}

//If we've passed the filter, continue to fill out the rest of the metrics
status, err = FillPidMetrics(procStats.Hostfs, pid, status, procStats.isWhitelistedEnvVar)
status, err = FillPidMetrics(procStats.Hostfs, pid, status, procStats.isWhitelistedEnvVar) //nolint:typecheck // platform-specific code
if err != nil {
return status, true, errors.Wrap(err, "FillPidMetrics")
return status, true, fmt.Errorf("FillPidMetrics: %w", err)
}
if len(status.Args) > 0 && status.Cmdline == "" {
status.Cmdline = strings.Join(status.Args, " ")
Expand All @@ -330,7 +333,7 @@ func (procStats *Stats) pidFill(pid int, filter bool) (ProcState, bool, error) {
if procStats.EnableCgroups {
cgStats, err := procStats.cgroups.GetStatsForPid(status.Pid.ValueOr(0))
if err != nil {
return status, true, errors.Wrap(err, "cgroups.GetStatsForPid")
return status, true, fmt.Errorf("cgroups.GetStatsForPid: %w", err)
}
status.Cgroup = cgStats
if ok {
Expand Down Expand Up @@ -359,7 +362,7 @@ func (procStats *Stats) cacheCmdLine(in ProcState) ProcState {
}

// return a formatted MapStr of the process metrics
func (procStats *Stats) getProcessEvent(process *ProcState) (mapstr.M, error) {
func (procStats *Stats) getProcessEvent(process ProcState) (mapstr.M, error) {

// Remove CPUTicks if needed
if !procStats.CPUTicks {
Expand Down Expand Up @@ -415,7 +418,7 @@ func (procStats *Stats) includeTopProcesses(processes []ProcState) []ProcState {
return processes[i].Memory.Rss.Bytes.ValueOr(0) > processes[j].Memory.Rss.Bytes.ValueOr(0)
})
for _, proc := range processes[:numProcs] {
if !isProcessInSlice(result, &proc) {
if !isProcessInSlice(result, proc) {
result = append(result, proc)
}
}
Expand Down
19 changes: 9 additions & 10 deletions libbeat/metric/system/process/process_aix.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package process

import (
"bytes"
"fmt"
"io"
"os"
"os/user"
"strconv"
"syscall"
"unsafe"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/metric/system/resolve"
"github.com/elastic/beats/v7/libbeat/opt"
)
Expand All @@ -44,7 +43,7 @@ func (procStats *Stats) FetchPids() (ProcsMap, []ProcState, error) {
// getprocs first argument is a void*
num, err := C.getprocs(unsafe.Pointer(&info), C.sizeof_struct_procsinfo64, nil, 0, &pid, 1)
if err != nil {
return nil, nil, errors.Wrap(err, "error fetching PIDs")
return nil, nil, fmt.Errorf("error fetching PIDs: %w", err)
}
procMap, plist = procStats.pidIter(int(info.pi_pid), procMap, plist)

Expand All @@ -62,7 +61,7 @@ func GetInfoForPid(_ resolve.Resolver, pid int) (ProcState, error) {

num, err := C.getprocs(unsafe.Pointer(&info), C.sizeof_struct_procsinfo64, nil, 0, &cpid, 1)
if err != nil {
return ProcState{}, errors.Wrap(err, "error in getprocs")
return ProcState{}, fmt.Errorf("error in getprocs: %w", err)
}
if num != 1 {
return ProcState{}, syscall.ESRCH
Expand Down Expand Up @@ -110,7 +109,7 @@ func FillPidMetrics(_ resolve.Resolver, pid int, state ProcState, filter func(st

num, err := C.getprocs(unsafe.Pointer(&info), C.sizeof_struct_procsinfo64, nil, 0, &cpid, 1)
if err != nil {
return state, errors.Wrap(err, "error in getprocs")
return state, fmt.Errorf("error in getprocs: %w", err)
}
if num != 1 {
return state, syscall.ESRCH
Expand All @@ -131,7 +130,7 @@ func FillPidMetrics(_ resolve.Resolver, pid int, state ProcState, filter func(st
info.pi_pid = C.pid_t(pid)

if _, err := C.getargs(unsafe.Pointer(&info), C.sizeof_struct_procsinfo64, (*C.char)(&buf[0]), 8192); err != nil {
return state, errors.Wrap(err, "error in gitargs")
return state, fmt.Errorf("error in gitargs: %w", err)
}

bbuf := bytes.NewBuffer(buf)
Expand All @@ -143,7 +142,7 @@ func FillPidMetrics(_ resolve.Resolver, pid int, state ProcState, filter func(st
break
}
if err != nil {
return state, errors.Wrap(err, "error reading args buffer")
return state, fmt.Errorf("error reading args buffer: %w", err)
}

args = append(args, stripNullByte(arg))
Expand All @@ -155,7 +154,7 @@ func FillPidMetrics(_ resolve.Resolver, pid int, state ProcState, filter func(st
buf = make([]byte, 8192)

if _, err := C.getevars(unsafe.Pointer(&info), C.sizeof_struct_procsinfo64, (*C.char)(&buf[0]), 8192); err != nil {
return state, errors.Wrap(err, "error in getevars")
return state, fmt.Errorf("error in getevars: %w", err)
}

if state.Env != nil {
Expand All @@ -171,12 +170,12 @@ func FillPidMetrics(_ resolve.Resolver, pid int, state ProcState, filter func(st
break
}
if err != nil {
return state, errors.Wrap(err, "error")
return state, fmt.Errorf("error: %w", err)
}

pair := bytes.SplitN(stripNullByteRaw(line), delim, 2)
if len(pair) != 2 {
return state, errors.Wrap(err, "error reading environment")
return state, fmt.Errorf("error reading environment: %w", err)
}
eKey := string(pair[0])
if filter == nil || filter(eKey) {
Expand Down
16 changes: 7 additions & 9 deletions libbeat/metric/system/process/process_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ import (
"time"
"unsafe"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/metric/system/resolve"
"github.com/elastic/beats/v7/libbeat/opt"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -139,7 +137,7 @@ func FillPidMetrics(_ resolve.Resolver, pid int, state ProcState, filter func(st

args, exe, env, err := getProcArgs(pid, filter)
if err != nil {
return state, errors.Wrap(err, "error fetching string data from process")
return state, fmt.Errorf("error fetching string data from process: %w", err)
}

state.Args = args
Expand All @@ -160,7 +158,7 @@ func getProcArgs(pid int, filter func(string) bool) ([]string, string, mapstr.M,
buf := make([]byte, argmax)
err := sysctl(mib, &buf[0], &argmax, nil, 0)
if err != nil {
return nil, "", nil, errors.Wrap(err, "error in sysctl")
return nil, "", nil, fmt.Errorf("error in sysctl: %w", err)
}

bbuf := bytes.NewBuffer(buf)
Expand All @@ -171,7 +169,7 @@ func getProcArgs(pid int, filter func(string) bool) ([]string, string, mapstr.M,

path, err := bbuf.ReadBytes(0)
if err != nil {
return nil, "", nil, errors.Wrap(err, "Error reading the executable name")
return nil, "", nil, fmt.Errorf("error reading the executable name: %w", err)
}

exeName = stripNullByte(path)
Expand All @@ -180,7 +178,7 @@ func getProcArgs(pid int, filter func(string) bool) ([]string, string, mapstr.M,
for {
c, err := bbuf.ReadByte()
if err != nil {
return nil, "", nil, errors.Wrap(err, "Error skipping nul values in KERN_PROCARGS2 buffer")
return nil, "", nil, fmt.Errorf("error skipping nul values in KERN_PROCARGS2 buffer: %w", err)
}
if c != 0 {
bbuf.UnreadByte()
Expand All @@ -196,7 +194,7 @@ func getProcArgs(pid int, filter func(string) bool) ([]string, string, mapstr.M,
break
}
if err != nil {
return nil, exeName, nil, errors.Wrap(err, "Error reading args from KERN_PROCARGS2")
return nil, exeName, nil, fmt.Errorf("error reading args from KERN_PROCARGS2: %w", err)
}
argv = append(argv, stripNullByte(arg))
}
Expand All @@ -210,12 +208,12 @@ func getProcArgs(pid int, filter func(string) bool) ([]string, string, mapstr.M,
break
}
if err != nil {
return argv, exeName, nil, errors.Wrap(err, "Error reading args from KERN_PROCARGS2 buffer")
return argv, exeName, nil, fmt.Errorf("error reading args from KERN_PROCARGS2 buffer: %w", err)
}
pair := bytes.SplitN(stripNullByteRaw(line), delim, 2)

if len(pair) != 2 {
return argv, exeName, nil, errors.Wrap(err, "Error reading process information from KERN_PROCARGS2")
return argv, exeName, nil, fmt.Errorf("error reading process information from KERN_PROCARGS2: %w", err)
}
eKey := string(pair[0])
if filter == nil || filter(eKey) {
Expand Down
Loading