Skip to content

Commit

Permalink
feat(cafs): MerkelizeHooks to modify DAG persistence mid-flight
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Oct 21, 2020
1 parent 8db0d6f commit 164aadc
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 29 deletions.
21 changes: 16 additions & 5 deletions cafs/mapstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,27 @@ func (m *MapStore) AddConnection(other *MapStore) {
func (m MapStore) Print() (string, error) {
buf := &bytes.Buffer{}
for key, file := range m.Files {
data, err := ioutil.ReadAll(file.File())
if err != nil {
return "", err
f := file.File()
if !f.IsDirectory() {
data, err := ioutil.ReadAll(f)
if err != nil {
return "", err
}
fmt.Fprintf(buf, "%s:%s\n\t%s\n", key, f.FullPath(), string(data))
}
fmt.Fprintf(buf, "%s:%s\n\t%s\n", key, file.File().FileName(), string(data))
}

return buf.String(), nil
}

// ObjectCount returns the number of content-addressed objects in the store
func (m MapStore) ObjectCount() (objects int) {
for range m.Files {
objects++
}
return objects
}

// PutFileAtKey puts the file at the given key
func (m *MapStore) PutFileAtKey(ctx context.Context, key string, file qfs.File) error {
if file.IsDirectory() {
Expand Down Expand Up @@ -280,7 +291,7 @@ func (a *adder) AddFile(ctx context.Context, f qfs.File) error {
}
a.out <- AddedFile{
Path: path,
Name: f.FileName(),
Name: f.FullPath(),
Bytes: 0,
Hash: path,
}
Expand Down
212 changes: 212 additions & 0 deletions cafs/merkle_hooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package cafs

import (
"context"
"fmt"
"io"
"sync"

logger "github.com/ipfs/go-log"
"github.com/qri-io/qfs"
)

var log = logger.Logger("cafs")

// func init() {
// logger.SetLogLevel("cafs", "debug")
// }

// MerkelizeCallback is a function that's called when a given path has been
// written to the content addressed filesystem
type MerkelizeCallback func(ctx context.Context, f qfs.File, merkelizedPaths map[string]string) (io.Reader, error)

// MerkelizeHook configures a callback function to be executed on a saved
// file, at a specific point in the merkelization process
type MerkelizeHook struct {
// path of file to fire on
inputFilename string
// file for delayed hook calls
file qfs.File
// once mutex for callback execution
once sync.Once
// slice of pre-merkelized paths that need to be saved before the hook
// can be called
requiredPaths []string
// function to call
callback MerkelizeCallback
}

// NewMerkelizeHook creates
func NewMerkelizeHook(inputFilename string, cb MerkelizeCallback, requiredPaths ...string) *MerkelizeHook {
return &MerkelizeHook{
inputFilename: inputFilename,
requiredPaths: requiredPaths,
callback: cb,
}
}

func (h *MerkelizeHook) hasRequiredPaths(merkelizedPaths map[string]string) bool {
for _, p := range h.requiredPaths {
if _, ok := merkelizedPaths[p]; !ok {
return false
}
}
return true
}

func (h *MerkelizeHook) callAndAdd(ctx context.Context, adder Adder, f qfs.File, merkelizedPaths map[string]string) (err error) {
h.once.Do(func() {
log.Debugf("calling merkelizeHook path=%s merkelized=%#v", h.inputFilename, merkelizedPaths)
var r io.Reader
r, err = h.callback(ctx, f, merkelizedPaths)
if err != nil {
return
}
if err = adder.AddFile(ctx, qfs.NewMemfileReader(h.inputFilename, r)); err != nil {
return
}
})

return err
}

// WriteWithHooks writes a file or directory to a given filestore using
// merkelization hooks
// failed writes are rolled back with delete requests for all added files
func WriteWithHooks(ctx context.Context, fs Filestore, root qfs.File, hooks ...*MerkelizeHook) (string, error) {
var (
finalPath string
waitingHooks []*MerkelizeHook
doneCh = make(chan error, 0)
addedCh = make(chan AddedFile, 1)
merkelizedPaths = map[string]string{}
tasks = 0
)

hookMap, err := processHookPaths(hooks)
if err != nil {
return "", err
}

adder, err := fs.NewAdder(true, true)
if err != nil {
return "", err
}

var rollback = func() {
log.Debug("rolling back failed write operation")
for _, path := range merkelizedPaths {
if err := fs.Delete(ctx, path); err != nil {
log.Debugf("error removing path: %s: %s", path, err)
}
}
}
defer func() {
if rollback != nil {
log.Debug("InitDataset rolling back...")
rollback()
}
}()

go func() {
for ao := range adder.Added() {
log.Debugf("added name=%s hash=%s", ao.Name, ao.Path)
merkelizedPaths[ao.Name] = ao.Path
finalPath = ao.Path

addedCh <- ao

tasks--
if tasks == 0 {
doneCh <- nil
return
}
}
}()

go func() {
err := qfs.Walk(root, func(file qfs.File) error {
tasks++
log.Debugf("visiting %s waitingHooks=%d", file.FullPath(), len(waitingHooks))

for i, hook := range waitingHooks {
if hook.hasRequiredPaths(merkelizedPaths) {
log.Debugf("calling delayed hook: %s", hook.inputFilename)
if err := hook.callAndAdd(ctx, adder, hook.file, merkelizedPaths); err != nil {
return err
}
waitingHooks = append(waitingHooks[i:], waitingHooks[:i+1]...)
// wait for one path to be added
<-addedCh
}
}

if hook, hookExists := hookMap[file.FullPath()]; !hookExists {
if err := adder.AddFile(ctx, file); err != nil {
return err
}
// wait for one path to be added
<-addedCh

} else if hook.hasRequiredPaths(merkelizedPaths) {
log.Debugf("calling hook for path %s", file.FullPath())
if err := hook.callAndAdd(ctx, adder, file, merkelizedPaths); err != nil {
return err
}
// wait for one path to be added
<-addedCh
} else {
hook.file = file
log.Debugf("adding hook to waitlist for path %s", file.FullPath())
waitingHooks = append(waitingHooks, hook)
}

return nil
})

for i, hook := range waitingHooks {
if !hook.hasRequiredPaths(merkelizedPaths) {
doneCh <- fmt.Errorf("reequirements for hook %q were never met", hook.inputFilename)
return
}

log.Debugf("calling delayed hook: %s", hook.inputFilename)
if err := hook.callAndAdd(ctx, adder, hook.file, merkelizedPaths); err != nil {
doneCh <- err
}
waitingHooks = append(waitingHooks[i:], waitingHooks[:i+1]...)
}

if err != nil {
doneCh <- err
}
}()

err = <-doneCh
if err != nil {
log.Debugf("writing dataset: %q", err)
return finalPath, err
}

log.Debugf("dataset written to filesystem. path=%q", finalPath)
// successful execution. remove rollback func
rollback = nil
return finalPath, nil
}

func processHookPaths(hooks []*MerkelizeHook) (hookMap map[string]*MerkelizeHook, err error) {
requiredPaths := map[string]struct{}{}
hookMap = map[string]*MerkelizeHook{}

for _, hook := range hooks {
if _, exists := hookMap[hook.inputFilename]; exists {
return nil, fmt.Errorf("multiple hooks provided for path %q", hook.inputFilename)
}
hookMap[hook.inputFilename] = hook
for _, p := range hook.requiredPaths {
requiredPaths[p] = struct{}{}
}
}

return hookMap, nil
}
88 changes: 88 additions & 0 deletions cafs/merkle_hooks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cafs

import (
"context"
"fmt"
"io"
"io/ioutil"
"strings"
"testing"

"github.com/qri-io/qfs"
)

func TestWriteHooks(t *testing.T) {
root := qfs.NewMemdir("/a",
qfs.NewMemfileBytes("b.txt", []byte("foo")),
qfs.NewMemfileBytes("c.txt", []byte("bar")),
qfs.NewMemfileBytes("d.txt", []byte("baz")),
)

ctx := context.Background()
fs := NewMapstore()
bHash := ""

rewriteB := NewMerkelizeHook("/a/b.txt", func(ctx context.Context, f qfs.File, pathMap map[string]string) (io.Reader, error) {
hContents, err := fs.Get(ctx, pathMap["/a/d.txt"])
if err != nil {
return nil, err
}
hData, err := ioutil.ReadAll(hContents)
if err != nil {
return nil, err
}
return strings.NewReader("APPLES" + string(hData)), nil
}, "/a/d.txt")

getBHash := NewMerkelizeHook("/a/c.txt", func(ctx context.Context, f qfs.File, pathMap map[string]string) (io.Reader, error) {
bHash = pathMap["/a/b.txt"]
return f, nil
}, "/a/b.txt")

_, err := WriteWithHooks(ctx, fs, root, rewriteB, getBHash)
if err != nil {
t.Fatal(err)
}

f, err := fs.Get(ctx, bHash)
if err != nil {
t.Fatalf("getting hooked file: %s", err)
}
gotData, err := ioutil.ReadAll(f)
if err != nil {
t.Fatal(err)
}

expect := "APPLESbaz"
if expect != string(gotData) {
t.Errorf("stored result mismatch. want: %q got: %q", expect, string(gotData))
}
}

func TestWriteHooksRollback(t *testing.T) {
root := qfs.NewMemdir("/a",
qfs.NewMemfileBytes("b.txt", []byte("foo")),
qfs.NewMemfileBytes("c.txt", []byte("bar")),
qfs.NewMemfileBytes("d.txt", []byte("baz")),
)

ctx := context.Background()
fs := NewMapstore()
errMsg := "oh noes it broke"

rewriteB := NewMerkelizeHook("/a/b.txt", func(ctx context.Context, f qfs.File, pathMap map[string]string) (io.Reader, error) {
return nil, fmt.Errorf(errMsg)
}, "/a/d.txt")

_, err := WriteWithHooks(ctx, fs, root, rewriteB)
if err == nil {
t.Errorf("expected error, got nil")
} else if err.Error() != errMsg {
t.Errorf("error mismatch. want: %q, got: %q", errMsg, err.Error())
}

expectCount := 0
if count := fs.ObjectCount(); count != expectCount {
t.Errorf("expected %d objects, got: %d", expectCount, count)
}
}
2 changes: 1 addition & 1 deletion cafs/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func EnsureDirectoryBehavior(f cafs.Filestore) error {
}

paths := []string{}
qfs.Walk(outf, 0, func(f qfs.File, depth int) error {
qfs.Walk(outf, func(f qfs.File) error {
paths = append(paths, f.FullPath())
return nil
})
Expand Down
19 changes: 10 additions & 9 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,28 +82,30 @@ type PathSetter interface {
SetPath(path string)
}

// Walk traverses a file tree calling visit on each node
func Walk(root File, depth int, visit func(f File, depth int) error) (err error) {
if err := visit(root, depth); err != nil {
return err
}

// Walk traverses a file tree from the bottom-up calling visit on each file
// and directory within the tree
func Walk(root File, visit func(f File) error) (err error) {
if root.IsDirectory() {
for {
f, err := root.NextFile()
if err != nil {
if err.Error() == "EOF" {
break
return visit(root)
} else {
return err
}
}

if err := Walk(f, depth+1, visit); err != nil {
if err := Walk(f, visit); err != nil {
return err
}
}
} else {
if err := visit(root); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -236,7 +238,6 @@ func (Memdir) IsDirectory() bool {
// Returning io.EOF when no files remain
func (m *Memdir) NextFile() (File, error) {
if m.fi >= len(m.links) {
m.fi = 0
return nil, io.EOF
}
defer func() { m.fi++ }()
Expand Down
Loading

0 comments on commit 164aadc

Please sign in to comment.