Skip to content

Commit

Permalink
fix: add commander stopping sync mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
rogercoll committed Feb 18, 2025
1 parent 4e4986f commit 372361f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
16 changes: 16 additions & 0 deletions internal/examples/supervisor/supervisor/commander/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"os/exec"
"sync"
"sync/atomic"
"syscall"
"time"
Expand All @@ -24,6 +25,10 @@ type Commander struct {
doneCh chan struct{}
waitCh chan struct{}
running int64

// True when stopping is in progress.
isStoppingFlag bool
isStoppingMutex sync.RWMutex
}

func NewCommander(logger types.Logger, cfg *config.Agent, args ...string) (*Commander, error) {
Expand All @@ -41,6 +46,13 @@ func NewCommander(logger types.Logger, cfg *config.Agent, args ...string) (*Comm
// Start the Agent and begin watching the process.
// Agent's stdout and stderr are written to a file.
func (c *Commander) Start(ctx context.Context) error {
c.isStoppingMutex.Lock()
defer c.isStoppingMutex.Unlock()

if c.isStoppingFlag {
return nil
}

c.logger.Debugf(ctx, "Starting agent %s", c.cfg.Executable)

logFilePath := "agent.log"
Expand Down Expand Up @@ -116,6 +128,10 @@ func (c *Commander) IsRunning() bool {
// and if the process does not finish kills it forcedly by sending SIGKILL.
// Returns after the process is terminated.
func (c *Commander) Stop(ctx context.Context) error {
c.isStoppingMutex.Lock()
c.isStoppingFlag = true
c.isStoppingMutex.Unlock()

if c.cmd == nil || c.cmd.Process == nil {
// Not started, nothing to do.
return nil
Expand Down
6 changes: 3 additions & 3 deletions internal/examples/supervisor/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"os"
"testing"

"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opamp-go/internal"
Expand Down Expand Up @@ -51,9 +51,9 @@ func TestNewSupervisor(t *testing.T) {
server:
endpoint: ws://127.0.0.1:4320/v1/opamp
agent:
executable: %s/dummy_agent.sh`, tmpDir)), 0644)
executable: %s/dummy_agent.sh`, tmpDir)), 0o644)

os.WriteFile("dummy_agent.sh", []byte("#!/bin/sh\nsleep 9999\n"), 0755)
os.WriteFile(tmpDir+"dummy_agent.sh", []byte("#!/bin/sh\nsleep 9999\n"), 0o755)

startOpampServer(t)

Expand Down

0 comments on commit 372361f

Please sign in to comment.