diff --git a/etc/telegraf.conf b/etc/telegraf.conf index a6057ecd22930..3deb7f895dcb4 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -123,6 +123,10 @@ [[inputs.mem]] # no configuration +# Get the number of processes and group them by status +[[inputs.processes]] + # no configuration + # Read metrics about swap memory usage [[inputs.swap]] # no configuration diff --git a/plugins/inputs/system/PROCESSES_README.md b/plugins/inputs/system/PROCESSES_README.md new file mode 100644 index 0000000000000..dc7b88682f396 --- /dev/null +++ b/plugins/inputs/system/PROCESSES_README.md @@ -0,0 +1,38 @@ +# Processes Input Plugin + +This plugin gathers info about the total number of processes and groups +them by status (zombie, sleeping, running, etc.) + +### Configuration: + +```toml +# Get the number of processes and group them by status +[[inputs.processes]] + # no configuration +``` + +### Measurements & Fields: + +- processes + - blocked + - running + - sleeping + - stopped + - total + - zombie + - wait (freebsd only) + - idle (bsd only) + - paging (linux only) + - total_threads (linux only) + +### Tags: + +### Example Output: + +Give an example `-test` output here + +``` +$ telegraf -config ~/ws/telegraf.conf -input-filter processes -test +* Plugin: processes, Collection 1 +> processes blocked=8i,running=1i,sleeping=265i,stopped=0i,total=274i,zombie=0i,paging=0i,total_threads=687i 1457478636980905042 +``` diff --git a/plugins/inputs/system/processes.go b/plugins/inputs/system/processes.go index c4b791e3c81ef..919972080b535 100644 --- a/plugins/inputs/system/processes.go +++ b/plugins/inputs/system/processes.go @@ -1,61 +1,212 @@ +// +build !windows + package system import ( + "bytes" "fmt" + "io/ioutil" "log" + "os" + "os/exec" + "path" + "runtime" + "strconv" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/shirou/gopsutil/process" ) type Processes struct { + execPS func() ([]byte, error) + readProcFile func(statFile string) ([]byte, error) + + forcePS bool + forceProc bool } -func (_ *Processes) Description() string { - return "Get the number of processes and group them by status (Linux only)" +func (p *Processes) Description() string { + return "Get the number of processes and group them by status" } -func (_ *Processes) SampleConfig() string { return "" } +func (p *Processes) SampleConfig() string { return "" } -func (s *Processes) Gather(acc telegraf.Accumulator) error { - pids, err := process.Pids() - if err != nil { - return fmt.Errorf("error getting pids list: %s", err) +func (p *Processes) Gather(acc telegraf.Accumulator) error { + // Get an empty map of metric fields + fields := getEmptyFields() + + // Decide if we will use 'ps' to get stats (use procfs otherwise) + usePS := true + if runtime.GOOS == "linux" { + usePS = false + } + if p.forcePS { + usePS = true + } else if p.forceProc { + usePS = false + } + + // Gather stats from 'ps' or procfs + if usePS { + if err := p.gatherFromPS(fields); err != nil { + return err + } + } else { + if err := p.gatherFromProc(fields); err != nil { + return err + } } - // TODO handle other OS (Windows/BSD/Solaris/OSX) + + acc.AddFields("processes", fields, nil) + return nil +} + +// Gets empty fields of metrics based on the OS +func getEmptyFields() map[string]interface{} { fields := map[string]interface{}{ - "paging": uint64(0), - "blocked": uint64(0), - "zombie": uint64(0), - "stopped": uint64(0), - "running": uint64(0), - "sleeping": uint64(0), - } - for _, pid := range pids { - process, err := process.NewProcess(pid) - if err != nil { - log.Printf("Can not get process %d status: %s", pid, err) + "blocked": int64(0), + "zombie": int64(0), + "stopped": int64(0), + "running": int64(0), + "sleeping": int64(0), + "total": int64(0), + } + switch runtime.GOOS { + case "freebsd": + fields["idle"] = int64(0) + fields["wait"] = int64(0) + case "darwin": + fields["idle"] = int64(0) + case "openbsd": + fields["idle"] = int64(0) + case "linux": + fields["paging"] = int64(0) + fields["total_threads"] = int64(0) + } + return fields +} + +// exec `ps` to get all process states +func (p *Processes) gatherFromPS(fields map[string]interface{}) error { + out, err := p.execPS() + if err != nil { + return err + } + + for _, status := range bytes.Fields(out) { + switch status[0] { + case 'W': + fields["wait"] = fields["wait"].(int64) + int64(1) + case 'U', 'D': + // Also known as uninterruptible sleep or disk sleep + fields["blocked"] = fields["blocked"].(int64) + int64(1) + case 'Z': + fields["zombie"] = fields["zombie"].(int64) + int64(1) + case 'T': + fields["stopped"] = fields["stopped"].(int64) + int64(1) + case 'R': + fields["running"] = fields["running"].(int64) + int64(1) + case 'S': + fields["sleeping"] = fields["sleeping"].(int64) + int64(1) + case 'I': + fields["idle"] = fields["idle"].(int64) + int64(1) + default: + log.Printf("processes: Unknown state [ %s ] from ps", + string(status[0])) + } + fields["total"] = fields["total"].(int64) + int64(1) + } + return nil +} + +// get process states from /proc/(pid)/stat files +func (p *Processes) gatherFromProc(fields map[string]interface{}) error { + files, err := ioutil.ReadDir("/proc") + if err != nil { + return err + } + + for _, file := range files { + if !file.IsDir() { continue } - status, err := process.Status() + + statFile := path.Join("/proc", file.Name(), "stat") + data, err := p.readProcFile(statFile) if err != nil { - log.Printf("Can not get process %d status: %s\n", pid, err) + return err + } + if data == nil { continue } - _, exists := fields[status] - if !exists { - log.Printf("Status '%s' for process with pid: %d\n", status, pid) + + stats := bytes.Fields(data) + if len(stats) < 3 { + return fmt.Errorf("Something is terribly wrong with %s", statFile) + } + switch stats[2][0] { + case 'R': + fields["running"] = fields["running"].(int64) + int64(1) + case 'S': + fields["sleeping"] = fields["sleeping"].(int64) + int64(1) + case 'D': + fields["blocked"] = fields["blocked"].(int64) + int64(1) + case 'Z': + fields["zombies"] = fields["zombies"].(int64) + int64(1) + case 'T', 't': + fields["stopped"] = fields["stopped"].(int64) + int64(1) + case 'W': + fields["paging"] = fields["paging"].(int64) + int64(1) + default: + log.Printf("processes: Unknown state [ %s ] in file %s", + string(stats[2][0]), statFile) + } + fields["total"] = fields["total"].(int64) + int64(1) + + threads, err := strconv.Atoi(string(stats[19])) + if err != nil { + log.Printf("processes: Error parsing thread count: %s", err) continue } - fields[status] = fields[status].(uint64) + uint64(1) + fields["total_threads"] = fields["total_threads"].(int64) + int64(threads) } - - acc.AddFields("processes", fields, nil) return nil } + +func readProcFile(statFile string) ([]byte, error) { + if _, err := os.Stat(statFile); os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err + } + + data, err := ioutil.ReadFile(statFile) + if err != nil { + return nil, err + } + + return data, nil +} + +func execPS() ([]byte, error) { + bin, err := exec.LookPath("ps") + if err != nil { + return nil, err + } + + out, err := exec.Command(bin, "axo", "state").Output() + if err != nil { + return nil, err + } + + return out, err +} + func init() { inputs.Add("processes", func() telegraf.Input { - return &Processes{} + return &Processes{ + execPS: execPS, + readProcFile: readProcFile, + } }) } diff --git a/plugins/inputs/system/processes_test.go b/plugins/inputs/system/processes_test.go index 246884711082a..adc74a3e9e351 100644 --- a/plugins/inputs/system/processes_test.go +++ b/plugins/inputs/system/processes_test.go @@ -1,6 +1,8 @@ package system import ( + "fmt" + "runtime" "testing" "github.com/influxdata/telegraf/testutil" @@ -9,13 +11,135 @@ import ( ) func TestProcesses(t *testing.T) { - processes := &Processes{} + processes := &Processes{ + execPS: execPS, + readProcFile: readProcFile, + } var acc testutil.Accumulator err := processes.Gather(&acc) require.NoError(t, err) - assert.True(t, acc.HasUIntField("processes", "running")) - assert.True(t, acc.HasUIntField("processes", "sleeping")) - assert.True(t, acc.HasUIntField("processes", "stopped")) + assert.True(t, acc.HasIntField("processes", "running")) + assert.True(t, acc.HasIntField("processes", "sleeping")) + assert.True(t, acc.HasIntField("processes", "stopped")) + assert.True(t, acc.HasIntField("processes", "total")) + total, ok := acc.Get("processes") + require.True(t, ok) + assert.True(t, total.Fields["total"].(int64) > 0) } + +func TestFromPS(t *testing.T) { + processes := &Processes{ + execPS: testExecPS, + forcePS: true, + } + + var acc testutil.Accumulator + err := processes.Gather(&acc) + require.NoError(t, err) + + fields := getEmptyFields() + fields["blocked"] = int64(1) + fields["running"] = int64(4) + fields["sleeping"] = int64(34) + fields["total"] = int64(39) + + acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{}) +} + +func TestFromPSError(t *testing.T) { + processes := &Processes{ + execPS: testExecPSError, + forcePS: true, + } + + var acc testutil.Accumulator + err := processes.Gather(&acc) + require.Error(t, err) +} + +func TestFromProcFiles(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("This test only runs on linux") + } + tester := tester{} + processes := &Processes{ + readProcFile: tester.testProcFile, + forceProc: true, + } + + var acc testutil.Accumulator + err := processes.Gather(&acc) + require.NoError(t, err) + + fields := getEmptyFields() + fields["sleeping"] = tester.calls + fields["total_threads"] = tester.calls * 2 + fields["total"] = tester.calls + + acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{}) +} + +func testExecPS() ([]byte, error) { + return []byte(testPSOut), nil +} + +// struct for counting calls to testProcFile +type tester struct { + calls int64 +} + +func (t *tester) testProcFile(_ string) ([]byte, error) { + t.calls++ + return []byte(fmt.Sprintf(testProcStat, "S", "2")), nil +} + +func testExecPSError() ([]byte, error) { + return []byte(testPSOut), fmt.Errorf("ERROR!") +} + +const testPSOut = ` +S +S +S +S +R +R +S +S +Ss +Ss +S +SNs +Ss +Ss +S +R+ +S +U +S +S +S +S +Ss +S+ +Ss +S +S+ +S+ +Ss +S+ +Ss +S +R+ +Ss +S +S+ +S+ +Ss +S+ +` + +const testProcStat = `10 (rcuob/0) %s 2 0 0 0 -1 2129984 0 0 0 0 0 0 0 0 20 0 %s 0 11 0 0 18446744073709551615 0 0 0 0 0 0 0 2147483647 0 18446744073709551615 0 0 17 0 0 0 0 0 0 0 0 0 0 0 0 0 0 +` diff --git a/scripts/circle-test.sh b/scripts/circle-test.sh index 9a3e0e6780235..f0288c73e60d7 100755 --- a/scripts/circle-test.sh +++ b/scripts/circle-test.sh @@ -68,7 +68,7 @@ telegraf -sample-config > $tmpdir/config.toml exit_if_fail telegraf -config $tmpdir/config.toml \ -test -input-filter cpu:mem -mv $GOPATH/bin/telegraf $CIRCLE_ARTIFACTS +cat $GOPATH/bin/telegraf | gzip > $CIRCLE_ARTIFACTS/telegraf.gz eval "git describe --exact-match HEAD" if [ $? -eq 0 ]; then