Skip to content
This repository has been archived by the owner on May 11, 2022. It is now read-only.

Add option to force nat into a specified reachability state #55

Merged
merged 3 commits into from
Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 57 additions & 23 deletions autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,36 +42,66 @@ type AmbientAutoNAT struct {

subAddrUpdated event.Subscription
service *autoNATService
serviceCancel context.CancelFunc

emitReachabilityChanged event.Emitter
}

type StaticAutoNAT struct {
ctx context.Context
host host.Host
reachability network.Reachability
service *autoNATService
}

type autoNATResult struct {
network.Reachability
address ma.Multiaddr
}

// New creates a new NAT autodiscovery system attached to a host
func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) {
var err error
conf := new(config)
conf.host = h

if err := defaults(conf); err != nil {
if err = defaults(conf); err != nil {
return nil, err
}
if conf.addressFunc == nil {
conf.addressFunc = h.Addrs
}

for _, o := range options {
if err := o(conf); err != nil {
if err = o(conf); err != nil {
return nil, err
}
}
emitReachabilityChanged, _ := h.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful)

var service *autoNATService
if (!conf.forceReachability || conf.reachability == network.ReachabilityPublic) && conf.dialer != nil {
service, err = newAutoNATService(ctx, conf)
if err != nil {
return nil, err
}
}

if conf.forceReachability {
emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: conf.reachability})

// The serice will only exist when reachability is public.
if service != nil {
service.Enable()
}
return &StaticAutoNAT{
ctx: ctx,
host: h,
reachability: conf.reachability,
service: service,
}, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Else emit unknown?


subAddrUpdated, _ := h.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated))
emitReachabilityChanged, _ := h.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful)

as := &AmbientAutoNAT{
ctx: ctx,
Expand All @@ -83,20 +113,13 @@ func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) {
subAddrUpdated: subAddrUpdated,

emitReachabilityChanged: emitReachabilityChanged,
service: service,
}
as.status.Store(autoNATResult{network.ReachabilityUnknown, nil})

h.Network().Notify(as)
go as.background()

if conf.dialer != nil {
var err error
as.service, err = newAutoNATService(ctx, conf)
if err != nil {
return nil, err
}
}

return as, nil
}

