Skip to content

Commit

Permalink
Channel range queries: send back node announcements (#1108)
Browse files Browse the repository at this point in the history
* Channel Range Queries: send back node announcements if requested

This PR adds support for sending back node announcements when replying to channel range queries:
- when explicitly requested (bit is set in the optional query flag)
- when query flags are not used and a channel announcement is sent (as per the BOLTs)

A new configuration option `request-node-announcements` has been added in the `router` section. If set to true, we
will request node announcements when we receive a channel id (through channel range queries) that we don't know of.
This is a setting that we will probably turn off on mobile devices.

* Increase tests timeouts

There is now more work to do.

* Test query sync with and without node announcements

* Router: minor fix

* Router: rework query handling
  • Loading branch information
sstone authored and pm47 committed Aug 26, 2019
1 parent 0780fc2 commit 92d9f2a
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 100 deletions.
1 change: 1 addition & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ eclair {
channel-exclude-duration = 60 seconds // when a temporary channel failure is returned, we exclude the channel from our payment routes for this duration
broadcast-interval = 60 seconds // see BOLT #7
init-timeout = 5 minutes
request-node-announcements = true // if true we will ask for node announcements when we receive channel ids that we don't know

// the values below will be used to perform route searching
path-finding {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ object NodeParams {
channelExcludeDuration = FiniteDuration(config.getDuration("router.channel-exclude-duration").getSeconds, TimeUnit.SECONDS),
routerBroadcastInterval = FiniteDuration(config.getDuration("router.broadcast-interval").getSeconds, TimeUnit.SECONDS),
randomizeRouteSelection = config.getBoolean("router.randomize-route-selection"),
requestNodeAnnouncements = config.getBoolean("router.request-node-announcements"),
searchMaxRouteLength = config.getInt("router.path-finding.max-route-length"),
searchMaxCltv = config.getInt("router.path-finding.max-cltv"),
searchMaxFeeBase = Satoshi(config.getLong("router.path-finding.fee-threshold-sat")),
Expand Down
194 changes: 145 additions & 49 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import scala.util.{Random, Try}
case class RouterConf(randomizeRouteSelection: Boolean,
channelExcludeDuration: FiniteDuration,
routerBroadcastInterval: FiniteDuration,
requestNodeAnnouncements: Boolean,
searchMaxFeeBase: Satoshi,
searchMaxFeePct: Double,
searchMaxRouteLength: Int,
Expand Down Expand Up @@ -535,7 +536,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
ids match {
case Nil => acc.reverse
case head :: tail =>
val flag = computeFlag(d.channels, d.updates)(head, timestamps.headOption, checksums.headOption)
val flag = computeFlag(d.channels, d.updates)(head, timestamps.headOption, checksums.headOption, nodeParams.routerConf.requestNodeAnnouncements)
// 0 means nothing to query, just don't include it
val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc
loop(tail, timestamps.drop(1), checksums.drop(1), acc1)
Expand All @@ -549,7 +550,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[

val (channelCount, updatesCount) = shortChannelIdAndFlags.foldLeft((0, 0)) {
case ((c, u), ShortChannelIdAndFlag(_, flag)) =>
val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeAnnouncement(flag)) 1 else 0)
val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeChannelAnnouncement(flag)) 1 else 0)
val u1 = u + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) 1 else 0) + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) 1 else 0)
(c1, u1)
}
Expand All @@ -573,26 +574,29 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[

case Event(PeerRoutingMessage(transport, _, routingMessage@QueryShortChannelIds(chainHash, shortChannelIds, queryFlags_opt)), d) =>
sender ! TransportHandler.ReadAck(routingMessage)
val (channelCount, updatesCount) = shortChannelIds.array
.zipWithIndex
.foldLeft((0, 0)) {
case ((c, u), (shortChannelId, idx)) =>
var c1 = c
var u1 = u
val flag = routingMessage.queryFlags_opt.map(_.array(idx)).getOrElse(QueryShortChannelIdsTlv.QueryFlagType.INCLUDE_ALL)
d.channels.get(shortChannelId) match {
case None => log.warning("received query for shortChannelId={} that we don't have", shortChannelId)
case Some(ca) =>
if (QueryShortChannelIdsTlv.QueryFlagType.includeAnnouncement(flag)) {
transport ! ca
c1 = c1 + 1
}
if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId1, ca.nodeId2)).foreach { u => transport ! u; u1 = u1 + 1 }
if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId2, ca.nodeId1)).foreach { u => transport ! u; u1 = u1 + 1 }
}
(c1, u1)
val flags = routingMessage.queryFlags_opt.map(_.array).getOrElse(List.empty[Long])

