Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
swarm/pss: Exorcise evil bitwise logic and revert kad consts
Browse files Browse the repository at this point in the history
  • Loading branch information
nolash committed Nov 20, 2018
1 parent 3e6d23e commit b13b05c
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 70 deletions.
24 changes: 7 additions & 17 deletions swarm/network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,6 @@ import (
"github.com/ethereum/go-ethereum/swarm/pot"
)

const (
defaultMaxProxDisplay = 16
defaultMinProxBinSize = 2
defaultMinBinSize = 2
defaultMaxBinSize = 4
defaultRetryInterval = 4200000000 // 4.2 sec
defaultMaxRetries = 42
defaultRetryExponent = 2
)

/*
Taking the proximity order relative to a fix point x classifies the points in
Expand Down Expand Up @@ -78,13 +68,13 @@ type KadParams struct {
// NewKadParams returns a params struct with default values
func NewKadParams() *KadParams {
return &KadParams{
MaxProxDisplay: defaultMaxProxDisplay,
MinProxBinSize: defaultMinProxBinSize,
MinBinSize: defaultMinBinSize,
MaxBinSize: defaultMaxBinSize,
RetryInterval: defaultRetryInterval,
MaxRetries: defaultMaxRetries,
RetryExponent: defaultRetryExponent,
MaxProxDisplay: 16,
MinProxBinSize: 2,
MinBinSize: 2,
MaxBinSize: 4,
RetryInterval: 4200000000, // 4.2 sec
MaxRetries: 42,
RetryExponent: 2,
}
}

Expand Down
28 changes: 13 additions & 15 deletions swarm/pss/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,22 @@ func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool, prox bool

psssub := notifier.CreateSubscription()

hndlr := &handler{
f: func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
apimsg := &APIMsg{
Msg: hexutil.Bytes(msg),
Asymmetric: asymmetric,
Key: keyid,
}
if err := notifier.Notify(psssub.ID, apimsg); err != nil {
log.Warn(fmt.Sprintf("notification on pss sub topic rpc (sub %v) msg %v failed!", psssub.ID, msg))
}
return nil
},
}
hndlr := NewHandler(func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
apimsg := &APIMsg{
Msg: hexutil.Bytes(msg),
Asymmetric: asymmetric,
Key: keyid,
}
if err := notifier.Notify(psssub.ID, apimsg); err != nil {
log.Warn(fmt.Sprintf("notification on pss sub topic rpc (sub %v) msg %v failed!", psssub.ID, msg))
}
return nil
})
if raw {
hndlr.caps |= handlerCapRaw
hndlr.caps.raw = true
}
if prox {
hndlr.caps |= handlerCapProx
hndlr.caps.prox = true
}

deregf := pssapi.Register(&topic, hndlr)
Expand Down
52 changes: 37 additions & 15 deletions swarm/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type Pss struct {
handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle()
handlersMu sync.RWMutex
hashPool sync.Pool
topicHandlerCaps map[Topic]byte // caches capabilities of each topic's handlers (see handlerCap* consts in types.go)
topicHandlerCaps map[Topic]*handlerCaps // caches capabilities of each topic's handlers (see handlerCap* consts in types.go)

// process
quitC chan struct{}
Expand Down Expand Up @@ -183,7 +183,8 @@ func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) {
symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity,

handlers: make(map[Topic]map[*handler]bool),
topicHandlerCaps: make(map[Topic]byte),
topicHandlerCaps: make(map[Topic]*handlerCaps),

hashPool: sync.Pool{
New: func() interface{} {
return sha3.NewKeccak256()
Expand Down Expand Up @@ -324,20 +325,36 @@ func (p *Pss) Register(topic *Topic, hndlr *handler) func() {
p.handlers[*topic] = handlers
log.Debug("registered handler", "caps", hndlr.caps)
}
if hndlr.caps == nil {
hndlr.caps = &handlerCaps{}
}
handlers[hndlr] = true
p.topicHandlerCaps[*topic] |= hndlr.caps
if _, ok := p.topicHandlerCaps[*topic]; !ok {
p.topicHandlerCaps[*topic] = &handlerCaps{}
}
if !p.topicHandlerCaps[*topic].raw && hndlr.caps.raw {
p.topicHandlerCaps[*topic].raw = true
}
if !p.topicHandlerCaps[*topic].prox && hndlr.caps.prox {
p.topicHandlerCaps[*topic].prox = true
}
return func() { p.deregister(topic, hndlr) }
}
func (p *Pss) deregister(topic *Topic, hndlr *handler) {
p.handlersMu.Lock()
defer p.handlersMu.Unlock()
handlers := p.handlers[*topic]
if len(handlers) == 1 {
if len(handlers) > 1 {
delete(p.handlers, *topic)
// topic caps might have changed now that a handler is gone
var caps byte
caps := &handlerCaps{}
for h := range handlers {
caps |= h.caps
if h.caps.raw {
caps.raw = true
}
if h.caps.prox {
caps.prox = true
}
}
p.topicHandlerCaps[*topic] = caps
return
Expand Down Expand Up @@ -379,7 +396,7 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error {
// raw is simplest handler contingency to check, so check that first
var isRaw bool
if pssmsg.isRaw() {
if p.topicHandlerCaps[psstopic]&handlerCapRaw == 0 {
if !p.topicHandlerCaps[psstopic].raw {
log.Debug("No handler for raw message", "topic", psstopic)
return nil
}
Expand All @@ -390,7 +407,10 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error {
// - no prox handler on message and partial address matches
// - prox handler on message and we are in prox regardless of partial address match
// store this result so we don't calculate again on every handler
isProx := p.topicHandlerCaps[psstopic]&handlerCapProx != 0
var isProx bool
if _, ok := p.topicHandlerCaps[psstopic]; ok {
isProx = p.topicHandlerCaps[psstopic].prox
}
isRecipient := p.isSelfPossibleRecipient(pssmsg, isProx)
if !isRecipient {
log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()), "prox", isProx)
Expand Down Expand Up @@ -457,12 +477,12 @@ func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, raw
handlers := p.getHandlers(topic)
peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{})
for h := range handlers {
if h.caps&handlerCapRaw == 0 && raw {
log.Trace("norawhandler")
if !h.caps.raw && raw {
log.Warn("norawhandler")
continue
}
if h.caps&handlerCapProx == 0 && prox {
log.Trace("noproxhandler")
if !h.caps.prox && prox {
log.Warn("noproxhandler")
continue
}
err := (h.f)(payload, peer, asymmetric, keyid)
Expand Down Expand Up @@ -750,7 +770,7 @@ func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error {
if err != nil {
return err
}
if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic]&handlerCapProx != 0 {
if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox {
return p.process(pssMsg, true, true)
}
return nil
Expand Down Expand Up @@ -858,8 +878,10 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by
if err != nil {
return err
}
if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic]&handlerCapProx != 0 {
return p.process(pssMsg, true, true)
if _, ok := p.topicHandlerCaps[topic]; ok {
if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox {
return p.process(pssMsg, true, true)
}
}
return nil
}
Expand Down
32 changes: 22 additions & 10 deletions swarm/pss/pss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,11 @@ func TestProxShortCircuit(t *testing.T) {
}
topic := BytesToTopic([]byte{0x2a})
hndlrProxDereg := ps.Register(&topic, &handler{
f: rawHandlerFunc,
caps: handlerCapProx | handlerCapRaw,
f: rawHandlerFunc,
caps: &handlerCaps{
raw: true,
prox: true,
},
})
defer hndlrProxDereg()

Expand Down Expand Up @@ -553,8 +556,11 @@ func TestAddressMatchProx(t *testing.T) {
// register it marking prox capability
topic := BytesToTopic([]byte{0x2a})
hndlrProxDereg := ps.Register(&topic, &handler{
f: rawHandlerFunc,
caps: handlerCapProx | handlerCapRaw,
f: rawHandlerFunc,
caps: &handlerCaps{
raw: true,
prox: true,
},
})

// test the distances
Expand Down Expand Up @@ -583,8 +589,10 @@ func TestAddressMatchProx(t *testing.T) {

// now add a non prox-capable handler and test
ps.Register(&topic, &handler{
f: rawHandlerFunc,
caps: handlerCapRaw,
f: rawHandlerFunc,
caps: &handlerCaps{
raw: true,
},
})
receives = 0
prevReceive = 0
Expand Down Expand Up @@ -982,8 +990,10 @@ func TestRawAllow(t *testing.T) {

// now wrap the same handler function with raw capabilities and register it
hndlrRaw := &handler{
f: rawHandlerFunc,
caps: handlerCapRaw,
f: rawHandlerFunc,
caps: &handlerCaps{
raw: true,
},
}
deregRawHandler := ps.Register(&topic, hndlrRaw)

Expand Down Expand Up @@ -1974,8 +1984,10 @@ func newServices(allowRaw bool) adapters.Services {
SetHandshakeController(ps, NewHandshakeParams())
}
ps.Register(&PingTopic, &handler{
f: pp.Handle,
caps: handlerCapRaw,
f: pp.Handle,
caps: &handlerCaps{
raw: true,
},
})
ps.addAPI(rpc.API{
Namespace: "psstest",
Expand Down
23 changes: 10 additions & 13 deletions swarm/pss/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ const (
pssControlRaw = 1 << 1
)

const (
handlerCapRaw = 1 << 0
handlerCapProx = 1 << 1
)

var (
topicHashMutex = sync.Mutex{}
topicHashFunc = storage.MakeHashFunc("SHA256")()
Expand Down Expand Up @@ -167,32 +162,34 @@ func (msg *PssMsg) String() string {
// Implementations of this type are passed to Pss.Register together with a topic,
type HandlerFunc func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error

type handlerCaps struct {
raw bool
prox bool
}

// Handler defines code to be executed upon reception of content.
type handler struct {
f HandlerFunc
caps byte
//raw bool // if true, will allow raw messages to be handled
//prox bool // if true, explicit recipient address will be truncated to minproxsize depth
caps *handlerCaps
}

// NewHandler returns a new message handler
func NewHandler(f HandlerFunc) *handler {
return &handler{
f: f,
f: f,
caps: &handlerCaps{},
}
}

// WithRaw is a chainable method that allows raw messages to be handled.
func (h *handler) WithRaw() *handler {
//h.raw = true
h.caps |= handlerCapRaw
h.caps.raw = true
return h
}

// WithProxBin is a chainable method that allows sending messages with full addresses to neighbourhoods using the kademlia depth as reference
func (h *handler) WithProxBin() *handler {
//h.prox = true
h.caps |= handlerCapProx
h.caps.prox = true
return h
}

Expand Down

0 comments on commit b13b05c

Please sign in to comment.