Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cmd/opampsupervisor] Fix windows support #34573

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .chloggen/fix_supervisor-windows-signalling.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cmd/opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Fix supervisor support for Windows."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34570]
82 changes: 82 additions & 0 deletions .github/workflows/e2e-tests-windows.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
name: e2e-tests-windows

on:
push:
branches:
- main
tags:
- "v[0-9]+.[0-9]+.[0-9]+*"
paths-ignore:
- "**/README.md"
pull_request:
paths-ignore:
- "**/README.md"
merge_group:

env:
# Make sure to exit early if cache segment download times out after 2 minutes.
# We limit cache download as a whole to 5 minutes.
SEGMENT_DOWNLOAD_TIMEOUT_MINS: 2

jobs:
collector-build:
runs-on: windows-latest
if: ${{ github.actor != 'dependabot[bot]' && (contains(github.event.pull_request.labels.*.name, 'Run Windows') || github.event_name == 'push' || github.event_name == 'merge_group') }}
steps:
- name: Checkout
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: "1.21.12"
cache: false
- name: Cache Go
id: go-mod-cache
timeout-minutes: 25
uses: actions/cache@v4
with:
path: |
~\go\pkg\mod
~\AppData\Local\go-build
key: go-build-cache-${{ runner.os }}-${{ matrix.group }}-go-${{ hashFiles('**/go.sum') }}
- name: Install dependencies
if: steps.go-mod-cache.outputs.cache-hit != 'true'
run: make -j2 gomoddownload
- name: Build Collector
run: make otelcontribcol
- name: Upload Collector Binary
uses: actions/upload-artifact@v4
with:
name: collector-binary
path: ./bin/*

supervisor-test:
runs-on: windows-latest
if: ${{ github.actor != 'dependabot[bot]' && (contains(github.event.pull_request.labels.*.name, 'Run Windows') || github.event_name == 'push' || github.event_name == 'merge_group') }}
needs: [collector-build]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: "1.21.12"
cache: false
- name: Cache Go
id: go-mod-cache
timeout-minutes: 25
uses: actions/cache@v4
with:
path: |
~\go\pkg\mod
~\AppData\Local\go-build
key: go-build-cache-${{ runner.os }}-${{ matrix.group }}-go-${{ hashFiles('**/go.sum') }}
- name: Install dependencies
if: steps.go-mod-cache.outputs.cache-hit != 'true'
run: make -j2 gomoddownload
- name: Download Collector Binary
uses: actions/download-artifact@v4
with:
name: collector-binary
path: bin/
- name: Run opampsupervisor e2e tests
run: |
cd cmd/opampsupervisor
go test -v --tags=e2e
12 changes: 10 additions & 2 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func getSupervisorConfig(t *testing.T, configType string, extraConfigData map[st
if runtime.GOOS == "windows" {
extension = ".exe"
}

configData := map[string]string{
"goos": runtime.GOOS,
"goarch": runtime.GOARCH,
Expand All @@ -184,7 +185,10 @@ func getSupervisorConfig(t *testing.T, configType string, extraConfigData map[st
}
err = templ.Execute(&buf, configData)
require.NoError(t, err)
cfgFile, _ := os.CreateTemp(t.TempDir(), "config_*.yaml")
cfgFile, err := os.CreateTemp(t.TempDir(), "config_*.yaml")
require.NoError(t, err)
t.Cleanup(func() { cfgFile.Close() })

_, err = cfgFile.Write(buf.Bytes())
require.NoError(t, err)

Expand Down Expand Up @@ -617,11 +621,13 @@ func TestSupervisorReportsEffectiveConfig(t *testing.T) {
tempDir := t.TempDir()
testKeyFile, err := os.CreateTemp(tempDir, "confKey")
require.NoError(t, err)
t.Cleanup(func() { testKeyFile.Close() })

n, err := testKeyFile.Write([]byte(testKeyFile.Name()))
require.NoError(t, err)
require.NotZero(t, n)

colCfgTpl, err := os.ReadFile(path.Join("testdata", "collector", "split_config.yaml"))
colCfgTpl, err := os.ReadFile(filepath.Join("testdata", "collector", "split_config.yaml"))
require.NoError(t, err)

templ, err := template.New("").Parse(string(colCfgTpl))
Expand Down Expand Up @@ -770,9 +776,11 @@ func createSimplePipelineCollectorConf(t *testing.T) (*bytes.Buffer, []byte, *os
tempDir := t.TempDir()
inputFile, err := os.CreateTemp(tempDir, "input_*.yaml")
require.NoError(t, err)
t.Cleanup(func() { inputFile.Close() })

outputFile, err := os.CreateTemp(tempDir, "output_*.yaml")
require.NoError(t, err)
t.Cleanup(func() { outputFile.Close() })

colCfgTpl, err := os.ReadFile(path.Join(wd, "testdata", "collector", "simple_pipeline.yaml"))
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/opampsupervisor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
go.opentelemetry.io/collector/semconv v0.107.1-0.20240816132030-9fd84668bb02
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/sys v0.21.0
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
)
Expand All @@ -32,5 +33,4 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.21.0 // indirect
)
19 changes: 11 additions & 8 deletions cmd/opampsupervisor/supervisor/commander/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"os/exec"
"path/filepath"
"sync/atomic"
"syscall"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -73,26 +72,28 @@ func (c *Commander) Start(ctx context.Context) error {
c.logger.Debug("Starting agent", zap.String("agent", c.cfg.Executable))

logFilePath := filepath.Join(c.logsDir, "agent.log")
logFile, err := os.Create(logFilePath)
stdoutFile, err := os.Create(logFilePath)
if err != nil {
return fmt.Errorf("cannot create %s: %w", logFilePath, err)
}

c.cmd = exec.CommandContext(ctx, c.cfg.Executable, c.args...) // #nosec G204
c.cmd.SysProcAttr = sysProcAttrs()

// Capture standard output and standard error.
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21072
c.cmd.Stdout = logFile
c.cmd.Stderr = logFile
c.cmd.Stdout = stdoutFile
c.cmd.Stderr = stdoutFile

if err := c.cmd.Start(); err != nil {
stdoutFile.Close()
return err
}

c.logger.Debug("Agent process started", zap.Int("pid", c.cmd.Process.Pid))
c.running.Store(1)

go c.watch()
go c.watch(stdoutFile)

return nil
}
Expand All @@ -106,7 +107,9 @@ func (c *Commander) Restart(ctx context.Context) error {
return c.Start(ctx)
}

func (c *Commander) watch() {
func (c *Commander) watch(stdoutFile *os.File) {
defer stdoutFile.Close()

err := c.cmd.Wait()

// cmd.Wait returns an exec.ExitError when the Collector exits unsuccessfully or stops
Expand Down Expand Up @@ -160,7 +163,7 @@ func (c *Commander) Stop(ctx context.Context) error {
c.logger.Debug("Stopping agent process", zap.Int("pid", pid))

// Gracefully signal process to stop.
if err := c.cmd.Process.Signal(syscall.SIGTERM); err != nil {
if err := sendShutdownSignal(c.cmd.Process); err != nil {
return err
}

Expand All @@ -181,7 +184,7 @@ func (c *Commander) Stop(ctx context.Context) error {
c.logger.Debug(
"Agent process is not responding to SIGTERM. Sending SIGKILL to kill forcibly.",
zap.Int("pid", pid))
if innerErr = c.cmd.Process.Signal(syscall.SIGKILL); innerErr != nil {
if innerErr = c.cmd.Process.Signal(os.Kill); innerErr != nil {
return
}
}()
Expand Down
20 changes: 20 additions & 0 deletions cmd/opampsupervisor/supervisor/commander/commander_others.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !windows
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved

package commander

import (
"os"
"syscall"
)

func sendShutdownSignal(process *os.Process) error {
return process.Signal(os.Interrupt)
BinaryFissionGames marked this conversation as resolved.
Show resolved Hide resolved
}

func sysProcAttrs() *syscall.SysProcAttr {
// On non-windows systems, no extra attributes are needed.
return nil
}
40 changes: 40 additions & 0 deletions cmd/opampsupervisor/supervisor/commander/commander_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build windows
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved

package commander

import (
"os"
"syscall"

"golang.org/x/sys/windows"
)

var (
kernel32API = windows.NewLazySystemDLL("kernel32.dll")

ctrlEventProc = kernel32API.NewProc("GenerateConsoleCtrlEvent")
)

func sendShutdownSignal(process *os.Process) error {
// signaling with os.Interrupt is not supported on windows systems,
// so we need to use the windows API to properly send a graceful shutdown signal.
// See: https://learn.microsoft.com/en-us/windows/console/generateconsolectrlevent
r, _, e := ctrlEventProc.Call(syscall.CTRL_BREAK_EVENT, uintptr(process.Pid))
if r == 0 {
return e
}

return nil
}

func sysProcAttrs() *syscall.SysProcAttr {
// By default, a ctrl-break event applies to a whole process group, which ends up
// shutting down the supervisor. Instead, we spawn the collector in its own process
// group, so that sending a ctrl-break event shuts down just the collector.
return &syscall.SysProcAttr{
CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP,
}
}
8 changes: 1 addition & 7 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,17 +459,11 @@ func (s *Supervisor) startOpAMPClient() error {
func (s *Supervisor) startOpAMPServer() error {
s.opampServer = server.New(newLoggerFromZap(s.logger))

var err error
s.opampServerPort, err = s.findRandomPort()
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

s.logger.Debug("Starting OpAMP server...")

connected := &atomic.Bool{}

err = s.opampServer.Start(flattenedSettings{
err := s.opampServer.Start(flattenedSettings{
endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort),
onConnectingFunc: func(_ *http.Request) (bool, int) {
// Only allow one agent to be connected the this server at a time.
Expand Down
2 changes: 1 addition & 1 deletion cmd/opampsupervisor/testdata/collector/split_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ service:
exporters: [nop]
telemetry:
resource:
test_key: "${file:{{.TestKeyFile}}}"
test_key: '${file:{{.TestKeyFile}}}'
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ capabilities:
accepts_opamp_connection_settings: true

storage:
directory: "{{.storage_dir}}"
directory: '{{.storage_dir}}'

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ capabilities:
accepts_restart_command: true

storage:
directory: "{{.storage_dir}}"
directory: '{{.storage_dir}}'

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ capabilities:
accepts_restart_command: true

storage:
directory: "{{.storage_dir}}"
directory: '{{.storage_dir}}'

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ capabilities:
reports_remote_config: false

storage:
directory: "{{.storage_dir}}"
directory: '{{.storage_dir}}'

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ capabilities:
reports_remote_config: true

storage:
directory: {{.storage_dir}}
directory: '{{.storage_dir}}'

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ capabilities:
reports_remote_config: true

storage:
directory: "{{.storage_dir}}"
directory: '{{.storage_dir}}'

agent:
executable: ../../bin/otelcontribcol_darwin_arm64
Loading