Skip to content

Commit

Permalink
Report AvailableComponents via OpAMP supervisor
Browse files Browse the repository at this point in the history
  • Loading branch information
mrsillydog committed Jan 15, 2025
1 parent 57a5d0d commit 9cf3185
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 13 deletions.
27 changes: 27 additions & 0 deletions .chloggen/report-available-components-opamp-supervisor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support retrieval of available components via the OpAMP supervisor.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37247]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions cmd/opampsupervisor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ For a list of open issues related to the Supervisor, see [these issues](https://
| AcceptsRestartCommand | <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21077> |
| ReportsHealth | ⚠️ |
| ReportsRemoteConfig | <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21079> |
| ReportsAvailableComponents | <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37247> |

### Supervisor specification features

Expand Down
4 changes: 3 additions & 1 deletion cmd/opampsupervisor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsuperv

go 1.22.0

replace github.com/open-telemetry/opamp-go => /Users/ian/git/opamp-go

require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/google/uuid v1.6.0
Expand All @@ -10,7 +12,7 @@ require (
github.com/knadh/koanf/providers/file v1.1.2
github.com/knadh/koanf/providers/rawbytes v0.1.0
github.com/knadh/koanf/v2 v2.1.2
github.com/open-telemetry/opamp-go v0.18.0
github.com/open-telemetry/opamp-go v0.18.1-0.20250109233938-e6fac32dddf5
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/config/configopaque v1.23.1-0.20250114172347-71aae791d7f8
go.opentelemetry.io/collector/config/configtls v1.23.1-0.20250114172347-71aae791d7f8
Expand Down
2 changes: 2 additions & 0 deletions cmd/opampsupervisor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Capabilities struct {
ReportsOwnMetrics bool `mapstructure:"reports_own_metrics"`
ReportsHealth bool `mapstructure:"reports_health"`
ReportsRemoteConfig bool `mapstructure:"reports_remote_config"`
ReportsAvailableComponents bool `mapstructure:"reports_available_components"`
}

func (c Capabilities) SupportedCapabilities() protobufs.AgentCapabilities {
Expand Down Expand Up @@ -128,6 +129,10 @@ func (c Capabilities) SupportedCapabilities() protobufs.AgentCapabilities {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings
}

if c.ReportsAvailableComponents {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents
}

return supportedCapabilities
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/opampsupervisor/supervisor/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ func TestCapabilities_SupportedCapabilities(t *testing.T) {
ReportsOwnMetrics: true,
ReportsHealth: true,
ReportsRemoteConfig: true,
ReportsAvailableComponents: true,
},
expectedAgentCapabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus |
protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig |
Expand All @@ -489,7 +490,8 @@ func TestCapabilities_SupportedCapabilities(t *testing.T) {
protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig |
protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings,
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings |
protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
},
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/opampsupervisor/supervisor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type flattenedSettings struct {
onMessage func(conn serverTypes.Connection, message *protobufs.AgentToServer)
onMessage func(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent
onConnecting func(request *http.Request) (shouldConnect bool, rejectStatusCode int)
onConnectionClose func(conn serverTypes.Connection)
endpoint string
Expand Down Expand Up @@ -53,7 +53,7 @@ func (fs flattenedSettings) OnConnected(_ context.Context, _ serverTypes.Connect

func (fs flattenedSettings) OnMessage(_ context.Context, conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if fs.onMessage != nil {
fs.onMessage(conn, message)
return fs.onMessage(conn, message)
}

return &protobufs.ServerToAgent{}
Expand Down
3 changes: 2 additions & 1 deletion cmd/opampsupervisor/supervisor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ func Test_flattenedSettings_OnConnecting(t *testing.T) {
func Test_flattenedSettings_OnMessage(t *testing.T) {
onMessageFuncCalled := false
fs := flattenedSettings{
onMessage: func(_ serverTypes.Connection, _ *protobufs.AgentToServer) {
onMessage: func(_ serverTypes.Connection, _ *protobufs.AgentToServer) *protobufs.ServerToAgent {
onMessageFuncCalled = true
return &protobufs.ServerToAgent{}
},
}

Expand Down
54 changes: 46 additions & 8 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ type Supervisor struct {
// Supervisor's own config.
config config.Supervisor

agentDescription *atomic.Value
agentDescription *atomic.Value
availableComponents *atomic.Value

// Supervisor's persistent state
persistentState *persistentState
Expand Down Expand Up @@ -174,6 +175,7 @@ func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, erro
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentDescription: &atomic.Value{},
availableComponents: &atomic.Value{},
doneChan: make(chan struct{}),
customMessageToServer: make(chan *protobufs.CustomMessage, maxBufferedCustomMessages),
agentConn: &atomic.Value{},
Expand Down Expand Up @@ -302,14 +304,19 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
var connected atomic.Bool

// Start a one-shot server to get the Collector's agent description
// using the Collector's OpAMP extension.
// and available components using the Collector's OpAMP extension.
err = srv.Start(flattenedSettings{
endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort),
onConnecting: func(_ *http.Request) (bool, int) {
connected.Store(true)
return true, http.StatusOK
},
onMessage: func(_ serverTypes.Connection, message *protobufs.AgentToServer) {
onMessage: func(_ serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
response := &protobufs.ServerToAgent{}
if message.GetAvailableComponents() != nil {
s.setAvailableComponents(message.AvailableComponents)
}

if message.AgentDescription != nil {
instanceIDSeen := false
s.setAgentDescription(message.AgentDescription)
Expand All @@ -324,19 +331,39 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
"the Collector's instance ID (%s) does not match with the instance ID set by the Supervisor (%s)",
attr.Value.GetStringValue(),
s.persistentState.InstanceID.String())
return
return response
}
instanceIDSeen = true
}
}

if !instanceIDSeen {
done <- errors.New("the Collector did not specify an instance ID in its AgentDescription message")
return
return response
}
}

done <- nil
// agent description must be defined
_, ok := s.agentDescription.Load().(*protobufs.AgentDescription)
if !ok {
return response
}

availableComponents, availableComponentsOk := s.availableComponents.Load().(*protobufs.AvailableComponents)
if availableComponentsOk {
// must have a full list of components if available components have been reported
if availableComponents.GetComponents() != nil {
done <- nil
} else {
// if we don't have a full component list, ask for it
response.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
}
return response
}

// if available components have not been reported, agent description is sufficient
done <- nil
return response
},
}.toServerSettings())
if err != nil {
Expand Down Expand Up @@ -456,6 +483,10 @@ func (s *Supervisor) startOpAMPClient() error {
return err
}

if ac, ok := s.availableComponents.Load().(*protobufs.AvailableComponents); ok {
settings.AvailableComponents = ac
}

s.logger.Debug("Starting OpAMP client...")
if err = s.opampClient.Start(context.Background(), settings); err != nil {
return err
Expand Down Expand Up @@ -505,7 +536,7 @@ func (s *Supervisor) startOpAMPServer() error {
return nil
}

func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, message *protobufs.AgentToServer) {
func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
s.agentConn.Store(conn)

s.logger.Debug("Received OpAMP message from the agent")
Expand Down Expand Up @@ -551,6 +582,8 @@ func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, messag
s.logger.Debug("Received health status from agent", zap.Bool("healthy", message.Health.Healthy))
s.lastHealthFromClient = message.Health
}

return &protobufs.ServerToAgent{}
}

func (s *Supervisor) forwardCustomMessagesToServerLoop() {
Expand Down Expand Up @@ -584,6 +617,11 @@ func (s *Supervisor) setAgentDescription(ad *protobufs.AgentDescription) {
s.agentDescription.Store(ad)
}

// setAvailableComponents sets the available components of the OpAMP agent
func (s *Supervisor) setAvailableComponents(ac *protobufs.AvailableComponents) {
s.availableComponents.Store(ac)
}

// applyKeyValueOverrides merges the overrides map into the array of key value pairs.
// If a key from overrides already exists in the array of key value pairs, it is overwritten by the value from the overrides map.
// An array of KeyValue pair is returned, with each key value pair having a distinct key.
Expand Down Expand Up @@ -1022,7 +1060,7 @@ func (s *Supervisor) startAgent() (agentStartStatus, error) {
err := s.commander.Start(context.Background())
if err != nil {
s.logger.Error("Cannot start the agent", zap.Error(err))
startErr := fmt.Errorf("Cannot start the agent: %w", err)
startErr := fmt.Errorf("cannot start the agent: %w", err)
err = s.opampClient.SetHealth(&protobufs.ComponentHealth{Healthy: false, LastError: startErr.Error()})
if err != nil {
s.logger.Error("Failed to report OpAMP client health", zap.Error(err))
Expand Down

0 comments on commit 9cf3185

Please sign in to comment.