Skip to content

Commit

Permalink
Merge pull request #4575 from ipfs/feat/coreapi/pin
Browse files Browse the repository at this point in the history
coreapi: Pin API
  • Loading branch information
whyrusleeping authored Feb 5, 2018
2 parents 52301ce + 242c98f commit 56509a7
Show file tree
Hide file tree
Showing 5 changed files with 557 additions and 0 deletions.
4 changes: 4 additions & 0 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (api *CoreAPI) Object() coreiface.ObjectAPI {
return &ObjectAPI{api, nil}
}

func (api *CoreAPI) Pin() coreiface.PinAPI {
return &PinAPI{api, nil}
}

// ResolveNode resolves the path `p` using Unixfx resolver, gets and returns the
// resolved Node.
func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (coreiface.Node, error) {
Expand Down
63 changes: 63 additions & 0 deletions core/coreapi/interface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,33 @@ type BlockStat interface {
Path() Path
}

// Pin holds information about pinned resource
type Pin interface {
// Path to the pinned object
Path() Path

// Type of the pin
Type() string
}

// PinStatus holds information about pin health
type PinStatus interface {
// Ok indicates whether the pin has been verified to be correct
Ok() bool

// BadNodes returns any bad (usually missing) nodes from the pin
BadNodes() []BadPinNode
}

// BadPinNode is a node that has been marked as bad by Pin.Verify
type BadPinNode interface {
// Path is the path of the node
Path() Path

// Err is the reason why the node has been marked as bad
Err() error
}

// CoreAPI defines an unified interface to IPFS for Go programs.
type CoreAPI interface {
// Unixfs returns an implementation of Unixfs API.
Expand All @@ -74,6 +101,7 @@ type CoreAPI interface {

// Key returns an implementation of Key API.
Key() KeyAPI
Pin() PinAPI

// ObjectAPI returns an implementation of Object API
Object() ObjectAPI
Expand Down Expand Up @@ -322,5 +350,40 @@ type ObjectStat struct {
CumulativeSize int
}

// PinAPI specifies the interface to pining
type PinAPI interface {
// Add creates new pin, be default recursive - pinning the whole referenced
// tree
Add(context.Context, Path, ...options.PinAddOption) error

// WithRecursive is an option for Add which specifies whether to pin an entire
// object tree or just one object. Default: true
WithRecursive(bool) options.PinAddOption

// Ls returns list of pinned objects on this node
Ls(context.Context, ...options.PinLsOption) ([]Pin, error)

// WithType is an option for Ls which allows to specify which pin types should
// be returned
//
// Supported values:
// * "direct" - directly pinned objects
// * "recursive" - roots of recursive pins
// * "indirect" - indirectly pinned objects (referenced by recursively pinned
// objects)
// * "all" - all pinned objects (default)
WithType(string) options.PinLsOption

// Rm removes pin for object specified by the path
Rm(context.Context, Path) error

// Update changes one pin to another, skipping checks for matching paths in
// the old tree
Update(ctx context.Context, from Path, to Path, opts ...options.PinUpdateOption) error

// Verify verifies the integrity of pinned objects
Verify(context.Context) (<-chan PinStatus, error)
}

var ErrIsDir = errors.New("object is a directory")
var ErrOffline = errors.New("can't resolve, ipfs node is offline")
85 changes: 85 additions & 0 deletions core/coreapi/interface/options/pin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package options

type PinAddSettings struct {
Recursive bool
}

type PinLsSettings struct {
Type string
}

type PinUpdateSettings struct {
Unpin bool
}

type PinAddOption func(*PinAddSettings) error
type PinLsOption func(settings *PinLsSettings) error
type PinUpdateOption func(*PinUpdateSettings) error

func PinAddOptions(opts ...PinAddOption) (*PinAddSettings, error) {
options := &PinAddSettings{
Recursive: true,
}

for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}

return options, nil
}

func PinLsOptions(opts ...PinLsOption) (*PinLsSettings, error) {
options := &PinLsSettings{
Type: "all",
}

for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}

return options, nil
}

func PinUpdateOptions(opts ...PinUpdateOption) (*PinUpdateSettings, error) {
options := &PinUpdateSettings{
Unpin: true,
}

for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}

return options, nil
}

type PinOptions struct{}

func (api *PinOptions) WithRecursive(recucsive bool) PinAddOption {
return func(settings *PinAddSettings) error {
settings.Recursive = recucsive
return nil
}
}

func (api *PinOptions) WithType(t string) PinLsOption {
return func(settings *PinLsSettings) error {
settings.Type = t
return nil
}
}

