From b74c7aaece2a98e8bd9775ee74baa32f066909a7 Mon Sep 17 00:00:00 2001 From: Andre Nathan Date: Fri, 21 Feb 2020 17:01:45 -0300 Subject: [PATCH 1/7] Allow globs in FPM unix socket paths --- plugins/inputs/phpfpm/README.md | 2 + plugins/inputs/phpfpm/phpfpm.go | 79 +++++++++++++++++++++++++--- plugins/inputs/phpfpm/phpfpm_test.go | 65 +++++++++++++++++++++++ 3 files changed, 138 insertions(+), 8 deletions(-) diff --git a/plugins/inputs/phpfpm/README.md b/plugins/inputs/phpfpm/README.md index e2f4e0c2ff574..b31f4b7e427bd 100644 --- a/plugins/inputs/phpfpm/README.md +++ b/plugins/inputs/phpfpm/README.md @@ -19,6 +19,8 @@ Get phpfpm stats using either HTTP status page or fpm socket. ## "/var/run/php5-fpm.sock" ## or using a custom fpm status path: ## "/var/run/php5-fpm.sock:fpm-custom-status-path" + ## glob patterns are also supported: + ## "/var/run/php*.sock" ## ## - fcgi: the URL must start with fcgi:// or cgi://, and port must be present, ie: ## "fcgi://10.0.0.12:9000/status" diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index 2d280626176a6..b263e215ee29a 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "os" + "path/filepath" "strconv" "strings" "sync" @@ -95,7 +96,12 @@ func (g *phpfpm) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - for _, serv := range g.Urls { + urls, err := expandUrls(g.Urls) + if err != nil { + return err + } + + for _, serv := range urls { wg.Add(1) go func(serv string) { defer wg.Done() @@ -153,15 +159,10 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { statusPath = "status" } } else { - socketAddr := strings.Split(addr, ":") - if len(socketAddr) >= 2 { - socketPath = socketAddr[0] - statusPath = socketAddr[1] - } else { - socketPath = socketAddr[0] + socketPath, statusPath = unixSocketPaths(addr) + if statusPath == "" { statusPath = "status" } - if _, err := os.Stat(socketPath); os.IsNotExist(err) { return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err) } @@ -277,6 +278,68 @@ func importMetric(r io.Reader, acc telegraf.Accumulator, addr string) (poolStat, return stats, nil } +func expandUrls(urls []string) ([]string, error) { + addrs := make([]string, 0, len(urls)) + for _, url := range urls { + if isNetworkURL(url) { + addrs = append(addrs, url) + continue + } + paths, err := globUnixSocket(url) + if err != nil { + return nil, err + } + addrs = append(addrs, paths...) + } + return addrs, nil +} + +func globUnixSocket(url string) ([]string, error) { + pattern, status := unixSocketPaths(url) + paths, err := filepath.Glob(pattern) + if err != nil { + return nil, err + } + + addrs := make([]string, 0, len(paths)) + + if len(paths) == 0 { + _, err := os.Stat(pattern) + if os.IsNotExist(err) { + return nil, fmt.Errorf("Socket doesn't exist '%s': %s", pattern, err) + } + return nil, err + } + + for _, path := range paths { + if status != "" { + status = fmt.Sprintf(":%s", status) + } + addrs = append(addrs, fmt.Sprintf("%s%s", path, status)) + } + + return addrs, nil +} + +func unixSocketPaths(addr string) (string, string) { + var socketPath, statusPath string + + socketAddr := strings.Split(addr, ":") + if len(socketAddr) >= 2 { + socketPath = socketAddr[0] + statusPath = socketAddr[1] + } else { + socketPath = socketAddr[0] + statusPath = "" + } + + return socketPath, statusPath +} + +func isNetworkURL(addr string) bool { + return strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") || strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") +} + func init() { inputs.Add("phpfpm", func() telegraf.Input { return &phpfpm{} diff --git a/plugins/inputs/phpfpm/phpfpm_test.go b/plugins/inputs/phpfpm/phpfpm_test.go index f449b46498d15..05494e15d76e4 100644 --- a/plugins/inputs/phpfpm/phpfpm_test.go +++ b/plugins/inputs/phpfpm/phpfpm_test.go @@ -148,6 +148,71 @@ func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) { acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags) } +func TestPhpFpmGeneratesMetrics_From_Multiple_Sockets_With_Glob(t *testing.T) { + // Create a socket in /tmp because we always have write permission and if the + // removing of socket fail when system restart /tmp is clear so + // we don't have junk files around + var randomNumber int64 + binary.Read(rand.Reader, binary.LittleEndian, &randomNumber) + socket1 := fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber) + tcp1, err := net.Listen("unix", socket1) + if err != nil { + t.Fatal("Cannot initialize server on port ") + } + defer tcp1.Close() + + binary.Read(rand.Reader, binary.LittleEndian, &randomNumber) + socket2 := fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber) + tcp2, err := net.Listen("unix", socket2) + if err != nil { + t.Fatal("Cannot initialize server on port ") + } + defer tcp2.Close() + + s := statServer{} + go fcgi.Serve(tcp1, s) + go fcgi.Serve(tcp2, s) + + r := &phpfpm{ + Urls: []string{"/tmp/test-fpm[\\-0-9]*.sock"}, + } + + var acc1, acc2 testutil.Accumulator + + err = acc1.GatherError(r.Gather) + require.NoError(t, err) + + err = acc2.GatherError(r.Gather) + require.NoError(t, err) + + tags1 := map[string]string{ + "pool": "www", + "url": socket1, + } + + tags2 := map[string]string{ + "pool": "www", + "url": socket2, + } + + fields := map[string]interface{}{ + "start_since": int64(1991), + "accepted_conn": int64(3), + "listen_queue": int64(1), + "max_listen_queue": int64(0), + "listen_queue_len": int64(0), + "idle_processes": int64(1), + "active_processes": int64(1), + "total_processes": int64(2), + "max_active_processes": int64(1), + "max_children_reached": int64(2), + "slow_requests": int64(1), + } + + acc1.AssertContainsTaggedFields(t, "phpfpm", fields, tags1) + acc2.AssertContainsTaggedFields(t, "phpfpm", fields, tags2) +} + func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) { // Create a socket in /tmp because we always have write permission. If the // removing of socket fail we won't have junk files around. Cuz when system From bff2d92afdcbf2db99acdf5e20acaa3047caee4b Mon Sep 17 00:00:00 2001 From: Andre Nathan Date: Thu, 27 Feb 2020 14:18:19 -0300 Subject: [PATCH 2/7] Add error context when filepath.Glob fails. Co-Authored-By: Steven Soroka --- plugins/inputs/phpfpm/phpfpm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index b263e215ee29a..ce3604f40fbe4 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -298,7 +298,7 @@ func globUnixSocket(url string) ([]string, error) { pattern, status := unixSocketPaths(url) paths, err := filepath.Glob(pattern) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "couldn't read the file path pattern %q", pattern) } addrs := make([]string, 0, len(paths)) From 916f1d3433873232c120bb3d3b5aafa72642f92c Mon Sep 17 00:00:00 2001 From: Andre Nathan Date: Thu, 27 Feb 2020 14:26:31 -0300 Subject: [PATCH 3/7] Add missing import --- plugins/inputs/phpfpm/phpfpm.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index ce3604f40fbe4..550f5ec44ddd7 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" + "github.com/pkg/errors" ) const ( From a57b771378556191084c93360f3532f2e952f750 Mon Sep 17 00:00:00 2001 From: Andre Nathan Date: Thu, 27 Feb 2020 16:55:41 -0300 Subject: [PATCH 4/7] Use globpath.Globpath --- plugins/inputs/phpfpm/phpfpm.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index 550f5ec44ddd7..e704c6d04b044 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -8,16 +8,15 @@ import ( "net/http" "net/url" "os" - "path/filepath" "strconv" "strings" "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/pkg/errors" ) const ( @@ -297,11 +296,11 @@ func expandUrls(urls []string) ([]string, error) { func globUnixSocket(url string) ([]string, error) { pattern, status := unixSocketPaths(url) - paths, err := filepath.Glob(pattern) + glob, err := globpath.Compile(pattern) if err != nil { - return nil, errors.Wrapf(err, "couldn't read the file path pattern %q", pattern) + return nil, fmt.Errorf("couldn not compile glob %q: %v", pattern, err) } - + paths := glob.Match() addrs := make([]string, 0, len(paths)) if len(paths) == 0 { From 8455e2c03d63f68f22cd364aadbb47b10d8de3fb Mon Sep 17 00:00:00 2001 From: Andre Nathan Date: Thu, 27 Feb 2020 16:58:36 -0300 Subject: [PATCH 5/7] Fix typo --- plugins/inputs/phpfpm/phpfpm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index e704c6d04b044..c9856bd55737e 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -298,7 +298,7 @@ func globUnixSocket(url string) ([]string, error) { pattern, status := unixSocketPaths(url) glob, err := globpath.Compile(pattern) if err != nil { - return nil, fmt.Errorf("couldn not compile glob %q: %v", pattern, err) + return nil, fmt.Errorf("could not compile glob %q: %v", pattern, err) } paths := glob.Match() addrs := make([]string, 0, len(paths)) From b17709472fe5fbe4787e54b86637bad19d4672fc Mon Sep 17 00:00:00 2001 From: Andre Nathan Date: Fri, 28 Feb 2020 16:43:22 -0300 Subject: [PATCH 6/7] Handle static paths when using globpath.Match() --- plugins/inputs/phpfpm/phpfpm.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index c9856bd55737e..0ee7c72c0eab5 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -163,9 +163,6 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { if statusPath == "" { statusPath = "status" } - if _, err := os.Stat(socketPath); os.IsNotExist(err) { - return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err) - } fcgi, err = newFcgiClient("unix", socketPath) } @@ -301,16 +298,23 @@ func globUnixSocket(url string) ([]string, error) { return nil, fmt.Errorf("could not compile glob %q: %v", pattern, err) } paths := glob.Match() - addrs := make([]string, 0, len(paths)) - if len(paths) == 0 { - _, err := os.Stat(pattern) - if os.IsNotExist(err) { - return nil, fmt.Errorf("Socket doesn't exist '%s': %s", pattern, err) + return nil, nil + } + + // globpath.Match() returns the given argument when it's a static path, + // i.e., not a glob pattern. In that case, ensure it exists. + if len(paths) == 1 && paths[0] == pattern { + if _, err := os.Stat(paths[0]); err != nil { + if os.IsNotExist(err) { + return nil, fmt.Errorf("Socket doesn't exist '%s': %s", pattern, err) + } + return nil, err } - return nil, err } + addrs := make([]string, 0, len(paths)) + for _, path := range paths { if status != "" { status = fmt.Sprintf(":%s", status) From 2a7bf89548989a2be4de1e39873a9695060286a7 Mon Sep 17 00:00:00 2001 From: Andre Nathan Date: Fri, 28 Feb 2020 18:45:37 -0300 Subject: [PATCH 7/7] Error on empty glob matches --- plugins/inputs/phpfpm/phpfpm.go | 7 +------ plugins/inputs/phpfpm/phpfpm_test.go | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index 0ee7c72c0eab5..75a6aeb176498 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -299,18 +299,13 @@ func globUnixSocket(url string) ([]string, error) { } paths := glob.Match() if len(paths) == 0 { - return nil, nil - } - - // globpath.Match() returns the given argument when it's a static path, - // i.e., not a glob pattern. In that case, ensure it exists. - if len(paths) == 1 && paths[0] == pattern { if _, err := os.Stat(paths[0]); err != nil { if os.IsNotExist(err) { return nil, fmt.Errorf("Socket doesn't exist '%s': %s", pattern, err) } return nil, err } + return nil, nil } addrs := make([]string, 0, len(paths)) diff --git a/plugins/inputs/phpfpm/phpfpm_test.go b/plugins/inputs/phpfpm/phpfpm_test.go index 05494e15d76e4..64e5fbfea155f 100644 --- a/plugins/inputs/phpfpm/phpfpm_test.go +++ b/plugins/inputs/phpfpm/phpfpm_test.go @@ -292,7 +292,7 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testi err := acc.GatherError(r.Gather) require.Error(t, err) - assert.Equal(t, `Socket doesn't exist '/tmp/invalid.sock': stat /tmp/invalid.sock: no such file or directory`, err.Error()) + assert.Equal(t, `dial unix /tmp/invalid.sock: connect: no such file or directory`, err.Error()) }