From aae384e9263be642ff33656f9e8c6598a1572e5b Mon Sep 17 00:00:00 2001 From: Deshi Xiao Date: Fri, 22 Nov 2024 11:54:18 +0800 Subject: [PATCH 1/5] Fix issues with defragment and alarm clear on etcd startup * Use clientv3.NewCtxClient instead of New to avoid automatic retry of all RPCs * Only timeout status requests; allow defrag and alarm clear requests to run to completion. * Only clear alarms on the local cluster member, not ALL cluster members Signed-off-by: Brad Davidson (cherry picked from commit 095e34d) Signed-off-by: Brad Davidson porting by Signed-off-by: Deshi Xiao --- go.mod | 4 +- pkg/etcd/etcd.go | 173 ++++++++++++++++++++++++++----------------- pkg/etcd/resolver.go | 80 ++++++++++++++++++++ pkg/etcd/snapshot.go | 4 +- 4 files changed, 190 insertions(+), 71 deletions(-) create mode 100644 pkg/etcd/resolver.go diff --git a/go.mod b/go.mod index 22038d37..73d903dd 100644 --- a/go.mod +++ b/go.mod @@ -129,9 +129,11 @@ require ( github.com/urfave/cli v1.22.15 github.com/yl2chen/cidranger v1.0.2 go.etcd.io/etcd/api/v3 v3.5.16 + go.etcd.io/etcd/client/pkg/v3 v3.5.16 go.etcd.io/etcd/client/v3 v3.5.16 go.etcd.io/etcd/etcdutl/v3 v3.5.13 go.etcd.io/etcd/server/v3 v3.5.16 + go.uber.org/zap v1.27.0 golang.org/x/crypto v0.27.0 golang.org/x/net v0.29.0 golang.org/x/sys v0.25.0 @@ -413,7 +415,6 @@ require ( github.com/xlab/treeprint v1.2.0 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect go.etcd.io/bbolt v1.3.11 // indirect - go.etcd.io/etcd/client/pkg/v3 v3.5.16 // indirect go.etcd.io/etcd/client/v2 v2.305.16 // indirect go.etcd.io/etcd/pkg/v3 v3.5.16 // indirect go.etcd.io/etcd/raft/v3 v3.5.16 // indirect @@ -435,7 +436,6 @@ require ( go.uber.org/fx v1.20.1 // indirect go.uber.org/mock v0.4.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.20.0 // indirect golang.org/x/oauth2 v0.22.0 // indirect diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index ec0fec08..95cb99d5 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -41,8 +41,15 @@ import ( "github.com/xiaods/k8e/pkg/version" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "go.etcd.io/etcd/client/pkg/v3/logutil" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/credentials" snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot" + "go.etcd.io/etcd/server/v3/etcdserver" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -55,7 +62,7 @@ import ( ) const ( - testTimeout = time.Second * 30 + statusTimeout = time.Second * 30 manageTickerTime = time.Second * 15 learnerMaxStallTime = time.Minute * 5 memberRemovalTimeout = time.Minute * 1 @@ -206,35 +213,40 @@ func (e *ETCD) Test(ctx context.Context) error { return errors.New("etcd datastore is not started") } - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - - endpoints := getEndpoints(e.config) - status, err := e.client.Status(ctx, endpoints[0]) + status, err := e.status(ctx) if err != nil { - return err + return errors.Wrap(err, "failed to get etcd status") + } else if status.IsLearner { + return errors.New("this server has not yet been promoted from learner to voting member") + } else if status.Leader == 0 { + return etcdserver.ErrNoLeader } - if status.IsLearner { - return errors.New("this server has not yet been promoted from learner to voting member") + logrus.Infof("Connected to etcd v%s - datastore using %d of %d bytes", status.Version, status.DbSizeInUse, status.DbSize) + if len(status.Errors) > 0 { + logrus.Warnf("Errors present on etcd cluster: %s", strings.Join(status.Errors, ",")) } + // defrag this node to reclaim freed space from compacted revisions if err := e.defragment(ctx); err != nil { return errors.Wrap(err, "failed to defragment etcd database") } - if err := e.clearAlarms(ctx); err != nil { - return errors.Wrap(err, "failed to report and disarm etcd alarms") + // clear alarms on this node + if err := e.clearAlarms(ctx, status.Header.MemberId); err != nil { + return errors.Wrap(err, "failed to disarm etcd alarms") } - // refresh status to see if any errors remain after clearing alarms - status, err = e.client.Status(ctx, endpoints[0]) + // refresh status - note that errors may remain on other nodes, but this + // should not prevent us from continuing with startup. + status, err = e.status(ctx) if err != nil { - return err + return errors.Wrap(err, "failed to get etcd status") } + logrus.Infof("Datastore using %d of %d bytes after defragment", status.DbSizeInUse, status.DbSize) if len(status.Errors) > 0 { - return fmt.Errorf("etcd cluster errors: %s", strings.Join(status.Errors, ", ")) + logrus.Warnf("Errors present on etcd cluster after defragment: %s", strings.Join(status.Errors, ",")) } members, err := e.client.MemberList(ctx) @@ -242,6 +254,7 @@ func (e *ETCD) Test(ctx context.Context) error { return err } + // Ensure that there is a cluster member with our peerURL and name var memberNameUrls []string for _, member := range members.Members { for _, peerURL := range member.PeerURLs { @@ -253,6 +266,8 @@ func (e *ETCD) Test(ctx context.Context) error { memberNameUrls = append(memberNameUrls, member.Name+"="+member.PeerURLs[0]) } } + + // no matching PeerURL on any Member, return an error that indicates what was expected vs what we found. return &membershipError{members: memberNameUrls, self: e.name + "=" + e.peerURL()} } @@ -523,7 +538,7 @@ func (e *ETCD) startClient(ctx context.Context) error { e.config.Datastore.BackendTLSConfig.CertFile = e.config.Runtime.ClientETCDCert e.config.Datastore.BackendTLSConfig.KeyFile = e.config.Runtime.ClientETCDKey - client, err := getClient(ctx, e.config, endpoints...) + client, conn, err := getClient(ctx, e.config, endpoints...) if err != nil { return err } @@ -531,9 +546,8 @@ func (e *ETCD) startClient(ctx context.Context) error { go func() { <-ctx.Done() - client := e.client e.client = nil - client.Close() + conn.Close() }() return nil @@ -554,11 +568,11 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er return err } - client, err := getClient(clientCtx, e.config, clientURLs...) + client, conn, err := getClient(clientCtx, e.config, clientURLs...) if err != nil { return err } - defer client.Close() + defer conn.Close() for _, member := range memberList.Members { for _, peer := range member.PeerURLs { @@ -725,13 +739,49 @@ func (e *ETCD) infoHandler() http.Handler { // If the runtime config does not list any endpoints, the default endpoint is used. // The returned client should be closed when no longer needed, in order to avoid leaking GRPC // client goroutines. -func getClient(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Client, error) { +func getClient(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Client, *grpc.ClientConn, error) { + logger, err := logutil.CreateDefaultZapLogger(zapcore.DebugLevel) + if err != nil { + return nil, nil, err + } + cfg, err := getClientConfig(ctx, control, endpoints...) if err != nil { - return nil, err + return nil, nil, err + } + + // Set up dialer and resolver options. + // This is normally handled by clientv3.New() but that wraps all the GRPC + // service with retry handlers and uses deprecated grpc.DialContext() which + // tries to establish a connection even when one isn't wanted. + if cfg.DialKeepAliveTime > 0 { + params := keepalive.ClientParameters{ + Time: cfg.DialKeepAliveTime, + Timeout: cfg.DialKeepAliveTimeout, + PermitWithoutStream: cfg.PermitWithoutStream, + } + cfg.DialOptions = append(cfg.DialOptions, grpc.WithKeepaliveParams(params)) + } + if cfg.TLS != nil { + creds := credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials() + cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(creds)) + } else { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(insecure.NewCredentials())) } - return clientv3.New(*cfg) + cfg.DialOptions = append(cfg.DialOptions, grpc.WithResolvers(NewSimpleResolver(cfg.Endpoints[0]))) + target := fmt.Sprintf("%s://%p/%s", scheme, cfg, authority(cfg.Endpoints[0])) + conn, err := grpc.NewClient(target, cfg.DialOptions...) + if err != nil { + return nil, nil, err + } + // Create a new client and wire up the GRPC service interfaces. + // Ref: https://github.com/etcd-io/etcd/blob/v3.5.16/client/v3/client.go#L87 + client := clientv3.NewCtxClient(ctx, clientv3.WithZapLogger(logger.Named(version.Program+"-etcd-client"))) + client.Cluster = clientv3.NewClusterFromClusterClient(etcdserverpb.NewClusterClient(conn), client) + client.KV = clientv3.NewKVFromKVClient(etcdserverpb.NewKVClient(conn), client) + client.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(etcdserverpb.NewMaintenanceClient(conn), client) + return client, conn, nil } // getClientConfig generates an etcd client config connected to the specified endpoints. @@ -851,11 +901,11 @@ func (e *ETCD) migrateFromSQLite(ctx context.Context) error { } defer sqliteClient.Close() - etcdClient, err := getClient(ctx, e.config) + etcdClient, conn, err := getClient(ctx, e.config) if err != nil { return err } - defer etcdClient.Close() + defer conn.Close() values, err := sqliteClient.List(ctx, "/registry/", 0) if err != nil { @@ -984,7 +1034,7 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error { return errors.New("etcd datastore already started") } - client, err := getClient(ctx, e.config) + client, conn, err := getClient(ctx, e.config) if err != nil { return err } @@ -992,9 +1042,8 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error { go func() { <-ctx.Done() - client := e.client e.client = nil - client.Close() + conn.Close() }() if err := cp.Copy(etcdDataDir, tmpDataDir, cp.Options{PreserveOwner: true}); err != nil { @@ -1251,8 +1300,6 @@ func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgre } func (e *ETCD) getETCDStatus(ctx context.Context, url string) (*clientv3.StatusResponse, error) { - ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout) - defer cancel() resp, err := e.client.Status(ctx, url) if err != nil { return resp, errors.Wrap(err, "failed to check etcd member status") @@ -1363,11 +1410,10 @@ func (e *ETCD) setLearnerProgress(ctx context.Context, status *learnerProgress) return err } -// clearAlarms checks for any alarms on the local etcd member. If found, they are -// reported and the alarm state is cleared. -func (e *ETCD) clearAlarms(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() +// clearAlarms checks for any NOSPACE alarms on the local etcd member. +// If found, they are reported and the alarm state is cleared. +// Other alarm types are not handled. +func (e *ETCD) clearAlarms(ctx context.Context, memberID uint64) error { if e.client == nil { return errors.New("etcd client was nil") @@ -1379,22 +1425,36 @@ func (e *ETCD) clearAlarms(ctx context.Context) error { } for _, alarm := range alarmList.Alarms { - logrus.Warnf("Alarm on etcd member %d: %s", alarm.MemberID, alarm.Alarm) - } - - if len(alarmList.Alarms) > 0 { - if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{}); err != nil { - return fmt.Errorf("etcd alarm disarm failed: %v", err) + if alarm.MemberID != memberID { + // ignore alarms on other cluster members, they should manage their own problems + continue + } + if alarm.Alarm == etcdserverpb.AlarmType_NOSPACE { + if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{MemberID: alarm.MemberID, Alarm: alarm.Alarm}); err != nil { + return fmt.Errorf("%s disarm failed: %v", alarm.Alarm, err) + } + logrus.Infof("%s disarmed successfully", alarm.Alarm) + } else { + return fmt.Errorf("%s alarm must be disarmed manually", alarm.Alarm) } - logrus.Infof("Alarms disarmed on etcd server") } return nil } -func (e *ETCD) defragment(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, testTimeout) +// status returns status using the first etcd endpoint. +func (e *ETCD) status(ctx context.Context) (*clientv3.StatusResponse, error) { + if e.client == nil { + return nil, errors.New("etcd client was nil") + } + ctx, cancel := context.WithTimeout(ctx, statusTimeout) defer cancel() + endpoints := getEndpoints(e.config) + return e.client.Status(ctx, endpoints[0]) +} + +// defragment defragments the etcd datastore using the first etcd endpoint +func (e *ETCD) defragment(ctx context.Context) error { if e.client == nil { return errors.New("etcd client was nil") } @@ -1550,11 +1610,11 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error) // GetAPIServerURLsFromETCD will try to fetch the version.Program/apiaddresses key from etcd // and unmarshal it to a list of apiserver endpoints. func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]string, error) { - cl, err := getClient(ctx, cfg) + cl, conn, err := getClient(ctx, cfg) if err != nil { return nil, err } - defer cl.Close() + defer conn.Close() etcdResp, err := cl.KV.Get(ctx, AddressKey) if err != nil { @@ -1576,9 +1636,6 @@ func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]strin // GetMembersClientURLs will list through the member lists in etcd and return // back a combined list of client urls for each member in the cluster func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) { - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - members, err := e.client.MemberList(ctx) if err != nil { return nil, err @@ -1593,24 +1650,6 @@ func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) { return clientURLs, nil } -// GetMembersNames will list through the member lists in etcd and return -// back a combined list of member names -func (e *ETCD) GetMembersNames(ctx context.Context) ([]string, error) { - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - - members, err := e.client.MemberList(ctx) - if err != nil { - return nil, err - } - - var memberNames []string - for _, member := range members.Members { - memberNames = append(memberNames, member.Name) - } - return memberNames, nil -} - // RemoveSelf will remove the member if it exists in the cluster. This should // only be called on a node that may have previously run etcd, but will not // currently run etcd, to ensure that it is not a member of the cluster. diff --git a/pkg/etcd/resolver.go b/pkg/etcd/resolver.go new file mode 100644 index 00000000..b95242cb --- /dev/null +++ b/pkg/etcd/resolver.go @@ -0,0 +1,80 @@ +package etcd + +import ( + "net/url" + "path" + "strings" + + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" +) + +const scheme = "etcd-endpoint" + +type EtcdSimpleResolver struct { + *manual.Resolver + endpoint string +} + +// Cribbed from https://github.com/etcd-io/etcd/blob/v3.5.16/client/v3/internal/resolver/resolver.go +// but only supports a single fixed endpoint. We use this instead of the internal etcd client resolver +// because the agent loadbalancer handles failover and we don't want etcd or grpc's special behavior. +func NewSimpleResolver(endpoint string) *EtcdSimpleResolver { + r := manual.NewBuilderWithScheme(scheme) + return &EtcdSimpleResolver{Resolver: r, endpoint: endpoint} +} + +func (r *EtcdSimpleResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + res, err := r.Resolver.Build(target, cc, opts) + if err != nil { + return nil, err + } + + if r.CC != nil { + addr, serverName := interpret(r.endpoint) + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: addr, ServerName: serverName}}, + }) + } + + return res, nil +} + +func interpret(ep string) (string, string) { + if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") { + if strings.HasPrefix(ep, "unix:///") || strings.HasPrefix(ep, "unixs:///") { + _, absolutePath, _ := strings.Cut(ep, "://") + return "unix://" + absolutePath, path.Base(absolutePath) + } + if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "unixs://") { + _, localPath, _ := strings.Cut(ep, "://") + return "unix:" + localPath, path.Base(localPath) + } + _, localPath, _ := strings.Cut(ep, ":") + return "unix:" + localPath, path.Base(localPath) + } + if strings.Contains(ep, "://") { + url, err := url.Parse(ep) + if err != nil { + return ep, ep + } + if url.Scheme == "http" || url.Scheme == "https" { + return url.Host, url.Host + } + return ep, url.Host + } + return ep, ep +} + +func authority(ep string) string { + if _, authority, ok := strings.Cut(ep, "://"); ok { + return authority + } + if suff, ok := strings.CutPrefix(ep, "unix:"); ok { + return suff + } + if suff, ok := strings.CutPrefix(ep, "unixs:"); ok { + return suff + } + return ep +} diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index fb83bdaf..5d5b7da5 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -27,7 +27,7 @@ import ( "github.com/xiaods/k8e/pkg/etcd/snapshot" "github.com/xiaods/k8e/pkg/util" "github.com/xiaods/k8e/pkg/version" - snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot" + snapshotv3 "go.etcd.io/etcd/client/v3/snapshot" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -243,7 +243,7 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) { var sf *snapshot.File - if err := snapshotv3.NewV3(e.client.GetLogger()).Save(ctx, *cfg, snapshotPath); err != nil { + if err := snapshotv3.Save(ctx, e.client.GetLogger(), *cfg, snapshotPath); err != nil { sf = &snapshot.File{ Name: snapshotName, Location: "", From c0bd81899f684c6d977bf55b45e3921f305e465b Mon Sep 17 00:00:00 2001 From: Deshi Xiao Date: Fri, 22 Nov 2024 12:12:04 +0800 Subject: [PATCH 2/5] Add tests for ETCD.Test() Signed-off-by: Brad Davidson (cherry picked from commit a39e191) Signed-off-by: Brad Davidson porting by Signed-off-by: Deshi Xiao --- pkg/etcd/etcd_test.go | 523 ++++++++++++++++++++++++++++++++++++++++-- tests/unit.go | 4 + 2 files changed, 507 insertions(+), 20 deletions(-) diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index a888e117..eabaeaa8 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "path/filepath" + "sync" "testing" "time" @@ -15,11 +16,23 @@ import ( "github.com/xiaods/k8e/pkg/daemons/config" "github.com/xiaods/k8e/pkg/etcd/s3" testutil "github.com/xiaods/k8e/tests" + "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/etcdserver" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" utilnet "k8s.io/apimachinery/pkg/util/net" ) +func init() { + logrus.SetLevel(logrus.DebugLevel) +} + func mustGetAddress() string { ipAddr, err := utilnet.ChooseHostInterface() if err != nil { @@ -75,7 +88,7 @@ func Test_UnitETCD_IsInitialized(t *testing.T) { wantErr bool }{ { - name: "Directory exists", + name: "directory exists", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -94,7 +107,7 @@ func Test_UnitETCD_IsInitialized(t *testing.T) { want: true, }, { - name: "Directory does not exist", + name: "directory does not exist", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -116,9 +129,6 @@ func Test_UnitETCD_IsInitialized(t *testing.T) { }, } - // enable logging - logrus.SetLevel(logrus.DebugLevel) - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { e := NewETCD() @@ -158,7 +168,7 @@ func Test_UnitETCD_Register(t *testing.T) { wantErr bool }{ { - name: "Call Register with standard config", + name: "standard config", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -173,7 +183,7 @@ func Test_UnitETCD_Register(t *testing.T) { }, }, { - name: "Call Register with a tombstone file created", + name: "with a tombstone file created", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -248,7 +258,7 @@ func Test_UnitETCD_Start(t *testing.T) { wantErr bool }{ { - name: "Start etcd without clientAccessInfo and without snapshots", + name: "nil clientAccessInfo and nil cron", fields: fields{ config: generateTestConfig(), address: mustGetAddress(), @@ -265,17 +275,18 @@ func Test_UnitETCD_Start(t *testing.T) { }, teardown: func(e *ETCD, ctxInfo *contextInfo) error { // RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes - if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { - return err - } + err := e.RemoveSelf(ctxInfo.ctx) ctxInfo.cancel() - time.Sleep(10 * time.Second) + time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config) + if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { + return err + } return nil }, }, { - name: "Start etcd without clientAccessInfo on", + name: "nil clientAccessInfo", fields: fields{ config: generateTestConfig(), address: mustGetAddress(), @@ -292,17 +303,18 @@ func Test_UnitETCD_Start(t *testing.T) { }, teardown: func(e *ETCD, ctxInfo *contextInfo) error { // RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes - if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { - return err - } + err := e.RemoveSelf(ctxInfo.ctx) ctxInfo.cancel() time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config) + if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { + return err + } return nil }, }, { - name: "Start etcd with an existing cluster", + name: "existing cluster", fields: fields{ config: generateTestConfig(), address: mustGetAddress(), @@ -321,13 +333,14 @@ func Test_UnitETCD_Start(t *testing.T) { }, teardown: func(e *ETCD, ctxInfo *contextInfo) error { // RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes - if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { - return err - } + err := e.RemoveSelf(ctxInfo.ctx) ctxInfo.cancel() time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config) os.Remove(walDir(e.config)) + if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { + return err + } return nil }, }, @@ -352,8 +365,478 @@ func Test_UnitETCD_Start(t *testing.T) { } if err := tt.teardown(e, &tt.fields.context); err != nil { t.Errorf("Teardown for ETCD.Start() failed = %v", err) + } + }) + } +} + +func Test_UnitETCD_Test(t *testing.T) { + type contextInfo struct { + ctx context.Context + cancel context.CancelFunc + } + type fields struct { + context contextInfo + client *clientv3.Client + config *config.Control + name string + address string + } + type args struct { + clientAccessInfo *clientaccess.Info + } + tests := []struct { + name string + fields fields + setup func(e *ETCD, ctxInfo *contextInfo) error + teardown func(e *ETCD, ctxInfo *contextInfo) error + wantErr bool + }{ + { + name: "no server running", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "unreachable server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + e.config.Runtime.EtcdConfig.Endpoints = []string{"https://192.0.2.0:2379"} // RFC5737 + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "learner server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, true, false, false, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "corrupt server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, true, false, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "leaderless server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, false, true, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "normal server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, false, false, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: false, + }, + { + name: "alarm on other server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + extraAlarm := &etcdserverpb.AlarmMember{MemberID: 2, Alarm: etcdserverpb.AlarmType_NOSPACE} + if err := startMock(ctxInfo.ctx, e, false, false, false, time.Second, extraAlarm); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: false, + }, + { + name: "slow defrag", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, false, false, 40*time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &ETCD{ + client: tt.fields.client, + config: tt.fields.config, + name: tt.fields.name, + address: tt.fields.address, + } + + if err := tt.setup(e, &tt.fields.context); err != nil { + t.Errorf("Setup for ETCD.Test() failed = %v", err) return } + start := time.Now() + err := e.Test(tt.fields.context.ctx) + duration := time.Now().Sub(start) + t.Logf("ETCD.Test() completed in %v with err=%v", duration, err) + if (err != nil) != tt.wantErr { + t.Errorf("ETCD.Test() error = %v, wantErr %v", err, tt.wantErr) + } + if err := tt.teardown(e, &tt.fields.context); err != nil { + t.Errorf("Teardown for ETCD.Test() failed = %v", err) + } }) } } + +// startMock starts up a mock etcd grpc service with canned responses +// that can be used to test specific scenarios. +func startMock(ctx context.Context, e *ETCD, isLearner, isCorrupt, noLeader bool, defragDelay time.Duration, extraAlarms ...*etcdserverpb.AlarmMember) error { + address := authority(getEndpoints(e.config)[0]) + // listen on endpoint and close listener on context cancel + listener, err := net.Listen("tcp", address) + if err != nil { + return err + } + + // set up tls if enabled + gopts := []grpc.ServerOption{} + if e.config.Datastore.ServerTLSConfig.CertFile != "" && e.config.Datastore.ServerTLSConfig.KeyFile != "" { + creds, err := credentials.NewServerTLSFromFile(e.config.Datastore.ServerTLSConfig.CertFile, e.config.Datastore.ServerTLSConfig.KeyFile) + if err != nil { + return err + } + gopts = append(gopts, grpc.Creds(creds)) + } + server := grpc.NewServer(gopts...) + + mock := &mockEtcd{ + e: e, + mu: &sync.RWMutex{}, + isLearner: isLearner, + isCorrupt: isCorrupt, + noLeader: noLeader, + defragDelay: defragDelay, + extraAlarms: extraAlarms, + } + + // register grpc services + etcdserverpb.RegisterKVServer(server, mock) + etcdserverpb.RegisterClusterServer(server, mock) + etcdserverpb.RegisterMaintenanceServer(server, mock) + + hsrv := health.NewServer() + hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) + healthpb.RegisterHealthServer(server, hsrv) + + reflection.Register(server) + + // shutdown on context cancel + go func() { + <-ctx.Done() + server.GracefulStop() + listener.Close() + }() + + // start serving + go func() { + logrus.Infof("Mock etcd server starting on %s", listener.Addr()) + logrus.Infof("Mock etcd server exited: %v", server.Serve(listener)) + }() + + return nil +} + +type mockEtcd struct { + e *ETCD + mu *sync.RWMutex + calls map[string]int + isLearner bool + isCorrupt bool + noLeader bool + defragDelay time.Duration + extraAlarms []*etcdserverpb.AlarmMember +} + +// increment call counter for this function +func (m *mockEtcd) inc(call string) { + m.mu.Lock() + defer m.mu.Unlock() + if m.calls == nil { + m.calls = map[string]int{} + } + m.calls[call] = m.calls[call] + 1 +} + +// get call counter for this function +func (m *mockEtcd) get(call string) int { + m.mu.RLock() + defer m.mu.RUnlock() + return m.calls[call] +} + +// get alarm list +func (m *mockEtcd) alarms() []*etcdserverpb.AlarmMember { + alarms := m.extraAlarms + if m.get("alarm") < 2 { + // on the first check, return NOSPACE so that we can clear it after defragging + alarms = append(alarms, &etcdserverpb.AlarmMember{ + Alarm: etcdserverpb.AlarmType_NOSPACE, + MemberID: 1, + }) + } + if m.isCorrupt { + // return CORRUPT if so requested + alarms = append(alarms, &etcdserverpb.AlarmMember{ + Alarm: etcdserverpb.AlarmType_CORRUPT, + MemberID: 1, + }) + } + return alarms +} + +// KV mocks +func (m *mockEtcd) Range(context.Context, *etcdserverpb.RangeRequest) (*etcdserverpb.RangeResponse, error) { + m.inc("range") + return nil, unsupported("range") +} +func (m *mockEtcd) Put(context.Context, *etcdserverpb.PutRequest) (*etcdserverpb.PutResponse, error) { + m.inc("put") + return nil, unsupported("put") +} +func (m *mockEtcd) DeleteRange(context.Context, *etcdserverpb.DeleteRangeRequest) (*etcdserverpb.DeleteRangeResponse, error) { + m.inc("deleterange") + return nil, unsupported("deleterange") +} +func (m *mockEtcd) Txn(context.Context, *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) { + m.inc("txn") + return nil, unsupported("txn") +} +func (m *mockEtcd) Compact(context.Context, *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) { + m.inc("compact") + return nil, unsupported("compact") +} + +// Maintenance mocks +func (m *mockEtcd) Alarm(ctx context.Context, r *etcdserverpb.AlarmRequest) (*etcdserverpb.AlarmResponse, error) { + m.inc("alarm") + res := &etcdserverpb.AlarmResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + } + if r.Action == etcdserverpb.AlarmRequest_GET { + res.Alarms = m.alarms() + } + return res, nil +} +func (m *mockEtcd) Status(context.Context, *etcdserverpb.StatusRequest) (*etcdserverpb.StatusResponse, error) { + m.inc("status") + res := &etcdserverpb.StatusResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + Leader: 1, + Version: "v3.5.0-mock0", + DbSize: 1024, + DbSizeInUse: 512, + IsLearner: m.isLearner, + } + if m.noLeader { + res.Leader = 0 + res.Errors = append(res.Errors, etcdserver.ErrNoLeader.Error()) + } + for _, a := range m.alarms() { + res.Errors = append(res.Errors, a.String()) + } + return res, nil +} +func (m *mockEtcd) Defragment(ctx context.Context, r *etcdserverpb.DefragmentRequest) (*etcdserverpb.DefragmentResponse, error) { + m.inc("defragment") + // delay defrag response by configured time, or until the request is cancelled + select { + case <-ctx.Done(): + case <-time.After(m.defragDelay): + } + return &etcdserverpb.DefragmentResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + }, nil +} +func (m *mockEtcd) Hash(context.Context, *etcdserverpb.HashRequest) (*etcdserverpb.HashResponse, error) { + m.inc("hash") + return nil, unsupported("hash") +} +func (m *mockEtcd) HashKV(context.Context, *etcdserverpb.HashKVRequest) (*etcdserverpb.HashKVResponse, error) { + m.inc("hashkv") + return nil, unsupported("hashkv") +} +func (m *mockEtcd) Snapshot(*etcdserverpb.SnapshotRequest, etcdserverpb.Maintenance_SnapshotServer) error { + m.inc("snapshot") + return unsupported("snapshot") +} +func (m *mockEtcd) MoveLeader(context.Context, *etcdserverpb.MoveLeaderRequest) (*etcdserverpb.MoveLeaderResponse, error) { + m.inc("moveleader") + return nil, unsupported("moveleader") +} +func (m *mockEtcd) Downgrade(context.Context, *etcdserverpb.DowngradeRequest) (*etcdserverpb.DowngradeResponse, error) { + m.inc("downgrade") + return nil, unsupported("downgrade") +} + +// Cluster mocks +func (m *mockEtcd) MemberAdd(context.Context, *etcdserverpb.MemberAddRequest) (*etcdserverpb.MemberAddResponse, error) { + m.inc("memberadd") + return nil, unsupported("memberadd") +} +func (m *mockEtcd) MemberRemove(context.Context, *etcdserverpb.MemberRemoveRequest) (*etcdserverpb.MemberRemoveResponse, error) { + m.inc("memberremove") + return nil, etcdserver.ErrNotEnoughStartedMembers +} +func (m *mockEtcd) MemberUpdate(context.Context, *etcdserverpb.MemberUpdateRequest) (*etcdserverpb.MemberUpdateResponse, error) { + m.inc("memberupdate") + return nil, unsupported("memberupdate") +} +func (m *mockEtcd) MemberList(context.Context, *etcdserverpb.MemberListRequest) (*etcdserverpb.MemberListResponse, error) { + m.inc("memberlist") + scheme := "http" + if m.e.config.Datastore.ServerTLSConfig.CertFile != "" { + scheme = "https" + } + + return &etcdserverpb.MemberListResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + Members: []*etcdserverpb.Member{ + { + ID: 1, + Name: m.e.name, + IsLearner: m.isLearner, + ClientURLs: []string{scheme + "://127.0.0.1:2379"}, + PeerURLs: []string{scheme + "://" + m.e.address + ":2380"}, + }, + }, + }, nil +} + +func (m *mockEtcd) MemberPromote(context.Context, *etcdserverpb.MemberPromoteRequest) (*etcdserverpb.MemberPromoteResponse, error) { + m.inc("memberpromote") + return nil, unsupported("memberpromote") +} + +func unsupported(field string) error { + return status.New(codes.Unimplemented, field+" is not implemented").Err() +} diff --git a/tests/unit.go b/tests/unit.go index c6aa3f50..458a3e6e 100644 --- a/tests/unit.go +++ b/tests/unit.go @@ -54,6 +54,10 @@ func GenerateRuntime(cnf *config.Control) error { deps.CreateRuntimeCertFiles(cnf) + cnf.Datastore.ServerTLSConfig.CAFile = cnf.Runtime.ETCDServerCA + cnf.Datastore.ServerTLSConfig.CertFile = cnf.Runtime.ServerETCDCert + cnf.Datastore.ServerTLSConfig.KeyFile = cnf.Runtime.ServerETCDKey + return deps.GenServerDeps(cnf) } From a7c343d2ed49e2387504274d587efef469267b0f Mon Sep 17 00:00:00 2001 From: Deshi Xiao Date: Fri, 22 Nov 2024 12:15:59 +0800 Subject: [PATCH 3/5] Set kine EmulatedETCDVersion from embedded etcd version Signed-off-by: Brad Davidson (cherry picked from commit bc60ff7) Signed-off-by: Brad Davidson porting by Signed-off-by: Deshi Xiao --- pkg/cli/server/server.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index 81bb0b44..2826153e 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -32,6 +32,7 @@ import ( "github.com/xiaods/k8e/pkg/util" "github.com/xiaods/k8e/pkg/version" "github.com/xiaods/k8e/pkg/vpn" + etcdversion "go.etcd.io/etcd/api/v3/version" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilnet "k8s.io/apimachinery/pkg/util/net" kubeapiserverflag "k8s.io/component-base/cli/flag" @@ -146,6 +147,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont serverConfig.ControlConfig.ExtraSchedulerAPIArgs = cfg.ExtraSchedulerArgs serverConfig.ControlConfig.ClusterDomain = cfg.ClusterDomain serverConfig.ControlConfig.Datastore.NotifyInterval = 5 * time.Second + serverConfig.ControlConfig.Datastore.EmulatedETCDVersion = etcdversion.Version serverConfig.ControlConfig.Datastore.Endpoint = cfg.DatastoreEndpoint serverConfig.ControlConfig.Datastore.BackendTLSConfig.CAFile = cfg.DatastoreCAFile serverConfig.ControlConfig.Datastore.BackendTLSConfig.CertFile = cfg.DatastoreCertFile From f210c48232b77ce1efd9e4079913d6dcca2b1aa8 Mon Sep 17 00:00:00 2001 From: Deshi Xiao Date: Fri, 22 Nov 2024 12:23:47 +0800 Subject: [PATCH 4/5] Add nonroot-devices flag to agent CLI Add new flag that is passed through to the device_ownership_from_security_context parameter in the containerd CRI config. This is not possible to change without providing a complete custom containerd.toml template so we should add a flag for it. Signed-off-by: Brad Davidson (cherry picked from commit 56fb3b0) Signed-off-by: Brad Davidson porting by Signed-off-by: Deshi Xiao --- pkg/agent/config/config.go | 1 + pkg/agent/containerd/config_linux.go | 1 + pkg/cli/cmds/agent.go | 7 +++++ pkg/cli/cmds/server.go | 1 + pkg/daemons/config/types.go | 43 ++++++++++++++-------------- 5 files changed, 32 insertions(+), 21 deletions(-) diff --git a/pkg/agent/config/config.go b/pkg/agent/config/config.go index 01e415fe..1aedac64 100644 --- a/pkg/agent/config/config.go +++ b/pkg/agent/config/config.go @@ -552,6 +552,7 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N nodeConfig.Containerd.Log = filepath.Join(envInfo.DataDir, "agent", "containerd", "containerd.log") nodeConfig.Containerd.Registry = filepath.Join(envInfo.DataDir, "agent", "etc", "containerd", "certs.d") nodeConfig.Containerd.NoDefault = envInfo.ContainerdNoDefault + nodeConfig.Containerd.NonrootDevices = envInfo.ContainerdNonrootDevices nodeConfig.Containerd.Debug = envInfo.Debug applyContainerdStateAndAddress(nodeConfig) applyCRIDockerdAddress(nodeConfig) diff --git a/pkg/agent/containerd/config_linux.go b/pkg/agent/containerd/config_linux.go index 063634bc..a80efa05 100644 --- a/pkg/agent/containerd/config_linux.go +++ b/pkg/agent/containerd/config_linux.go @@ -73,6 +73,7 @@ func SetupContainerdConfig(cfg *config.Node) error { SystemdCgroup: cfg.AgentConfig.Systemd, IsRunningInUserNS: isRunningInUserNS, EnableUnprivileged: kernel.CheckKernelVersion(4, 11, 0), + NonrootDevices: cfg.Containerd.NonrootDevices, PrivateRegistryConfig: cfg.AgentConfig.Registry, ExtraRuntimes: extraRuntimes, Program: version.Program, diff --git a/pkg/cli/cmds/agent.go b/pkg/cli/cmds/agent.go index 5edce355..dc0a08ea 100644 --- a/pkg/cli/cmds/agent.go +++ b/pkg/cli/cmds/agent.go @@ -29,6 +29,7 @@ type Agent struct { Snapshotter string Docker bool ContainerdNoDefault bool + ContainerdNonrootDevices bool ContainerRuntimeEndpoint string DefaultRuntime string ImageServiceEndpoint string @@ -215,6 +216,11 @@ var ( Usage: "(agent/containerd) Disables containerd's fallback default registry endpoint when a mirror is configured for that registry", Destination: &AgentConfig.ContainerdNoDefault, } + NonrootDevicesFlag = &cli.BoolFlag{ + Name: "nonroot-devices", + Usage: "(agent/containerd) Allows non-root pods to access devices by setting device_ownership_from_security_context=true in the containerd CRI config", + Destination: &AgentConfig.ContainerdNonrootDevices, + } EnablePProfFlag = &cli.BoolFlag{ Name: "enable-pprof", Usage: "(experimental) Enable pprof endpoint on supervisor port", @@ -278,6 +284,7 @@ func NewAgentCommand(action func(ctx *cli.Context) error) cli.Command { SnapshotterFlag, PrivateRegistryFlag, DisableDefaultRegistryEndpointFlag, + NonrootDevicesFlag, AirgapExtraRegistryFlag, NodeIPFlag, BindAddressFlag, diff --git a/pkg/cli/cmds/server.go b/pkg/cli/cmds/server.go index 1e0a4c4b..475d412b 100644 --- a/pkg/cli/cmds/server.go +++ b/pkg/cli/cmds/server.go @@ -490,6 +490,7 @@ var ServerFlags = []cli.Flag{ DefaultRuntimeFlag, ImageServiceEndpointFlag, DisableDefaultRegistryEndpointFlag, + NonrootDevicesFlag, PauseImageFlag, SnapshotterFlag, PrivateRegistryFlag, diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index 6691a069..d8033bbf 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -9,11 +9,11 @@ import ( "strings" "sync" - "github.com/xiaods/k8e/pkg/generated/controllers/k8e.cattle.io" "github.com/k3s-io/kine/pkg/endpoint" "github.com/rancher/wharfie/pkg/registries" "github.com/rancher/wrangler/v3/pkg/generated/controllers/core" "github.com/rancher/wrangler/v3/pkg/leader" + "github.com/xiaods/k8e/pkg/generated/controllers/k8e.cattle.io" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apiserver/pkg/authentication/authenticator" @@ -22,12 +22,12 @@ import ( ) const ( - EgressSelectorModeAgent = "agent" - EgressSelectorModeCluster = "cluster" - EgressSelectorModeDisabled = "disabled" - EgressSelectorModePod = "pod" - CertificateRenewDays = 90 - StreamServerPort = "10010" + EgressSelectorModeAgent = "agent" + EgressSelectorModeCluster = "cluster" + EgressSelectorModeDisabled = "disabled" + EgressSelectorModePod = "pod" + CertificateRenewDays = 90 + StreamServerPort = "10010" ) type Node struct { @@ -66,19 +66,20 @@ type EtcdS3 struct { } type Containerd struct { - Address string - Log string - Root string - State string - Config string - Opt string - Template string - BlockIOConfig string - RDTConfig string - Registry string - NoDefault bool - SELinux bool - Debug bool + Address string + Log string + Root string + State string + Config string + Opt string + Template string + BlockIOConfig string + RDTConfig string + Registry string + NoDefault bool + NonrootDevices bool + SELinux bool + Debug bool } type CRIDockerd struct { @@ -450,4 +451,4 @@ func GetArgs(initialArgs map[string]string, extraArgs []string) []string { } return args -} \ No newline at end of file +} From 94d2940429788c84cd0523030ecfccfde533071a Mon Sep 17 00:00:00 2001 From: Deshi Xiao Date: Fri, 22 Nov 2024 12:28:16 +0800 Subject: [PATCH 5/5] Fix MustFindString returning override flags on external CLI commands External CLI actions cannot short-circuit on --help or --version, so we cannot skip loading the config file if these flags are present when running these wrapped commands. The behavior of just returning the override flag name instead of the requested flag value was breaking data-dir lookup when running wrapped commands. Signed-off-by: Brad Davidson (cherry picked from commit ff5c633) Signed-off-by: Brad Davidson porting by Signed-off-by: Deshi Xiao --- cmd/k8e/main.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/cmd/k8e/main.go b/cmd/k8e/main.go index 4cb2ee88..c5f8cf93 100644 --- a/cmd/k8e/main.go +++ b/cmd/k8e/main.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "path/filepath" + "slices" "strconv" "strings" @@ -27,6 +28,7 @@ import ( ) var criDefaultConfigPath = "/etc/crictl.yaml" +var externalCLIActions = []string{"crictl", "ctr", "kubectl"} // main entrypoint for the k8e multicall binary func main() { @@ -106,7 +108,7 @@ func findDebug(args []string) bool { if debug { return debug } - debug, _ = strconv.ParseBool(configfilearg.MustFindString(args, "debug")) + debug, _ = strconv.ParseBool(configfilearg.MustFindString(args, "debug", externalCLIActions...)) return debug } @@ -126,7 +128,7 @@ func findDataDir(args []string) string { if dataDir != "" { return dataDir } - dataDir = configfilearg.MustFindString(args, "data-dir") + dataDir = configfilearg.MustFindString(args, "data-dir", externalCLIActions...) if d, err := datadir.Resolve(dataDir); err == nil { dataDir = d } else { @@ -144,7 +146,7 @@ func findPreferBundledBin(args []string) bool { fs.SetOutput(io.Discard) fs.BoolVar(&preferBundledBin, "prefer-bundled-bin", false, "Prefer bundled binaries") - preferRes := configfilearg.MustFindString(args, "prefer-bundled-bin") + preferRes := configfilearg.MustFindString(args, "prefer-bundled-bin", externalCLIActions...) if preferRes != "" { preferBundledBin, _ = strconv.ParseBool(preferRes) } @@ -159,8 +161,7 @@ func findPreferBundledBin(args []string) bool { // it returns false so that standard CLI wrapping can occur. func runCLIs(dataDir string) bool { progName := filepath.Base(os.Args[0]) - switch progName { - case "crictl", "ctr", "kubectl": + if slices.Contains(externalCLIActions, progName) { if err := externalCLI(progName, dataDir, os.Args[1:]); err != nil && !errors.Is(err, context.Canceled) { logrus.Fatal(err) }