Skip to content

Commit

Permalink
Relay to channel with lowest possible balance (#784)
Browse files Browse the repository at this point in the history
* relay to channel with lowest possible balance

Our current channel selection is very simplistic: we relay to the
channel with the largest balance. As time goes by, this leads to all
channels having the same balance.

A better strategy is to relay to the channel which has the smallest
balance but has enough to process the payment. This way we save larger
channels for larger payments, and also on average channels get depleted
one after the other.

* added tests...

...and found bugs!

Note that there is something fishy in BOLT 4, filed a PR:
lightning/bolts#538

Also, first try of softwaremill's quicklens lib (in scope test for now)

* minor: fixed typo (h/t @btcontract)
  • Loading branch information
pm47 authored Jan 21, 2019
1 parent 3f72b44 commit 3aa5754
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 20 deletions.
6 changes: 6 additions & 0 deletions eclair-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@
<version>${guava.version}</version>
</dependency>
<!-- TESTS -->
<dependency>
<groupId>com.softwaremill.quicklens</groupId>
<artifactId>quicklens_${scala.version.short}</artifactId>
<version>1.4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.whisk</groupId>
<artifactId>docker-testkit-scalatest_${scala.version.short}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu
self ! TickRefreshChannelUpdate
}
context.system.eventStream.publish(ChannelSignatureSent(self, commitments1))
context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, nextRemoteCommit.spec.toRemoteMsat)) // note that remoteCommit.toRemote == toLocal
context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, nextRemoteCommit.spec.toRemoteMsat, commitments1.availableBalanceForSendMsat)) // note that remoteCommit.toRemote == toLocal
// we expect a quick response from our peer
setTimer(RevocationTimeout.toString, RevocationTimeout(commitments1.remoteCommit.index, peer = context.parent), timeout = nodeParams.revocationTimeout, repeat = false)
handleCommandSuccess(sender, store(d.copy(commitments = commitments1))) sending commit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class ChannelFailed(channel: ActorRef, channelId: BinaryData, remoteNodeId:
case class NetworkFeePaid(channel: ActorRef, remoteNodeId: PublicKey, channelId: BinaryData, tx: Transaction, fee: Satoshi, txType: String) extends ChannelEvent

// NB: this event is only sent when the channel is available
case class AvailableBalanceChanged(channel: ActorRef, channelId: BinaryData, shortChannelId: ShortChannelId, localBalanceMsat: Long) extends ChannelEvent
case class AvailableBalanceChanged(channel: ActorRef, channelId: BinaryData, shortChannelId: ShortChannelId, localBalanceMsat: Long, availableBalanceForSendMsat: Long) extends ChannelEvent

case class ChannelPersisted(channel: ActorRef, remoteNodeId: PublicKey, channelId: BinaryData, data: Data) extends ChannelEvent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ case class Commitments(localParams: LocalParams, remoteParams: RemoteParams,
def addRemoteProposal(proposal: UpdateMessage): Commitments = Commitments.addRemoteProposal(this, proposal)

def announceChannel: Boolean = (channelFlags & 0x01) != 0

def availableBalanceForSendMsat: Long = {
val reduced = CommitmentSpec.reduce(remoteCommit.spec, remoteChanges.acked, localChanges.proposed)
val fees = if (localParams.isFunder) Transactions.commitTxFee(Satoshi(remoteParams.dustLimitSatoshis), reduced).amount else 0
reduced.toRemoteMsat / 1000 - remoteParams.channelReserveSatoshis - fees
}
}

object Commitments {
Expand Down
31 changes: 15 additions & 16 deletions eclair-core/src/main/scala/fr/acinq/eclair/payment/Relayer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,15 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorR

case LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, commitments) =>
log.debug(s"updating local channel info for channelId=$channelId shortChannelId=$shortChannelId remoteNodeId=$remoteNodeId channelUpdate={} commitments={}", channelUpdate, commitments)
val availableLocalBalance = commitments.remoteCommit.spec.toRemoteMsat // note that remoteCommit.toRemote == toLocal
context become main(channelUpdates + (channelUpdate.shortChannelId -> OutgoingChannel(remoteNodeId, channelUpdate, availableLocalBalance)), node2channels.addBinding(remoteNodeId, channelUpdate.shortChannelId))
context become main(channelUpdates + (channelUpdate.shortChannelId -> OutgoingChannel(remoteNodeId, channelUpdate, commitments.availableBalanceForSendMsat)), node2channels.addBinding(remoteNodeId, channelUpdate.shortChannelId))

case LocalChannelDown(_, channelId, shortChannelId, remoteNodeId) =>
log.debug(s"removed local channel info for channelId=$channelId shortChannelId=$shortChannelId")
context become main(channelUpdates - shortChannelId, node2channels.removeBinding(remoteNodeId, shortChannelId))

case AvailableBalanceChanged(_, _, shortChannelId, localBalanceMsat) =>
case AvailableBalanceChanged(_, _, shortChannelId, _, availableBalanceForSendMsat) =>
val channelUpdates1 = channelUpdates.get(shortChannelId) match {
case Some(c: OutgoingChannel) => channelUpdates + (shortChannelId -> c.copy(availableBalanceMsat = localBalanceMsat))
case Some(c: OutgoingChannel) => channelUpdates + (shortChannelId -> c.copy(availableBalanceMsat = availableBalanceForSendMsat))
case None => channelUpdates // we only consider the balance if we have the channel_update
}
context become main(channelUpdates1, node2channels)
Expand Down Expand Up @@ -200,7 +199,7 @@ object Relayer {
sealed trait NextPayload
case class FinalPayload(add: UpdateAddHtlc, payload: PerHopPayload) extends NextPayload
case class RelayPayload(add: UpdateAddHtlc, payload: PerHopPayload, nextPacket: Sphinx.Packet) extends NextPayload {
val relayFeeSatoshi = add.amountMsat - payload.amtToForward
val relayFeeMsat = add.amountMsat - payload.amtToForward
val expiryDelta = add.cltvExpiry - payload.outgoingCltvValue
}
// @formatter:on
Expand Down Expand Up @@ -264,19 +263,19 @@ object Relayer {
case Some(channelUpdate) if !Announcements.isEnabled(channelUpdate.channelFlags) =>
Left(CMD_FAIL_HTLC(add.id, Right(ChannelDisabled(channelUpdate.messageFlags, channelUpdate.channelFlags, channelUpdate)), commit = true))
case Some(channelUpdate) if payload.amtToForward < channelUpdate.htlcMinimumMsat =>
Left(CMD_FAIL_HTLC(add.id, Right(AmountBelowMinimum(add.amountMsat, channelUpdate)), commit = true))
Left(CMD_FAIL_HTLC(add.id, Right(AmountBelowMinimum(payload.amtToForward, channelUpdate)), commit = true))
case Some(channelUpdate) if relayPayload.expiryDelta != channelUpdate.cltvExpiryDelta =>
Left(CMD_FAIL_HTLC(add.id, Right(IncorrectCltvExpiry(add.cltvExpiry, channelUpdate)), commit = true))
case Some(channelUpdate) if relayPayload.relayFeeSatoshi < nodeFee(channelUpdate.feeBaseMsat, channelUpdate.feeProportionalMillionths, payload.amtToForward) =>
Left(CMD_FAIL_HTLC(add.id, Right(IncorrectCltvExpiry(payload.outgoingCltvValue, channelUpdate)), commit = true))
case Some(channelUpdate) if relayPayload.relayFeeMsat < nodeFee(channelUpdate.feeBaseMsat, channelUpdate.feeProportionalMillionths, payload.amtToForward) =>
Left(CMD_FAIL_HTLC(add.id, Right(FeeInsufficient(add.amountMsat, channelUpdate)), commit = true))
case Some(channelUpdate) =>
val isRedirected = (channelUpdate.shortChannelId != payload.shortChannelId) // we may decide to use another channel (to the same node) that the one requested
val isRedirected = (channelUpdate.shortChannelId != payload.shortChannelId) // we may decide to use another channel (to the same node) from the one requested
Right(CMD_ADD_HTLC(payload.amtToForward, add.paymentHash, payload.outgoingCltvValue, nextPacket.serialize, upstream_opt = Some(add), commit = true, redirected = isRedirected))
}
}

/**
* Select a channel to the same node to the relay the payment to, that has the highest balance and is compatible in
* Select a channel to the same node to the relay the payment to, that has the lowest balance and is compatible in
* terms of fees, expiry_delta, etc.
*
* If no suitable channel is found we default to the originally requested channel.
Expand All @@ -293,24 +292,24 @@ object Relayer {
log.debug(s"selecting next channel for htlc #{} paymentHash={} from channelId={} to requestedShortChannelId={}", add.id, add.paymentHash, add.channelId, requestedShortChannelId)
// first we find out what is the next node
channelUpdates.get(requestedShortChannelId) match {
case Some(OutgoingChannel(nextNodeId, _, requestedChannelId)) =>
case Some(OutgoingChannel(nextNodeId, _, _)) =>
log.debug(s"next hop for htlc #{} paymentHash={} is nodeId={}", add.id, add.paymentHash, nextNodeId)
// then we retrieve all known channels to this node
val candidateChannels = node2channels.get(nextNodeId).getOrElse(Set.empty)
val candidateChannels = node2channels.get(nextNodeId).getOrElse(Set.empty[ShortChannelId])
// and we filter keep the ones that are compatible with this payment (mainly fees, expiry delta)
candidateChannels
.map {
case shortChannelId =>
.map { shortChannelId =>
val channelInfo_opt = channelUpdates.get(shortChannelId)
val channelUpdate_opt = channelInfo_opt.map(_.channelUpdate)
val relayResult = handleRelay(relayPayload, channelUpdate_opt)
log.debug(s"candidate channel for htlc #${add.id} paymentHash=${add.paymentHash}: shortChannelId={} balanceMsat={} channelUpdate={} relayResult={}", shortChannelId, channelInfo_opt.map(_.availableBalanceMsat).getOrElse(""), channelUpdate_opt.getOrElse(""), relayResult)
(shortChannelId, channelInfo_opt, relayResult)
}
.collect { case (shortChannelId, Some(channelInfo), Right(_)) => (shortChannelId, channelInfo.availableBalanceMsat) }
.filter(_._2 > relayPayload.payload.amtToForward) // we only keep channels that have enough balance to handle this payment
.toList // needed for ordering
.sortBy(_._2) // we want to use the channel with the highest available balance
.lastOption match {
.sortBy(_._2) // we want to use the channel with the lowest available balance that can process the payment
.headOption match {
case Some((preferredShortChannelId, availableBalanceMsat)) if preferredShortChannelId != requestedShortChannelId =>
log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, preferredShortChannelId, availableBalanceMsat)
preferredShortChannelId
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package fr.acinq.eclair.payment

import akka.http.impl.util.DefaultNoLogging
import fr.acinq.bitcoin.Block
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.channel.{CMD_ADD_HTLC, CMD_FAIL_HTLC}
import fr.acinq.eclair.crypto.Sphinx
import fr.acinq.eclair.payment.Relayer.{OutgoingChannel, RelayPayload}
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{ShortChannelId, randomBytes, randomKey}
import org.scalatest.FunSuite

import scala.collection.mutable

class ChannelSelectionSpec extends FunSuite {

/**
* This is just a simplified helper function with random values for fields we are not using here
*/
def dummyUpdate(shortChannelId: ShortChannelId, cltvExpiryDelta: Int, htlcMinimumMsat: Long, feeBaseMsat: Long, feeProportionalMillionths: Long, htlcMaximumMsat: Long, enable: Boolean = true) =
Announcements.makeChannelUpdate(Block.RegtestGenesisBlock.hash, randomKey, randomKey.publicKey, shortChannelId, cltvExpiryDelta, htlcMinimumMsat, feeBaseMsat, feeProportionalMillionths, htlcMaximumMsat, enable)

test("handle relay") {
val relayPayload = RelayPayload(
add = UpdateAddHtlc(randomBytes(32), 42, 1000000, randomBytes(32), 70, ""),
payload = PerHopPayload(ShortChannelId(12345), amtToForward = 998900, outgoingCltvValue = 60),
nextPacket = Sphinx.LAST_PACKET // just a placeholder
)

val channelUpdate = dummyUpdate(ShortChannelId(12345), 10, 100, 1000, 100, 10000000, true)

implicit val log = DefaultNoLogging

// nominal case
assert(Relayer.handleRelay(relayPayload, Some(channelUpdate)) === Right(CMD_ADD_HTLC(relayPayload.payload.amtToForward, relayPayload.add.paymentHash, relayPayload.payload.outgoingCltvValue, relayPayload.nextPacket.serialize, upstream_opt = Some(relayPayload.add), commit = true, redirected = false)))
// redirected to preferred channel
assert(Relayer.handleRelay(relayPayload, Some(channelUpdate.copy(shortChannelId = ShortChannelId(1111)))) === Right(CMD_ADD_HTLC(relayPayload.payload.amtToForward, relayPayload.add.paymentHash, relayPayload.payload.outgoingCltvValue, relayPayload.nextPacket.serialize, upstream_opt = Some(relayPayload.add), commit = true, redirected = true)))
// no channel_update
assert(Relayer.handleRelay(relayPayload, channelUpdate_opt = None) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(UnknownNextPeer), commit = true)))
// channel disabled
val channelUpdate_disabled = channelUpdate.copy(channelFlags = Announcements.makeChannelFlags(true, enable = false))
assert(Relayer.handleRelay(relayPayload, Some(channelUpdate_disabled)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(ChannelDisabled(channelUpdate_disabled.messageFlags, channelUpdate_disabled.channelFlags, channelUpdate_disabled)), commit = true)))
// amount too low
val relayPayload_toolow = relayPayload.copy(payload = relayPayload.payload.copy(amtToForward = 99))
assert(Relayer.handleRelay(relayPayload_toolow, Some(channelUpdate)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(AmountBelowMinimum(relayPayload_toolow.payload.amtToForward, channelUpdate)), commit = true)))
// incorrect cltv expiry
val relayPayload_incorrectcltv = relayPayload.copy(payload = relayPayload.payload.copy(outgoingCltvValue = 42))
assert(Relayer.handleRelay(relayPayload_incorrectcltv, Some(channelUpdate)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(IncorrectCltvExpiry(relayPayload_incorrectcltv.payload.outgoingCltvValue, channelUpdate)), commit = true)))
// insufficient fee
val relayPayload_insufficientfee = relayPayload.copy(payload = relayPayload.payload.copy(amtToForward = 998910))
assert(Relayer.handleRelay(relayPayload_insufficientfee, Some(channelUpdate)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(FeeInsufficient(relayPayload_insufficientfee.add.amountMsat, channelUpdate)), commit = true)))
// note that a generous fee is ok!
val relayPayload_highfee = relayPayload.copy(payload = relayPayload.payload.copy(amtToForward = 900000))
assert(Relayer.handleRelay(relayPayload_highfee, Some(channelUpdate)) === Right(CMD_ADD_HTLC(relayPayload_highfee.payload.amtToForward, relayPayload_highfee.add.paymentHash, relayPayload_highfee.payload.outgoingCltvValue, relayPayload_highfee.nextPacket.serialize, upstream_opt = Some(relayPayload.add), commit = true, redirected = false)))
}

