diff --git a/venus-devtool/api-gen/example.go b/venus-devtool/api-gen/example.go index a2ba55f919..e5491d3fa3 100644 --- a/venus-devtool/api-gen/example.go +++ b/venus-devtool/api-gen/example.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/venus/venus-shared/types/market" + auuid "github.com/google/uuid" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-graphsync" @@ -262,6 +263,9 @@ func init() { addExample(&clientDataSelector) addExample(client.ImportID(1234)) + + uuidTmp := auuid.MustParse("102334ec-35a3-4b36-be9f-02883844503a") + addExample(&uuidTmp) } func ExampleValue(method string, t, parent reflect.Type) interface{} { diff --git a/venus-devtool/cborgen/main.go b/venus-devtool/cborgen/main.go index c5ab2b204e..eb07e69851 100644 --- a/venus-devtool/cborgen/main.go +++ b/venus-devtool/cborgen/main.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/venus/pkg/state/tree" "github.com/filecoin-project/venus/pkg/vm/dispatch" "github.com/filecoin-project/venus/venus-devtool/util" + "github.com/filecoin-project/venus/venus-shared/blockstore" "github.com/filecoin-project/venus/venus-shared/libp2p/exchange" "github.com/filecoin-project/venus/venus-shared/libp2p/hello" "github.com/filecoin-project/venus/venus-shared/types" @@ -121,6 +122,14 @@ func main() { fvm.FvmGasCharge{}, }, }, + { + dir: "../venus-shared/blockstore", + types: []interface{}{ + blockstore.NetRPCReq{}, + blockstore.NetRPCResp{}, + blockstore.NetRPCErr{}, + }, + }, } for _, target := range targets { diff --git a/venus-shared/api/market/client/method.md b/venus-shared/api/market/client/method.md index 51cd20a553..df16d9c3b5 100644 --- a/venus-shared/api/market/client/method.md +++ b/venus-shared/api/market/client/method.md @@ -928,7 +928,8 @@ Inputs: "Address": "f01234", "ID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "PieceCID": null - } + }, + "RemoteStore": "102334ec-35a3-4b36-be9f-02883844503a" } ] ``` diff --git a/venus-shared/blockstore/cbor_gen.go b/venus-shared/blockstore/cbor_gen.go new file mode 100644 index 0000000000..49997aedc1 --- /dev/null +++ b/venus-shared/blockstore/cbor_gen.go @@ -0,0 +1,441 @@ +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +package blockstore + +import ( + "fmt" + "io" + "math" + "sort" + + cid "github.com/ipfs/go-cid" + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +var _ = xerrors.Errorf +var _ = cid.Undef +var _ = math.E +var _ = sort.Sort + +var lengthBufNetRPCReq = []byte{132} + +func (t *NetRPCReq) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufNetRPCReq); err != nil { + return err + } + + // t.Type (blockstore.NetRPCReqType) (uint8) + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Type)); err != nil { + return err + } + + // t.ID (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.ID)); err != nil { + return err + } + + // t.Cid ([]cid.Cid) (slice) + if len(t.Cid) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.Cid was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Cid))); err != nil { + return err + } + for _, v := range t.Cid { + if err := cbg.WriteCid(w, v); err != nil { + return xerrors.Errorf("failed writing cid field t.Cid: %w", err) + } + } + + // t.Data ([][]uint8) (slice) + if len(t.Data) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.Data was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Data))); err != nil { + return err + } + for _, v := range t.Data { + if len(v) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field v was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(v))); err != nil { + return err + } + + if _, err := cw.Write(v[:]); err != nil { + return err + } + } + return nil +} + +func (t *NetRPCReq) UnmarshalCBOR(r io.Reader) (err error) { + *t = NetRPCReq{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 4 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Type (blockstore.NetRPCReqType) (uint8) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint8 field") + } + if extra > math.MaxUint8 { + return fmt.Errorf("integer in input was too large for uint8 field") + } + t.Type = NetRPCReqType(extra) + // t.ID (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.ID = uint64(extra) + + } + // t.Cid ([]cid.Cid) (slice) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + + if extra > cbg.MaxLength { + return fmt.Errorf("t.Cid: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + + if extra > 0 { + t.Cid = make([]cid.Cid, extra) + } + + for i := 0; i < int(extra); i++ { + + c, err := cbg.ReadCid(cr) + if err != nil { + return xerrors.Errorf("reading cid field t.Cid failed: %w", err) + } + t.Cid[i] = c + } + + // t.Data ([][]uint8) (slice) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + + if extra > cbg.MaxLength { + return fmt.Errorf("t.Data: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + + if extra > 0 { + t.Data = make([][]uint8, extra) + } + + for i := 0; i < int(extra); i++ { + { + var maj byte + var extra uint64 + var err error + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.Data[i]: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + + if extra > 0 { + t.Data[i] = make([]uint8, extra) + } + + if _, err := io.ReadFull(cr, t.Data[i][:]); err != nil { + return err + } + } + } + + return nil +} + +var lengthBufNetRPCResp = []byte{131} + +func (t *NetRPCResp) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufNetRPCResp); err != nil { + return err + } + + // t.Type (blockstore.NetRPCRespType) (uint8) + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Type)); err != nil { + return err + } + + // t.ID (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.ID)); err != nil { + return err + } + + // t.Data ([]uint8) (slice) + if len(t.Data) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.Data was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(t.Data))); err != nil { + return err + } + + if _, err := cw.Write(t.Data[:]); err != nil { + return err + } + return nil +} + +func (t *NetRPCResp) UnmarshalCBOR(r io.Reader) (err error) { + *t = NetRPCResp{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 3 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Type (blockstore.NetRPCRespType) (uint8) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint8 field") + } + if extra > math.MaxUint8 { + return fmt.Errorf("integer in input was too large for uint8 field") + } + t.Type = NetRPCRespType(extra) + // t.ID (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.ID = uint64(extra) + + } + // t.Data ([]uint8) (slice) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.Data: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + + if extra > 0 { + t.Data = make([]uint8, extra) + } + + if _, err := io.ReadFull(cr, t.Data[:]); err != nil { + return err + } + return nil +} + +var lengthBufNetRPCErr = []byte{131} + +func (t *NetRPCErr) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufNetRPCErr); err != nil { + return err + } + + // t.Type (blockstore.NetRPCErrType) (uint8) + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Type)); err != nil { + return err + } + + // t.Msg (string) (string) + if len(t.Msg) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Msg was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Msg))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.Msg)); err != nil { + return err + } + + // t.Cid (cid.Cid) (struct) + + if t.Cid == nil { + if _, err := cw.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(cw, *t.Cid); err != nil { + return xerrors.Errorf("failed to write cid field t.Cid: %w", err) + } + } + + return nil +} + +func (t *NetRPCErr) UnmarshalCBOR(r io.Reader) (err error) { + *t = NetRPCErr{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 3 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Type (blockstore.NetRPCErrType) (uint8) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint8 field") + } + if extra > math.MaxUint8 { + return fmt.Errorf("integer in input was too large for uint8 field") + } + t.Type = NetRPCErrType(extra) + // t.Msg (string) (string) + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.Msg = string(sval) + } + // t.Cid (cid.Cid) (struct) + + { + + b, err := cr.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := cr.UnreadByte(); err != nil { + return err + } + + c, err := cbg.ReadCid(cr) + if err != nil { + return xerrors.Errorf("failed to read cid field t.Cid: %w", err) + } + + t.Cid = &c + } + + } + return nil +} diff --git a/venus-shared/blockstore/net.go b/venus-shared/blockstore/net.go new file mode 100644 index 0000000000..9eee66c3a1 --- /dev/null +++ b/venus-shared/blockstore/net.go @@ -0,0 +1,423 @@ +package blockstore + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "sync" + "sync/atomic" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + "github.com/libp2p/go-msgio" + cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" +) + +type NetRPCReqType byte + +const ( + NRpcHas NetRPCReqType = iota + NRpcGet + NRpcGetSize + NRpcPut + NRpcDelete + + // todo cancel req +) + +type NetRPCRespType byte + +const ( + NRpcOK NetRPCRespType = iota + NRpcErr + NRpcMore +) + +type NetRPCErrType byte + +const ( + NRpcErrGeneric NetRPCErrType = iota + NRpcErrNotFound +) + +type NetRPCReq struct { + Type NetRPCReqType + ID uint64 + + Cid []cid.Cid // todo maxsize? + Data [][]byte // todo maxsize? +} + +type NetRPCResp struct { + Type NetRPCRespType + ID uint64 + + // error or cids in allkeys + Data []byte // todo maxsize? + + next <-chan NetRPCResp +} + +type NetRPCErr struct { + Type NetRPCErrType + + Msg string + + // in case of NRpcErrNotFound + Cid *cid.Cid +} + +type NetworkStore struct { + // note: writer is thread-safe + msgStream msgio.ReadWriteCloser + + // atomic + reqCount uint64 + + respLk sync.Mutex + + // respMap is nil after store closes + respMap map[uint64]chan<- NetRPCResp + + closing chan struct{} + closed chan struct{} + + closeLk sync.Mutex + onClose []func() +} + +func NewNetworkStore(mss msgio.ReadWriteCloser) *NetworkStore { + ns := &NetworkStore{ + msgStream: mss, + + respMap: map[uint64]chan<- NetRPCResp{}, + + closing: make(chan struct{}), + closed: make(chan struct{}), + } + + go ns.receive() + + return ns +} + +func (n *NetworkStore) shutdown(msg string) { + if err := n.msgStream.Close(); err != nil { + log.Errorw("closing netstore msg stream", "error", err) + } + + nerr := NetRPCErr{ + Type: NRpcErrGeneric, + Msg: msg, + Cid: nil, + } + + var errb bytes.Buffer + if err := nerr.MarshalCBOR(&errb); err != nil { + log.Errorw("netstore shutdown: error marshaling error", "err", err) + } + + n.respLk.Lock() + for id, resps := range n.respMap { + resps <- NetRPCResp{ + Type: NRpcErr, + ID: id, + Data: errb.Bytes(), + } + } + + n.respMap = nil + + n.respLk.Unlock() +} + +func (n *NetworkStore) OnClose(cb func()) { + n.closeLk.Lock() + defer n.closeLk.Unlock() + + select { + case <-n.closed: + cb() + default: + n.onClose = append(n.onClose, cb) + } +} + +func (n *NetworkStore) receive() { + defer func() { + n.closeLk.Lock() + defer n.closeLk.Unlock() + + close(n.closed) + if n.onClose != nil { + for _, f := range n.onClose { + f() + } + } + }() + + for { + select { + case <-n.closing: + n.shutdown("netstore stopping") + return + default: + } + + msg, err := n.msgStream.ReadMsg() + if err != nil { + n.shutdown(fmt.Sprintf("netstore ReadMsg: %s", err)) + return + } + + var resp NetRPCResp + if err := resp.UnmarshalCBOR(bytes.NewReader(msg)); err != nil { + n.shutdown(fmt.Sprintf("unmarshaling netstore response: %s", err)) + return + } + + n.msgStream.ReleaseMsg(msg) + + n.respLk.Lock() + if ch, ok := n.respMap[resp.ID]; ok { + if resp.Type == NRpcMore { + nch := make(chan NetRPCResp, 1) + resp.next = nch + n.respMap[resp.ID] = nch + } else { + delete(n.respMap, resp.ID) + } + + ch <- resp + } + n.respLk.Unlock() + } +} + +func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte) (uint64, <-chan NetRPCResp, error) { + rid := atomic.AddUint64(&n.reqCount, 1) + + respCh := make(chan NetRPCResp, 1) // todo pool? + + n.respLk.Lock() + if n.respMap == nil { + n.respLk.Unlock() + return 0, nil, xerrors.Errorf("netstore closed") + } + n.respMap[rid] = respCh + n.respLk.Unlock() + + req := NetRPCReq{ + Type: rt, + ID: rid, + Cid: cids, + Data: data, + } + + var rbuf bytes.Buffer // todo buffer pool + if err := req.MarshalCBOR(&rbuf); err != nil { + n.respLk.Lock() + defer n.respLk.Unlock() + + if n.respMap == nil { + return 0, nil, xerrors.Errorf("netstore closed") + } + delete(n.respMap, rid) + + return 0, nil, err + } + + if err := n.msgStream.WriteMsg(rbuf.Bytes()); err != nil { + n.respLk.Lock() + defer n.respLk.Unlock() + + if n.respMap == nil { + return 0, nil, xerrors.Errorf("netstore closed") + } + delete(n.respMap, rid) + + return 0, nil, err + } + + return rid, respCh, nil +} + +func (n *NetworkStore) waitResp(ctx context.Context, rch <-chan NetRPCResp, rid uint64) (NetRPCResp, error) { + select { + case resp := <-rch: + if resp.Type == NRpcErr { + var e NetRPCErr + if err := e.UnmarshalCBOR(bytes.NewReader(resp.Data)); err != nil { + return NetRPCResp{}, xerrors.Errorf("unmarshaling error data: %w", err) + } + + var err error + switch e.Type { + case NRpcErrNotFound: + if e.Cid != nil { + err = ipld.ErrNotFound{ + Cid: *e.Cid, + } + } else { + err = xerrors.Errorf("block not found, but cid was null") + } + case NRpcErrGeneric: + err = xerrors.Errorf("generic error") + default: + err = xerrors.Errorf("unknown error type") + } + + return NetRPCResp{}, xerrors.Errorf("netstore error response: %s (%w)", e.Msg, err) + } + + return resp, nil + case <-ctx.Done(): + // todo send cancel req + + n.respLk.Lock() + if n.respMap != nil { + delete(n.respMap, rid) + } + n.respLk.Unlock() + + return NetRPCResp{}, ctx.Err() + } +} + +func (n *NetworkStore) Has(ctx context.Context, c cid.Cid) (bool, error) { + req, rch, err := n.sendRpc(NRpcHas, []cid.Cid{c}, nil) + if err != nil { + return false, err + } + + resp, err := n.waitResp(ctx, rch, req) + if err != nil { + return false, err + } + + if len(resp.Data) != 1 { + return false, xerrors.Errorf("expected reposnse length to be 1 byte") + } + switch resp.Data[0] { + case cbg.CborBoolTrue[0]: + return true, nil + case cbg.CborBoolFalse[0]: + return false, nil + default: + return false, xerrors.Errorf("has: bad response: %x", resp.Data[0]) + } +} + +func (n *NetworkStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + req, rch, err := n.sendRpc(NRpcGet, []cid.Cid{c}, nil) + if err != nil { + return nil, err + } + + resp, err := n.waitResp(ctx, rch, req) + if err != nil { + return nil, err + } + + return blocks.NewBlockWithCid(resp.Data, c) +} + +func (n *NetworkStore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error { + req, rch, err := n.sendRpc(NRpcGet, []cid.Cid{c}, nil) + if err != nil { + return err + } + + resp, err := n.waitResp(ctx, rch, req) + if err != nil { + return err + } + + return callback(resp.Data) // todo return buf to pool +} + +func (n *NetworkStore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + req, rch, err := n.sendRpc(NRpcGetSize, []cid.Cid{c}, nil) + if err != nil { + return 0, err + } + + resp, err := n.waitResp(ctx, rch, req) + if err != nil { + return 0, err + } + + if len(resp.Data) != 4 { + return 0, xerrors.Errorf("expected getsize response to be 4 bytes, was %d", resp.Data) + } + + return int(binary.LittleEndian.Uint32(resp.Data)), nil +} + +func (n *NetworkStore) Put(ctx context.Context, block blocks.Block) error { + return n.PutMany(ctx, []blocks.Block{block}) +} + +func (n *NetworkStore) PutMany(ctx context.Context, blocks []blocks.Block) error { + // todo pool + cids := make([]cid.Cid, len(blocks)) + blkDatas := make([][]byte, len(blocks)) + for i, block := range blocks { + cids[i] = block.Cid() + blkDatas[i] = block.RawData() + } + + req, rch, err := n.sendRpc(NRpcPut, cids, blkDatas) + if err != nil { + return err + } + + _, err = n.waitResp(ctx, rch, req) + if err != nil { + return err + } + + return nil +} + +func (n *NetworkStore) DeleteBlock(ctx context.Context, c cid.Cid) error { + return n.DeleteMany(ctx, []cid.Cid{c}) +} + +func (n *NetworkStore) DeleteMany(ctx context.Context, cids []cid.Cid) error { + req, rch, err := n.sendRpc(NRpcDelete, cids, nil) + if err != nil { + return err + } + + _, err = n.waitResp(ctx, rch, req) + if err != nil { + return err + } + + return nil +} + +func (n *NetworkStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return nil, xerrors.Errorf("not supported") +} + +func (n *NetworkStore) HashOnRead(enabled bool) { + // todo +} + +func (n *NetworkStore) Stop(ctx context.Context) error { + close(n.closing) + + select { + case <-n.closed: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +var _ Blockstore = &NetworkStore{} diff --git a/venus-shared/blockstore/net_serve.go b/venus-shared/blockstore/net_serve.go new file mode 100644 index 0000000000..0a2310743f --- /dev/null +++ b/venus-shared/blockstore/net_serve.go @@ -0,0 +1,237 @@ +package blockstore + +import ( + "bytes" + "context" + "encoding/binary" + + block "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + "github.com/libp2p/go-msgio" + cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" +) + +type NetworkStoreHandler struct { + msgStream msgio.ReadWriteCloser + + bs Blockstore +} + +// NOTE: This code isn't yet hardened to accept untrusted input. See TODOs here and in net.go +func HandleNetBstoreStream(ctx context.Context, bs Blockstore, mss msgio.ReadWriteCloser) *NetworkStoreHandler { + ns := &NetworkStoreHandler{ + msgStream: mss, + bs: bs, + } + + go ns.handle(ctx) + + return ns +} + +func (h *NetworkStoreHandler) handle(ctx context.Context) { + defer func() { + if err := h.msgStream.Close(); err != nil { + log.Errorw("error closing blockstore stream", "error", err) + } + }() + + for { + var req NetRPCReq + + ms, err := h.msgStream.ReadMsg() + if err != nil { + log.Warnw("bstore stream err", "error", err) + return + } + + if err := req.UnmarshalCBOR(bytes.NewReader(ms)); err != nil { + return + } + + h.msgStream.ReleaseMsg(ms) + + switch req.Type { + case NRpcHas: + if len(req.Cid) != 1 { + if err := h.respondError(req.ID, xerrors.New("expected request for 1 cid"), cid.Undef); err != nil { + log.Warnw("writing error response", "error", err) + return + } + continue + } + + res, err := h.bs.Has(ctx, req.Cid[0]) + if err != nil { + if err := h.respondError(req.ID, err, req.Cid[0]); err != nil { + log.Warnw("writing error response", "error", err) + return + } + continue + } + + var resData [1]byte + if res { + resData[0] = cbg.CborBoolTrue[0] + } else { + resData[0] = cbg.CborBoolFalse[0] + } + + if err := h.respond(req.ID, NRpcOK, resData[:]); err != nil { + log.Warnw("writing response", "error", err) + return + } + + case NRpcGet: + if len(req.Cid) != 1 { + if err := h.respondError(req.ID, xerrors.New("expected request for 1 cid"), cid.Undef); err != nil { + log.Warnw("writing error response", "error", err) + return + } + continue + } + + err := h.bs.View(ctx, req.Cid[0], func(bdata []byte) error { + return h.respond(req.ID, NRpcOK, bdata) + }) + if err != nil { + if err := h.respondError(req.ID, err, req.Cid[0]); err != nil { + log.Warnw("writing error response", "error", err) + return + } + continue + } + + case NRpcGetSize: + if len(req.Cid) != 1 { + if err := h.respondError(req.ID, xerrors.New("expected request for 1 cid"), cid.Undef); err != nil { + log.Warnw("writing error response", "error", err) + return + } + continue + } + + sz, err := h.bs.GetSize(ctx, req.Cid[0]) + if err != nil { + if err := h.respondError(req.ID, err, req.Cid[0]); err != nil { + log.Warnw("writing error response", "error", err) + return + } + continue + } + + var resData [4]byte + binary.LittleEndian.PutUint32(resData[:], uint32(sz)) + + if err := h.respond(req.ID, NRpcOK, resData[:]); err != nil { + log.Warnw("writing response", "error", err) + return + } + + case NRpcPut: + blocks := make([]block.Block, len(req.Cid)) + + if len(req.Cid) != len(req.Data) { + if err := h.respondError(req.ID, xerrors.New("cid count didn't match data count"), cid.Undef); err != nil { + log.Warnw("writing error response", "error", err) + } + return + } + + for i := range req.Cid { + blocks[i], err = block.NewBlockWithCid(req.Data[i], req.Cid[i]) + if err != nil { + log.Warnw("make block", "error", err) + return + } + } + + err := h.bs.PutMany(ctx, blocks) + if err != nil { + if err := h.respondError(req.ID, err, cid.Undef); err != nil { + log.Warnw("writing error response", "error", err) + return + } + continue + } + + if err := h.respond(req.ID, NRpcOK, []byte{}); err != nil { + log.Warnw("writing response", "error", err) + return + } + case NRpcDelete: + err := h.bs.DeleteMany(ctx, req.Cid) + if err != nil { + if err := h.respondError(req.ID, err, cid.Undef); err != nil { + log.Warnw("writing error response", "error", err) + return + } + continue + } + + if err := h.respond(req.ID, NRpcOK, []byte{}); err != nil { + log.Warnw("writing response", "error", err) + return + } + default: + if err := h.respondError(req.ID, xerrors.New("unsupported request type"), cid.Undef); err != nil { + log.Warnw("writing error response", "error", err) + return + } + continue + } + } +} + +func (h *NetworkStoreHandler) respondError(req uint64, uerr error, c cid.Cid) error { + var resp NetRPCResp + resp.ID = req + resp.Type = NRpcErr + + nerr := NetRPCErr{ + Type: NRpcErrGeneric, + Msg: uerr.Error(), + } + if ipld.IsNotFound(uerr) { + nerr.Type = NRpcErrNotFound + nerr.Cid = &c + } + + var edata bytes.Buffer + if err := nerr.MarshalCBOR(&edata); err != nil { + return xerrors.Errorf("marshaling error data: %w", err) + } + + resp.Data = edata.Bytes() + + var msg bytes.Buffer + if err := resp.MarshalCBOR(&msg); err != nil { + return xerrors.Errorf("marshaling error response: %w", err) + } + + if err := h.msgStream.WriteMsg(msg.Bytes()); err != nil { + return xerrors.Errorf("write error response: %w", err) + } + + return nil +} + +func (h *NetworkStoreHandler) respond(req uint64, rt NetRPCRespType, data []byte) error { + var resp NetRPCResp + resp.ID = req + resp.Type = rt + resp.Data = data + + var msg bytes.Buffer + if err := resp.MarshalCBOR(&msg); err != nil { + return xerrors.Errorf("marshaling response: %w", err) + } + + if err := h.msgStream.WriteMsg(msg.Bytes()); err != nil { + return xerrors.Errorf("write response: %w", err) + } + + return nil +} diff --git a/venus-shared/blockstore/net_test.go b/venus-shared/blockstore/net_test.go new file mode 100644 index 0000000000..9b91514631 --- /dev/null +++ b/venus-shared/blockstore/net_test.go @@ -0,0 +1,63 @@ +package blockstore + +import ( + "context" + "io" + "testing" + + tf "github.com/filecoin-project/venus/pkg/testhelpers/testflags" + block "github.com/ipfs/go-block-format" + ipld "github.com/ipfs/go-ipld-format" + "github.com/libp2p/go-msgio" + "github.com/stretchr/testify/require" +) + +func TestNetBstore(t *testing.T) { + tf.UnitTest(t) + ctx := context.Background() + + cr, sw := io.Pipe() + sr, cw := io.Pipe() + + cm := msgio.Combine(msgio.NewWriter(cw), msgio.NewReader(cr)) + sm := msgio.Combine(msgio.NewWriter(sw), msgio.NewReader(sr)) + + bbs := NewTemporarySync() + _ = HandleNetBstoreStream(ctx, bbs, sm) + + nbs := NewNetworkStore(cm) + + tb1 := block.NewBlock([]byte("aoeu")) + + h, err := nbs.Has(ctx, tb1.Cid()) + require.NoError(t, err) + require.False(t, h) + + err = nbs.Put(ctx, tb1) + require.NoError(t, err) + + h, err = nbs.Has(ctx, tb1.Cid()) + require.NoError(t, err) + require.True(t, h) + + sz, err := nbs.GetSize(ctx, tb1.Cid()) + require.NoError(t, err) + require.Equal(t, 4, sz) + + err = nbs.DeleteBlock(ctx, tb1.Cid()) + require.NoError(t, err) + + h, err = nbs.Has(ctx, tb1.Cid()) + require.NoError(t, err) + require.False(t, h) + + _, err = nbs.Get(ctx, tb1.Cid()) + require.True(t, ipld.IsNotFound(err)) + + err = nbs.Put(ctx, tb1) + require.NoError(t, err) + + b, err := nbs.Get(ctx, tb1.Cid()) + require.NoError(t, err) + require.Equal(t, "aoeu", string(b.RawData())) +} diff --git a/venus-shared/blockstore/net_ws.go b/venus-shared/blockstore/net_ws.go new file mode 100644 index 0000000000..5c9a70d843 --- /dev/null +++ b/venus-shared/blockstore/net_ws.go @@ -0,0 +1,100 @@ +package blockstore + +import ( + "bytes" + "context" + + "github.com/gorilla/websocket" + "github.com/libp2p/go-msgio" + "golang.org/x/xerrors" +) + +type wsWrapper struct { + wc *websocket.Conn + + nextMsg []byte +} + +func (w *wsWrapper) Read(b []byte) (int, error) { + return 0, xerrors.New("read unsupported") +} + +func (w *wsWrapper) ReadMsg() ([]byte, error) { + if w.nextMsg != nil { + nm := w.nextMsg + w.nextMsg = nil + return nm, nil + } + + mt, r, err := w.wc.NextReader() + if err != nil { + return nil, err + } + + switch mt { + case websocket.BinaryMessage, websocket.TextMessage: + default: + return nil, xerrors.Errorf("unexpected message type") + } + + // todo pool + // todo limit sizes + var mbuf bytes.Buffer + if _, err := mbuf.ReadFrom(r); err != nil { + return nil, err + } + + return mbuf.Bytes(), nil +} + +func (w *wsWrapper) ReleaseMsg(bytes []byte) { + // todo use a pool +} + +func (w *wsWrapper) NextMsgLen() (int, error) { + if w.nextMsg != nil { + return len(w.nextMsg), nil + } + + mt, msg, err := w.wc.ReadMessage() + if err != nil { + return 0, err + } + + switch mt { + case websocket.BinaryMessage, websocket.TextMessage: + default: + return 0, xerrors.Errorf("unexpected message type") + } + + w.nextMsg = msg + return len(w.nextMsg), nil +} + +func (w *wsWrapper) Write(bytes []byte) (int, error) { + return 0, xerrors.New("write unsupported") +} + +func (w *wsWrapper) WriteMsg(bytes []byte) error { + return w.wc.WriteMessage(websocket.BinaryMessage, bytes) +} + +func (w *wsWrapper) Close() error { + return w.wc.Close() +} + +var _ msgio.ReadWriteCloser = &wsWrapper{} + +func wsConnToMio(wc *websocket.Conn) msgio.ReadWriteCloser { + return &wsWrapper{ + wc: wc, + } +} + +func HandleNetBstoreWS(ctx context.Context, bs Blockstore, wc *websocket.Conn) *NetworkStoreHandler { + return HandleNetBstoreStream(ctx, bs, wsConnToMio(wc)) +} + +func NewNetworkStoreWS(wc *websocket.Conn) *NetworkStore { + return NewNetworkStore(wsConnToMio(wc)) +} diff --git a/venus-shared/types/market/client/retrieval_order.go b/venus-shared/types/market/client/retrieval_order.go index d447e444e0..b526e751f8 100644 --- a/venus-shared/types/market/client/retrieval_order.go +++ b/venus-shared/types/market/client/retrieval_order.go @@ -3,6 +3,7 @@ package client import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/filecoin-project/venus/venus-shared/types" @@ -23,8 +24,12 @@ type RetrievalOrder struct { Client address.Address Miner address.Address MinerPeer *retrievalmarket.RetrievalPeer + + RemoteStore *RemoteStoreID `json:"RemoteStore,omitempty"` } +type RemoteStoreID = uuid.UUID + type RestrievalRes struct { DealID retrievalmarket.DealID }