Expand All @@ -115,7 +138,7 @@ func (as *AmbientAutoNAT) emitStatus() {
func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) {
s := as.status.Load().(autoNATResult)
if s.Reachability != network.ReachabilityPublic {
return nil, errors.New("NAT Status is not public")
return nil, errors.New("NAT status is not public")
}

return s.address, nil
Expand Down Expand Up @@ -225,10 +248,8 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
if currentStatus.Reachability != network.ReachabilityPublic {
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
if as.service != nil && !as.config.forceServer {
ctx, cancel := context.WithCancel(as.ctx)
go as.service.Enable(ctx)
as.serviceCancel = cancel
if as.service != nil {
as.service.Enable()
}
changed = true
} else if as.confidence < 3 {
Expand All @@ -255,9 +276,8 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
as.status.Store(observation)
if as.serviceCancel != nil {
as.serviceCancel()
as.serviceCancel = nil
if as.service != nil {
as.service.Disable()
}
as.emitStatus()
}
Expand All @@ -275,9 +295,8 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
log.Debugf("NAT status is unknown")
as.status.Store(autoNATResult{network.ReachabilityUnknown, nil})
if currentStatus.Reachability != network.ReachabilityUnknown {
if as.serviceCancel != nil {
as.serviceCancel()
as.serviceCancel = nil
if as.service != nil {
as.service.Disable()
}
as.emitStatus()
}
Expand Down Expand Up @@ -337,3 +356,18 @@ func shufflePeers(peers []peer.AddrInfo) {
peers[i], peers[j] = peers[j], peers[i]
}
}

func (s *StaticAutoNAT) Status() network.Reachability {
return s.reachability
}

func (s *StaticAutoNAT) PublicAddr() (ma.Multiaddr, error) {
if s.reachability != network.ReachabilityPublic {
return nil, errors.New("NAT status is not public")
}
addrs := s.host.Addrs()
if len(addrs) > 0 {
return s.host.Addrs()[0], nil
}
return nil, errors.New("No available address")
}
17 changes: 17 additions & 0 deletions autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,20 @@ func TestAutoNATObservationRecording(t *testing.T) {
}

}

func TestStaticNat(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
s, _ := h.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})

nat, err := New(ctx, h, WithReachability(network.ReachabilityPrivate))
if err != nil {
t.Fatal(err)
}
if nat.Status() != network.ReachabilityPrivate {
t.Fatalf("should be private")
}
expectEvent(t, s, network.ReachabilityPrivate)
}
20 changes: 15 additions & 5 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
type config struct {
host host.Host

addressFunc AddrFunc
dialer network.Network
forceServer bool
addressFunc AddrFunc
dialer network.Network
forceReachability bool
reachability network.Reachability

// client
bootDelay time.Duration
Expand Down Expand Up @@ -52,13 +53,22 @@ var defaults = func(c *config) error {
// make parallel connections, and as such will modify both the associated peerstore
// and terminate connections of this dialer. The dialer provided
// should be compatible (TCP/UDP) however with the transports of the libp2p network.
func EnableService(dialer network.Network, forceServer bool) Option {
func EnableService(dialer network.Network) Option {
return func(c *config) error {
if dialer == c.host.Network() || dialer.Peerstore() == c.host.Peerstore() {
return errors.New("dialer should not be that of the host")
}
c.dialer = dialer
c.forceServer = forceServer
return nil
}
}

// WithReachability overrides autonat to simply report an over-ridden reachability
// status.
func WithReachability(reachability network.Reachability) Option {
return func(c *config) error {
c.forceReachability = true
c.reachability = reachability
return nil
}
}
Expand Down
35 changes: 28 additions & 7 deletions svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ const P_CIRCUIT = 290

// AutoNATService provides NAT autodetection services to other peers
type autoNATService struct {
ctx context.Context
ctx context.Context
instance context.CancelFunc
instanceLock sync.Mutex

config *config

// rate limiter
running uint32
mx sync.Mutex
reqs map[peer.ID]int
globalReqs int
Expand All @@ -46,10 +49,6 @@ func newAutoNATService(ctx context.Context, c *config) (*autoNATService, error)
reqs: make(map[peer.ID]int),
}

if c.forceServer {
go as.Enable(ctx)
}

return as, nil
}

Expand Down Expand Up @@ -210,8 +209,30 @@ func (as *autoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse {
return newDialResponseOK(ra)
}

// Enable the autoNAT service temporarily until the associated context is canceled.
func (as *autoNATService) Enable(ctx context.Context) {
// Enable the autoNAT service if it is not running.
func (as *autoNATService) Enable() {
as.instanceLock.Lock()
defer as.instanceLock.Unlock()
if as.instance != nil {
return
}
inst, cncl := context.WithCancel(as.ctx)
as.instance = cncl

go as.background(inst)
}

// Disable the autoNAT service if it is running.
func (as *autoNATService) Disable() {
as.instanceLock.Lock()
defer as.instanceLock.Unlock()
if as.instance != nil {
as.instance()
as.instance = nil
}
}

func (as *autoNATService) background(ctx context.Context) {
as.config.host.SetStreamHandler(AutoNATProto, as.handleStream)

timer := time.NewTimer(as.config.throttleResetPeriod)
Expand Down
5 changes: 3 additions & 2 deletions svc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func makeAutoNATConfig(ctx context.Context, t *testing.T) *config {
dh := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
c := config{host: h, dialer: dh.Network()}
_ = defaults(&c)
c.forceServer = true
c.forceReachability = true
return &c
}

Expand All @@ -30,6 +30,7 @@ func makeAutoNATService(ctx context.Context, t *testing.T, c *config) *autoNATSe
if err != nil {
t.Fatal(err)
}
as.Enable()

return as
}
Expand Down Expand Up @@ -195,7 +196,7 @@ func TestAutoNATServiceStartup(t *testing.T) {

h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
dh := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
an, err := New(ctx, h, EnableService(dh.Network(), false))
an, err := New(ctx, h, EnableService(dh.Network()))
if err != nil {
t.Fatal(err)
}
Expand Down