Skip to content

Commit ffd49aa

Browse files
committed
Add PeerReadyNotifier actor
We add an actor that waits for a given peer to be connected and ready to process payments. This is useful in the context of async payments for the receiver's LSP.
1 parent 1e252e5 commit ffd49aa

File tree

10 files changed

+390
-17
lines changed

10 files changed

+390
-17
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/io/MessageRelay.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ object MessageRelay {
7373

7474
def waitForPreviousPeer(messageId: ByteVector32, switchboard: ActorRef, nextNodeId: PublicKey, msg: OnionMessage, replyTo_opt: Option[typed.ActorRef[Status]]): Behavior[Command] = {
7575
Behaviors.receivePartial {
76-
case (context, WrappedPeerInfo(PeerInfo(_, _, _, _, channels))) if channels > 0 =>
76+
case (context, WrappedPeerInfo(PeerInfo(_, _, _, _, channels))) if channels.nonEmpty =>
7777
switchboard ! GetPeerInfo(context.messageAdapter(WrappedPeerInfo), nextNodeId)
7878
waitForNextPeer(messageId, msg, replyTo_opt)
7979
case _ =>
@@ -84,7 +84,7 @@ object MessageRelay {
8484

8585
def waitForNextPeer(messageId: ByteVector32, msg: OnionMessage, replyTo_opt: Option[typed.ActorRef[Status]]): Behavior[Command] = {
8686
Behaviors.receiveMessagePartial {
87-
case WrappedPeerInfo(PeerInfo(peer, _, _, _, channels)) if channels > 0 =>
87+
case WrappedPeerInfo(PeerInfo(peer, _, _, _, channels)) if channels.nonEmpty =>
8888
peer ! Peer.RelayOnionMessage(messageId, msg, replyTo_opt)
8989
Behaviors.stopped
9090
case _ =>

eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainA
324324
replyTo ! PeerInfo(self, remoteNodeId, stateName, d match {
325325
case c: ConnectedData => Some(c.address)
326326
case _ => None
327-
}, d.channels.values.toSet.size) // we use toSet to dedup because a channel can have a TemporaryChannelId + a ChannelId
327+
}, d.channels.values.toSet)
328328
stay()
329329

330330
case Event(_: Peer.OutgoingMessage, _) => stay() // we got disconnected or reconnected and this message was for the previous connection
@@ -548,7 +548,7 @@ object Peer {
548548
sealed trait PeerInfoResponse {
549549
def nodeId: PublicKey
550550
}
551-
case class PeerInfo(peer: ActorRef, nodeId: PublicKey, state: State, address: Option[NodeAddress], channels: Int) extends PeerInfoResponse
551+
case class PeerInfo(peer: ActorRef, nodeId: PublicKey, state: State, address: Option[NodeAddress], channels: Set[ActorRef]) extends PeerInfoResponse
552552
case class PeerNotFound(nodeId: PublicKey) extends PeerInfoResponse { override def toString: String = s"peer $nodeId not found" }
553553

554554
case class PeerRoutingMessage(peerConnection: ActorRef, remoteNodeId: PublicKey, message: RoutingMessage) extends RemoteTypes
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Copyright 2022 ACINQ SAS
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package fr.acinq.eclair.io
18+
19+
import akka.actor.typed.eventstream.EventStream
20+
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
21+
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler}
22+
import akka.actor.typed.{ActorRef, Behavior}
23+
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
24+
import fr.acinq.eclair.blockchain.CurrentBlockHeight
25+
import fr.acinq.eclair.{BlockHeight, Logs, channel}
26+
27+
import scala.concurrent.duration.{DurationInt, FiniteDuration}
28+
29+
/**
30+
* This actor waits for a given peer to be online and ready to process payments.
31+
* It automatically stops after the timeout provided.
32+
*/
33+
object PeerReadyNotifier {
34+
35+
// @formatter:off
36+
sealed trait Command
37+
case class NotifyWhenPeerReady(replyTo: ActorRef[Result]) extends Command
38+
private case object PeerNotConnected extends Command
39+
private case class SomePeerConnected(nodeId: PublicKey) extends Command
40+
private case class PeerChannels(channels: Set[akka.actor.ActorRef]) extends Command
41+
private case class NewBlockNotTimedOut(currentBlockHeight: BlockHeight) extends Command
42+
private case object CheckChannelsReady extends Command
43+
private case class ChannelStates(states: Seq[channel.ChannelState]) extends Command
44+
private case object Timeout extends Command
45+
46+
sealed trait Result
47+
case class PeerReady(remoteNodeId: PublicKey, channelsCount: Int) extends Result
48+
case class PeerUnavailable(remoteNodeId: PublicKey) extends Result
49+
// @formatter:on
50+
51+
def apply(remoteNodeId: PublicKey, switchboard: ActorRef[Switchboard.GetPeerInfo], timeout_opt: Option[Either[FiniteDuration, BlockHeight]]): Behavior[Command] = {
52+
Behaviors.setup { context =>
53+
Behaviors.withTimers { timers =>
54+
Behaviors.withMdc(Logs.mdc(remoteNodeId_opt = Some(remoteNodeId))) {
55+
Behaviors.receiveMessagePartial {
56+
case NotifyWhenPeerReady(replyTo) =>
57+
timeout_opt.foreach {
58+
case Left(d) => timers.startSingleTimer(Timeout, d)
59+
case Right(h) => context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockHeight] {
60+
case cbc if h <= cbc.blockHeight => Timeout
61+
case cbc => NewBlockNotTimedOut(cbc.blockHeight)
62+
})
63+
}
64+
waitForPeerConnected(replyTo, remoteNodeId, switchboard, context, timers)
65+
}
66+
}
67+
}
68+
}
69+
}
70+
71+
def waitForPeerConnected(replyTo: ActorRef[Result], remoteNodeId: PublicKey, switchboard: ActorRef[Switchboard.GetPeerInfo], context: ActorContext[Command], timers: TimerScheduler[Command]): Behavior[Command] = {
72+
// In case the peer is not currently connected, we will wait for them to connect instead of regularly polling the
73+
// switchboard. This makes more sense for long timeouts such as the ones used for async payments.
74+
val peerConnectedAdapter = context.messageAdapter[PeerConnected](pc => SomePeerConnected(pc.nodeId))
75+
context.system.eventStream ! EventStream.Subscribe(peerConnectedAdapter)
76+
val peerInfoAdapter = context.messageAdapter[Peer.PeerInfoResponse] {
77+
// We receive this when we don't have any channel to the given peer and are not currently connected to them.
78+
// In that case we still want to wait for a connection, because we may want to open a channel to them.
79+
case _: Peer.PeerNotFound => PeerNotConnected
80+
case info: Peer.PeerInfo if info.state != Peer.CONNECTED => PeerNotConnected
81+
case info: Peer.PeerInfo => PeerChannels(info.channels)
82+
}
83+
// We check whether the peer is already connected.
84+
switchboard ! Switchboard.GetPeerInfo(peerInfoAdapter, remoteNodeId)
85+
Behaviors.receiveMessagePartial {
86+
case PeerNotConnected =>
87+
context.log.debug("peer is not connected yet")
88+
Behaviors.same
89+
case SomePeerConnected(nodeId) =>
90+
if (nodeId == remoteNodeId) {
91+
switchboard ! Switchboard.GetPeerInfo(peerInfoAdapter, remoteNodeId)
92+
}
93+
Behaviors.same
94+
case PeerChannels(channels) =>
95+
if (channels.isEmpty) {
96+
context.log.info("peer is ready with {} channels", channels.size)
97+
replyTo ! PeerReady(remoteNodeId, 0)
98+
Behaviors.stopped
99+
} else {
100+
context.log.debug("peer is connected with {} channels", channels.size)
101+
waitForChannelsReady(replyTo, remoteNodeId, channels, context, timers)
102+
}
103+
case NewBlockNotTimedOut(currentBlockHeight) =>
104+
context.log.debug("waiting for peer to connect at block {}", currentBlockHeight)
105+
Behaviors.same
106+
case Timeout =>
107+
context.log.info("timed out waiting for peer to be ready")
108+
replyTo ! PeerUnavailable(remoteNodeId)
109+
Behaviors.stopped
110+
}
111+
}
112+
113+
def waitForChannelsReady(replyTo: ActorRef[Result], remoteNodeId: PublicKey, channels: Set[akka.actor.ActorRef], context: ActorContext[Command], timers: TimerScheduler[Command]): Behavior[Command] = {
114+
timers.startTimerWithFixedDelay(CheckChannelsReady, initialDelay = 50 millis, delay = 1 second)
115+
Behaviors.receiveMessagePartial {
116+
case CheckChannelsReady =>
117+
context.spawnAnonymous(ChannelStatesCollector(context.self, channels))
118+
Behaviors.same
119+
case ChannelStates(states) =>
120+
if (states.forall(isChannelReady)) {
121+
replyTo ! PeerReady(remoteNodeId, channels.size)
122+
Behaviors.stopped
123+
} else {
124+
context.log.debug("peer has {} channels that are not ready", states.count(s => !isChannelReady(s)))
125+
Behaviors.same
126+
}
127+
case NewBlockNotTimedOut(currentBlockHeight) =>
128+
context.log.debug("waiting for channels to be ready at block {}", currentBlockHeight)
129+
Behaviors.same
130+
case SomePeerConnected(_) =>
131+
Behaviors.same
132+
case Timeout =>
133+
context.log.info("timed out waiting for channels to be ready")
134+
replyTo ! PeerUnavailable(remoteNodeId)
135+
Behaviors.stopped
136+
}
137+
}
138+
139+
// We use an exhaustive pattern matching here to ensure we explicitly handle future new channel states.
140+
private def isChannelReady(state: channel.ChannelState): Boolean = state match {
141+
case channel.WAIT_FOR_INIT_INTERNAL => false
142+
case channel.WAIT_FOR_INIT_SINGLE_FUNDED_CHANNEL => false
143+
case channel.WAIT_FOR_INIT_DUAL_FUNDED_CHANNEL => false
144+
case channel.OFFLINE => false
145+
case channel.SYNCING => false
146+
case channel.WAIT_FOR_OPEN_CHANNEL => true
147+
case channel.WAIT_FOR_ACCEPT_CHANNEL => true
148+
case channel.WAIT_FOR_FUNDING_INTERNAL => true
149+
case channel.WAIT_FOR_FUNDING_CREATED => true
150+
case channel.WAIT_FOR_FUNDING_SIGNED => true
151+
case channel.WAIT_FOR_FUNDING_CONFIRMED => true
152+
case channel.WAIT_FOR_CHANNEL_READY => true
153+
case channel.WAIT_FOR_OPEN_DUAL_FUNDED_CHANNEL => true
154+
case channel.WAIT_FOR_ACCEPT_DUAL_FUNDED_CHANNEL => true
155+
case channel.WAIT_FOR_DUAL_FUNDING_CREATED => true
156+
case channel.WAIT_FOR_DUAL_FUNDING_CONFIRMED => true
157+
case channel.WAIT_FOR_DUAL_FUNDING_READY => true
158+
case channel.NORMAL => true
159+
case channel.SHUTDOWN => true
160+
case channel.NEGOTIATING => true
161+
case channel.CLOSING => true
162+
case channel.CLOSED => true
163+
case channel.WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT => true
164+
case channel.ERR_INFORMATION_LEAK => true
165+
}
166+
167+
object ChannelStatesCollector {
168+
169+
// @formatter:off
170+
sealed trait Command
171+
private final case class WrappedChannelState(wrapped: channel.RES_GET_CHANNEL_STATE) extends Command
172+
// @formatter:on
173+
174+
def apply(replyTo: ActorRef[ChannelStates], channels: Set[akka.actor.ActorRef]): Behavior[Command] = {
175+
Behaviors.setup { context =>
176+
val channelStateAdapter = context.messageAdapter[channel.RES_GET_CHANNEL_STATE](WrappedChannelState)
177+
channels.foreach(c => c ! channel.CMD_GET_CHANNEL_STATE(channelStateAdapter.toClassic))
178+
collect(replyTo, Nil, channels.size)
179+
}
180+
}
181+
182+
private def collect(replyTo: ActorRef[ChannelStates], received: Seq[channel.ChannelState], remaining: Int): Behavior[Command] = {
183+
Behaviors.receiveMessage {
184+
case WrappedChannelState(wrapped) => remaining match {
185+
case 1 =>
186+
replyTo ! ChannelStates(received :+ wrapped.state)
187+
Behaviors.stopped
188+
case _ =>
189+
collect(replyTo, received :+ wrapped.state, remaining - 1)
190+
}
191+
}
192+
}
193+
194+
}
195+
196+
}

eclair-core/src/main/scala/fr/acinq/eclair/json/JsonSerializers.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ private case class GlobalBalanceJson(total: Btc, onChain: CorrectedOnChainBalanc
462462
object GlobalBalanceSerializer extends ConvertClassSerializer[GlobalBalance](b => GlobalBalanceJson(b.total, b.onChain, b.offChain))
463463

464464
private case class PeerInfoJson(nodeId: PublicKey, state: String, address: Option[String], channels: Int)
465-
object PeerInfoSerializer extends ConvertClassSerializer[Peer.PeerInfo](peerInfo => PeerInfoJson(peerInfo.nodeId, peerInfo.state.toString, peerInfo.address.map(_.toString), peerInfo.channels))
465+
object PeerInfoSerializer extends ConvertClassSerializer[Peer.PeerInfo](peerInfo => PeerInfoJson(peerInfo.nodeId, peerInfo.state.toString, peerInfo.address.map(_.toString), peerInfo.channels.size))
466466

467467
private[json] case class MessageReceivedJson(pathId: Option[ByteVector], encodedReplyPath: Option[String], replyPath: Option[BlindedRoute], unknownTlvs: Map[String, ByteVector])
468468
object OnionMessageReceivedSerializer extends ConvertClassSerializer[OnionMessages.ReceiveMessage](m => MessageReceivedJson(m.finalPayload.pathId_opt, m.finalPayload.replyPath_opt.map(route => blindedRouteCodec.encode(route.blindedRoute).require.bytes.toHex), m.finalPayload.replyPath_opt.map(_.blindedRoute), m.finalPayload.records.unknown.map(tlv => tlv.tag.toString -> tlv.value).toMap))

eclair-core/src/test/scala/fr/acinq/eclair/io/MessageRelaySpec.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class MessageRelaySpec extends ScalaTestWithActorTestKit(ConfigFactory.load("app
103103

104104
val getPeerInfo = switchboard.expectMsgType[GetPeerInfo]
105105
assert(getPeerInfo.remoteNodeId == previousNodeId)
106-
getPeerInfo.replyTo ! PeerInfo(peer.ref.toClassic, previousNodeId, Peer.CONNECTED, None, 0)
106+
getPeerInfo.replyTo ! PeerInfo(peer.ref.toClassic, previousNodeId, Peer.CONNECTED, None, Set.empty)
107107

108108
probe.expectMessage(AgainstPolicy(messageId, RelayChannelsOnly))
109109
peer.expectNoMessage(100 millis)
@@ -119,7 +119,7 @@ class MessageRelaySpec extends ScalaTestWithActorTestKit(ConfigFactory.load("app
119119

120120
val getPeerInfo1 = switchboard.expectMsgType[GetPeerInfo]
121121
assert(getPeerInfo1.remoteNodeId == previousNodeId)
122-
getPeerInfo1.replyTo ! PeerInfo(peer.ref.toClassic, previousNodeId, Peer.CONNECTED, None, 1)
122+
getPeerInfo1.replyTo ! PeerInfo(peer.ref.toClassic, previousNodeId, Peer.CONNECTED, None, Set(TestProbe()(system.classicSystem).ref))
123123

124124
val getPeerInfo2 = switchboard.expectMsgType[GetPeerInfo]
125125
assert(getPeerInfo2.remoteNodeId == bobId)
@@ -139,11 +139,11 @@ class MessageRelaySpec extends ScalaTestWithActorTestKit(ConfigFactory.load("app
139139

140140
val getPeerInfo1 = switchboard.expectMsgType[GetPeerInfo]
141141
assert(getPeerInfo1.remoteNodeId == previousNodeId)
142-
getPeerInfo1.replyTo ! PeerInfo(TestProbe()(system.classicSystem).ref, previousNodeId, Peer.CONNECTED, None, 1)
142+
getPeerInfo1.replyTo ! PeerInfo(TestProbe()(system.classicSystem).ref, previousNodeId, Peer.CONNECTED, None, Set(TestProbe()(system.classicSystem).ref))
143143

144144
val getPeerInfo2 = switchboard.expectMsgType[GetPeerInfo]
145145
assert(getPeerInfo2.remoteNodeId == bobId)
146-
getPeerInfo2.replyTo ! PeerInfo(peer.ref.toClassic, bobId, Peer.CONNECTED, None, 2)
146+
getPeerInfo2.replyTo ! PeerInfo(peer.ref.toClassic, bobId, Peer.CONNECTED, None, Set(0, 1).map(_ => TestProbe()(system.classicSystem).ref))
147147

148148
assert(peer.expectMessageType[Peer.RelayOnionMessage].msg == message)
149149
}

0 commit comments

Comments
 (0)