Skip to content

Commit

Permalink
Merge pull request #260 from changweige/exit-daemon-after-ref0
Browse files Browse the repository at this point in the history
Shutdown nydusd if it serves no filesystem or snapshot
  • Loading branch information
changweige authored Nov 25, 2022
2 parents 84d9ba3 + cbe79da commit a6fc3af
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 92 deletions.
1 change: 1 addition & 0 deletions cmd/containerd-nydus-grpc/app/snapshotter/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (c) 2020. Ant Group. All rights reserved.
* Copyright (c) 2022. Nydus Developers. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down
32 changes: 21 additions & 11 deletions cmd/containerd-nydus-grpc/app/snapshotter/serve.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (c) 2020. Ant Group. All rights reserved.
* Copyright (c) 2022. Nydus Developers. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand All @@ -12,7 +13,7 @@ import (
"os"
"path/filepath"

snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
api "github.com/containerd/containerd/api/services/snapshots/v1"
"github.com/containerd/containerd/contrib/snapshotservice"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/snapshots"
Expand All @@ -24,26 +25,35 @@ type ServeOptions struct {
ListeningSocketPath string
}

func Serve(ctx context.Context, rs snapshots.Snapshotter, options ServeOptions, stop <-chan struct{}) error {
func Serve(ctx context.Context, sn snapshots.Snapshotter, options ServeOptions, stop <-chan struct{}) error {
err := ensureSocketNotExists(options.ListeningSocketPath)
if err != nil {
return err
}
rpc := grpc.NewServer()
snapshotsapi.RegisterSnapshotsServer(rpc, snapshotservice.FromSnapshotter(rs))
l, err := net.Listen("unix", options.ListeningSocketPath)
if rpc == nil {
return errors.New("start RPC server")
}
api.RegisterSnapshotsServer(rpc, snapshotservice.FromSnapshotter(sn))
listener, err := net.Listen("unix", options.ListeningSocketPath)
if err != nil {
return errors.Wrapf(err, "error on listen socket %q", options.ListeningSocketPath)
return errors.Wrapf(err, "listen socket %q", options.ListeningSocketPath)
}
go func() {
sig := <-stop
log.G(ctx).Infof("caught signal %s: shutting down", sig)
err := l.Close()
if err != nil {
log.G(ctx).Errorf("failed to close listener %s, err: %v", options.ListeningSocketPath, err)
<-stop

log.L.Infof("Shutting down nydus-snapshotter!")

if err := sn.Close(); err != nil {
log.L.WithError(err).Errorf("Closing snapshotter error")
}

if err := listener.Close(); err != nil {
log.L.Errorf("failed to close listener %s, err: %v", options.ListeningSocketPath, err)
}
}()
return rpc.Serve(l)

return rpc.Serve(listener)
}

func ensureSocketNotExists(listeningSocketPath string) error {
Expand Down
7 changes: 6 additions & 1 deletion pkg/cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,19 @@ func (m *Manager) CacheDir() string {
}

// Report each blob disk usage
// TODO: For fscache cache files, the cache files are managed by nydusd and Linux kernel
// We don't know how it manages cache files. A method to address this is to query nydusd.
// So we can't report cache usage in the case of fscache now
func (m *Manager) CacheUsage(ctx context.Context, blobID string) (snapshots.Usage, error) {
var usage snapshots.Usage

blobCachePath := path.Join(m.cacheDir, blobID)
// For backward compatibility
blobCacheSuffixedPath := path.Join(m.cacheDir, blobID+dataFileSuffix)
blobChunkMap := path.Join(m.cacheDir, blobID+chunkMapFileSuffix)
blobMeta := path.Join(m.cacheDir, blobID+metaFileSuffix)

stuffs := []string{blobCachePath, blobChunkMap, blobMeta}
stuffs := []string{blobCachePath, blobCacheSuffixedPath, blobChunkMap, blobMeta}

for _, f := range stuffs {
du, err := fs.DiskUsage(ctx, f)
Expand Down
9 changes: 9 additions & 0 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (d *Daemon) DecRef() int32 {
return atomic.AddInt32(&d.ref, -1)
}

func (d *Daemon) GetRef() int32 {
return atomic.LoadInt32(&d.ref)
}

func (d *Daemon) HostMountpoint() (mnt string) {
// Identify a shared nydusd for multiple rafs instances.
mnt = d.States.Mountpoint
Expand Down Expand Up @@ -136,6 +140,11 @@ func (d *Daemon) AddInstance(r *Rafs) {
r.DaemonID = d.ID()
}

func (d *Daemon) RemoveInstance(snapshotID string) {
d.Instances.Remove(snapshotID)
d.DecRef()
}

// Nydusd daemon current working state by requesting to nydusd:
// 1. INIT
// 2. READY: All needed resources are ready.
Expand Down
99 changes: 63 additions & 36 deletions pkg/filesystem/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"os"
"path"
"path/filepath"

"github.com/containerd/containerd/log"
"github.com/containerd/containerd/snapshots"
Expand Down Expand Up @@ -64,6 +63,14 @@ type Filesystem struct {
rootMountpoint string
}

func (fs *Filesystem) tryRetainSharedDaemon(d *daemon.Daemon) {
// FsDriver can be changed between two startups.
if d.HostMountpoint() == fs.rootMountpoint || fs.fsDriver == config.FsDriverFscache {
fs.sharedDaemon = d
d.IncRef()
}
}

// NewFileSystem initialize Filesystem instance
// TODO(chge): `Filesystem` abstraction is not suggestive. A snapshotter
// can mount many Rafs/Erofs file systems
Expand Down Expand Up @@ -92,27 +99,33 @@ func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) {
// a new nydusd for it.
// TODO: We still need to consider shared daemon the time sequence of initializing daemon,
// start daemon commit its state to DB and retrieving its state.
if d := fs.getSharedDaemon(); d == nil {
log.L.Infof("initializing the shared nydus daemon")
if err := fs.initSharedDaemon(); err != nil {
return nil, errors.Wrap(err, "start shared nydusd daemon")
}
log.L.Infof("initializing the shared nydus daemon")
if err := fs.initSharedDaemon(); err != nil {
return nil, errors.Wrap(err, "start shared nydusd daemon")
}
}

// Try to bring all persisted and stopped nydusd up and remount Rafs
if len(recoveringDaemons) != 0 {
for _, d := range recoveringDaemons {
if err := fs.Manager.StartDaemon(d); err != nil {
return nil, errors.Wrapf(err, "start daemon %s", d.ID())
}
if err := d.WaitUntilState(types.DaemonStateRunning); err != nil {
return nil, errors.Wrapf(err, "recover daemon %s", d.ID())
}
if err := d.RecoveredMountInstances(); err != nil {
return nil, errors.Wrapf(err, "recover daemons")
}
for _, d := range recoveringDaemons {
if err := fs.Manager.StartDaemon(d); err != nil {
return nil, errors.Wrapf(err, "start daemon %s", d.ID())
}
if err := d.WaitUntilState(types.DaemonStateRunning); err != nil {
return nil, errors.Wrapf(err, "wait for daemon %s", d.ID())
}
if err := d.RecoveredMountInstances(); err != nil {
return nil, errors.Wrapf(err, "recover mounts for daemon %s", d.ID())
}

// Found shared daemon
// Fscache userspace daemon has no host mountpoint.
fs.tryRetainSharedDaemon(d)

}

for _, d := range liveDaemons {
// Found shared daemon
fs.tryRetainSharedDaemon(d)
}

return &fs, nil
Expand Down Expand Up @@ -161,7 +174,13 @@ func (fs *Filesystem) WaitUntilReady(snapshotID string) error {
return errors.Wrapf(errdefs.ErrNotFound, "snapshot id %s daemon id %s", snapshotID, instance.DaemonID)
}

return d.WaitUntilState(types.DaemonStateRunning)
if err := d.WaitUntilState(types.DaemonStateRunning); err != nil {
return err
}

log.L.Infof("Nydus remote snapshot %s is ready", snapshotID)

return nil
}

// Mount will be called when containerd snapshotter prepare remote snapshotter
Expand Down Expand Up @@ -197,8 +216,8 @@ func (fs *Filesystem) Mount(snapshotID string, labels map[string]string) (err er
}()

var d *daemon.Daemon
if fs.sharedDaemon != nil {
d = fs.sharedDaemon
if fs.getSharedDaemon() != nil {
d = fs.getSharedDaemon()
d.AddInstance(rafs)
} else {
mp, err := fs.decideDaemonMountpoint(rafs)
Expand Down Expand Up @@ -279,15 +298,10 @@ func (fs *Filesystem) Mount(snapshotID string, labels map[string]string) (err er
return nil
}

func (fs *Filesystem) Umount(ctx context.Context, mountpoint string) error {
if !fs.hasDaemon() {
return nil
}

snapshotID := filepath.Base(mountpoint)
func (fs *Filesystem) Umount(ctx context.Context, snapshotID string) error {
instance := daemon.RafsSet.Get(snapshotID)
if instance == nil {
log.L.Debugf("Not a instance. ID %s, mountpoint %s", snapshotID, mountpoint)
log.L.Debugf("Not a rafs instance. ID %s", snapshotID)
return nil
}

Expand All @@ -299,7 +313,7 @@ func (fs *Filesystem) Umount(ctx context.Context, mountpoint string) error {

log.L.Infof("umount snapshot %s, daemon ID %s", snapshotID, daemon.ID())

daemon.Instances.Remove(snapshotID)
daemon.RemoveInstance(snapshotID)
if err := daemon.UmountInstance(instance); err != nil {
return errors.Wrapf(err, "umount instance %s", snapshotID)
}
Expand All @@ -309,8 +323,7 @@ func (fs *Filesystem) Umount(ctx context.Context, mountpoint string) error {
}

// Once daemon's reference reaches 0, destroy the whole daemon
ref := daemon.DecRef()
if ref == 0 {
if daemon.GetRef() == 0 {
if err := fs.Manager.DestroyDaemon(daemon); err != nil {
return errors.Wrapf(err, "destroy daemon %s", daemon.ID())
}
Expand Down Expand Up @@ -339,13 +352,18 @@ func (fs *Filesystem) RemoveCache(blobDigest string) error {
return fs.cacheMgr.RemoveBlobCache(blobID)
}

func (fs *Filesystem) Cleanup(ctx context.Context) error {
// Try to stop all the running daemons if they are not referenced by any snapshots
// Clean up resources along with the daemons.
func (fs *Filesystem) Teardown(ctx context.Context) error {
for _, d := range fs.Manager.ListDaemons() {
err := fs.Umount(ctx, filepath.Dir(d.HostMountpoint()))
if err != nil {
log.G(ctx).Infof("failed to umount %s err %#v", d.HostMountpoint(), err)
for _, instance := range d.Instances.List() {
err := fs.Umount(ctx, instance.SnapshotID)
if err != nil {
log.L.Errorf("Failed to umount snapshot %s, %s", instance.SnapshotID, err)
}
}
}

return nil
}

Expand Down Expand Up @@ -424,8 +442,6 @@ func (fs *Filesystem) initSharedDaemon() (err error) {
return errors.Wrapf(err, "dump configuration file %s", d.ConfigFile(""))
}

err = nil

if err := fs.Manager.StartDaemon(d); err != nil {
return errors.Wrap(err, "start shared daemon")
}
Expand All @@ -435,6 +451,17 @@ func (fs *Filesystem) initSharedDaemon() (err error) {
return
}

func (fs *Filesystem) TryStopSharedDaemon() {
sharedDaemon := fs.getSharedDaemon()
if sharedDaemon != nil {
if sharedDaemon.GetRef() == 1 {
if err := fs.Manager.DestroyDaemon(sharedDaemon); err != nil {
log.L.WithError(err).Errorf("Terminate shared daemon %s failed", sharedDaemon.ID())
}
}
}
}

// createDaemon create new nydus daemon by snapshotID and imageID
// For fscache driver, no need to provide mountpoint to nydusd daemon.
func (fs *Filesystem) createDaemon(mountpoint string, ref int32) (d *daemon.Daemon, err error) {
Expand Down
11 changes: 2 additions & 9 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,6 @@ func (s *DaemonStates) GetByDaemonID(id string, op func(d *daemon.Daemon)) *daem

if daemon != nil && op != nil {
op(daemon)
} else if daemon == nil {
log.L.Warnf("daemon daemon_id=%s is not found", id)
}

return daemon
Expand Down Expand Up @@ -475,6 +473,8 @@ func (m *Manager) CleanUpDaemonResources(d *daemon.Daemon) {
// FIXME: should handle the inconsistent status caused by any step
// in the function that returns an error.
func (m *Manager) DestroyDaemon(d *daemon.Daemon) error {
log.L.Infof("Destroy nydusd daemon %s. Host mountpoint %s", d.ID(), d.HostMountpoint())

// Delete daemon from DB in the first place in case any of below steps fails
// ending up with daemon is residual in DB.
if err := m.DeleteDaemon(d); err != nil {
Expand All @@ -497,8 +497,6 @@ func (m *Manager) DestroyDaemon(d *daemon.Daemon) error {
}
}

log.L.Infof("Destroy nydusd daemon %s. Host mountpoint %s", d.ID(), d.HostMountpoint())

// Graceful nydusd termination will umount itself.
if err := d.Terminate(); err != nil {
log.L.Warnf("Fails to terminate daemon, %v", err)
Expand Down Expand Up @@ -535,11 +533,6 @@ func (m *Manager) Recover(ctx context.Context) (map[string]*daemon.Daemon, map[s
d.States.FsDriver, m.FsDriver)
}

if m.SupervisorSet != nil {
su := m.SupervisorSet.NewSupervisor(d.ID())
d.Supervisor = su
}

m.daemonStates.RecoverDaemonState(d)

if m.SupervisorSet != nil {
Expand Down
9 changes: 7 additions & 2 deletions pkg/store/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,17 @@ func (db *Database) NextInstanceSeq() (uint64, error) {
}

defer func() {
if rerr := tx.Rollback(); rerr != nil {
log.L.WithError(rerr).Errorf("Rollback error when getting next sequence")
if err != nil {
if err := tx.Rollback(); err != nil {
log.L.WithError(err).Errorf("Rollback error when getting next sequence")
}
}
}()

bk := getInstancesBucket(tx)
if bk == nil {
return 0, errdefs.ErrNotFound
}

seq, err := bk.NextSequence()
if err != nil {
Expand Down
Loading

0 comments on commit a6fc3af

Please sign in to comment.