Skip to content

Commit

Permalink
Merge pull request #208 from jbenet/ctxcloserify
Browse files Browse the repository at this point in the history
ContextCloserify
  • Loading branch information
jbenet committed Oct 26, 2014
2 parents 08edaf8 + d79ebe6 commit be1c10f
Show file tree
Hide file tree
Showing 13 changed files with 261 additions and 279 deletions.
112 changes: 46 additions & 66 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
)

var log = u.Logger("core")
Expand Down Expand Up @@ -71,119 +72,98 @@ type IpfsNode struct {

// the pinning manager
Pinning pin.Pinner

ctxc.ContextCloser
}

// NewIpfsNode constructs a new IpfsNode based on the given config.
func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
// derive this from a higher context.
// cancel if we need to fail early.
ctx, cancel := context.WithCancel(context.TODO())
func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
success := false // flip to true after all sub-system inits succeed
defer func() {
if !success {
cancel()
if !success && n != nil {
n.Close()
}
}()

if cfg == nil {
return nil, fmt.Errorf("configuration required")
}

d, err := makeDatastore(cfg.Datastore)
if err != nil {
// derive this from a higher context.
ctx := context.TODO()
n = &IpfsNode{
Config: cfg,
ContextCloser: ctxc.NewContextCloser(ctx, nil),
}

// setup datastore.
if n.Datastore, err = makeDatastore(cfg.Datastore); err != nil {
return nil, err
}

peerstore := peer.NewPeerstore()
local, err := initIdentity(cfg, peerstore, online)
// setup peerstore + local peer identity
n.Peerstore = peer.NewPeerstore()
n.Identity, err = initIdentity(n.Config, n.Peerstore, online)
if err != nil {
return nil, err
}

// FIXME(brian): This is a bit dangerous. If any of the vars declared in
// this block are assigned inside of the "if online" block using the ":="
// declaration syntax, the compiler permits re-declaration. This is rather
// undesirable
var (
net inet.Network
// TODO: refactor so we can use IpfsRouting interface instead of being DHT-specific
route *dht.IpfsDHT
exchangeSession exchange.Interface
diagnostics *diag.Diagnostics
network inet.Network
)

// setup online services
if online {

dhtService := netservice.NewService(nil) // nil handler for now, need to patch it
exchangeService := netservice.NewService(nil) // nil handler for now, need to patch it
diagService := netservice.NewService(nil)

if err := dhtService.Start(ctx); err != nil {
return nil, err
}
if err := exchangeService.Start(ctx); err != nil {
return nil, err
}
if err := diagService.Start(ctx); err != nil {
return nil, err
}
dhtService := netservice.NewService(ctx, nil) // nil handler for now, need to patch it
exchangeService := netservice.NewService(ctx, nil) // nil handler for now, need to patch it
diagService := netservice.NewService(ctx, nil) // nil handler for now, need to patch it

net, err = inet.NewIpfsNetwork(ctx, local, peerstore, &mux.ProtocolMap{
muxMap := &mux.ProtocolMap{
mux.ProtocolID_Routing: dhtService,
mux.ProtocolID_Exchange: exchangeService,
mux.ProtocolID_Diagnostic: diagService,
// add protocol services here.
})
}

// setup the network
n.Network, err = inet.NewIpfsNetwork(ctx, n.Identity, n.Peerstore, muxMap)
if err != nil {
return nil, err
}
network = net
n.AddCloserChild(n.Network)

diagnostics = diag.NewDiagnostics(local, net, diagService)
diagService.SetHandler(diagnostics)
// setup diagnostics service
n.Diagnostics = diag.NewDiagnostics(n.Identity, n.Network, diagService)
diagService.SetHandler(n.Diagnostics)

route = dht.NewDHT(ctx, local, peerstore, net, dhtService, d)
// setup routing service
dhtRouting := dht.NewDHT(ctx, n.Identity, n.Peerstore, n.Network, dhtService, n.Datastore)
// TODO(brian): perform this inside NewDHT factory method
dhtService.SetHandler(route) // wire the handler to the service.
dhtService.SetHandler(dhtRouting) // wire the handler to the service.
n.Routing = dhtRouting
n.AddCloserChild(dhtRouting)

// setup exchange service
const alwaysSendToPeer = true // use YesManStrategy
exchangeSession = bitswap.NetMessageSession(ctx, local, net, exchangeService, route, d, alwaysSendToPeer)
n.Exchange = bitswap.NetMessageSession(ctx, n.Identity, n.Network, exchangeService, n.Routing, n.Datastore, alwaysSendToPeer)
// ok, this function call is ridiculous o/ consider making it simpler.

// TODO(brian): pass a context to initConnections
go initConnections(ctx, cfg, peerstore, route)
go initConnections(ctx, n.Config, n.Peerstore, dhtRouting)
}

// TODO(brian): when offline instantiate the BlockService with a bitswap
// session that simply doesn't return blocks
bs, err := bserv.NewBlockService(d, exchangeSession)
n.Blocks, err = bserv.NewBlockService(n.Datastore, n.Exchange)
if err != nil {
return nil, err
}

dag := merkledag.NewDAGService(bs)
ns := namesys.NewNameSystem(route)
p, err := pin.LoadPinner(d, dag)
n.DAG = merkledag.NewDAGService(n.Blocks)
n.Namesys = namesys.NewNameSystem(n.Routing)
n.Pinning, err = pin.LoadPinner(n.Datastore, n.DAG)
if err != nil {
p = pin.NewPinner(d, dag)
n.Pinning = pin.NewPinner(n.Datastore, n.DAG)
}

success = true
return &IpfsNode{
Config: cfg,
Peerstore: peerstore,
Datastore: d,
Blocks: bs,
DAG: dag,
Resolver: &path.Resolver{DAG: dag},
Exchange: exchangeSession,
Identity: local,
Routing: route,
Namesys: ns,
Diagnostics: diagnostics,
Network: network,
Pinning: p,
}, nil
return n, nil
}

func initIdentity(cfg *config.Config, peers peer.Peerstore, online bool) (peer.Peer, error) {
Expand Down
5 changes: 2 additions & 3 deletions net/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
mux "github.com/jbenet/go-ipfs/net/mux"
srv "github.com/jbenet/go-ipfs/net/service"
peer "github.com/jbenet/go-ipfs/peer"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
)

// Network is the interface IPFS uses for connecting to the world.
type Network interface {
ctxc.ContextCloser

// Listen handles incoming connections on given Multiaddr.
// Listen(*ma.Muliaddr) error
Expand All @@ -35,9 +37,6 @@ type Network interface {

// SendMessage sends given Message out
SendMessage(msg.NetMessage) error

// Close terminates all network operation
Close() error
}

// Sender interface for network services.
Expand Down
90 changes: 32 additions & 58 deletions net/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
msg "github.com/jbenet/go-ipfs/net/message"
pb "github.com/jbenet/go-ipfs/net/mux/internal/pb"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
)

var log = u.Logger("muxer")

// ProtocolIDs used to identify each protocol.
// These should probably be defined elsewhere.
var (
ProtocolID_Routing = pb.ProtocolID_Routing
ProtocolID_Exchange = pb.ProtocolID_Exchange
Expand All @@ -38,57 +41,40 @@ type Muxer struct {
// Protocols are the multiplexed services.
Protocols ProtocolMap

// cancel is the function to stop the Muxer
cancel context.CancelFunc
ctx context.Context
wg sync.WaitGroup

bwiLock sync.Mutex
bwIn uint64

bwoLock sync.Mutex
bwOut uint64

*msg.Pipe
ctxc.ContextCloser
}

// NewMuxer constructs a muxer given a protocol map.
func NewMuxer(mp ProtocolMap) *Muxer {
return &Muxer{
Protocols: mp,
Pipe: msg.NewPipe(10),
func NewMuxer(ctx context.Context, mp ProtocolMap) *Muxer {
m := &Muxer{
Protocols: mp,
Pipe: msg.NewPipe(10),
ContextCloser: ctxc.NewContextCloser(ctx, nil),
}
}

// GetPipe implements the Protocol interface
func (m *Muxer) GetPipe() *msg.Pipe {
return m.Pipe
}

// Start kicks off the Muxer goroutines.
func (m *Muxer) Start(ctx context.Context) error {
if m == nil {
panic("nix muxer")
}

if m.cancel != nil {
return errors.New("Muxer already started.")
}

// make a cancellable context.
m.ctx, m.cancel = context.WithCancel(ctx)
m.wg = sync.WaitGroup{}

m.wg.Add(1)
m.Children().Add(1)
go m.handleIncomingMessages()
for pid, proto := range m.Protocols {
m.wg.Add(1)
m.Children().Add(1)
go m.handleOutgoingMessages(pid, proto)
}

return nil
return m
}

// GetPipe implements the Protocol interface
func (m *Muxer) GetPipe() *msg.Pipe {
return m.Pipe
}

// GetBandwidthTotals return the in/out bandwidth measured over this muxer.
func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) {
m.bwiLock.Lock()
in = m.bwIn
Expand All @@ -100,19 +86,6 @@ func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) {
return
}

// Stop stops muxer activity.
func (m *Muxer) Stop() {
if m.cancel == nil {
panic("muxer stopped twice.")
}
// issue cancel, and wipe func.
m.cancel()
m.cancel = context.CancelFunc(nil)

// wait for everything to wind down.
m.wg.Wait()
}

// AddProtocol adds a Protocol with given ProtocolID to the Muxer.
func (m *Muxer) AddProtocol(p Protocol, pid pb.ProtocolID) error {
if _, found := m.Protocols[pid]; found {
Expand All @@ -126,28 +99,26 @@ func (m *Muxer) AddProtocol(p Protocol, pid pb.ProtocolID) error {
// handleIncoming consumes the messages on the m.Incoming channel and
// routes them appropriately (to the protocols).
func (m *Muxer) handleIncomingMessages() {
defer m.wg.Done()
defer m.Children().Done()

for {
if m == nil {
panic("nil muxer")
}

select {
case <-m.Closing():
return

case msg, more := <-m.Incoming:
if !more {
return
}
m.Children().Add(1)
go m.handleIncomingMessage(msg)

case <-m.ctx.Done():
return
}
}
}

// handleIncomingMessage routes message to the appropriate protocol.
func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) {
defer m.Children().Done()

m.bwiLock.Lock()
// TODO: compensate for overhead
Expand All @@ -169,33 +140,35 @@ func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) {

select {
case proto.GetPipe().Incoming <- m2:
case <-m.ctx.Done():
log.Error(m.ctx.Err())
case <-m.Closing():
return
}
}

// handleOutgoingMessages consumes the messages on the proto.Outgoing channel,
// wraps them and sends them out.
func (m *Muxer) handleOutgoingMessages(pid pb.ProtocolID, proto Protocol) {
defer m.wg.Done()
defer m.Children().Done()

for {
select {
case msg, more := <-proto.GetPipe().Outgoing:
if !more {
return
}
m.Children().Add(1)
go m.handleOutgoingMessage(pid, msg)

case <-m.ctx.Done():
case <-m.Closing():
return
}
}
}

// handleOutgoingMessage wraps out a message and sends it out the
func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) {
defer m.Children().Done()

data, err := wrapData(m1.Data(), pid)
if err != nil {
log.Errorf("muxer serializing error: %v", err)
Expand All @@ -204,13 +177,14 @@ func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) {

m.bwoLock.Lock()
// TODO: compensate for overhead
// TODO(jbenet): switch this to a goroutine to prevent sync waiting.
m.bwOut += uint64(len(data))
m.bwoLock.Unlock()

m2 := msg.New(m1.Peer(), data)
select {
case m.GetPipe().Outgoing <- m2:
case <-m.ctx.Done():
case <-m.Closing():
return
}
}
Expand Down
Loading

0 comments on commit be1c10f

Please sign in to comment.