From dfd4f94bf69d3e66587fbb82dd1a1d7b8a970fad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 3 Aug 2018 18:06:57 +0200 Subject: [PATCH 1/3] coreapi: dag: Batching interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/dag.go | 82 +++++++++++++++++++++++++++-------- core/coreapi/interface/dag.go | 21 +++++++-- 2 files changed, 81 insertions(+), 22 deletions(-) diff --git a/core/coreapi/dag.go b/core/coreapi/dag.go index 75416a249d0..e2bd144ff07 100644 --- a/core/coreapi/dag.go +++ b/core/coreapi/dag.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sync" gopath "path" @@ -17,34 +18,25 @@ import ( type DagAPI CoreAPI +type dagBatch struct { + api *DagAPI + toPut []ipld.Node + + lk sync.Mutex +} + // Put inserts data using specified format and input encoding. Unless used with // `WithCodes` or `WithHash`, the defaults "dag-cbor" and "sha256" are used. // Returns the path of the inserted data. func (api *DagAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPutOption) (coreiface.ResolvedPath, error) { - settings, err := caopts.DagPutOptions(opts...) - if err != nil { - return nil, err - } + nd, err := getNode(src, opts...) - codec, ok := cid.CodecToStr[settings.Codec] - if !ok { - return nil, fmt.Errorf("invalid codec %d", settings.Codec) - } - - nds, err := coredag.ParseInputs(settings.InputEnc, codec, src, settings.MhType, settings.MhLength) + err = api.node.DAG.Add(ctx, nd) if err != nil { return nil, err } - if len(nds) == 0 { - return nil, fmt.Errorf("no node returned from ParseInputs") - } - err = api.node.DAG.Add(ctx, nds[0]) - if err != nil { - return nil, err - } - - return coreiface.IpldPath(nds[0].Cid()), nil + return coreiface.IpldPath(nd.Cid()), nil } // Get resolves `path` using Unixfs resolver, returns the resolved Node. @@ -75,6 +67,58 @@ func (api *DagAPI) Tree(ctx context.Context, p coreiface.Path, opts ...caopts.Da return out, nil } +func (api *DagAPI) Batch(ctx context.Context) coreiface.DagBatch { + return &dagBatch{api: api} +} + +func (b *dagBatch) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPutOption) (coreiface.ResolvedPath, error) { + nd, err := getNode(src, opts...) + if err != nil { + return nil, err + } + + b.lk.Lock() + b.toPut = append(b.toPut, nd) + b.lk.Unlock() + + return coreiface.IpldPath(nd.Cid()), nil +} + +func (b *dagBatch) Commit(ctx context.Context) error { + b.lk.Lock() + defer b.lk.Unlock() + defer func() { + b.toPut = nil + }() + + return b.api.node.DAG.AddMany(ctx, b.toPut) +} + +func getNode(src io.Reader, opts ...caopts.DagPutOption) (ipld.Node, error) { + settings, err := caopts.DagPutOptions(opts...) + if err != nil { + return nil, err + } + + codec, ok := cid.CodecToStr[settings.Codec] + if !ok { + return nil, fmt.Errorf("invalid codec %d", settings.Codec) + } + + nds, err := coredag.ParseInputs(settings.InputEnc, codec, src, settings.MhType, settings.MhLength) + if err != nil { + return nil, err + } + if len(nds) == 0 { + return nil, fmt.Errorf("no node returned from ParseInputs") + } + if len(nds) != 1 { + return nil, fmt.Errorf("got more that one node from ParseInputs") + } + + return nds[0], nil +} + func (api *DagAPI) core() coreiface.CoreAPI { return (*CoreAPI)(api) } diff --git a/core/coreapi/interface/dag.go b/core/coreapi/interface/dag.go index 3f92ebab34c..a128e97c5bc 100644 --- a/core/coreapi/interface/dag.go +++ b/core/coreapi/interface/dag.go @@ -4,21 +4,36 @@ import ( "context" "io" - options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + "github.com/ipfs/go-ipfs/core/coreapi/interface/options" ipld "gx/ipfs/QmZtNq8dArGfnpCZfx2pUNY7UcjGhVp5qqwQ4hH6mpTMRQ/go-ipld-format" ) -// DagAPI specifies the interface to IPLD -type DagAPI interface { +// DagOps groups operations that can be batched together +type DagOps interface { // Put inserts data using specified format and input encoding. // Unless used with WithCodec or WithHash, the defaults "dag-cbor" and // "sha256" are used. Put(ctx context.Context, src io.Reader, opts ...options.DagPutOption) (ResolvedPath, error) +} + +// DagBatch is the batching version of DagAPI. All implementations of DagBatch +// should be threadsafe +type DagBatch interface { + DagOps + + Commit(ctx context.Context) error +} + +// DagAPI specifies the interface to IPLD +type DagAPI interface { + DagOps // Get attempts to resolve and get the node specified by the path Get(ctx context.Context, path Path) (ipld.Node, error) // Tree returns list of paths within a node specified by the path. Tree(ctx context.Context, path Path, opts ...options.DagTreeOption) ([]Path, error) + + Batch(ctx context.Context) DagBatch } From 9f22058d95706b14425ce5aebee7dd79d87e0d68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 3 Aug 2018 18:12:47 +0200 Subject: [PATCH 2/3] coreapi: dag: Batching interface tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/dag_test.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/core/coreapi/dag_test.go b/core/coreapi/dag_test.go index 773ff0f1a4c..e75699f696f 100644 --- a/core/coreapi/dag_test.go +++ b/core/coreapi/dag_test.go @@ -116,3 +116,36 @@ func TestTree(t *testing.T) { } } } + +func TestBatch(t *testing.T) { + ctx := context.Background() + _, api, err := makeAPI(ctx) + if err != nil { + t.Error(err) + } + + batch := api.Dag().Batch(ctx) + + c, err := batch.Put(ctx, strings.NewReader(`"Hello"`)) + if err != nil { + t.Error(err) + } + + if c.Cid().String() != "zdpuAqckYF3ToF3gcJNxPZXmnmGuXd3gxHCXhq81HGxBejEvv" { + t.Errorf("got wrong cid: %s", c.Cid().String()) + } + + _, err = api.Dag().Get(ctx, c) + if err == nil || err.Error() != "merkledag: not found"{ + t.Error(err) + } + + if err := batch.Commit(ctx); err != nil { + t.Error(err) + } + + _, err = api.Dag().Get(ctx, c) + if err != nil { + t.Error(err) + } +} From d6ee9555d0e183dc574204fe5dc21bbbf4dbc017 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 3 Aug 2018 18:19:45 +0200 Subject: [PATCH 3/3] coreapi: dag: Missing batch docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/dag.go | 5 +++++ core/coreapi/dag_test.go | 2 +- core/coreapi/interface/dag.go | 2 ++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core/coreapi/dag.go b/core/coreapi/dag.go index e2bd144ff07..7462c103a27 100644 --- a/core/coreapi/dag.go +++ b/core/coreapi/dag.go @@ -67,10 +67,14 @@ func (api *DagAPI) Tree(ctx context.Context, p coreiface.Path, opts ...caopts.Da return out, nil } +// Batch creates new DagBatch func (api *DagAPI) Batch(ctx context.Context) coreiface.DagBatch { return &dagBatch{api: api} } +// Put inserts data using specified format and input encoding. Unless used with +// `WithCodes` or `WithHash`, the defaults "dag-cbor" and "sha256" are used. +// Returns the path of the inserted data. func (b *dagBatch) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPutOption) (coreiface.ResolvedPath, error) { nd, err := getNode(src, opts...) if err != nil { @@ -84,6 +88,7 @@ func (b *dagBatch) Put(ctx context.Context, src io.Reader, opts ...caopts.DagPut return coreiface.IpldPath(nd.Cid()), nil } +// Commit commits nodes to the datastore and announces them to the network func (b *dagBatch) Commit(ctx context.Context) error { b.lk.Lock() defer b.lk.Unlock() diff --git a/core/coreapi/dag_test.go b/core/coreapi/dag_test.go index e75699f696f..16247e1f010 100644 --- a/core/coreapi/dag_test.go +++ b/core/coreapi/dag_test.go @@ -136,7 +136,7 @@ func TestBatch(t *testing.T) { } _, err = api.Dag().Get(ctx, c) - if err == nil || err.Error() != "merkledag: not found"{ + if err == nil || err.Error() != "merkledag: not found" { t.Error(err) } diff --git a/core/coreapi/interface/dag.go b/core/coreapi/interface/dag.go index a128e97c5bc..01d6112e74b 100644 --- a/core/coreapi/interface/dag.go +++ b/core/coreapi/interface/dag.go @@ -22,6 +22,7 @@ type DagOps interface { type DagBatch interface { DagOps + // Commit commits nodes to the datastore and announces them to the network Commit(ctx context.Context) error } @@ -35,5 +36,6 @@ type DagAPI interface { // Tree returns list of paths within a node specified by the path. Tree(ctx context.Context, path Path, opts ...options.DagTreeOption) ([]Path, error) + // Batch creates new DagBatch Batch(ctx context.Context) DagBatch }