diff --git a/CHANGELOG.md b/CHANGELOG.md index fc02fd7c1..29c49647a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes: ### Added +* 🛠 `pinning/pinner`: you can now give a custom label when pinning a CID. To reflect this, the `Pinner` has been adjusted. + ### Changed ### Removed diff --git a/pinning/pinner/dspinner/pin.go b/pinning/pinner/dspinner/pin.go index 5b0a1ef7f..18cf9ba43 100644 --- a/pinning/pinner/dspinner/pin.go +++ b/pinning/pinner/dspinner/pin.go @@ -174,20 +174,20 @@ func (p *pinner) SetAutosync(auto bool) bool { } // Pin the given node, optionally recursive -func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { +func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool, label string) error { err := p.dserv.Add(ctx, node) if err != nil { return err } if recurse { - return p.doPinRecursive(ctx, node.Cid(), true) + return p.doPinRecursive(ctx, node.Cid(), true, label) } else { - return p.doPinDirect(ctx, node.Cid()) + return p.doPinDirect(ctx, node.Cid(), label) } } -func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool) error { +func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool, label string) error { cidKey := c.KeyString() p.lock.Lock() @@ -243,14 +243,14 @@ func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool) erro } } - _, err = p.addPin(ctx, c, ipfspinner.Recursive, "") + _, err = p.addPin(ctx, c, ipfspinner.Recursive, label) if err != nil { return err } return p.flushPins(ctx, false) } -func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid) error { +func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid, label string) error { cidKey := c.KeyString() p.lock.Lock() @@ -264,7 +264,7 @@ func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid) error { return fmt.Errorf("%s already pinned recursively", c.String()) } - _, err = p.addPin(ctx, c, ipfspinner.Direct, "") + _, err = p.addPin(ctx, c, ipfspinner.Direct, label) if err != nil { return err } @@ -665,17 +665,17 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) { } // DirectKeys returns a slice containing the directly pinned keys -func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedCid { +func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedPin { return p.streamIndex(ctx, p.cidDIndex) } // RecursiveKeys returns a slice containing the recursively pinned keys -func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedCid { +func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedPin { return p.streamIndex(ctx, p.cidRIndex) } -func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedCid { - out := make(chan ipfspinner.StreamedCid) +func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedPin { + out := make(chan ipfspinner.StreamedPin) go func() { defer close(out) @@ -688,21 +688,32 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan err := index.ForEach(ctx, "", func(key, value string) bool { c, err := cid.Cast([]byte(key)) if err != nil { - out <- ipfspinner.StreamedCid{Err: err} + out <- ipfspinner.StreamedPin{Err: err} return false } + + pp, err := p.loadPin(ctx, value) + if err != nil { + out <- ipfspinner.StreamedPin{Err: err} + return false + } + if !cidSet.Has(c) { select { case <-ctx.Done(): return false - case out <- ipfspinner.StreamedCid{C: c}: + case out <- ipfspinner.StreamedPin{Pin: ipfspinner.Pinned{ + Key: pp.Cid, + Mode: pp.Mode, + Label: pp.Name, + }}: } cidSet.Add(c) } return true }) if err != nil { - out <- ipfspinner.StreamedCid{Err: err} + out <- ipfspinner.StreamedPin{Err: err} } }() @@ -711,8 +722,8 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan // InternalPins returns all cids kept pinned for the internal state of the // pinner -func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedCid { - c := make(chan ipfspinner.StreamedCid) +func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedPin { + c := make(chan ipfspinner.StreamedPin) close(c) return c } @@ -756,6 +767,8 @@ func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error return err } + // TODO: get old pin label + _, err = p.addPin(ctx, to, ipfspinner.Recursive, "") if err != nil { return err @@ -809,13 +822,13 @@ func (p *pinner) Flush(ctx context.Context) error { // PinWithMode allows the user to have fine grained control over pin // counts -func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) error { +func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, label string) error { // TODO: remove his to support multiple pins per CID switch mode { case ipfspinner.Recursive: - return p.doPinRecursive(ctx, c, false) + return p.doPinRecursive(ctx, c, false, label) case ipfspinner.Direct: - return p.doPinDirect(ctx, c) + return p.doPinDirect(ctx, c, label) default: return errors.New("unrecognized pin mode") } diff --git a/pinning/pinner/dspinner/pin_test.go b/pinning/pinner/dspinner/pin_test.go index ad8284480..dd99ca745 100644 --- a/pinning/pinner/dspinner/pin_test.go +++ b/pinning/pinner/dspinner/pin_test.go @@ -120,7 +120,7 @@ func TestPinnerBasic(t *testing.T) { } // Pin A{} - err = p.Pin(ctx, a, false) + err = p.Pin(ctx, a, false, "") if err != nil { t.Fatal(err) } @@ -154,7 +154,7 @@ func TestPinnerBasic(t *testing.T) { bk := b.Cid() // recursively pin B{A,C} - err = p.Pin(ctx, b, true) + err = p.Pin(ctx, b, true, "") if err != nil { t.Fatal(err) } @@ -191,7 +191,8 @@ func TestPinnerBasic(t *testing.T) { } // Add D{A,C,E} - err = p.Pin(ctx, d, true) + label := "My Label" + err = p.Pin(ctx, d, true, label) if err != nil { t.Fatal(err) } @@ -199,26 +200,29 @@ func TestPinnerBasic(t *testing.T) { dk := d.Cid() assertPinned(t, p, dk, "pinned node not found.") - allCids := func(ch <-chan ipfspin.StreamedCid) (cids []cid.Cid) { + allPins := func(ch <-chan ipfspin.StreamedPin) (pins []ipfspin.Pinned) { for val := range ch { if val.Err != nil { t.Fatal(val.Err) } - cids = append(cids, val.C) + pins = append(pins, val.Pin) } - return cids + return pins } - cids := allCids(p.RecursiveKeys(ctx)) - if len(cids) != 2 { + pins := allPins(p.RecursiveKeys(ctx)) + if len(pins) != 2 { t.Error("expected 2 recursive pins") } - if !(bk == cids[0] || bk == cids[1]) { + if !(bk == pins[0].Key || bk == pins[1].Key) { t.Error("expected recursive pin of B") } - if !(dk == cids[0] || dk == cids[1]) { + if !(dk == pins[0].Key || dk == pins[1].Key) { t.Error("expected recursive pin of D") } + if !(label == pins[0].Label || label == pins[1].Label) { + t.Error("expected pin with label") + } pinned, err := p.CheckIfPinned(ctx, ak, bk, ck, dk) if err != nil { @@ -251,16 +255,16 @@ func TestPinnerBasic(t *testing.T) { } } - cids = allCids(p.DirectKeys(ctx)) - if len(cids) != 1 { + pins = allPins(p.DirectKeys(ctx)) + if len(pins) != 1 { t.Error("expected 1 direct pin") } - if cids[0] != ak { + if pins[0].Key != ak { t.Error("wrong direct pin") } - cids = allCids(p.InternalPins(ctx)) - if len(cids) != 0 { + pins = allPins(p.InternalPins(ctx)) + if len(pins) != 0 { t.Error("should not have internal keys") } @@ -323,7 +327,7 @@ func TestPinnerBasic(t *testing.T) { fakeLog := &fakeLogger{} fakeLog.StandardLogger = log log = fakeLog - err = p.Pin(ctx, a, true) + err = p.Pin(ctx, a, true, "") if err != nil { t.Fatal(err) } @@ -457,19 +461,19 @@ func TestDuplicateSemantics(t *testing.T) { } // pin is recursively - err = p.Pin(ctx, a, true) + err = p.Pin(ctx, a, true, "") if err != nil { t.Fatal(err) } // pinning directly should fail - err = p.Pin(ctx, a, false) + err = p.Pin(ctx, a, false, "") if err == nil { t.Fatal("expected direct pin to fail") } // pinning recursively again should succeed - err = p.Pin(ctx, a, true) + err = p.Pin(ctx, a, true, "") if err != nil { t.Fatal(err) } @@ -489,7 +493,7 @@ func TestFlush(t *testing.T) { } _, k := randNode() - p.PinWithMode(ctx, k, ipfspin.Recursive) + p.PinWithMode(ctx, k, ipfspin.Recursive, "") if err = p.Flush(ctx); err != nil { t.Fatal(err) } @@ -520,7 +524,7 @@ func TestPinRecursiveFail(t *testing.T) { mctx, cancel := context.WithTimeout(ctx, time.Millisecond) defer cancel() - err = p.Pin(mctx, a, true) + err = p.Pin(mctx, a, true, "") if err == nil { t.Fatal("should have failed to pin here") } @@ -538,7 +542,7 @@ func TestPinRecursiveFail(t *testing.T) { // this one is time based... but shouldnt cause any issues mctx, cancel = context.WithTimeout(ctx, time.Second) defer cancel() - err = p.Pin(mctx, a, true) + err = p.Pin(mctx, a, true, "") if err != nil { t.Fatal(err) } @@ -568,7 +572,7 @@ func TestPinUpdate(t *testing.T) { t.Fatal(err) } - if err = p.Pin(ctx, n1, true); err != nil { + if err = p.Pin(ctx, n1, true, ""); err != nil { t.Fatal(err) } @@ -646,7 +650,7 @@ func TestLoadDirty(t *testing.T) { _, bk := randNode() - err = p.Pin(ctx, a, true) + err = p.Pin(ctx, a, true, "") if err != nil { t.Fatal(err) } @@ -787,7 +791,7 @@ func makeTree(ctx context.Context, aBranchLen int, dserv ipld.DAGService, p ipfs } // Pin last A recursively - if err = p.Pin(ctx, aNodes[aBranchLen-1], true); err != nil { + if err = p.Pin(ctx, aNodes[aBranchLen-1], true, ""); err != nil { return } @@ -820,12 +824,12 @@ func makeTree(ctx context.Context, aBranchLen int, dserv ipld.DAGService, p ipfs bk = b.Cid() // Pin C recursively - if err = p.Pin(ctx, c, true); err != nil { + if err = p.Pin(ctx, c, true, ""); err != nil { return } // Pin B recursively - if err = p.Pin(ctx, b, true); err != nil { + if err = p.Pin(ctx, b, true, ""); err != nil { return } @@ -857,7 +861,7 @@ func pinNodes(nodes []ipld.Node, p ipfspin.Pinner, recursive bool) { var err error for i := range nodes { - err = p.Pin(ctx, nodes[i], recursive) + err = p.Pin(ctx, nodes[i], recursive, "") if err != nil { panic(err) } @@ -975,7 +979,7 @@ func benchmarkNthPin(b *testing.B, count int, pinner ipfspin.Pinner, dserv ipld. which := count - 1 for i := 0; i < b.N; i++ { // Pin the Nth node and Flush - err := pinner.Pin(ctx, nodes[which], true) + err := pinner.Pin(ctx, nodes[which], true, "") if err != nil { panic(err) } @@ -1021,7 +1025,7 @@ func benchmarkNPins(b *testing.B, count int, pinner ipfspin.Pinner, dserv ipld.D for i := 0; i < b.N; i++ { // Pin all the nodes one at a time. for j := range nodes { - err := pinner.Pin(ctx, nodes[j], true) + err := pinner.Pin(ctx, nodes[j], true, "") if err != nil { panic(err) } diff --git a/pinning/pinner/pin.go b/pinning/pinner/pin.go index c5169bfee..64571342d 100644 --- a/pinning/pinner/pin.go +++ b/pinning/pinner/pin.go @@ -95,7 +95,7 @@ type Pinner interface { // Pin the given node, optionally recursively. // Pin will make sure that the given node and its children if recursive is set // are stored locally. - Pin(ctx context.Context, node ipld.Node, recursive bool) error + Pin(ctx context.Context, node ipld.Node, recursive bool, label string) error // Unpin the given cid. If recursive is true, removes either a recursive or // a direct pin. If recursive is false, only removes a direct pin. @@ -114,20 +114,20 @@ type Pinner interface { // PinWithMode is for manually editing the pin structure. Use with // care! If used improperly, garbage collection may not be // successful. - PinWithMode(context.Context, cid.Cid, Mode) error + PinWithMode(context.Context, cid.Cid, Mode, string) error // Flush writes the pin state to the backing datastore Flush(ctx context.Context) error // DirectKeys returns all directly pinned cids - DirectKeys(ctx context.Context) <-chan StreamedCid + DirectKeys(ctx context.Context) <-chan StreamedPin // RecursiveKeys returns all recursively pinned cids - RecursiveKeys(ctx context.Context) <-chan StreamedCid + RecursiveKeys(ctx context.Context) <-chan StreamedPin // InternalPins returns all cids kept pinned for the internal state of the // pinner - InternalPins(ctx context.Context) <-chan StreamedCid + InternalPins(ctx context.Context) <-chan StreamedPin } // Pinned represents CID which has been pinned with a pinning strategy. @@ -135,9 +135,10 @@ type Pinner interface { // case that the item is not pinned directly (but rather pinned recursively // by some ascendant). type Pinned struct { - Key cid.Cid - Mode Mode - Via cid.Cid + Key cid.Cid + Mode Mode + Label string + Via cid.Cid } // Pinned returns whether or not the given cid is pinned @@ -158,8 +159,8 @@ func (p Pinned) String() string { } } -// StreamedCid encapsulate a Cid and an error for a function to return a channel of Cids. -type StreamedCid struct { - C cid.Cid +// StreamedPin encapsulate a [Pin] and an error for a function to return a channel of [Pin]s. +type StreamedPin struct { + Pin Pinned Err error } diff --git a/provider/provider.go b/provider/provider.go index 6fb021695..6035fd92d 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -85,7 +85,7 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory logR.Errorf("reprovide direct pins: %s", sc.Err) return } - set.Visitor(ctx)(sc.C) + set.Visitor(ctx)(sc.Pin.Key) } session := fetchConfig.NewSession(ctx) @@ -94,9 +94,9 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory logR.Errorf("reprovide recursive pins: %s", sc.Err) return } - set.Visitor(ctx)(sc.C) + set.Visitor(ctx)(sc.Pin.Key) if !onlyRoots { - err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: sc.C}, func(res fetcher.FetchResult) error { + err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: sc.Pin.Key}, func(res fetcher.FetchResult) error { clink, ok := res.LastBlockLink.(cidlink.Link) if ok { set.Visitor(ctx)(clink.Cid)