func (api *PinOptions) WithUnpin(unpin bool) PinUpdateOption {
return func(settings *PinUpdateSettings) error {
settings.Unpin = unpin
return nil
}
}
196 changes: 196 additions & 0 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package coreapi

import (
"context"
"fmt"

bserv "github.com/ipfs/go-ipfs/blockservice"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
offline "github.com/ipfs/go-ipfs/exchange/offline"
merkledag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"

cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

type PinAPI struct {
*CoreAPI
*caopts.PinOptions
}

func (api *PinAPI) Add(ctx context.Context, p coreiface.Path, opts ...caopts.PinAddOption) error {
settings, err := caopts.PinAddOptions(opts...)
if err != nil {
return err
}

defer api.node.Blockstore.PinLock().Unlock()

_, err = corerepo.Pin(api.node, ctx, []string{p.String()}, settings.Recursive)
if err != nil {
return err
}

return nil
}

func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreiface.Pin, error) {
settings, err := caopts.PinLsOptions(opts...)
if err != nil {
return nil, err
}

switch settings.Type {
case "all", "direct", "indirect", "recursive":
default:
return nil, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type)
}

return pinLsAll(settings.Type, ctx, api.node.Pinning, api.node.DAG)
}

func (api *PinAPI) Rm(ctx context.Context, p coreiface.Path) error {
_, err := corerepo.Unpin(api.node, ctx, []string{p.String()}, true)
if err != nil {
return err
}

return nil
}

func (api *PinAPI) Update(ctx context.Context, from coreiface.Path, to coreiface.Path, opts ...caopts.PinUpdateOption) error {
settings, err := caopts.PinUpdateOptions(opts...)
if err != nil {
return err
}

return api.node.Pinning.Update(ctx, from.Cid(), to.Cid(), settings.Unpin)
}

type pinStatus struct {
cid *cid.Cid
ok bool
badNodes []coreiface.BadPinNode
}

// BadNode is used in PinVerifyRes
type badNode struct {
cid *cid.Cid
err error
}

func (s *pinStatus) Ok() bool {
return s.ok
}

func (s *pinStatus) BadNodes() []coreiface.BadPinNode {
return s.badNodes
}

func (n *badNode) Path() coreiface.Path {
return ParseCid(n.cid)
}

func (n *badNode) Err() error {
return n.err
}

func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, error) {
visited := make(map[string]*pinStatus)
bs := api.node.Blocks.Blockstore()
DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := merkledag.GetLinksWithDAG(DAG)
recPins := api.node.Pinning.RecursiveKeys()

var checkPin func(root *cid.Cid) *pinStatus
checkPin = func(root *cid.Cid) *pinStatus {
key := root.String()
if status, ok := visited[key]; ok {
return status
}

links, err := getLinks(ctx, root)
if err != nil {
status := &pinStatus{ok: false, cid: root}
status.badNodes = []coreiface.BadPinNode{&badNode{cid: root, err: err}}
visited[key] = status
return status
}

status := &pinStatus{ok: true, cid: root}
for _, lnk := range links {
res := checkPin(lnk.Cid)
if !res.ok {
status.ok = false
status.badNodes = append(status.badNodes, res.badNodes...)
}
}

visited[key] = status
return status
}

out := make(chan coreiface.PinStatus)
go func() {
defer close(out)
for _, c := range recPins {
out <- checkPin(c)
}
}()

return out, nil
}

type pinInfo struct {
pinType string
object *cid.Cid
}

func (p *pinInfo) Path() coreiface.Path {
return ParseCid(p.object)
}

func (p *pinInfo) Type() string {
return p.pinType
}

func pinLsAll(typeStr string, ctx context.Context, pinning pin.Pinner, dag ipld.DAGService) ([]coreiface.Pin, error) {

keys := make(map[string]*pinInfo)

AddToResultKeys := func(keyList []*cid.Cid, typeStr string) {
for _, c := range keyList {
keys[c.String()] = &pinInfo{
pinType: typeStr,
object: c,
}
}
}

if typeStr == "direct" || typeStr == "all" {
AddToResultKeys(pinning.DirectKeys(), "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range pinning.RecursiveKeys() {
err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), k, set.Visit)
if err != nil {
return nil, err
}
}
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(pinning.RecursiveKeys(), "recursive")
}

out := make([]coreiface.Pin, 0, len(keys))
for _, v := range keys {
out = append(out, v)
}

return out, nil
}
Loading

0 comments on commit 56509a7

Please sign in to comment.