Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test Event streaming #4

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ jobs:
strategy:
matrix:
os: [ubuntu-22.04, actuated-arm64-4cpu-16gb, macos-12, windows-2019]
exclude:
- os: ${{ github.repository != 'containerd/containerd' && 'actuated-arm64-4cpu-16gb' }}


steps:
- uses: actions/checkout@v4
Expand All @@ -39,7 +42,6 @@ jobs:
#
project:
name: Project Checks
if: github.repository == 'containerd/containerd'
runs-on: ubuntu-22.04
timeout-minutes: 5

Expand All @@ -52,6 +54,7 @@ jobs:
- uses: ./src/github.com/containerd/containerd/.github/actions/install-go

- uses: containerd/project-checks@v1.1.0
if: github.repository == 'containerd/containerd'
with:
working-directory: src/github.com/containerd/containerd
repo-access-token: ${{ secrets.GITHUB_TOKEN }}
Expand Down
2 changes: 2 additions & 0 deletions cmd/containerd-shim-runc-v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/manager"
_ "github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/task/plugin"
"github.com/containerd/containerd/v2/pkg/shim"
_ "github.com/containerd/containerd/v2/plugins/events"
_ "github.com/containerd/containerd/v2/plugins/services/events/ttrpc"
)

func main() {
Expand Down
6 changes: 3 additions & 3 deletions cmd/containerd-shim-runc-v2/task/plugin/plugin_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package plugin

import (
"github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/task"
"github.com/containerd/containerd/v2/pkg/shim"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/pkg/shutdown"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/plugin"
Expand All @@ -34,15 +34,15 @@ func init() {
plugins.InternalPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
pp, err := ic.GetByID(plugins.EventPlugin, "publisher")
pp, err := ic.GetByID(plugins.EventPlugin, "exchange")
if err != nil {
return nil, err
}
ss, err := ic.GetByID(plugins.InternalPlugin, "shutdown")
if err != nil {
return nil, err
}
return task.NewTaskService(ic.Context, pp.(shim.Publisher), ss.(shutdown.Service))
return task.NewTaskService(ic.Context, pp.(events.Publisher), ss.(shutdown.Service))
},
})

