Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow globs in FPM unix socket paths #7089

Merged
merged 7 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions plugins/inputs/phpfpm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Get phpfpm stats using either HTTP status page or fpm socket.
## "/var/run/php5-fpm.sock"
## or using a custom fpm status path:
## "/var/run/php5-fpm.sock:fpm-custom-status-path"
## glob patterns are also supported:
## "/var/run/php*.sock"
##
## - fcgi: the URL must start with fcgi:// or cgi://, and port must be present, ie:
## "fcgi://10.0.0.12:9000/status"
Expand Down
89 changes: 78 additions & 11 deletions plugins/inputs/phpfpm/phpfpm.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
Expand Down Expand Up @@ -95,7 +96,12 @@ func (g *phpfpm) Gather(acc telegraf.Accumulator) error {

var wg sync.WaitGroup

for _, serv := range g.Urls {
urls, err := expandUrls(g.Urls)
if err != nil {
return err
}

for _, serv := range urls {
wg.Add(1)
go func(serv string) {
defer wg.Done()
Expand Down Expand Up @@ -153,18 +159,10 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
statusPath = "status"
}
} else {
socketAddr := strings.Split(addr, ":")
if len(socketAddr) >= 2 {
socketPath = socketAddr[0]
statusPath = socketAddr[1]
} else {
socketPath = socketAddr[0]
socketPath, statusPath = unixSocketPaths(addr)
if statusPath == "" {
statusPath = "status"
}

if _, err := os.Stat(socketPath); os.IsNotExist(err) {
return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err)
}
fcgi, err = newFcgiClient("unix", socketPath)
}

Expand Down Expand Up @@ -277,6 +275,75 @@ func importMetric(r io.Reader, acc telegraf.Accumulator, addr string) (poolStat,
return stats, nil
}

func expandUrls(urls []string) ([]string, error) {
addrs := make([]string, 0, len(urls))
for _, url := range urls {
if isNetworkURL(url) {
addrs = append(addrs, url)
continue
}
paths, err := globUnixSocket(url)
if err != nil {
return nil, err
}
addrs = append(addrs, paths...)
}
return addrs, nil
}

func globUnixSocket(url string) ([]string, error) {
pattern, status := unixSocketPaths(url)
glob, err := globpath.Compile(pattern)
if err != nil {
return nil, fmt.Errorf("could not compile glob %q: %v", pattern, err)
}
paths := glob.Match()
if len(paths) == 0 {
return nil, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are planning to show an error if a glob doesn't have any matches, then I think we need to do it here.

Copy link
Contributor Author

@andrenth andrenth Feb 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ok, my thinking was that since globpath can diferentiate static paths from patterns, to allow non-matching globs, but to error on static paths. But it's probably better to be consistent.


// globpath.Match() returns the given argument when it's a static path,
// i.e., not a glob pattern. In that case, ensure it exists.
if len(paths) == 1 && paths[0] == pattern {
if _, err := os.Stat(paths[0]); err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("Socket doesn't exist '%s': %s", pattern, err)
}
return nil, err
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since, as you mentioned, Match() will return the path if there are no special glob characters, I think we can remove this too. We will still display an error when we dial the socket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (this required a change to the unit tests due to the error now coming from Dial).


addrs := make([]string, 0, len(paths))

for _, path := range paths {
if status != "" {
status = fmt.Sprintf(":%s", status)
}
addrs = append(addrs, fmt.Sprintf("%s%s", path, status))
}

return addrs, nil
}

func unixSocketPaths(addr string) (string, string) {
var socketPath, statusPath string

socketAddr := strings.Split(addr, ":")
if len(socketAddr) >= 2 {
socketPath = socketAddr[0]
statusPath = socketAddr[1]
} else {
socketPath = socketAddr[0]
statusPath = ""
}

return socketPath, statusPath
}

func isNetworkURL(addr string) bool {
return strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") || strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://")
}

func init() {
inputs.Add("phpfpm", func() telegraf.Input {
return &phpfpm{}
Expand Down
65 changes: 65 additions & 0 deletions plugins/inputs/phpfpm/phpfpm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,71 @@ func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
}

func TestPhpFpmGeneratesMetrics_From_Multiple_Sockets_With_Glob(t *testing.T) {
// Create a socket in /tmp because we always have write permission and if the
// removing of socket fail when system restart /tmp is clear so
// we don't have junk files around
var randomNumber int64
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
socket1 := fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber)
tcp1, err := net.Listen("unix", socket1)
if err != nil {
t.Fatal("Cannot initialize server on port ")
}
defer tcp1.Close()

binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
socket2 := fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber)
tcp2, err := net.Listen("unix", socket2)
if err != nil {
t.Fatal("Cannot initialize server on port ")
}
defer tcp2.Close()

s := statServer{}
go fcgi.Serve(tcp1, s)
go fcgi.Serve(tcp2, s)

r := &phpfpm{
Urls: []string{"/tmp/test-fpm[\\-0-9]*.sock"},
}

var acc1, acc2 testutil.Accumulator

err = acc1.GatherError(r.Gather)
require.NoError(t, err)

err = acc2.GatherError(r.Gather)
require.NoError(t, err)

tags1 := map[string]string{
"pool": "www",
"url": socket1,
}

tags2 := map[string]string{
"pool": "www",
"url": socket2,
}

fields := map[string]interface{}{
"start_since": int64(1991),
"accepted_conn": int64(3),
"listen_queue": int64(1),
"max_listen_queue": int64(0),
"listen_queue_len": int64(0),
"idle_processes": int64(1),
"active_processes": int64(1),
"total_processes": int64(2),
"max_active_processes": int64(1),
"max_children_reached": int64(2),
"slow_requests": int64(1),
}

acc1.AssertContainsTaggedFields(t, "phpfpm", fields, tags1)
acc2.AssertContainsTaggedFields(t, "phpfpm", fields, tags2)
}

func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
// Create a socket in /tmp because we always have write permission. If the
// removing of socket fail we won't have junk files around. Cuz when system
Expand Down