Skip to content

Commit

Permalink
Replace engine probes with direct calls
Browse files Browse the repository at this point in the history
  • Loading branch information
lixmal committed Jan 16, 2025
1 parent 481bbe8 commit 9761297
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 184 deletions.
2 changes: 1 addition & 1 deletion client/cmd/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func runInForegroundMode(ctx context.Context, cmd *cobra.Command) error {
r.GetFullStatus()

connectClient := internal.NewConnectClient(ctx, config, r)
return connectClient.Run()
return connectClient.Run(nil)
}

func runInDaemonMode(ctx context.Context, cmd *cobra.Command) error {
Expand Down
17 changes: 6 additions & 11 deletions client/internal/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,8 @@ func NewConnectClient(
}

// Run with main logic.
func (c *ConnectClient) Run() error {
return c.run(MobileDependency{}, nil, nil)
}

// RunWithProbes runs the client's main logic with probes attached
func (c *ConnectClient) RunWithProbes(probes *ProbeHolder, runningChan chan error) error {
return c.run(MobileDependency{}, probes, runningChan)
func (c *ConnectClient) Run(runningChan chan error) error {
return c.run(MobileDependency{}, runningChan)
}

// RunOnAndroid with main logic on mobile system
Expand All @@ -84,7 +79,7 @@ func (c *ConnectClient) RunOnAndroid(
HostDNSAddresses: dnsAddresses,
DnsReadyListener: dnsReadyListener,
}
return c.run(mobileDependency, nil, nil)
return c.run(mobileDependency, nil)
}

func (c *ConnectClient) RunOniOS(
Expand All @@ -102,10 +97,10 @@ func (c *ConnectClient) RunOniOS(
DnsManager: dnsManager,
StateFilePath: stateFilePath,
}
return c.run(mobileDependency, nil, nil)
return c.run(mobileDependency, nil)
}

func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHolder, runningChan chan error) error {
func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan error) error {
defer func() {
if r := recover(); r != nil {
log.Panicf("Panic occurred: %v, stack trace: %s", r, string(debug.Stack()))
Expand Down Expand Up @@ -261,7 +256,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold
checks := loginResp.GetChecks()

c.engineMutex.Lock()
c.engine = NewEngineWithProbes(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, probes, checks)
c.engine = NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks)
c.engine.SetNetworkMapPersistence(c.persistNetworkMap)
c.engineMutex.Unlock()

Expand Down
127 changes: 41 additions & 86 deletions client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@ type Engine struct {

dnsServer dns.Server

probes *ProbeHolder

// checks are the client-applied posture checks that need to be evaluated on the client
checks []*mgmProto.Checks

Expand All @@ -196,7 +194,7 @@ type Peer struct {
WgAllowedIps string
}

// NewEngine creates a new Connection Engine
// NewEngine creates a new Connection Engine with probes attached
func NewEngine(
clientCtx context.Context,
clientCancel context.CancelFunc,
Expand All @@ -207,33 +205,6 @@ func NewEngine(
mobileDep MobileDependency,
statusRecorder *peer.Status,
checks []*mgmProto.Checks,
) *Engine {
return NewEngineWithProbes(
clientCtx,
clientCancel,
signalClient,
mgmClient,
relayManager,
config,
mobileDep,
statusRecorder,
nil,
checks,
)
}

// NewEngineWithProbes creates a new Connection Engine with probes attached
func NewEngineWithProbes(
clientCtx context.Context,
clientCancel context.CancelFunc,
signalClient signal.Client,
mgmClient mgm.Client,
relayManager *relayClient.Manager,
config *EngineConfig,
mobileDep MobileDependency,
statusRecorder *peer.Status,
probes *ProbeHolder,
checks []*mgmProto.Checks,
) *Engine {
engine := &Engine{
clientCtx: clientCtx,
Expand All @@ -251,7 +222,6 @@ func NewEngineWithProbes(
networkSerial: 0,
sshServerFunc: nbssh.DefaultSSHServer,
statusRecorder: statusRecorder,
probes: probes,
checks: checks,
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
}
Expand Down Expand Up @@ -450,7 +420,6 @@ func (e *Engine) Start() error {

e.receiveSignalEvents()
e.receiveManagementEvents()
e.receiveProbeEvents()

// starting network monitor at the very last to avoid disruptions
e.startNetworkMonitor()
Expand Down Expand Up @@ -1513,72 +1482,58 @@ func (e *Engine) getRosenpassAddr() string {
return ""
}

func (e *Engine) receiveProbeEvents() {
if e.probes == nil {
return
}
if e.probes.SignalProbe != nil {
go e.probes.SignalProbe.Receive(e.ctx, func() bool {
healthy := e.signal.IsHealthy()
log.Debugf("received signal probe request, healthy: %t", healthy)
return healthy
})
}
// RunHealthProbes executes health checks for Signal, Management, Relay and WireGuard services
// and updates the status recorder with the latest states.
func (e *Engine) RunHealthProbes() bool {
signalHealthy := e.signal.IsHealthy()
log.Debugf("signal health check: healthy=%t", signalHealthy)

if e.probes.MgmProbe != nil {
go e.probes.MgmProbe.Receive(e.ctx, func() bool {
healthy := e.mgmClient.IsHealthy()
log.Debugf("received management probe request, healthy: %t", healthy)
return healthy
})
}
managementHealthy := e.mgmClient.IsHealthy()
log.Debugf("management health check: healthy=%t", managementHealthy)

if e.probes.RelayProbe != nil {
go e.probes.RelayProbe.Receive(e.ctx, func() bool {
healthy := true
results := append(e.probeSTUNs(), e.probeTURNs()...)
e.statusRecorder.UpdateRelayStates(results)

results := append(e.probeSTUNs(), e.probeTURNs()...)
e.statusRecorder.UpdateRelayStates(results)

// A single failed server will result in a "failed" probe
for _, res := range results {
if res.Err != nil {
healthy = false
break
}
}

log.Debugf("received relay probe request, healthy: %t", healthy)
return healthy
})
relayHealthy := true
for _, res := range results {
if res.Err != nil {
relayHealthy = false
break
}
}
log.Debugf("relay health check: healthy=%t", relayHealthy)

if e.probes.WgProbe != nil {
go e.probes.WgProbe.Receive(e.ctx, func() bool {
log.Debug("received wg probe request")

for _, key := range e.peerStore.PeersPubKey() {
wgStats, err := e.wgInterface.GetStats(key)
if err != nil {
log.Debugf("failed to get wg stats for peer %s: %s", key, err)
}
// wgStats could be zero value, in which case we just reset the stats
if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil {
log.Debugf("failed to update wg stats for peer %s: %s", key, err)
}
}

return true
})
for _, key := range e.peerStore.PeersPubKey() {
wgStats, err := e.wgInterface.GetStats(key)
if err != nil {
log.Debugf("failed to get wg stats for peer %s: %s", key, err)
continue
}
// wgStats could be zero value, in which case we just reset the stats
if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil {
log.Debugf("failed to update wg stats for peer %s: %s", key, err)
}
}

allHealthy := signalHealthy && managementHealthy && relayHealthy
log.Debugf("all health checks completed: healthy=%t", allHealthy)
return allHealthy
}

func (e *Engine) probeSTUNs() []relay.ProbeResult {
return relay.ProbeAll(e.ctx, relay.ProbeSTUN, e.STUNs)
e.syncMsgMux.Lock()
stuns := slices.Clone(e.STUNs)
e.syncMsgMux.Unlock()

return relay.ProbeAll(e.ctx, relay.ProbeSTUN, stuns)
}

func (e *Engine) probeTURNs() []relay.ProbeResult {
return relay.ProbeAll(e.ctx, relay.ProbeTURN, e.TURNs)
e.syncMsgMux.Lock()
turns := slices.Clone(e.TURNs)
e.syncMsgMux.Unlock()

return relay.ProbeAll(e.ctx, relay.ProbeTURN, turns)
}

func (e *Engine) restartEngine() {
Expand Down
58 changes: 0 additions & 58 deletions client/internal/probe.go

This file was deleted.

46 changes: 18 additions & 28 deletions client/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,7 @@ type Server struct {
statusRecorder *peer.Status
sessionWatcher *internal.SessionWatcher

mgmProbe *internal.Probe
signalProbe *internal.Probe
relayProbe *internal.Probe
wgProbe *internal.Probe
lastProbe time.Time

lastProbe time.Time
persistNetworkMap bool
}

Expand All @@ -86,12 +81,7 @@ func New(ctx context.Context, configPath, logFile string) *Server {
latestConfigInput: internal.ConfigInput{
ConfigPath: configPath,
},
logFile: logFile,
mgmProbe: internal.NewProbe(),
signalProbe: internal.NewProbe(),
relayProbe: internal.NewProbe(),
wgProbe: internal.NewProbe(),

logFile: logFile,
persistNetworkMap: true,
}
}
Expand Down Expand Up @@ -202,14 +192,7 @@ func (s *Server) connectWithRetryRuns(ctx context.Context, config *internal.Conf
s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder)
s.connectClient.SetNetworkMapPersistence(s.persistNetworkMap)

probes := internal.ProbeHolder{
MgmProbe: s.mgmProbe,
SignalProbe: s.signalProbe,
RelayProbe: s.relayProbe,
WgProbe: s.wgProbe,
}

err := s.connectClient.RunWithProbes(&probes, runningChan)
err := s.connectClient.Run(runningChan)
if err != nil {
log.Debugf("run client connection exited with error: %v. Will retry in the background", err)
}
Expand Down Expand Up @@ -676,9 +659,13 @@ func (s *Server) Down(ctx context.Context, _ *proto.DownRequest) (*proto.DownRes

// Status returns the daemon status
func (s *Server) Status(
_ context.Context,
ctx context.Context,
msg *proto.StatusRequest,
) (*proto.StatusResponse, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

s.mutex.Lock()
defer s.mutex.Unlock()

Expand Down Expand Up @@ -707,14 +694,17 @@ func (s *Server) Status(
}

func (s *Server) runProbes() {
if time.Since(s.lastProbe) > probeThreshold {
managementHealthy := s.mgmProbe.Probe()
signalHealthy := s.signalProbe.Probe()
relayHealthy := s.relayProbe.Probe()
wgProbe := s.wgProbe.Probe()
if s.connectClient == nil {
return
}

// Update last time only if all probes were successful
if managementHealthy && signalHealthy && relayHealthy && wgProbe {
engine := s.connectClient.Engine()
if engine == nil {
return
}

if time.Since(s.lastProbe) > probeThreshold {
if engine.RunHealthProbes() {
s.lastProbe = time.Now()
}
}
Expand Down

0 comments on commit 9761297

Please sign in to comment.