Skip to content

Commit

Permalink
Send AgentInfo on component startup (#93)
Browse files Browse the repository at this point in the history
Now, AgentInfo can be sent during component startup along with the connection information. 
The components no longer report the version on CheckinObservedVersionInfo.
The V2 client constructors either take an AgentInfo, or it is read from the StartUpInfo.
  • Loading branch information
AndersonQ authored Jan 16, 2024
1 parent 608201a commit a38734b
Show file tree
Hide file tree
Showing 12 changed files with 389 additions and 343 deletions.
4 changes: 3 additions & 1 deletion dev-tools/v2tool/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/elastic/elastic-agent-client/v7/dev-tools/v2tool

go 1.18
go 1.21

toolchain go1.21.1

replace github.com/elastic/elastic-agent-client/v7 => ../../

Expand Down
6 changes: 3 additions & 3 deletions dev-tools/v2tool/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ func newUnit(expected *proto.UnitExpectedConfig, rules RulesCfg) (Unit, error) {
return Unit{State: expectedStart, Rules: rules}, nil
}

// Checkin checkin is called every time the Mock V2 server performs a client checkin.
// Checkin is called every time the Mock V2 server performs a client checkin.
// It updates the units based on the current state, and returns a new expected config
func (in *InputManager) Checkin(observed *proto.CheckinObserved, started time.Time) *proto.CheckinExpected {
base := &proto.CheckinExpected{
AgentInfo: &proto.CheckinAgentInfo{
AgentInfo: &proto.AgentInfo{
Id: "test-agent",
Version: "8.4.0",
Snapshot: true,
Expand Down Expand Up @@ -230,7 +230,7 @@ func generateUnitExpected(cfg *proto.UnitExpectedConfig) *proto.UnitExpected {
// helper to form an entire CheckinExpected structure
func createUnitsWithState(state proto.State, input *proto.UnitExpectedConfig, inID string, stateIndex uint64) *proto.CheckinExpected {
return &proto.CheckinExpected{
AgentInfo: &proto.CheckinAgentInfo{
AgentInfo: &proto.AgentInfo{
Id: "test-agent",
Version: "8.4.0",
Snapshot: true,
Expand Down
31 changes: 20 additions & 11 deletions elastic-agent-client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,16 @@ message UnitExpected {
UnitLogLevel log_level = 6;
}

// Only provided on first checkin expected response to the component.
// Agent information that the component might want to use for its events,
// including the package version, which components should report instead
// of their own version.
//
// Includes the agent information that the component might want to use for its events.
message CheckinAgentInfo {
// Sent on component start up as part of StartUpInfo and on the first checkin
// expected response to the component.
message AgentInfo {
// ID is the Elastic Agent's unique ID.
string id = 1;
// Version is the version of the running Elastic Agent.
// Version is the package version of the running Elastic Agent.
string version = 2;
// Snapshot is true when the running Elastic Agent is a snapshot version.
bool snapshot = 3;
Expand Down Expand Up @@ -269,7 +272,7 @@ message CheckinExpected {
// once their observed state has also been repoted as STOPPED to allow for graceful shutdown.
repeated UnitExpected units = 1;
// Agent info is provided only on first CheckinExpected response to the component.
CheckinAgentInfo agent_info = 2;
AgentInfo agent_info = 2;

// Features are the expected feature flags configurations. Can apply to either components or
// individual units depending on the flag and its implementation. Omitted if the client reports
Expand Down Expand Up @@ -323,10 +326,10 @@ message UnitObserved {
message CheckinObservedVersionInfo {
// Name of the binary.
string name = 1;
// Version of the binary.
string version = 2;
// Additional metadata about the binary.
map<string, string> meta = 3;
// VCS commit hash of the binary.
string build_hash = 4;
}

// Observed statuses and configuration for defined units.
Expand Down Expand Up @@ -455,10 +458,12 @@ enum ConnInfoServices {
Log = 4;
}

// Connection information sent to the application on startup so it knows how to connect back to the Elastic Agent.
// Information sent to component on startup containing the necessary information
// for the component to connect back to the Elastic Agent and the agent details.
//
// This is normally sent through stdin and should never be sent across a network un-encrypted.
message ConnInfo {
// This is normally sent through stdin and should never be sent across a network
// un-encrypted.
message StartUpInfo {
// GRPC connection address.
string addr = 1;
// Server name to use when connecting over TLS.
Expand All @@ -475,6 +480,10 @@ message ConnInfo {
repeated ConnInfoServices services = 7;
// Supports provides information to the client about extra features this server supports.
repeated ConnectionSupports supports = 8;
// Maximum message size that the client can use.
// Maximum message size that the client can use (in bytes).
uint32 max_message_size = 9;
// Agent information, including the agent package version, which should be
// presented in user-visible fields and messages instead of the build
// version of the running component.
AgentInfo agent_info = 10;
}
25 changes: 17 additions & 8 deletions pkg/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ import (
"sync"
"time"

"github.com/elastic/elastic-agent-client/v7/pkg/client/chunk"
"github.com/elastic/elastic-agent-libs/api/npipe"

"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/elastic-agent-client/v7/pkg/client/chunk"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-client/v7/pkg/utils"
"github.com/elastic/elastic-agent-libs/api/npipe"
)

// DefaultMaxMessageSize is the maximum message size that is allowed to be sent.
Expand Down Expand Up @@ -150,10 +149,10 @@ type AgentInfo struct {
type VersionInfo struct {
// Name is the name of the program.
Name string
// Version is the current version of the program.
Version string
// Meta is any extra metadata information about the version.
Meta map[string]string
// BuildHash is the VCS commit hash the program was built from.
BuildHash string
}

// V2 manages the state and communication to the Elastic Agent over the V2 control protocol.
Expand Down Expand Up @@ -192,6 +191,7 @@ type v2options struct {
maxMessageSize int
chunkingAllowed bool
dialOptions []grpc.DialOption
agentInfo *AgentInfo
}

// DialOptions returns the dial options for the GRPC connection.
Expand Down Expand Up @@ -226,6 +226,14 @@ func WithGRPCDialOptions(opts ...grpc.DialOption) V2ClientOption {
}
}

// WithAgentInfo sets the AgentInfo and updates the client's VersionInfo.Version
// to match the given agentInfo.Version.
func WithAgentInfo(agentInfo AgentInfo) V2ClientOption {
return func(o *v2options) {
o.agentInfo = &agentInfo
}
}

// clientV2 manages the state and communication to the Elastic Agent over the V2 control protocol.
type clientV2 struct {
target string
Expand Down Expand Up @@ -292,6 +300,7 @@ func NewV2(target string, token string, versionInfo VersionInfo, opts ...V2Clien
}

c := &clientV2{
agentInfo: options.agentInfo,
target: target,
opts: options,
token: token,
Expand Down Expand Up @@ -540,9 +549,9 @@ func (c *clientV2) sendObserved(client proto.ElasticAgent_CheckinV2Client) error
}
if !c.versionInfoSent {
msg.VersionInfo = &proto.CheckinObservedVersionInfo{
Name: c.versionInfo.Name,
Version: c.versionInfo.Version,
Meta: c.versionInfo.Meta,
Name: c.versionInfo.Name,
Meta: c.versionInfo.Meta,
BuildHash: c.versionInfo.BuildHash,
}
// supports information is sent when version information is set,
// this ensures that its always sent once per connected loop
Expand Down
8 changes: 2 additions & 6 deletions pkg/client/client_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,9 @@ func testClientV2CheckinInitial(t *testing.T, localRPC string, serverCreds, clie
if observed.Token == token {
gotValid = true
reportedVersion.Name = observed.VersionInfo.Name
reportedVersion.Version = observed.VersionInfo.Version
reportedVersion.Meta = observed.VersionInfo.Meta
return &proto.CheckinExpected{
AgentInfo: &proto.CheckinAgentInfo{
AgentInfo: &proto.AgentInfo{
Id: "elastic-agent-id",
Version: "8.5.0",
Snapshot: true,
Expand Down Expand Up @@ -253,10 +252,8 @@ func testClientV2CheckinInitial(t *testing.T, localRPC string, serverCreds, clie
var errs2 []error
ctx, cancel = context.WithCancel(context.Background())
defer cancel()

validClient := NewV2(srv.GetTarget(), token, VersionInfo{
Name: "program",
Version: "v1.0.0",
Name: "program",
Meta: map[string]string{
"key": "value",
},
Expand Down Expand Up @@ -325,7 +322,6 @@ func testClientV2CheckinInitial(t *testing.T, localRPC string, serverCreds, clie
assert.Equal(t, units[1].Type(), UnitTypeInput)

assert.Equal(t, reportedVersion.Name, "program")
assert.Equal(t, reportedVersion.Version, "v1.0.0")
assert.Equal(t, reportedVersion.Meta, map[string]string{
"key": "value",
})
Expand Down
40 changes: 25 additions & 15 deletions pkg/client/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/x509"
"errors"
"io"
"io/ioutil"

protobuf "github.com/golang/protobuf/proto"
"google.golang.org/grpc"
Expand Down Expand Up @@ -38,8 +39,8 @@ const (

// NewFromReader creates a new client reading the connection information from the io.Reader.
func NewFromReader(reader io.Reader, impl StateInterface, actions ...Action) (Client, error) {
connInfo := &proto.ConnInfo{}
data, err := io.ReadAll(reader)
connInfo := &proto.StartUpInfo{}
data, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
}
Expand All @@ -63,46 +64,55 @@ func NewFromReader(reader io.Reader, impl StateInterface, actions ...Action) (Cl

// NewV2FromReader creates a new V2 client reading the connection information from the io.Reader.
func NewV2FromReader(reader io.Reader, ver VersionInfo, opts ...V2ClientOption) (V2, []Service, error) {
connInfo := &proto.ConnInfo{}
info := &proto.StartUpInfo{}
data, err := io.ReadAll(reader)
if err != nil {
return nil, nil, err
}
err = protobuf.Unmarshal(data, connInfo)
err = protobuf.Unmarshal(data, info)
if err != nil {
return nil, nil, err
}
if connInfo.Services == nil {

if info.AgentInfo != nil {
opts = append(opts, WithAgentInfo(AgentInfo{
ID: info.AgentInfo.Id,
Version: info.AgentInfo.Version,
Snapshot: info.AgentInfo.Snapshot,
}))
}

if info.Services == nil {
return nil, []Service{ServiceCheckin}, ErrV2Unavailable
}
cert, err := tls.X509KeyPair(connInfo.PeerCert, connInfo.PeerKey)
cert, err := tls.X509KeyPair(info.PeerCert, info.PeerKey)
if err != nil {
return nil, nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(connInfo.CaCert)
caCertPool.AppendCertsFromPEM(info.CaCert)
trans := credentials.NewTLS(&tls.Config{
ServerName: connInfo.ServerName,
ServerName: info.ServerName,
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
})
for _, s := range connInfo.Supports {
for _, s := range info.Supports {
if s == proto.ConnectionSupports_CheckinChunking {
opts = append(opts, WithChunking(true))
}
}
if connInfo.MaxMessageSize > 0 {
opts = append(opts, WithMaxMessageSize(int(connInfo.MaxMessageSize)))
if info.MaxMessageSize > 0 {
opts = append(opts, WithMaxMessageSize(int(info.MaxMessageSize)))
}
opts = append(opts, WithGRPCDialOptions(grpc.WithTransportCredentials(trans)))
client := NewV2(
connInfo.Addr,
connInfo.Token,
info.Addr,
info.Token,
ver,
opts...,
)
services := make([]Service, 0, len(connInfo.Services))
for _, srv := range connInfo.Services {
services := make([]Service, 0, len(info.Services))
for _, srv := range info.Services {
services = append(services, Service(srv))
}
return client, services, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestNewFromReader_Connects(t *testing.T) {
require.NoError(t, srv.Start(grpc.Creds(creds)))
defer srv.Stop()

connInfo := &proto.ConnInfo{
connInfo := &proto.StartUpInfo{
Addr: fmt.Sprintf(":%d", srv.Port),
ServerName: "localhost",
Token: token,
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestNewV2FromReader_Services(t *testing.T) {
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(ca.caPEM)

connInfo := &proto.ConnInfo{
connInfo := &proto.StartUpInfo{
Addr: ":7777",
ServerName: "localhost",
Token: token,
Expand Down
2 changes: 1 addition & 1 deletion pkg/proto/elastic-agent-client-deprecated.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/proto/elastic-agent-client-future.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/proto/elastic-agent-client-future_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a38734b

Please sign in to comment.