Expand Down
19 changes: 3 additions & 16 deletions cmd/containerd-shim-runc-v2/task/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ import (
"github.com/containerd/containerd/v2/api/types/task"
"github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/process"
"github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/runc"
"github.com/containerd/containerd/v2/core/runtime"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/core/runtime/v2/runc/options"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/oom"
oomv1 "github.com/containerd/containerd/v2/pkg/oom/v1"
oomv2 "github.com/containerd/containerd/v2/pkg/oom/v2"
Expand All @@ -58,7 +57,7 @@ var (
)

// NewTaskService creates a new instance of a task service
func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TTRPCTaskService, error) {
func NewTaskService(ctx context.Context, publisher events.Publisher, sd shutdown.Service) (taskAPI.TTRPCTaskService, error) {
var (
ep oom.Watcher
err error
Expand Down Expand Up @@ -88,7 +87,7 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
if err := s.initPlatform(); err != nil {
return nil, fmt.Errorf("failed to initialized platform behavior: %w", err)
}
go s.forward(ctx, publisher)

sd.RegisterCallback(func(context.Context) error {
close(s.events)
return nil
Expand Down Expand Up @@ -743,18 +742,6 @@ func (s *service) getContainerPids(ctx context.Context, container *runc.Containe
return pids, nil
}

func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
ns, _ := namespaces.Namespace(ctx)
ctx = namespaces.WithNamespace(context.Background(), ns)
for e := range s.events {
err := publisher.Publish(ctx, runtime.GetTopic(e), e)
if err != nil {
log.G(ctx).WithError(err).Error("post event")
}
}
publisher.Close()
}

func (s *service) getContainer(id string) (*runc.Container, error) {
s.mu.Lock()
container := s.containers[id]
Expand Down
21 changes: 21 additions & 0 deletions core/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package events

import (
"context"
"fmt"
"time"

"github.com/containerd/log"
"github.com/containerd/typeurl/v2"
)

Expand Down Expand Up @@ -78,3 +80,22 @@ type Forwarder interface {
type Subscriber interface {
Subscribe(ctx context.Context, filters ...string) (ch <-chan *Envelope, errs <-chan error)
}

// ForwardAll forwards messages from a subscription until an error is received
func ForwardAll(ctx context.Context, f Forwarder, s Subscriber) error {
sub, errs := s.Subscribe(ctx)
for {
select {
case evt := <-sub:
if err := f.Forward(ctx, evt); err != nil {
log.G(ctx).WithError(err).Errorf("event forward error: %v", evt)
}
case err := <-errs:
if err != nil {
return fmt.Errorf("error from subscription: %w", err)
}
return nil
}

}
}
6 changes: 3 additions & 3 deletions core/runtime/v2/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
}

return &shim{
bundle: b.bundle,
client: conn,
version: params.Version,
bundle: b.bundle,
client: conn,
params: params,
}, nil
}

Expand Down
59 changes: 55 additions & 4 deletions core/runtime/v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (

apitypes "github.com/containerd/containerd/v2/api/types"
"github.com/containerd/containerd/v2/core/containers"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/core/events/exchange"
eventsproxy "github.com/containerd/containerd/v2/core/events/proxy"
"github.com/containerd/containerd/v2/core/metadata"
"github.com/containerd/containerd/v2/core/runtime"
"github.com/containerd/containerd/v2/core/sandbox"
Expand Down Expand Up @@ -101,7 +103,7 @@ func init() {
return nil, err
}

return NewTaskManager(shimManager), nil
return NewTaskManager(ic.Context, shimManager)
},
})

Expand Down Expand Up @@ -430,10 +432,36 @@ type TaskManager struct {
}

// NewTaskManager creates a new task manager instance.
func NewTaskManager(shims *ShimManager) *TaskManager {
return &TaskManager{
manager: shims,
func NewTaskManager(ctx context.Context, shims *ShimManager) (*TaskManager, error) {
manager := &TaskManager{manager: shims}
if err := manager.restoreTasks(ctx); err != nil {
return nil, err
}
return manager, nil
}

func (m *TaskManager) restoreTasks(ctx context.Context) error {
log.G(ctx).Info("restoring task event streamers")
instances, err := m.manager.shims.GetAll(ctx, true)
if err != nil {
return err
}

// See if any of restored shims supports event streaming
for _, instance := range instances {
shimTask, err := newShimTask(instance)
if err != nil {
log.G(ctx).WithError(err).Error("unable to get shim task")
continue
}

if err := m.tryStreamEvents(context.Background(), shimTask); err != nil {
log.G(ctx).WithError(err).Errorf("unable to resetore event stream for task %q: %v", instance.ID(), err)
continue
}
}

return nil
}

// ID of the task manager
Expand All @@ -455,6 +483,10 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr
return nil, err
}

if err := m.tryStreamEvents(ctx, shimTask); err != nil {
return nil, err
}

t, err := shimTask.Create(ctx, opts)
if err != nil {
// NOTE: ctx contains required namespace information.
Expand All @@ -481,6 +513,25 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr
return t, nil
}

// tryStreamEvents streams evetns from shim (if its supported by the shim implementation).
func (m *TaskManager) tryStreamEvents(ctx context.Context, shim *shimTask) error {
log.G(ctx).Debug("try using shim events streaming")

fwdCtx := log.WithLogger(context.Background(), log.G(ctx))
go func(client any) {
ep := eventsproxy.NewRemoteEvents(client)
if err := events.ForwardAll(fwdCtx, m.manager.events, ep); err != nil {
if errdefs.IsNotImplemented(err) {
log.G(ctx).WithError(err).Debug("shim does not support event streaming, relying on legacy callback")
} else {
log.G(ctx).WithError(err).Error("failed while forwarding event stream for shim")
}
}
}(shim.Client())

return nil
}

// Get a specific task
func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) {
shim, err := m.manager.shims.Get(ctx, id)
Expand Down
14 changes: 7 additions & 7 deletions core/runtime/v2/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
}()

shim := &shim{
bundle: bundle,
client: conn,
version: params.Version,
bundle: bundle,
client: conn,
params: params,
}

return shim, nil
Expand Down Expand Up @@ -359,9 +359,9 @@ func (gc *grpcConn) UserOnCloseWait(ctx context.Context) error {
}

type shim struct {
bundle *Bundle
client any
version int
bundle *Bundle
client any
params client.BootstrapParams
}

var _ ShimInstance = (*shim)(nil)
Expand All @@ -372,7 +372,7 @@ func (s *shim) ID() string {
}

func (s *shim) Version() int {
return s.version
return s.params.Version
}

func (s *shim) Namespace() string {
Expand Down
6 changes: 3 additions & 3 deletions pkg/oom/v1/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import (

"github.com/containerd/cgroups/v3/cgroup1"
eventstypes "github.com/containerd/containerd/v2/api/events"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/core/runtime"
"github.com/containerd/containerd/v2/pkg/oom"
"github.com/containerd/containerd/v2/pkg/shim"
"github.com/containerd/log"
"golang.org/x/sys/unix"
)

// New returns an epoll implementation that listens to OOM events
// from a container's cgroups.
func New(publisher shim.Publisher) (oom.Watcher, error) {
func New(publisher events.Publisher) (oom.Watcher, error) {
fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil {
return nil, err
Expand All @@ -51,7 +51,7 @@ type epoller struct {
mu sync.Mutex

fd int
publisher shim.Publisher
publisher events.Publisher
set map[uintptr]*item
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/oom/v2/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import (

cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
eventstypes "github.com/containerd/containerd/v2/api/events"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/core/runtime"
"github.com/containerd/containerd/v2/pkg/oom"
"github.com/containerd/containerd/v2/pkg/shim"
"github.com/containerd/log"
)

// New returns an implementation that listens to OOM events
// from a container's cgroups.
func New(publisher shim.Publisher) (oom.Watcher, error) {
func New(publisher events.Publisher) (oom.Watcher, error) {
return &watcher{
itemCh: make(chan item),
publisher: publisher,
Expand All @@ -42,7 +42,7 @@ func New(publisher shim.Publisher) (oom.Watcher, error) {
// watcher implementation for handling OOM events from a container's cgroup
type watcher struct {
itemCh chan item
publisher shim.Publisher
publisher events.Publisher
}

type item struct {
Expand Down
Loading
Loading