diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 86d3d7e4462f..c7325ce2a778 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,6 +24,9 @@ jobs: strategy: matrix: os: [ubuntu-22.04, actuated-arm64-4cpu-16gb, macos-12, windows-2019] + exclude: + - os: ${{ github.repository != 'containerd/containerd' && 'actuated-arm64-4cpu-16gb' }} + steps: - uses: actions/checkout@v4 @@ -39,7 +42,6 @@ jobs: # project: name: Project Checks - if: github.repository == 'containerd/containerd' runs-on: ubuntu-22.04 timeout-minutes: 5 @@ -52,6 +54,7 @@ jobs: - uses: ./src/github.com/containerd/containerd/.github/actions/install-go - uses: containerd/project-checks@v1.1.0 + if: github.repository == 'containerd/containerd' with: working-directory: src/github.com/containerd/containerd repo-access-token: ${{ secrets.GITHUB_TOKEN }} diff --git a/cmd/containerd-shim-runc-v2/main.go b/cmd/containerd-shim-runc-v2/main.go index d6f11990c211..54b883ccaba7 100644 --- a/cmd/containerd-shim-runc-v2/main.go +++ b/cmd/containerd-shim-runc-v2/main.go @@ -24,6 +24,8 @@ import ( "github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/manager" _ "github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/task/plugin" "github.com/containerd/containerd/v2/pkg/shim" + _ "github.com/containerd/containerd/v2/plugins/events" + _ "github.com/containerd/containerd/v2/plugins/services/events/ttrpc" ) func main() { diff --git a/cmd/containerd-shim-runc-v2/task/plugin/plugin_linux.go b/cmd/containerd-shim-runc-v2/task/plugin/plugin_linux.go index 31edccd443d5..dcc0cf366be2 100644 --- a/cmd/containerd-shim-runc-v2/task/plugin/plugin_linux.go +++ b/cmd/containerd-shim-runc-v2/task/plugin/plugin_linux.go @@ -18,7 +18,7 @@ package plugin import ( "github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/task" - "github.com/containerd/containerd/v2/pkg/shim" + "github.com/containerd/containerd/v2/core/events" "github.com/containerd/containerd/v2/pkg/shutdown" "github.com/containerd/containerd/v2/plugins" "github.com/containerd/plugin" @@ -34,7 +34,7 @@ func init() { plugins.InternalPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { - pp, err := ic.GetByID(plugins.EventPlugin, "publisher") + pp, err := ic.GetByID(plugins.EventPlugin, "exchange") if err != nil { return nil, err } @@ -42,7 +42,7 @@ func init() { if err != nil { return nil, err } - return task.NewTaskService(ic.Context, pp.(shim.Publisher), ss.(shutdown.Service)) + return task.NewTaskService(ic.Context, pp.(events.Publisher), ss.(shutdown.Service)) }, }) diff --git a/cmd/containerd-shim-runc-v2/task/service.go b/cmd/containerd-shim-runc-v2/task/service.go index 666e03603b5c..b62ad3637d7b 100644 --- a/cmd/containerd-shim-runc-v2/task/service.go +++ b/cmd/containerd-shim-runc-v2/task/service.go @@ -32,9 +32,8 @@ import ( "github.com/containerd/containerd/v2/api/types/task" "github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/process" "github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/runc" - "github.com/containerd/containerd/v2/core/runtime" + "github.com/containerd/containerd/v2/core/events" "github.com/containerd/containerd/v2/core/runtime/v2/runc/options" - "github.com/containerd/containerd/v2/pkg/namespaces" "github.com/containerd/containerd/v2/pkg/oom" oomv1 "github.com/containerd/containerd/v2/pkg/oom/v1" oomv2 "github.com/containerd/containerd/v2/pkg/oom/v2" @@ -58,7 +57,7 @@ var ( ) // NewTaskService creates a new instance of a task service -func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TTRPCTaskService, error) { +func NewTaskService(ctx context.Context, publisher events.Publisher, sd shutdown.Service) (taskAPI.TTRPCTaskService, error) { var ( ep oom.Watcher err error @@ -88,7 +87,7 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S if err := s.initPlatform(); err != nil { return nil, fmt.Errorf("failed to initialized platform behavior: %w", err) } - go s.forward(ctx, publisher) + sd.RegisterCallback(func(context.Context) error { close(s.events) return nil @@ -743,18 +742,6 @@ func (s *service) getContainerPids(ctx context.Context, container *runc.Containe return pids, nil } -func (s *service) forward(ctx context.Context, publisher shim.Publisher) { - ns, _ := namespaces.Namespace(ctx) - ctx = namespaces.WithNamespace(context.Background(), ns) - for e := range s.events { - err := publisher.Publish(ctx, runtime.GetTopic(e), e) - if err != nil { - log.G(ctx).WithError(err).Error("post event") - } - } - publisher.Close() -} - func (s *service) getContainer(id string) (*runc.Container, error) { s.mu.Lock() container := s.containers[id] diff --git a/core/events/events.go b/core/events/events.go index ff5642301f40..83acec42b823 100644 --- a/core/events/events.go +++ b/core/events/events.go @@ -18,8 +18,10 @@ package events import ( "context" + "fmt" "time" + "github.com/containerd/log" "github.com/containerd/typeurl/v2" ) @@ -78,3 +80,22 @@ type Forwarder interface { type Subscriber interface { Subscribe(ctx context.Context, filters ...string) (ch <-chan *Envelope, errs <-chan error) } + +// ForwardAll forwards messages from a subscription until an error is received +func ForwardAll(ctx context.Context, f Forwarder, s Subscriber) error { + sub, errs := s.Subscribe(ctx) + for { + select { + case evt := <-sub: + if err := f.Forward(ctx, evt); err != nil { + log.G(ctx).WithError(err).Errorf("event forward error: %v", evt) + } + case err := <-errs: + if err != nil { + return fmt.Errorf("error from subscription: %w", err) + } + return nil + } + + } +} diff --git a/core/runtime/v2/binary.go b/core/runtime/v2/binary.go index e6e8eca4962d..7ecdbb313567 100644 --- a/core/runtime/v2/binary.go +++ b/core/runtime/v2/binary.go @@ -145,9 +145,9 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ } return &shim{ - bundle: b.bundle, - client: conn, - version: params.Version, + bundle: b.bundle, + client: conn, + params: params, }, nil } diff --git a/core/runtime/v2/manager.go b/core/runtime/v2/manager.go index 3e12f37999c2..dfb858c497e3 100644 --- a/core/runtime/v2/manager.go +++ b/core/runtime/v2/manager.go @@ -29,7 +29,9 @@ import ( apitypes "github.com/containerd/containerd/v2/api/types" "github.com/containerd/containerd/v2/core/containers" + "github.com/containerd/containerd/v2/core/events" "github.com/containerd/containerd/v2/core/events/exchange" + eventsproxy "github.com/containerd/containerd/v2/core/events/proxy" "github.com/containerd/containerd/v2/core/metadata" "github.com/containerd/containerd/v2/core/runtime" "github.com/containerd/containerd/v2/core/sandbox" @@ -101,7 +103,7 @@ func init() { return nil, err } - return NewTaskManager(shimManager), nil + return NewTaskManager(ic.Context, shimManager) }, }) @@ -430,10 +432,36 @@ type TaskManager struct { } // NewTaskManager creates a new task manager instance. -func NewTaskManager(shims *ShimManager) *TaskManager { - return &TaskManager{ - manager: shims, +func NewTaskManager(ctx context.Context, shims *ShimManager) (*TaskManager, error) { + manager := &TaskManager{manager: shims} + if err := manager.restoreTasks(ctx); err != nil { + return nil, err } + return manager, nil +} + +func (m *TaskManager) restoreTasks(ctx context.Context) error { + log.G(ctx).Info("restoring task event streamers") + instances, err := m.manager.shims.GetAll(ctx, true) + if err != nil { + return err + } + + // See if any of restored shims supports event streaming + for _, instance := range instances { + shimTask, err := newShimTask(instance) + if err != nil { + log.G(ctx).WithError(err).Error("unable to get shim task") + continue + } + + if err := m.tryStreamEvents(context.Background(), shimTask); err != nil { + log.G(ctx).WithError(err).Errorf("unable to resetore event stream for task %q: %v", instance.ID(), err) + continue + } + } + + return nil } // ID of the task manager @@ -455,6 +483,10 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr return nil, err } + if err := m.tryStreamEvents(ctx, shimTask); err != nil { + return nil, err + } + t, err := shimTask.Create(ctx, opts) if err != nil { // NOTE: ctx contains required namespace information. @@ -481,6 +513,25 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr return t, nil } +// tryStreamEvents streams evetns from shim (if its supported by the shim implementation). +func (m *TaskManager) tryStreamEvents(ctx context.Context, shim *shimTask) error { + log.G(ctx).Debug("try using shim events streaming") + + fwdCtx := log.WithLogger(context.Background(), log.G(ctx)) + go func(client any) { + ep := eventsproxy.NewRemoteEvents(client) + if err := events.ForwardAll(fwdCtx, m.manager.events, ep); err != nil { + if errdefs.IsNotImplemented(err) { + log.G(ctx).WithError(err).Debug("shim does not support event streaming, relying on legacy callback") + } else { + log.G(ctx).WithError(err).Error("failed while forwarding event stream for shim") + } + } + }(shim.Client()) + + return nil +} + // Get a specific task func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { shim, err := m.manager.shims.Get(ctx, id) diff --git a/core/runtime/v2/shim.go b/core/runtime/v2/shim.go index 0cdd18b27d2b..2067b4dc7e5a 100644 --- a/core/runtime/v2/shim.go +++ b/core/runtime/v2/shim.go @@ -114,9 +114,9 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan }() shim := &shim{ - bundle: bundle, - client: conn, - version: params.Version, + bundle: bundle, + client: conn, + params: params, } return shim, nil @@ -359,9 +359,9 @@ func (gc *grpcConn) UserOnCloseWait(ctx context.Context) error { } type shim struct { - bundle *Bundle - client any - version int + bundle *Bundle + client any + params client.BootstrapParams } var _ ShimInstance = (*shim)(nil) @@ -372,7 +372,7 @@ func (s *shim) ID() string { } func (s *shim) Version() int { - return s.version + return s.params.Version } func (s *shim) Namespace() string { diff --git a/pkg/oom/v1/v1.go b/pkg/oom/v1/v1.go index c5423314c785..8407f22b5d5c 100644 --- a/pkg/oom/v1/v1.go +++ b/pkg/oom/v1/v1.go @@ -25,16 +25,16 @@ import ( "github.com/containerd/cgroups/v3/cgroup1" eventstypes "github.com/containerd/containerd/v2/api/events" + "github.com/containerd/containerd/v2/core/events" "github.com/containerd/containerd/v2/core/runtime" "github.com/containerd/containerd/v2/pkg/oom" - "github.com/containerd/containerd/v2/pkg/shim" "github.com/containerd/log" "golang.org/x/sys/unix" ) // New returns an epoll implementation that listens to OOM events // from a container's cgroups. -func New(publisher shim.Publisher) (oom.Watcher, error) { +func New(publisher events.Publisher) (oom.Watcher, error) { fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) if err != nil { return nil, err @@ -51,7 +51,7 @@ type epoller struct { mu sync.Mutex fd int - publisher shim.Publisher + publisher events.Publisher set map[uintptr]*item } diff --git a/pkg/oom/v2/v2.go b/pkg/oom/v2/v2.go index f3dbc5ea52c0..ed7e27f074ae 100644 --- a/pkg/oom/v2/v2.go +++ b/pkg/oom/v2/v2.go @@ -24,15 +24,15 @@ import ( cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2" eventstypes "github.com/containerd/containerd/v2/api/events" + "github.com/containerd/containerd/v2/core/events" "github.com/containerd/containerd/v2/core/runtime" "github.com/containerd/containerd/v2/pkg/oom" - "github.com/containerd/containerd/v2/pkg/shim" "github.com/containerd/log" ) // New returns an implementation that listens to OOM events // from a container's cgroups. -func New(publisher shim.Publisher) (oom.Watcher, error) { +func New(publisher events.Publisher) (oom.Watcher, error) { return &watcher{ itemCh: make(chan item), publisher: publisher, @@ -42,7 +42,7 @@ func New(publisher shim.Publisher) (oom.Watcher, error) { // watcher implementation for handling OOM events from a container's cgroup type watcher struct { itemCh chan item - publisher shim.Publisher + publisher events.Publisher } type item struct { diff --git a/pkg/shim/publisher.go b/pkg/shim/publisher.go deleted file mode 100644 index 882a476f303d..000000000000 --- a/pkg/shim/publisher.go +++ /dev/null @@ -1,170 +0,0 @@ -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package shim - -import ( - "context" - "sync" - "time" - - v1 "github.com/containerd/containerd/v2/api/services/ttrpc/events/v1" - "github.com/containerd/containerd/v2/api/types" - "github.com/containerd/containerd/v2/core/events" - "github.com/containerd/containerd/v2/pkg/namespaces" - "github.com/containerd/containerd/v2/pkg/ttrpcutil" - "github.com/containerd/containerd/v2/protobuf" - "github.com/containerd/log" - "github.com/containerd/ttrpc" -) - -const ( - queueSize = 2048 - maxRequeue = 5 -) - -type item struct { - ev *types.Envelope - ctx context.Context - count int -} - -// NewPublisher creates a new remote events publisher -func NewPublisher(address string) (*RemoteEventsPublisher, error) { - client, err := ttrpcutil.NewClient(address) - if err != nil { - return nil, err - } - - l := &RemoteEventsPublisher{ - client: client, - closed: make(chan struct{}), - requeue: make(chan *item, queueSize), - } - - go l.processQueue() - return l, nil -} - -// RemoteEventsPublisher forwards events to a ttrpc server -type RemoteEventsPublisher struct { - client *ttrpcutil.Client - closed chan struct{} - closer sync.Once - requeue chan *item -} - -// Done returns a channel which closes when done -func (l *RemoteEventsPublisher) Done() <-chan struct{} { - return l.closed -} - -// Close closes the remote connection and closes the done channel -func (l *RemoteEventsPublisher) Close() (err error) { - err = l.client.Close() - l.closer.Do(func() { - close(l.closed) - }) - return err -} - -func (l *RemoteEventsPublisher) processQueue() { - for i := range l.requeue { - if i.count > maxRequeue { - log.L.Errorf("evicting %s from queue because of retry count", i.ev.Topic) - // drop the event - continue - } - - if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil { - log.L.WithError(err).Error("forward event") - l.queue(i) - } - } -} - -func (l *RemoteEventsPublisher) queue(i *item) { - go func() { - i.count++ - // re-queue after a short delay - time.Sleep(time.Duration(1*i.count) * time.Second) - l.requeue <- i - }() -} - -// Publish publishes the event by forwarding it to the configured ttrpc server -func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return err - } - evt, err := protobuf.MarshalAnyToProto(event) - if err != nil { - return err - } - i := &item{ - ev: &types.Envelope{ - Timestamp: protobuf.ToTimestamp(time.Now()), - Namespace: ns, - Topic: topic, - Event: evt, - }, - ctx: ctx, - } - - if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil { - l.queue(i) - return err - } - - return nil -} - -func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error { - service, err := l.client.EventsService() - if err == nil { - fCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - _, err = service.Forward(fCtx, req) - cancel() - if err == nil { - return nil - } - } - - if err != ttrpc.ErrClosed { - return err - } - - // Reconnect and retry request - if err = l.client.Reconnect(); err != nil { - return err - } - - service, err = l.client.EventsService() - if err != nil { - return err - } - - // try again with a fresh context, otherwise we may get a context timeout unexpectedly. - fCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - _, err = service.Forward(fCtx, req) - cancel() - if err != nil { - return err - } - - return nil -} diff --git a/pkg/shim/shim.go b/pkg/shim/shim.go index d8f77e2598b0..0336d0041018 100644 --- a/pkg/shim/shim.go +++ b/pkg/shim/shim.go @@ -247,12 +247,6 @@ func run(ctx context.Context, manager Manager, config Config) error { } ttrpcAddress := os.Getenv(ttrpcAddressEnv) - publisher, err := NewPublisher(ttrpcAddress) - if err != nil { - return err - } - defer publisher.Close() - ctx = namespaces.WithNamespace(ctx, namespaceFlag) ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) ctx, sd := shutdown.WithShutdown(ctx) @@ -324,15 +318,6 @@ func run(ctx context.Context, manager Manager, config Config) error { }, }) - // Register event plugin - registry.Register(&plugin.Registration{ - Type: plugins.EventPlugin, - ID: "publisher", - InitFn: func(ic *plugin.InitContext) (interface{}, error) { - return publisher, nil - }, - }) - var ( initialized = plugin.NewPluginSet() ttrpcServices = []TTRPCService{} diff --git a/plugins/services/events/ttrpc/service.go b/plugins/services/events/ttrpc/service.go new file mode 100644 index 000000000000..885b2869f5c1 --- /dev/null +++ b/plugins/services/events/ttrpc/service.go @@ -0,0 +1,122 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package eventsttrpc + +import ( + "context" + "fmt" + + api "github.com/containerd/containerd/v2/api/services/events/v1" + "github.com/containerd/containerd/v2/api/types" + "github.com/containerd/containerd/v2/core/events" + "github.com/containerd/containerd/v2/core/events/exchange" + "github.com/containerd/containerd/v2/plugins" + "github.com/containerd/containerd/v2/protobuf" + ptypes "github.com/containerd/containerd/v2/protobuf/types" + "github.com/containerd/errdefs" + "github.com/containerd/plugin" + "github.com/containerd/plugin/registry" + "github.com/containerd/ttrpc" +) + +func init() { + registry.Register(&plugin.Registration{ + Type: plugins.TTRPCPlugin, + ID: "events", + Requires: []plugin.Type{ + plugins.EventPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + ep, err := ic.GetByID(plugins.EventPlugin, "exchange") + if err != nil { + return nil, err + } + return NewService(ep.(*exchange.Exchange)), nil + }, + }) +} + +type service struct { + events *exchange.Exchange +} + +// NewService returns the TTRPC events server +func NewService(events *exchange.Exchange) interface{} { + return &service{ + events: events, + } +} + +func (s *service) RegisterTTRPC(server *ttrpc.Server) error { + api.RegisterTTRPCEventsService(server, s) + return nil +} + +func (s *service) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.Empty, error) { + if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &ptypes.Empty{}, nil +} + +func (s *service) Forward(ctx context.Context, r *api.ForwardRequest) (*ptypes.Empty, error) { + if err := s.events.Forward(ctx, fromProto(r.Envelope)); err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &ptypes.Empty{}, nil +} + +func (s *service) Subscribe(ctx context.Context, req *api.SubscribeRequest, srv api.TTRPCEvents_SubscribeServer) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + eventq, errq := s.events.Subscribe(ctx, req.Filters...) + for { + select { + case ev := <-eventq: + if err := srv.Send(toProto(ev)); err != nil { + return fmt.Errorf("failed sending event to subscriber: %w", err) + } + case err := <-errq: + if err != nil { + return fmt.Errorf("subscription error: %w", err) + } + + return nil + } + } +} + +func toProto(env *events.Envelope) *types.Envelope { + return &types.Envelope{ + Timestamp: protobuf.ToTimestamp(env.Timestamp), + Namespace: env.Namespace, + Topic: env.Topic, + Event: protobuf.FromAny(env.Event), + } +} + +func fromProto(env *types.Envelope) *events.Envelope { + return &events.Envelope{ + Timestamp: protobuf.FromTimestamp(env.Timestamp), + Namespace: env.Namespace, + Topic: env.Topic, + Event: env.Event, + } +}