Skip to content

Commit

Permalink
Support for inotify in mounted directories
Browse files Browse the repository at this point in the history
Signed-off-by: Balaji Vijayakumar <kuttibalaji.v6@gmail.com>
  • Loading branch information
balajiv113 committed Nov 6, 2023
1 parent ce3b98a commit a36e2fd
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 38 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ jobs:
fetch-depth: 1
- uses: actions/setup-go@v4
with:
go-version: 1.20.x
go-version: 1.21.x
- uses: actions/cache@v3
with:
path: ~/.cache/lima/download
Expand Down Expand Up @@ -270,7 +270,7 @@ jobs:
fetch-depth: 1
- uses: actions/setup-go@v4
with:
go-version: 1.20.x
go-version: 1.21.x
- uses: actions/cache@v3
with:
path: ~/.cache/lima/download
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
github.com/nxadm/tail v1.4.11
github.com/opencontainers/go-digest v1.0.0
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/rjeczalik/notify v0.9.3
github.com/sethvargo/go-password v0.2.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.7.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rjeczalik/notify v0.9.3 h1:6rJAzHTGKXGj76sbRgDiDcYj/HniypXmSJo1SWakZeY=
github.com/rjeczalik/notify v0.9.3/go.mod h1:gF3zSOrafR9DQEWSE8TjfI9NkooDxbyT4UgRGKZA0lc=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down Expand Up @@ -306,6 +308,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
5 changes: 5 additions & 0 deletions pkg/guestagent/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ type Event struct {
LocalPortsRemoved []IPPort `json:"localPortsRemoved,omitempty"`
Errors []string `json:"errors,omitempty"`
}

type InotifyEvent struct {
Location string `json:"location,omitempty"`
Time time.Time `json:"time,omitempty"`
}
19 changes: 19 additions & 0 deletions pkg/guestagent/api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package client
// Apache License 2.0

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -19,6 +20,7 @@ type GuestAgentClient interface {
HTTPClient() *http.Client
Info(context.Context) (*api.Info, error)
Events(context.Context, func(api.Event)) error
Inotify(context.Context, api.InotifyEvent) error
}

type Proto = string
Expand Down Expand Up @@ -108,3 +110,20 @@ func (c *client) Events(ctx context.Context, onEvent func(api.Event)) error {
onEvent(ev)
}
}

func (c *client) Inotify(ctx context.Context, event api.InotifyEvent) error {
buffer := &bytes.Buffer{}
encoder := json.NewEncoder(buffer)
err := encoder.Encode(&event)
if err != nil {
return err
}

u := fmt.Sprintf("http://%s/%s/inotify", c.dummyHost, c.version)
resp, err := httpclientutil.Post(ctx, c.HTTPClient(), u, buffer)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}
25 changes: 25 additions & 0 deletions pkg/guestagent/api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,33 @@ func (b *Backend) GetEvents(w http.ResponseWriter, r *http.Request) {
}
}

// PostInotify is the handler for POST /v{N}/inotify.
func (b *Backend) PostInotify(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
_, cancel := context.WithCancel(ctx)
defer cancel()

inotifyEvent := api.InotifyEvent{}
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&inotifyEvent); err != nil {
logrus.Warn(err)
return
}
go b.Agent.HandleInotify(inotifyEvent)

flusher, ok := w.(http.Flusher)
if !ok {
panic("http.ResponseWriter has to implement http.Flusher")
}

w.Header().Set("Content-Type", "application/x-ndjson")
w.WriteHeader(http.StatusOK)
flusher.Flush()
}

func AddRoutes(r *mux.Router, b *Backend) {
v1 := r.PathPrefix("/v1").Subrouter()
v1.Path("/info").Methods("GET").HandlerFunc(b.GetInfo)
v1.Path("/events").Methods("GET").HandlerFunc(b.GetEvents)
v1.Path("/inotify").Methods("POST").HandlerFunc(b.PostInotify)
}
1 change: 1 addition & 0 deletions pkg/guestagent/guestagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ type Agent interface {
Info(ctx context.Context) (*api.Info, error)
Events(ctx context.Context, ch chan api.Event)
LocalPorts(ctx context.Context) ([]api.IPPort, error)
HandleInotify(event api.InotifyEvent)
}
11 changes: 11 additions & 0 deletions pkg/guestagent/guestagent_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package guestagent
import (
"context"
"errors"
"os"
"reflect"
"sync"
"syscall"
Expand Down Expand Up @@ -333,3 +334,13 @@ func (a *agent) fixSystemTimeSkew() {
ticker.Stop()
}
}

