Skip to content

Commit

Permalink
tiered put/get implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Aug 10, 2014
1 parent 9f76043 commit 67ddab1
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 97 deletions.
6 changes: 3 additions & 3 deletions peer/peer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package peer

import (
"time"
"sync"
"time"

b58 "github.com/jbenet/go-base58"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
mh "github.com/jbenet/go-multihash"
b58 "github.com/jbenet/go-base58"

"bytes"
)
Expand All @@ -33,7 +33,7 @@ type Peer struct {
ID ID
Addresses []*ma.Multiaddr

latency time.Duration
latency time.Duration
latenLock sync.RWMutex
}

Expand Down
21 changes: 21 additions & 0 deletions routing/dht/DHTMessage.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package dht

import (
peer "github.com/jbenet/go-ipfs/peer"
)

// A helper struct to make working with protbuf types easier
type DHTMessage struct {
Type PBDHTMessage_MessageType
Expand All @@ -8,6 +12,20 @@ type DHTMessage struct {
Response bool
Id uint64
Success bool
Peers []*peer.Peer
}

func peerInfo(p *peer.Peer) *PBDHTMessage_PBPeer {
pbp := new(PBDHTMessage_PBPeer)
addr, err := p.Addresses[0].String()
if err != nil {
//Temp: what situations could cause this?
panic(err)
}
pbp.Addr = &addr
pid := string(p.ID)
pbp.Id = &pid
return pbp
}

func (m *DHTMessage) ToProtobuf() *PBDHTMessage {
Expand All @@ -21,6 +39,9 @@ func (m *DHTMessage) ToProtobuf() *PBDHTMessage {
pmes.Response = &m.Response
pmes.Id = &m.Id
pmes.Success = &m.Success
for _, p := range m.Peers {
pmes.Peers = append(pmes.Peers, peerInfo(p))
}

return pmes
}
137 changes: 113 additions & 24 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"time"

peer "github.com/jbenet/go-ipfs/peer"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
kb "github.com/jbenet/go-ipfs/routing/kbucket"

ma "github.com/jbenet/go-multiaddr"

Expand All @@ -37,7 +37,6 @@ type IpfsDHT struct {
datastore ds.Datastore

// Map keys to peers that can provide their value
// TODO: implement a TTL on each of these keys
providers map[u.Key][]*providerInfo
providerLock sync.RWMutex

Expand Down Expand Up @@ -67,7 +66,7 @@ type listenInfo struct {
eol time.Time
}

// Create a new DHT object with the given peer as the 'local' host
// NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
if p == nil {
return nil, errors.New("nil peer passed to NewDHT()")
Expand Down Expand Up @@ -111,7 +110,7 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
// NOTE: this should be done better...
err = dht.Ping(npeer, time.Second*2)
if err != nil {
return nil, errors.New("Failed to ping newly connected peer.")
return nil, errors.New("failed to ping newly connected peer")
}

return npeer, nil
Expand Down Expand Up @@ -227,7 +226,7 @@ func (dht *IpfsDHT) cleanExpiredListeners() {
dht.listenLock.Unlock()
}

func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error {
func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error {
pmes := DHTMessage{
Type: PBDHTMessage_PUT_VALUE,
Key: key,
Expand All @@ -242,26 +241,32 @@ func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error

func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
var resp *DHTMessage
i_val, err := dht.datastore.Get(dskey)
resp := &DHTMessage{
Response: true,
Id: pmes.GetId(),
Key: pmes.GetKey(),
}
iVal, err := dht.datastore.Get(dskey)
if err == nil {
resp = &DHTMessage{
Response: true,
Id: *pmes.Id,
Key: *pmes.Key,
Value: i_val.([]byte),
Success: true,
}
resp.Success = true
resp.Value = iVal.([]byte)
} else if err == ds.ErrNotFound {
// Find closest peer(s) to desired key and reply with that info
closer := dht.routes[0].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
resp = &DHTMessage{
Response: true,
Id: *pmes.Id,
Key: *pmes.Key,
Value: closer.ID,
Success: false,
// Check if we know any providers for the requested value
provs, ok := dht.providers[u.Key(pmes.GetKey())]
if ok && len(provs) > 0 {
for _, prov := range provs {
resp.Peers = append(resp.Peers, prov.Value)
}
resp.Success = true
} else {
// No providers?
// Find closest peer(s) to desired key and reply with that info
closer := dht.routes[0].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
resp.Peers = []*peer.Peer{closer}
}
} else {
//temp: what other errors can a datastore throw?
panic(err)

This comment has been minimized.

Copy link
@jbenet

jbenet Aug 12, 2014

Member

this should probably just return that error as to the client.

This comment has been minimized.

Copy link
@whyrusleeping

whyrusleeping Aug 12, 2014

Author Member

So send it on to dht.network.Chan.Errors?

}

mes := swarm.NewMessage(p, resp.ToProtobuf())
Expand Down Expand Up @@ -397,6 +402,7 @@ func (dht *IpfsDHT) Unlisten(mesid uint64) {
close(list.resp)
}

// Check whether or not the dht is currently listening for mesid
func (dht *IpfsDHT) IsListening(mesid uint64) bool {
dht.listenLock.RLock()
li, ok := dht.listeners[mesid]
Expand Down Expand Up @@ -424,6 +430,7 @@ func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
dht.providerLock.Unlock()
}

// NOTE: not yet finished, low priority
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
dht.diaglock.Lock()
if dht.IsListening(pmes.GetId()) {
Expand All @@ -434,7 +441,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
dht.diaglock.Unlock()

seq := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second*30)
listenChan := dht.ListenFor(pmes.GetId(), len(seq), time.Second*30)

for _, ps := range seq {
mes := swarm.NewMessage(ps, pmes)
Expand All @@ -453,7 +460,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
case <-after:
//Timeout, return what we have
goto out
case req_resp := <-listen_chan:
case req_resp := <-listenChan:
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(req_resp.Data, pmes_out)
if err != nil {
Expand All @@ -477,6 +484,77 @@ out:
dht.network.Chan.Outgoing <- mes
}

func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration) ([]byte, error) {
pmes := DHTMessage{
Type: PBDHTMessage_GET_VALUE,
Key: string(key),
Id: GenerateMessageID(),
}
response_chan := dht.ListenFor(pmes.Id, 1, time.Minute)

mes := swarm.NewMessage(p, pmes.ToProtobuf())
dht.network.Chan.Outgoing <- mes

// Wait for either the response or a timeout
timeup := time.After(timeout)
select {
case <-timeup:
dht.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp, ok := <-response_chan:
if !ok {
u.PErr("response channel closed before timeout, please investigate.")
return nil, u.ErrTimeout
}
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil, err
}
// TODO: debate moving this logic out of this function to be handled by the caller
if pmes_out.GetSuccess() {
if pmes_out.Value == nil {
// We were given provider[s]
return dht.getFromProviderList(key, timeout, pmes_out.GetPeers())
}
// We were given the value
return pmes_out.GetValue(), nil
} else {
return pmes_out.GetValue(), u.ErrSearchIncomplete
}
}
}

// TODO: Im not certain on this implementation, we get a list of providers from someone
// what do we do with it? Connect to each of them? randomly pick one to get the value from?
// Or just connect to one at a time until we get a successful connection and request the
// value from it?
func (dht *IpfsDHT) getFromProviderList(key u.Key, timeout time.Duration, provlist []*PBDHTMessage_PBPeer) ([]byte, error) {
for _, prov := range provlist {
prov_p, _ := dht.Find(peer.ID(prov.GetId()))
if prov_p == nil {
maddr, err := ma.NewMultiaddr(prov.GetAddr())
if err != nil {
u.PErr("getValue error: %s", err)
continue
}
prov_p, err = dht.Connect(maddr)
if err != nil {
u.PErr("getValue error: %s", err)
continue
}
}
data, err := dht.getValueSingle(prov_p, key, timeout)
if err != nil {
u.DErr("getFromProvs error: %s", err)
continue
}

return data, nil
}
return nil, u.ErrNotFound
}

func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) {
v, err := dht.datastore.Get(ds.NewKey(string(key)))
if err != nil {
Expand All @@ -495,3 +573,14 @@ func (dht *IpfsDHT) Update(p *peer.Peer) {
dht.network.Drop(removed)
}
}

// Look for a peer with a given ID connected to this dht
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
for _, table := range dht.routes {
p := table.Find(id)
if p != nil {
return p, table
}
}
return nil, nil
}
84 changes: 80 additions & 4 deletions routing/dht/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,21 @@ func TestValueGetSet(t *testing.T) {
dht_a.Start()
dht_b.Start()

go func() {
select {
case err := <-dht_a.network.Chan.Errors:
t.Fatal(err)
case err := <-dht_b.network.Chan.Errors:
t.Fatal(err)
}
}()

_, err = dht_a.Connect(addr_b)
if err != nil {
t.Fatal(err)
}

err = dht_a.PutValue("hello", []byte("world"))
if err != nil {
t.Fatal(err)
}
dht_a.PutValue("hello", []byte("world"))

val, err := dht_a.GetValue("hello", time.Second*2)
if err != nil {
Expand Down Expand Up @@ -179,3 +185,73 @@ func TestProvides(t *testing.T) {
dhts[i].Halt()
}
}

func TestLayeredGet(t *testing.T) {
u.Debug = false
var addrs []*ma.Multiaddr
for i := 0; i < 4; i++ {
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i))
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, a)
}

var peers []*peer.Peer
for i := 0; i < 4; i++ {
p := new(peer.Peer)
p.AddAddress(addrs[i])
p.ID = peer.ID([]byte(fmt.Sprintf("peer_%d", i)))
peers = append(peers, p)
}

var dhts []*IpfsDHT
for i := 0; i < 4; i++ {
d, err := NewDHT(peers[i])
if err != nil {
t.Fatal(err)
}
dhts = append(dhts, d)
d.Start()
}

_, err := dhts[0].Connect(addrs[1])
if err != nil {
t.Fatal(err)
}

_, err = dhts[1].Connect(addrs[2])
if err != nil {
t.Fatal(err)
}

_, err = dhts[1].Connect(addrs[3])
if err != nil {
t.Fatal(err)
}

err = dhts[3].PutLocal(u.Key("hello"), []byte("world"))
if err != nil {
t.Fatal(err)
}

err = dhts[3].Provide(u.Key("hello"))
if err != nil {
t.Fatal(err)
}

time.Sleep(time.Millisecond * 60)

val, err := dhts[0].GetValue(u.Key("hello"), time.Second)
if err != nil {
t.Fatal(err)
}

if string(val) != "world" {
t.Fatal("Got incorrect value.")
}

for i := 0; i < 4; i++ {
dhts[i].Halt()
}
}
Loading

0 comments on commit 67ddab1

Please sign in to comment.