Skip to content

Commit

Permalink
feat(pinning): add support for labels
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Dec 15, 2023
1 parent 483bc39 commit 5443589
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 63 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes:

### Added

* 🛠 `pinning/pinner`: you can now give a custom label when pinning a CID. To reflect this, the `Pinner` has been adjusted.

### Changed

### Removed
Expand Down
51 changes: 32 additions & 19 deletions pinning/pinner/dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,20 @@ func (p *pinner) SetAutosync(auto bool) bool {
}

// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool, label string) error {
err := p.dserv.Add(ctx, node)
if err != nil {
return err
}

if recurse {
return p.doPinRecursive(ctx, node.Cid(), true)
return p.doPinRecursive(ctx, node.Cid(), true, label)
} else {
return p.doPinDirect(ctx, node.Cid())
return p.doPinDirect(ctx, node.Cid(), label)
}
}

func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool) error {
func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool, label string) error {
cidKey := c.KeyString()

p.lock.Lock()
Expand Down Expand Up @@ -243,14 +243,14 @@ func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool) erro
}
}

_, err = p.addPin(ctx, c, ipfspinner.Recursive, "")
_, err = p.addPin(ctx, c, ipfspinner.Recursive, label)
if err != nil {
return err
}
return p.flushPins(ctx, false)
}

func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid) error {
func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid, label string) error {
cidKey := c.KeyString()

p.lock.Lock()
Expand All @@ -264,7 +264,7 @@ func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid) error {
return fmt.Errorf("%s already pinned recursively", c.String())
}

_, err = p.addPin(ctx, c, ipfspinner.Direct, "")
_, err = p.addPin(ctx, c, ipfspinner.Direct, label)
if err != nil {
return err
}
Expand Down Expand Up @@ -665,17 +665,17 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) {
}

// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedCid {
func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedPin {
return p.streamIndex(ctx, p.cidDIndex)
}

// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedCid {
func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedPin {
return p.streamIndex(ctx, p.cidRIndex)
}

