From b16b5b9d03da7e009e00c6d781868b9195a93905 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Thu, 30 Oct 2014 01:04:10 -0700 Subject: [PATCH] dagwriter-parallel --- unixfs/io/dagwriter.go | 73 ++++++++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/unixfs/io/dagwriter.go b/unixfs/io/dagwriter.go index 6575b1edff6..cb9d3a2a885 100644 --- a/unixfs/io/dagwriter.go +++ b/unixfs/io/dagwriter.go @@ -1,6 +1,8 @@ package io import ( + "sync" + "github.com/jbenet/go-ipfs/importer/chunk" dag "github.com/jbenet/go-ipfs/merkledag" "github.com/jbenet/go-ipfs/pin" @@ -46,51 +48,66 @@ func (dw *DagWriter) startSplitter() { mbf := new(ft.MultiBlock) root := new(dag.Node) + // concurrent writing to disk + indirectNodes := make(chan *dag.Node) + var wg sync.WaitGroup + + // function to consume the nodesToWrite channel + writeNodes := func() { + defer wg.Done() + for { + node, more := <-indirectNodes + if !more { + return + } + log.Info("dagwriter worker writing") + dw.writeNode(node, pin.Indirect) + log.Info("dagwriter worker writing done") + } + } + + // spin off 10 worker goroutines. + for i := 0; i < 100; i++ { + wg.Add(1) + go writeNodes() + } + for blkData := range blkchan { + if dw.seterr != nil { + return + } + // Store the block size in the root node mbf.AddBlockSize(uint64(len(blkData))) node := &dag.Node{Data: ft.WrapData(blkData)} - nk, err := dw.dagserv.Add(node) - if dw.Pinner != nil { - dw.Pinner.PinWithMode(nk, pin.Indirect) - } - if err != nil { - dw.seterr = err - log.Critical("Got error adding created node to dagservice: %s", err) - return - } + indirectNodes <- node // Add a link to this node without storing a reference to the memory - err = root.AddNodeLinkClean("", node) + err := root.AddNodeLinkClean("", node) if err != nil { dw.seterr = err - log.Critical("Got error adding created node to root node: %s", err) + log.Criticalf("Got error adding created node to root node: %s", err) return } } + close(indirectNodes) // Generate the root node data mbf.Data = first data, err := mbf.GetBytes() if err != nil { dw.seterr = err - log.Critical("Failed generating bytes for multiblock file: %s", err) + log.Criticalf("Failed generating bytes for multiblock file: %s", err) return } root.Data = data // Add root node to the dagservice - rootk, err := dw.dagserv.Add(root) - if err != nil { - dw.seterr = err - log.Critical("Got error adding created node to dagservice: %s", err) - return - } - if dw.Pinner != nil { - dw.Pinner.PinWithMode(rootk, pin.Recursive) - } + dw.writeNode(root, pin.Recursive) dw.node = root + wg.Wait() dw.done <- struct{}{} + log.Info("dagwriter done") } func (dw *DagWriter) Write(b []byte) (int, error) { @@ -101,6 +118,20 @@ func (dw *DagWriter) Write(b []byte) (int, error) { return len(b), nil } +func (dw *DagWriter) writeNode(nd *dag.Node, pinMode pin.PinMode) { + nk, err := dw.dagserv.Add(nd) + if err != nil { + dw.seterr = err + log.Criticalf("Got error adding created node to dagservice: %s", err) + return + } + + if dw.Pinner != nil { + dw.Pinner.PinWithMode(nk, pinMode) + } + return +} + // Close the splitters input channel and wait for it to finish // Must be called to finish up splitting, otherwise split method // will never halt