var channelCount = 0
var updateCount = 0
var nodeCount = 0

Router.handleQuery(d.nodes, d.channels, d.updates)(
shortChannelIds.array,
flags,
ca => {
channelCount = channelCount + 1
transport ! ca
},
cu => {
updateCount = updateCount + 1
transport ! cu
},
na => {
nodeCount = nodeCount + 1
transport ! na
}
log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates", shortChannelIds.array.size, channelCount, updatesCount)
)
log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} nodes", shortChannelIds.array.size, channelCount, updateCount, nodeCount)
transport ! ReplyShortChannelIdsEnd(chainHash, 1)
stay

Expand Down Expand Up @@ -853,43 +857,135 @@ object Router {
height >= firstBlockNum && height <= (firstBlockNum + numberOfBlocks)
}

def computeFlag(channels: SortedMap[ShortChannelId, ChannelAnnouncement], updates: Map[ChannelDesc, ChannelUpdate])(
shortChannelId: ShortChannelId,
timestamps_opt: Option[ReplyChannelRangeTlv.Timestamps],
checksums_opt: Option[ReplyChannelRangeTlv.Checksums]): Long = {
import QueryShortChannelIdsTlv.QueryFlagType
var flag = 0L
(timestamps_opt, checksums_opt) match {
case (Some(theirTimestamps), Some(theirChecksums)) if channels.contains(shortChannelId) =>
val (ourTimestamps, ourChecksums) = Router.getChannelDigestInfo(channels, updates)(shortChannelId)
def shouldRequestUpdate(ourTimestamp: Long, ourChecksum: Long, theirTimestamp_opt: Option[Long], theirChecksum_opt: Option[Long]): Boolean = {
(theirTimestamp_opt, theirChecksum_opt) match {
case (Some(theirTimestamp), Some(theirChecksum)) =>
// we request their channel_update if all those conditions are met:
// - it is more recent than ours
// - it is different from ours, or it is the same but ours is about to be stale
// - it is not stale itself
if (ourTimestamps.timestamp1 < theirTimestamps.timestamp1 && (ourChecksums.checksum1 != theirChecksums.checksum1 || isAlmostStale(ourTimestamps.timestamp1)) && !isStale(theirTimestamps.timestamp1)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1
if (ourTimestamps.timestamp2 < theirTimestamps.timestamp2 && (ourChecksums.checksum2 != theirChecksums.checksum2 || isAlmostStale(ourTimestamps.timestamp1)) && !isStale(theirTimestamps.timestamp2)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
case (Some(theirTimestamps), None) if channels.contains(shortChannelId) =>
val (ourTimestamps, _) = Router.getChannelDigestInfo(channels, updates)(shortChannelId)
// we request their channel_update if all those conditions are met:
// - it is more recent than ours
// - it is not stale itself
if (ourTimestamps.timestamp1 < theirTimestamps.timestamp1 && !isStale(theirTimestamps.timestamp1)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1
if (ourTimestamps.timestamp2 < theirTimestamps.timestamp2 && !isStale(theirTimestamps.timestamp2)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
case (None, Some(theirChecksums)) if channels.contains(shortChannelId) =>
val (_, ourChecksums) = Router.getChannelDigestInfo(channels, updates)(shortChannelId)
// - it is not stale
val theirsIsMoreRecent = ourTimestamp < theirTimestamp
val theirsIsDifferent = ourChecksum != theirChecksum
val oursIsAlmostStale = isAlmostStale(ourTimestamp)
val theirsIsStale = isStale(theirTimestamp)
theirsIsMoreRecent && (theirsIsDifferent || oursIsAlmostStale) && !theirsIsStale
case (Some(theirTimestamp), None) =>
val theirsIsMoreRecent = ourTimestamp < theirTimestamp
val theirsIsStale = isStale(theirTimestamp)
theirsIsMoreRecent && !theirsIsStale
case (None, Some(theirChecksum)) =>
// this should not happen as we will not ask for checksums without asking for timestamps too
if (ourChecksums.checksum1 != theirChecksums.checksum1 && theirChecksums.checksum1 != 0) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1
if (ourChecksums.checksum2 != theirChecksums.checksum2 && theirChecksums.checksum2 != 0) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
case (None, None) if channels.contains(shortChannelId) =>
// we know this channel: we only request their channel updates
flag = QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
val theirsIsDifferent = theirChecksum != 0 && ourChecksum != theirChecksum
theirsIsDifferent
case _ =>
// we don't know this channel: we request everything
flag = QueryFlagType.INCLUDE_CHANNEL_ANNOUNCEMENT | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
// they did not include timestamp or checksum => ask for the update
true
}
}

def computeFlag(channels: SortedMap[ShortChannelId, ChannelAnnouncement], updates: Map[ChannelDesc, ChannelUpdate])(
shortChannelId: ShortChannelId,
theirTimestamps_opt: Option[ReplyChannelRangeTlv.Timestamps],
theirChecksums_opt: Option[ReplyChannelRangeTlv.Checksums],
includeNodeAnnouncements: Boolean): Long = {
import QueryShortChannelIdsTlv.QueryFlagType._

val flag = channels.contains(shortChannelId) match {
case false if includeNodeAnnouncements =>
INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2 | INCLUDE_NODE_ANNOUNCEMENT_1 | INCLUDE_NODE_ANNOUNCEMENT_2
case false =>
INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2
case true =>
// we already know this channel
val (ourTimestamps, ourChecksums) = Router.getChannelDigestInfo(channels, updates)(shortChannelId)
// if they don't provide timestamps or checksums, we set appropriate default values:
// - we assume their timestamp is more recent than ours by setting timestamp = Long.MaxValue
// - we assume their update is different from ours by setting checkum = Long.MaxValue (NB: our default value for checksum is 0)
val shouldRequestUpdate1 = shouldRequestUpdate(ourTimestamps.timestamp1, ourChecksums.checksum1, theirTimestamps_opt.map(_.timestamp1), theirChecksums_opt.map(_.checksum1))
val shouldRequestUpdate2 = shouldRequestUpdate(ourTimestamps.timestamp2, ourChecksums.checksum2, theirTimestamps_opt.map(_.timestamp2), theirChecksums_opt.map(_.checksum2))
val flagUpdate1 = if (shouldRequestUpdate1) INCLUDE_CHANNEL_UPDATE_1 else 0
val flagUpdate2 = if (shouldRequestUpdate2) INCLUDE_CHANNEL_UPDATE_2 else 0
flagUpdate1 | flagUpdate2
}
flag
}

/**
* Handle a query message, which includes a list of channel ids and flags.
*
* @param nodes node id -> node announcement
* @param channels channel id -> channel announcement
* @param updates channel description -> channel update
* @param ids list of channel ids
* @param flags list of query flags, either empty one flag per channel id
* @param onChannel called when a channel announcement matches (i.e. its bit is set in the query flag and we have it)
* @param onUpdate called when a channel update matches
* @param onNode called when a node announcement matches
*
*/
def handleQuery(nodes: Map[PublicKey, NodeAnnouncement],
channels: SortedMap[ShortChannelId, ChannelAnnouncement],
updates: Map[ChannelDesc, ChannelUpdate])(
ids: List[ShortChannelId],
flags: List[Long],
onChannel: ChannelAnnouncement => Unit,
onUpdate: ChannelUpdate => Unit,
onNode: NodeAnnouncement => Unit): Unit = {
import QueryShortChannelIdsTlv.QueryFlagType

// we loop over channel ids and query flag. We track node Ids for node announcement
// we've already sent to avoid sending them multiple times, as requested by the BOLTs
@tailrec
def loop(ids: List[ShortChannelId], flags: List[Long], numca: Int = 0, numcu: Int = 0, nodesSent: Set[PublicKey] = Set.empty[PublicKey]): (Int, Int, Int) = ids match {
case Nil => (numca, numcu, nodesSent.size)
case head :: tail if !channels.contains(head) =>
//log.warning("received query for shortChannelId={} that we don't have", head)
loop(tail, flags.drop(1), numca, numcu, nodesSent)
case head :: tail =>
var numca1 = numca
var numcu1 = numcu
var sent1 = nodesSent
val ca = channels(head)
val flag_opt = flags.headOption
// no flag means send everything

val includeChannel = flag_opt.forall(QueryFlagType.includeChannelAnnouncement)
val includeUpdate1 = flag_opt.forall(QueryFlagType.includeUpdate1)
val includeUpdate2 = flag_opt.forall(QueryFlagType.includeUpdate2)
val includeNode1 = flag_opt.forall(QueryFlagType.includeNodeAnnouncement1)
val includeNode2 = flag_opt.forall(QueryFlagType.includeNodeAnnouncement2)

if (includeChannel) {
onChannel(ca)
}
if (includeUpdate1) {
updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId1, ca.nodeId2)).foreach { u =>
onUpdate(u)
}
}
if (includeUpdate2) {
updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId2, ca.nodeId1)).foreach { u =>
onUpdate(u)
}
}
if (includeNode1 && !sent1.contains(ca.nodeId1)) {
nodes.get(ca.nodeId1).foreach { n =>
onNode(n)
sent1 = sent1 + ca.nodeId1
}
}
if (includeNode2 && !sent1.contains(ca.nodeId2)) {
nodes.get(ca.nodeId2).foreach { n =>
onNode(n)
sent1 = sent1 + ca.nodeId2
}
}
loop(tail, flags.drop(1), numca1, numcu1, sent1)
}

loop(ids, flags)
}

/**
* Returns overall progress on synchronization
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package fr.acinq.eclair.wire

import fr.acinq.eclair.UInt64
import fr.acinq.eclair.wire.CommonCodecs.{shortchannelid, varint, varintoverflow}
import fr.acinq.eclair.wire.CommonCodecs.{varint, varintoverflow}
import scodec.Codec
import scodec.codecs.{byte, discriminated, list, provide, variableSizeBytesLong, zlib}

Expand All @@ -20,13 +20,18 @@ object QueryShortChannelIdsTlv {
val INCLUDE_CHANNEL_ANNOUNCEMENT: Long = 1
val INCLUDE_CHANNEL_UPDATE_1: Long = 2
val INCLUDE_CHANNEL_UPDATE_2: Long = 4
val INCLUDE_ALL: Long = (INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2)
val INCLUDE_NODE_ANNOUNCEMENT_1: Long = 8
val INCLUDE_NODE_ANNOUNCEMENT_2: Long = 16

def includeAnnouncement(flag: Long) = (flag & INCLUDE_CHANNEL_ANNOUNCEMENT) != 0
def includeChannelAnnouncement(flag: Long) = (flag & INCLUDE_CHANNEL_ANNOUNCEMENT) != 0

def includeUpdate1(flag: Long) = (flag & INCLUDE_CHANNEL_UPDATE_1) != 0

def includeUpdate2(flag: Long) = (flag & INCLUDE_CHANNEL_UPDATE_2) != 0

def includeNodeAnnouncement1(flag: Long) = (flag & INCLUDE_NODE_ANNOUNCEMENT_1) != 0

def includeNodeAnnouncement2(flag: Long) = (flag & INCLUDE_NODE_ANNOUNCEMENT_2) != 0
}

val encodedQueryFlagsCodec: Codec[EncodedQueryFlags] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ object TestConstants {
randomizeRouteSelection = false,
channelExcludeDuration = 60 seconds,
routerBroadcastInterval = 5 seconds,
requestNodeAnnouncements = true,
searchMaxFeeBase = Satoshi(21),
searchMaxFeePct = 0.03,
searchMaxCltv = 2016,
Expand Down Expand Up @@ -176,6 +177,7 @@ object TestConstants {
randomizeRouteSelection = false,
channelExcludeDuration = 60 seconds,
routerBroadcastInterval = 5 seconds,
requestNodeAnnouncements = true,
searchMaxFeeBase = Satoshi(21),
searchMaxFeePct = 0.03,
searchMaxCltv = 2016,
Expand Down
Loading

0 comments on commit 92d9f2a

Please sign in to comment.