func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedCid {
out := make(chan ipfspinner.StreamedCid)
func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedPin {
out := make(chan ipfspinner.StreamedPin)

go func() {
defer close(out)
Expand All @@ -688,21 +688,32 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan
err := index.ForEach(ctx, "", func(key, value string) bool {
c, err := cid.Cast([]byte(key))
if err != nil {
out <- ipfspinner.StreamedCid{Err: err}
out <- ipfspinner.StreamedPin{Err: err}

Check warning on line 691 in pinning/pinner/dspinner/pin.go

View check run for this annotation

Codecov / codecov/patch

pinning/pinner/dspinner/pin.go#L691

Added line #L691 was not covered by tests
return false
}

pp, err := p.loadPin(ctx, value)
if err != nil {
out <- ipfspinner.StreamedPin{Err: err}
return false
}

Check warning on line 699 in pinning/pinner/dspinner/pin.go

View check run for this annotation

Codecov / codecov/patch

pinning/pinner/dspinner/pin.go#L697-L699

Added lines #L697 - L699 were not covered by tests

if !cidSet.Has(c) {
select {
case <-ctx.Done():
return false
case out <- ipfspinner.StreamedCid{C: c}:
case out <- ipfspinner.StreamedPin{Pin: ipfspinner.Pinned{
Key: pp.Cid,
Mode: pp.Mode,
Label: pp.Name,
}}:
}
cidSet.Add(c)
}
return true
})
if err != nil {
out <- ipfspinner.StreamedCid{Err: err}
out <- ipfspinner.StreamedPin{Err: err}

Check warning on line 716 in pinning/pinner/dspinner/pin.go

View check run for this annotation

Codecov / codecov/patch

pinning/pinner/dspinner/pin.go#L716

Added line #L716 was not covered by tests
}
}()

Expand All @@ -711,8 +722,8 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedCid {
c := make(chan ipfspinner.StreamedCid)
func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedPin {
c := make(chan ipfspinner.StreamedPin)
close(c)
return c
}
Expand Down Expand Up @@ -756,6 +767,8 @@ func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error
return err
}

// TODO: get old pin label

_, err = p.addPin(ctx, to, ipfspinner.Recursive, "")
if err != nil {
return err
Expand Down Expand Up @@ -809,13 +822,13 @@ func (p *pinner) Flush(ctx context.Context) error {

// PinWithMode allows the user to have fine grained control over pin
// counts
func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) error {
func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, label string) error {
// TODO: remove his to support multiple pins per CID
switch mode {
case ipfspinner.Recursive:
return p.doPinRecursive(ctx, c, false)
return p.doPinRecursive(ctx, c, false, label)
case ipfspinner.Direct:
return p.doPinDirect(ctx, c)
return p.doPinDirect(ctx, c, label)

Check warning on line 831 in pinning/pinner/dspinner/pin.go

View check run for this annotation

Codecov / codecov/patch

pinning/pinner/dspinner/pin.go#L831

Added line #L831 was not covered by tests
default:
return errors.New("unrecognized pin mode")
}
Expand Down
64 changes: 34 additions & 30 deletions pinning/pinner/dspinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestPinnerBasic(t *testing.T) {
}

// Pin A{}
err = p.Pin(ctx, a, false)
err = p.Pin(ctx, a, false, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestPinnerBasic(t *testing.T) {
bk := b.Cid()

// recursively pin B{A,C}
err = p.Pin(ctx, b, true)
err = p.Pin(ctx, b, true, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -191,34 +191,38 @@ func TestPinnerBasic(t *testing.T) {
}

// Add D{A,C,E}
err = p.Pin(ctx, d, true)
label := "My Label"
err = p.Pin(ctx, d, true, label)
if err != nil {
t.Fatal(err)
}

dk := d.Cid()
assertPinned(t, p, dk, "pinned node not found.")

allCids := func(ch <-chan ipfspin.StreamedCid) (cids []cid.Cid) {
allPins := func(ch <-chan ipfspin.StreamedPin) (pins []ipfspin.Pinned) {
for val := range ch {
if val.Err != nil {
t.Fatal(val.Err)
}
cids = append(cids, val.C)
pins = append(pins, val.Pin)
}
return cids
return pins
}

cids := allCids(p.RecursiveKeys(ctx))
if len(cids) != 2 {
pins := allPins(p.RecursiveKeys(ctx))
if len(pins) != 2 {
t.Error("expected 2 recursive pins")
}
if !(bk == cids[0] || bk == cids[1]) {
if !(bk == pins[0].Key || bk == pins[1].Key) {
t.Error("expected recursive pin of B")
}
if !(dk == cids[0] || dk == cids[1]) {
if !(dk == pins[0].Key || dk == pins[1].Key) {
t.Error("expected recursive pin of D")
}
if !(label == pins[0].Label || label == pins[1].Label) {
t.Error("expected pin with label")
}

pinned, err := p.CheckIfPinned(ctx, ak, bk, ck, dk)
if err != nil {
Expand Down Expand Up @@ -251,16 +255,16 @@ func TestPinnerBasic(t *testing.T) {
}
}

cids = allCids(p.DirectKeys(ctx))
if len(cids) != 1 {
pins = allPins(p.DirectKeys(ctx))
if len(pins) != 1 {
t.Error("expected 1 direct pin")
}
if cids[0] != ak {
if pins[0].Key != ak {
t.Error("wrong direct pin")
}

cids = allCids(p.InternalPins(ctx))
if len(cids) != 0 {
pins = allPins(p.InternalPins(ctx))
if len(pins) != 0 {
t.Error("should not have internal keys")
}

Expand Down Expand Up @@ -323,7 +327,7 @@ func TestPinnerBasic(t *testing.T) {
fakeLog := &fakeLogger{}
fakeLog.StandardLogger = log
log = fakeLog
err = p.Pin(ctx, a, true)
err = p.Pin(ctx, a, true, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -457,19 +461,19 @@ func TestDuplicateSemantics(t *testing.T) {
}

// pin is recursively
err = p.Pin(ctx, a, true)
err = p.Pin(ctx, a, true, "")
if err != nil {
t.Fatal(err)
}

// pinning directly should fail
err = p.Pin(ctx, a, false)
err = p.Pin(ctx, a, false, "")
if err == nil {
t.Fatal("expected direct pin to fail")
}

// pinning recursively again should succeed
err = p.Pin(ctx, a, true)
err = p.Pin(ctx, a, true, "")
if err != nil {
t.Fatal(err)
}
Expand All @@ -489,7 +493,7 @@ func TestFlush(t *testing.T) {
}
_, k := randNode()

p.PinWithMode(ctx, k, ipfspin.Recursive)
p.PinWithMode(ctx, k, ipfspin.Recursive, "")
if err = p.Flush(ctx); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -520,7 +524,7 @@ func TestPinRecursiveFail(t *testing.T) {
mctx, cancel := context.WithTimeout(ctx, time.Millisecond)
defer cancel()

err = p.Pin(mctx, a, true)
err = p.Pin(mctx, a, true, "")
if err == nil {
t.Fatal("should have failed to pin here")
}
Expand All @@ -538,7 +542,7 @@ func TestPinRecursiveFail(t *testing.T) {
// this one is time based... but shouldnt cause any issues
mctx, cancel = context.WithTimeout(ctx, time.Second)
defer cancel()
err = p.Pin(mctx, a, true)
err = p.Pin(mctx, a, true, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -568,7 +572,7 @@ func TestPinUpdate(t *testing.T) {
t.Fatal(err)
}

if err = p.Pin(ctx, n1, true); err != nil {
if err = p.Pin(ctx, n1, true, ""); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -646,7 +650,7 @@ func TestLoadDirty(t *testing.T) {

_, bk := randNode()

err = p.Pin(ctx, a, true)
err = p.Pin(ctx, a, true, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -787,7 +791,7 @@ func makeTree(ctx context.Context, aBranchLen int, dserv ipld.DAGService, p ipfs
}

// Pin last A recursively
if err = p.Pin(ctx, aNodes[aBranchLen-1], true); err != nil {
if err = p.Pin(ctx, aNodes[aBranchLen-1], true, ""); err != nil {
return
}

Expand Down Expand Up @@ -820,12 +824,12 @@ func makeTree(ctx context.Context, aBranchLen int, dserv ipld.DAGService, p ipfs
bk = b.Cid()

// Pin C recursively
if err = p.Pin(ctx, c, true); err != nil {
if err = p.Pin(ctx, c, true, ""); err != nil {
return
}

// Pin B recursively
if err = p.Pin(ctx, b, true); err != nil {
if err = p.Pin(ctx, b, true, ""); err != nil {
return
}

Expand Down Expand Up @@ -857,7 +861,7 @@ func pinNodes(nodes []ipld.Node, p ipfspin.Pinner, recursive bool) {
var err error

for i := range nodes {
err = p.Pin(ctx, nodes[i], recursive)
err = p.Pin(ctx, nodes[i], recursive, "")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -975,7 +979,7 @@ func benchmarkNthPin(b *testing.B, count int, pinner ipfspin.Pinner, dserv ipld.
which := count - 1
for i := 0; i < b.N; i++ {
// Pin the Nth node and Flush
err := pinner.Pin(ctx, nodes[which], true)
err := pinner.Pin(ctx, nodes[which], true, "")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -1021,7 +1025,7 @@ func benchmarkNPins(b *testing.B, count int, pinner ipfspin.Pinner, dserv ipld.D
for i := 0; i < b.N; i++ {
// Pin all the nodes one at a time.
for j := range nodes {
err := pinner.Pin(ctx, nodes[j], true)
err := pinner.Pin(ctx, nodes[j], true, "")
if err != nil {
panic(err)
}
Expand Down
Loading

0 comments on commit 5443589

Please sign in to comment.