func (a *agent) HandleInotify(event api.InotifyEvent) {
location := event.Location
if _, err := os.Stat(location); err == nil {
err := os.Chtimes(location, event.Time.Local(), event.Time.Local())
if err != nil {
logrus.Errorf("error in inotify handle. Event: %s, Error: %s", event, err)
}
}
}
99 changes: 63 additions & 36 deletions pkg/hostagent/hostagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type HostAgent struct {
eventEncMu sync.Mutex

vSockPort int

clientMu sync.RWMutex
client guestagentclient.GuestAgentClient
}

type options struct {
Expand Down Expand Up @@ -542,39 +545,41 @@ func (a *HostAgent) watchGuestAgentEvents(ctx context.Context) {
}
}

localUnix := filepath.Join(a.instDir, filenames.GuestAgentSock)
remoteUnix := "/run/lima-guestagent.sock"
local, remote := a.localAndRemoteGuestAgentPaths()

a.onClose = append(a.onClose, func() error {
logrus.Debugf("Stop forwarding unix sockets")
var errs []error
for _, rule := range a.y.PortForwards {
if rule.GuestSocket != "" {
local := hostAddress(rule, guestagentapi.IPPort{})
// using ctx.Background() because ctx has already been cancelled
if err := forwardSSH(context.Background(), a.sshConfig, a.sshLocalPort, local, rule.GuestSocket, verbCancel, rule.Reverse); err != nil {
errs = append(errs, err)
if a.guestAgentProto != guestagentclient.VSOCK {
a.onClose = append(a.onClose, func() error {
logrus.Debugf("Stop forwarding unix sockets")
var errs []error
for _, rule := range a.y.PortForwards {
if rule.GuestSocket != "" {
local := hostAddress(rule, guestagentapi.IPPort{})
// using ctx.Background() because ctx has already been cancelled
if err := forwardSSH(context.Background(), a.sshConfig, a.sshLocalPort, local, rule.GuestSocket, verbCancel, rule.Reverse); err != nil {
errs = append(errs, err)
}
}
}
}
if err := forwardSSH(context.Background(), a.sshConfig, a.sshLocalPort, localUnix, remoteUnix, verbCancel, false); err != nil {
errs = append(errs, err)
}
return errors.Join(errs...)
})

guestSocketAddr := localUnix
if a.guestAgentProto == guestagentclient.VSOCK {
guestSocketAddr = fmt.Sprintf("0.0.0.0:%d", a.vSockPort)
if err := forwardSSH(context.Background(), a.sshConfig, a.sshLocalPort, local, remote, verbCancel, false); err != nil {
errs = append(errs, err)
}
return errors.Join(errs...)
})
}

go func() {
err := a.startInotify(ctx)
if err != nil {
logrus.WithError(err).Warn("failed to start inotify", err)
}
}()

for {
if !isGuestAgentSocketAccessible(ctx, guestSocketAddr, a.guestAgentProto, a.instName) {
if a.guestAgentProto != guestagentclient.VSOCK {
_ = forwardSSH(ctx, a.sshConfig, a.sshLocalPort, localUnix, remoteUnix, verbForward, false)
}
client, err := a.getOrCreateClient(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
logrus.WithError(err).Warn("connection to the guest agent was closed unexpectedly")
}
if err := a.processGuestAgentEvents(ctx, guestSocketAddr, a.guestAgentProto, a.instName); err != nil {
if err := a.processGuestAgentEvents(ctx, client); err != nil {
if !errors.Is(err, context.Canceled) {
logrus.WithError(err).Warn("connection to the guest agent was closed unexpectedly")
}
Expand All @@ -587,21 +592,43 @@ func (a *HostAgent) watchGuestAgentEvents(ctx context.Context) {
}
}

func isGuestAgentSocketAccessible(ctx context.Context, localUnix string, proto guestagentclient.Proto, instanceName string) bool {
client, err := guestagentclient.NewGuestAgentClient(localUnix, proto, instanceName)
if err != nil {
return false
func (a *HostAgent) getOrCreateClient(ctx context.Context) (guestagentclient.GuestAgentClient, error) {
a.clientMu.Lock()
defer a.clientMu.Unlock()
if a.client != nil && isGuestAgentSocketAccessible(ctx, a.client) {
return a.client, nil
}
_, err = client.Info(ctx)
return err == nil
var err error
a.client, err = a.createClient(ctx)
return a.client, err
}

func (a *HostAgent) processGuestAgentEvents(ctx context.Context, localUnix string, proto guestagentclient.Proto, instanceName string) error {
client, err := guestagentclient.NewGuestAgentClient(localUnix, proto, instanceName)
if err != nil {
return err
func (a *HostAgent) createClient(ctx context.Context) (guestagentclient.GuestAgentClient, error) {
local, remote := a.localAndRemoteGuestAgentPaths()
if a.guestAgentProto != guestagentclient.VSOCK {
_ = forwardSSH(ctx, a.sshConfig, a.sshLocalPort, local, remote, verbForward, false)
}

return guestagentclient.NewGuestAgentClient(local, a.guestAgentProto, a.instName)
}

func (a *HostAgent) localAndRemoteGuestAgentPaths() (string, string) {
localUnix := filepath.Join(a.instDir, filenames.GuestAgentSock)
remoteUnix := "/run/lima-guestagent.sock"

guestSocketAddr := localUnix
if a.guestAgentProto == guestagentclient.VSOCK {
guestSocketAddr = fmt.Sprintf("0.0.0.0:%d", a.vSockPort)
}
return guestSocketAddr, remoteUnix
}

func isGuestAgentSocketAccessible(ctx context.Context, client guestagentclient.GuestAgentClient) bool {
_, err := client.Info(ctx)
return err == nil
}

func (a *HostAgent) processGuestAgentEvents(ctx context.Context, client guestagentclient.GuestAgentClient) error {
info, err := client.Info(ctx)
if err != nil {
return err
Expand Down
82 changes: 82 additions & 0 deletions pkg/hostagent/inotify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package hostagent

import (
"context"
"os"
"path"

guestagentapi "github.com/lima-vm/lima/pkg/guestagent/api"
"github.com/lima-vm/lima/pkg/localpathutil"
"github.com/rjeczalik/notify"
"github.com/sirupsen/logrus"
)

const CacheSize = 10000

var inotifyCache = make(map[string]string)

func (a *HostAgent) startInotify(ctx context.Context) error {
mountWatchCh := make(chan notify.EventInfo, 128)
err := a.setupWatchers(mountWatchCh)
if err != nil {
return err
}

for {
select {
case <-ctx.Done():
return nil
case watchEvent := <-mountWatchCh:
client, err := a.getOrCreateClient(ctx)
if err != nil {
logrus.Error("failed to create client for inotify", err)
}
stat, err := os.Stat(watchEvent.Path())
if err != nil {
continue
}

if filterEvents(watchEvent) {
continue
}

event := guestagentapi.InotifyEvent{Location: watchEvent.Path(), Time: stat.ModTime().UTC()}
err = client.Inotify(ctx, event)
if err != nil {
logrus.WithError(err).Warn("failed to send inotify", err)
}
}
}
}

func (a *HostAgent) setupWatchers(events chan notify.EventInfo) error {
for _, m := range a.y.Mounts {
if *m.Writable {
location, err := localpathutil.Expand(m.Location)
if err != nil {
return err
}
err = notify.Watch(path.Join(location, "..."), events, notify.Create|notify.Write)
if err != nil {
return err
}
}
}
return nil
}

func filterEvents(event notify.EventInfo) bool {
eventPath := event.Path()
_, ok := inotifyCache[eventPath]
if ok {
// Ignore the duplicate inotify on mounted directories, so always remove a entry if already present
delete(inotifyCache, eventPath)
return true
}
inotifyCache[eventPath] = ""

if len(inotifyCache) >= CacheSize {
inotifyCache = make(map[string]string)
}
return false
}
16 changes: 16 additions & 0 deletions pkg/httpclientutil/httpclientutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ func Get(ctx context.Context, c *http.Client, url string) (*http.Response, error
return resp, nil
}

func Post(ctx context.Context, c *http.Client, url string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, "POST", url, body)
if err != nil {
return nil, err
}
resp, err := c.Do(req)
if err != nil {
return nil, err
}
if err := Successful(resp); err != nil {
resp.Body.Close()
return nil, err
}
return resp, nil
}

func readAtMost(r io.Reader, maxBytes int) ([]byte, error) {
lr := &io.LimitedReader{
R: r,
Expand Down

0 comments on commit a36e2fd

Please sign in to comment.