test("relay channel selection") {

val relayPayload = RelayPayload(
add = UpdateAddHtlc(randomBytes(32), 42, 1000000, randomBytes(32), 70, ""),
payload = PerHopPayload(ShortChannelId(12345), amtToForward = 998900, outgoingCltvValue = 60),
nextPacket = Sphinx.LAST_PACKET // just a placeholder
)

val (a, b) = (randomKey.publicKey, randomKey.publicKey)
val channelUpdate = dummyUpdate(ShortChannelId(12345), 10, 100, 1000, 100, 10000000, true)

val channelUpdates = Map(
ShortChannelId(11111) -> OutgoingChannel(a, channelUpdate, 100000000),
ShortChannelId(12345) -> OutgoingChannel(a, channelUpdate, 20000000),
ShortChannelId(22222) -> OutgoingChannel(a, channelUpdate, 10000000),
ShortChannelId(33333) -> OutgoingChannel(a, channelUpdate, 100000),
ShortChannelId(44444) -> OutgoingChannel(b, channelUpdate, 1000000)
)

val node2channels = new mutable.HashMap[PublicKey, mutable.Set[ShortChannelId]] with mutable.MultiMap[PublicKey, ShortChannelId]
node2channels.put(a, mutable.Set(ShortChannelId(12345), ShortChannelId(11111), ShortChannelId(22222), ShortChannelId(33333)))
node2channels.put(b, mutable.Set(ShortChannelId(44444)))

implicit val log = DefaultNoLogging

import com.softwaremill.quicklens._

// select the channel to the same node, with the lowest balance but still high enough to handle the payment
assert(Relayer.selectPreferredChannel(relayPayload, channelUpdates, node2channels) === ShortChannelId(22222))
// higher amount payment (have to increased incoming htlc amount for fees to be sufficient)
assert(Relayer.selectPreferredChannel(relayPayload.modify(_.add.amountMsat).setTo(60000000).modify(_.payload.amtToForward).setTo(50000000), channelUpdates, node2channels) === ShortChannelId(11111))
// lower amount payment
assert(Relayer.selectPreferredChannel(relayPayload.modify(_.payload.amtToForward).setTo(1000), channelUpdates, node2channels) === ShortChannelId(33333))
// payment too high, no suitable channel, we keep the requested one
assert(Relayer.selectPreferredChannel(relayPayload.modify(_.payload.amtToForward).setTo(1000000000), channelUpdates, node2channels) === ShortChannelId(12345))
// invalid cltv expiry, no suitable channel, we keep the requested one
assert(Relayer.selectPreferredChannel(relayPayload.modify(_.payload.outgoingCltvValue).setTo(40), channelUpdates, node2channels) === ShortChannelId(12345))

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ class RelayerSpec extends TestkitBaseClass {
val channelId_ab: BinaryData = randomBytes(32)
val channelId_bc: BinaryData = randomBytes(32)

def makeCommitments(channelId: BinaryData) = Commitments(null, null, 0.toByte, null,
def makeCommitments(channelId: BinaryData) = new Commitments(null, null, 0.toByte, null,
RemoteCommit(42, CommitmentSpec(Set.empty, 20000, 5000000, 100000000), "00" * 32, randomKey.toPoint),
null, null, 0, 0, Map.empty, null, null, null, channelId)
null, null, 0, 0, Map.empty, null, null, null, channelId) {
override def availableBalanceForSendMsat: Long = remoteCommit.spec.toRemoteMsat // approximation
}

test("relay an htlc-add") { f =>
import f._
Expand Down

0 comments on commit 3aa5754

Please sign in to comment.