Skip to content

Commit

Permalink
Subprocess manager and its tests for the prometheus_exec Receiver (#499)
Browse files Browse the repository at this point in the history
* config, subprocess manager and tests

* Changed the folder name, and made small changes according to James' comments

* Removed changes in global go.sum

* took out config from its own package
  • Loading branch information
Nicolas-MacBeth authored Aug 5, 2020
1 parent 157e5a6 commit ff7ab28
Show file tree
Hide file tree
Showing 8 changed files with 520 additions and 0 deletions.
1 change: 1 addition & 0 deletions receiver/prometheusexecreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
8 changes: 8 additions & 0 deletions receiver/prometheusexecreceiver/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusexecreceiver

go 1.14

require (
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
go.uber.org/zap v1.15.0
)
57 changes: 57 additions & 0 deletions receiver/prometheusexecreceiver/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.15.0 h1:ZZCA22JRF2gQE5FoNmhmrf7jeJJ2uhqDUNRYKm8dvmM=
go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
33 changes: 33 additions & 0 deletions receiver/prometheusexecreceiver/subprocessmanager/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package subprocessmanager

// SubprocessConfig is the config definition for the subprocess manager
type SubprocessConfig struct {
// Command is the command to be run (binary + flags, separated by commas)
Command string `mapstructure:"exec"`
// Port is the port assigned to the Receiver, and to the {{port}} template variables
Port int `mapstructure:"port"`
// Env is a list of env variables to pass to a specific command
Env []EnvConfig `mapstructure:"env"`
}

// EnvConfig is the config definition of each key-value pair for environment variables
type EnvConfig struct {
// Name is the name of the environment variable
Name string `mapstructure:"name"`
// Value is the value of the variable
Value string `mapstructure:"value"`
}
146 changes: 146 additions & 0 deletions receiver/prometheusexecreceiver/subprocessmanager/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package subprocessmanager

import (
"bufio"
"fmt"
"io"
"math"
"math/rand"
"os"
"os/exec"
"strings"
"time"

"github.com/kballard/go-shellquote"
"go.uber.org/zap"
)

// Process struct holds all the info needed to instantiate a subprocess
type Process struct {
Command string
Port int
Env []EnvConfig
}

const (
// HealthyProcessTime is the default time a process needs to stay alive to be considered healthy
HealthyProcessTime time.Duration = 30 * time.Minute
// HealthyCrashCount is the amount of times a process can crash (within the healthyProcessTime) before being considered unstable - it may be trying to find a port
HealthyCrashCount int = 3
// delayMutiplier is the factor by which the delay scales
delayMultiplier float64 = 2.0
// initialDelay is the initial delay before a process is restarted
initialDelay time.Duration = 1 * time.Second
)

// Run will start the process and keep track of running time
func (proc *Process) Run(logger *zap.Logger) (time.Duration, error) {

var argsSlice []string

// Parse the command line string into arguments
args, err := shellquote.Split(proc.Command)
if err != nil {
return 0, fmt.Errorf("could not parse command error: %w", err)
}
// Separate the executable from the flags for the Command object
if len(args) > 1 {
argsSlice = args[1:]
}

// Create the command object and attach current os environment + environment variables defined by user
childProcess := exec.Command(args[0], argsSlice...)
childProcess.Env = append(os.Environ(), formatEnvSlice(&proc.Env)...)

// Handle the subprocess standard and error outputs in goroutines
stdoutReader, stdoutErr := childProcess.StdoutPipe()
if stdoutErr != nil {
return 0, fmt.Errorf("could not get the command's stdout pipe, err: %w", stdoutErr)
}
go proc.pipeSubprocessOutput(bufio.NewReader(stdoutReader), logger, true)

stderrReader, stderrErr := childProcess.StderrPipe()
if stderrErr != nil {
return 0, fmt.Errorf("could not get the command's stderr pipe, err: %w", stderrErr)
}
go proc.pipeSubprocessOutput(bufio.NewReader(stderrReader), logger, false)

// Start and stop timer (elapsed) right before and after executing the command
start := time.Now()
errProcess := childProcess.Start()
if errProcess != nil {
return 0, fmt.Errorf("process could not start: %w", errProcess)
}

errProcess = childProcess.Wait()
elapsed := time.Since(start)
if errProcess != nil {
return elapsed, fmt.Errorf("process error: %w", errProcess)
}

return elapsed, nil
}

// Log every line of the subprocesse's output using zap, until pipe is closed (EOF)
func (proc *Process) pipeSubprocessOutput(reader *bufio.Reader, logger *zap.Logger, isStdout bool) {
for {
line, err := reader.ReadString('\n')
if err != nil && err != io.EOF {
logger.Info("subprocess logging failed", zap.String("error", err.Error()))
break
}

line = strings.TrimSpace(line)
if line != "" {
if isStdout {
logger.Info("subprocess output line", zap.String("output", line))
} else {
logger.Error("subprocess output line", zap.String("output", line))
}
}

// Leave this function when error is EOF (stderr/stdout pipe was closed)
if err == io.EOF {
break
}
}
}

// formatEnvSlice will loop over the key-value pairs and format the slice correctly for use by the Command object ("name=value")
func formatEnvSlice(envs *[]EnvConfig) []string {
if len(*envs) == 0 {
return nil
}

envSlice := make([]string, len(*envs))
for i, env := range *envs {
envSlice[i] = fmt.Sprintf("%v=%v", env.Name, env.Value)
}

return envSlice
}

// GetDelay will compute the delay for a given process according to its crash count and time alive using an exponential backoff algorithm
func GetDelay(elapsed time.Duration, crashCount int) time.Duration {
// Return initialDelay if the process is healthy (lasted longer than health duration) or has less or equal than 3 crashes - it could be trying to find a port
if elapsed > HealthyProcessTime || crashCount <= HealthyCrashCount {
return initialDelay
}

// Return initialDelay times 2 to the power of crashCount-3 (to offset for the 3 allowed crashes) added to a random number
return initialDelay * time.Duration(math.Pow(delayMultiplier, float64(crashCount-HealthyCrashCount)+rand.Float64()))
}
Loading

0 comments on commit ff7ab28

Please sign in to comment.