Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add MPP sender with payment loop #30

Merged
merged 1 commit into from
May 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions PickhardtPayments.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# #PickhardtPayments

## Work in progress!

See https://arxiv.org/abs/2107.05322.
Please reach out to me on Twitter (https://twitter.com/c_otto83) to discuss more about this!

The implementation is based on the piecewise linearization approach:
https://lists.linuxfoundation.org/pipermail/lightning-dev/2022-March/003510.html.

# Requirements
1. Currently (as of v0.14.3-beta, May 2022) lnd does not allow sending a replacement shard once a shard of an active MPP
fails. This, sadly, is necessary to complete MPPs that regularly run into temporary channel failures due to lack of
funds. See https://github.com/lightningnetwork/lnd/issues/5746 for a (possible) fix. You might want to stick to
testnet until this is properly fixed!
2. The graph algorithm implementation used to do the heavy lifting currently is only supported for amd64 (x86_64) on
Linux, Windows, and Mac systems. See https://github.com/C-Otto/lnd-manageJ/issues/13.
3. You need to enable middleware support in lnd: add a section `[rpcmiddleware]` with `rpcmiddleware.enable=true` to
your `lnd.conf`, restart lnd and restart lnd-manageJ. Once enabled, lnd-manageJ will spy on every RPC request and
response, without changing/blocking any of the data. However, despite the read-only configuration, requests may
fail because of this if lnd-manageJ does not respond in time (crash, shutdown, ...).
See https://github.com/lightningnetwork/lnd/issues/6409.

# Fee Rate Weight
The following endpoints allow you to specify a fee rate weight.
The default fee rate weight is 0, which optimizes the computation for reliability and ignores fees.

Any value > 0 takes fees into account. Pick higher fee rate weights to compute cheaper routes.
Note that the probability is still taken into account, even with high fee rate weights. As such, a massive channel
may be picked, even though it charges a high fee rate.

A value of 1 seems to be a good compromise (using the default quantization value)

# Configuration options
You can configure the following values in the `[pickhardt-payments]` section of your `~/.config/lnd-manageJ.conf`
configuration file:

* `liquidity_information_max_age_in_seconds` (default 600, 10 minutes):
* lower/upper bound information observed from payment failures are only kept this long
* this information is kept for each pair of peers
* once any value (lower bound, upper bound, amount in-flight) is updated, the "age" is reset
* `use_mission_control` (default: false)
* regularly augment upper bound information based on information provided by lnd, as part of "mission control"
* this is not as helpful, as lnd-manageJ collects the same information in real-time
* `quantization` (default 10000, in satoshis):
* only consider payment shards with a multiple of this number to lower computational complexity: when sending 20k
sat with a quantization of 10k sat, either one shard worth 20k sat is attempted, or two shards worth 10k
* when sending amounts lower than the configured quantization, the amount itself is used as the quantization
* even if the amount you try to send is not divisible by the configured quantization, the resulting MPP still covers
the whole amount
* `piecewise_linear_approximations` (default: 5):
* this corresponds to `N` in the paper

# MPP computation

You can compute an MPP based on #PickhardtPayments using any of the following endpoints:

* `/beta/pickhardt-payments/from/{source}/to/{target}/amount/{amount}/fee-rate-weight/{feeRateWeight}`
* compute an MPP from the given node `source` to the given node `target`
* `/beta/pickhardt-payments/from/{source}/to/{target}/amount/{amount}`
* as above, with default fee rate weight 0
* `/beta/pickhardt-payments/to/{pubkey}/amount/{amount}/fee-rate-weight/{feeRateWeight}`
* originate payments from the own node
* `/beta/pickhardt-payments/to/{pubkey}/amount/{amount}`
* as above, with default fee rate weight 0

# Paying invoices

Warning: Don't do this on mainnet, yet! This is very much work in progress.

* `/beta/pickhardt-payments/pay-payment-request/{paymentRequest}/fee-rate-weight/{feeRateWeight}`
* Pay the given payment request (also known as invoice) using the configured fee rate weight.
* `/beta/pickhardt-payments/pay-payment-request/{paymentRequest}`
* as above, with default fee rate weight 0

The response shows a somewhat readable representation of the payment progress, including the final result.
5 changes: 5 additions & 0 deletions model/src/main/java/de/cotto/lndmanagej/model/Coins.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,9 @@ public String toString() {
double coins = BigDecimal.valueOf(milliSatoshis, SCALE).doubleValue();
return String.format(Locale.ENGLISH, "%,.3f", coins);
}

public String toStringSat() {
long coins = BigDecimal.valueOf(milliSatoshis(), SCALE).longValue();
return String.format(Locale.ENGLISH, "%,d", coins);
}
}
9 changes: 0 additions & 9 deletions model/src/main/java/de/cotto/lndmanagej/model/Route.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,4 @@ private static Coins getFeesForEdgeAndAmount(Coins amountWithFees, EdgeWithLiqui
Coins relativeFees = Coins.ofMilliSatoshis(feeRate * amountWithFees.milliSatoshis() / 1_000_000);
return baseFeeForHop.add(relativeFees);
}

@Override
public String toString() {
return "Route{" +
"edgesWithLiquidityInformation=" + edgesWithLiquidityInformation +
", amount=" + amount +
", feesForHops=" + feesForHops +
'}';
}
}
10 changes: 10 additions & 0 deletions model/src/test/java/de/cotto/lndmanagej/model/CoinsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ void justSatoshi() {
assertThat(Coins.ofSatoshis(12_345)).hasToString("12,345.000");
}

@Test
void toStringSat() {
assertThat(Coins.ofSatoshis(12_345).toStringSat()).isEqualTo("12,345");
}

@Test
void toStringSat_with_milli_sat() {
assertThat(Coins.ofMilliSatoshis(12_345_999).toStringSat()).isEqualTo("12,345");
}

@Test
void justMilliCoins() {
assertThat(Coins.ofSatoshis(12_300_000)).hasToString("12,300,000.000");
Expand Down
3 changes: 3 additions & 0 deletions pickhardt-payments/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ dependencies {
implementation 'com.google.ortools:ortools-java:9.3.10497'
implementation 'org.eclipse.collections:eclipse-collections:11.0.0'
testImplementation testFixtures(project(':model'))
integrationTestImplementation project(':backend')
integrationTestImplementation project(':grpc-adapter')
integrationTestImplementation testFixtures(project(':model'))
testFixturesImplementation testFixtures(project(':model'))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package de.cotto.lndmanagej.pickhardtpayments;

import de.cotto.lndmanagej.grpc.SendToRouteObserver;
import de.cotto.lndmanagej.model.Coins;
import de.cotto.lndmanagej.model.HexString;
import de.cotto.lndmanagej.service.LiquidityInformationUpdater;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static de.cotto.lndmanagej.model.DecodedPaymentRequestFixtures.DECODED_PAYMENT_REQUEST;
import static de.cotto.lndmanagej.model.RouteFixtures.ROUTE;
import static org.assertj.core.api.Assertions.assertThatCode;

@ExtendWith(MockitoExtension.class)
class MultiPathPaymentObserverIT {
private static final HexString PAYMENT_HASH = DECODED_PAYMENT_REQUEST.paymentHash();
private static final Duration DURATION = Duration.ofSeconds(1);
private static final Coins IN_FLIGHT = Coins.ofSatoshis(100);

@InjectMocks
private MultiPathPaymentObserver multiPathPaymentObserver;

@Mock
@SuppressWarnings("unused")
private LiquidityInformationUpdater liquidityInformationUpdater;

private final Executor executor = Executors.newCachedThreadPool();

@Test
@Timeout(30)
void waitForInFlightChange_stuck() {
multiPathPaymentObserver.getFor(ROUTE, PAYMENT_HASH);
assertThatCode(
() -> multiPathPaymentObserver.waitForInFlightChange(DURATION, PAYMENT_HASH, IN_FLIGHT)
).doesNotThrowAnyException();
}

@Test
@Timeout(value = 900, unit = TimeUnit.MILLISECONDS)
void waitForInFlightChange_changed_from_other_thread() {
SendToRouteObserver sendToRouteObserver = multiPathPaymentObserver.getFor(ROUTE, PAYMENT_HASH);
executor.execute(() -> unlockAfterSomeMilliSeconds(sendToRouteObserver));

assertThatCode(
() -> multiPathPaymentObserver.waitForInFlightChange(DURATION, PAYMENT_HASH, IN_FLIGHT)
).doesNotThrowAnyException();
}

@Test
@Timeout(value = 900, unit = TimeUnit.MILLISECONDS)
void waitForInFlightChange_two_waiting_changed_from_other_thread() {
SendToRouteObserver sendToRouteObserver = multiPathPaymentObserver.getFor(ROUTE, PAYMENT_HASH);
executor.execute(() -> unlockAfterSomeMilliSeconds(sendToRouteObserver));
executor.execute(() -> multiPathPaymentObserver.waitForInFlightChange(DURATION, PAYMENT_HASH, IN_FLIGHT));

assertThatCode(
() -> multiPathPaymentObserver.waitForInFlightChange(DURATION, PAYMENT_HASH, IN_FLIGHT)
).doesNotThrowAnyException();
}

private void unlockAfterSomeMilliSeconds(SendToRouteObserver sendToRouteObserver) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore
}
sendToRouteObserver.onError(new NullPointerException());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ public Flows getOptimalFlows(Pubkey source, Pubkey target, Coins amount, int fee
private int getQuantization(Coins amount) {
int quantization = configurationService.getIntegerValue(QUANTIZATION)
.orElse(DEFAULT_QUANTIZATION);
if (amount.satoshis() < quantization) {
return (int) amount.satoshis();
}
return quantization;
return (int) Math.min(amount.satoshis(), quantization);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,21 @@
import de.cotto.lndmanagej.service.LiquidityInformationUpdater;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

@Component
public class MultiPathPaymentObserver {
private final LiquidityInformationUpdater liquidityInformationUpdater;
private final Map<HexString, PaymentInformation> map = new ConcurrentHashMap<>();
private final Map<HexString, CountDownLatch> latches = new ConcurrentHashMap<>();

public MultiPathPaymentObserver(LiquidityInformationUpdater liquidityInformationUpdater) {
this.liquidityInformationUpdater = liquidityInformationUpdater;
Expand All @@ -39,12 +43,19 @@ public boolean isSettled(HexString paymentHash) {
return get(paymentHash).settled();
}

public boolean isFailed(HexString paymentHash) {
return get(paymentHash).failed();
public Optional<FailureCode> getFailureCode(HexString paymentHash) {
return get(paymentHash).failureCode();
}

private void addInFlight(HexString paymentHash, Coins amount) {
update(paymentHash, value -> value.withAdditionalInFlight(amount));
latches.compute(paymentHash, (key, value) -> {
if (value == null) {
return null;
}
value.countDown();
return null;
});
}

private PaymentInformation get(HexString paymentHash) {
Expand Down Expand Up @@ -77,6 +88,23 @@ private List<PaymentAttemptHop> topPaymentAttemptHops(Route route) {
return result;
}

public void waitForInFlightChange(Duration timeout, HexString paymentHash, Coins referenceInFlight) {
while (getInFlight(paymentHash).equals(referenceInFlight)) {
try {
boolean changedWithinTimeout = getLatch(paymentHash).await(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (!changedWithinTimeout) {
return;
}
} catch (InterruptedException ignored) {
return;
}
}
}

private CountDownLatch getLatch(HexString paymentHash) {
return latches.compute(paymentHash, (key, value) -> value == null ? new CountDownLatch(1) : value);
}

private class SendToRouteObserverImpl implements SendToRouteObserver {
private final Route route;
private final HexString paymentHash;
Expand All @@ -97,7 +125,7 @@ public void onError(Throwable throwable) {
public void onValue(HexString preimage, FailureCode failureCode) {
if (preimage.equals(HexString.EMPTY)) {
if (failureCode.isErrorFromFinalNode()) {
update(paymentHash, PaymentInformation::withIsFailed);
update(paymentHash, paymentInformation -> paymentInformation.withFailureCode(failureCode));
}
} else {
update(paymentHash, PaymentInformation::withIsSettled);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,27 @@
package de.cotto.lndmanagej.pickhardtpayments;

import de.cotto.lndmanagej.grpc.GrpcPayments;
import de.cotto.lndmanagej.grpc.GrpcSendToRoute;
import de.cotto.lndmanagej.grpc.SendToRouteObserver;
import de.cotto.lndmanagej.model.Coins;
import de.cotto.lndmanagej.model.DecodedPaymentRequest;
import de.cotto.lndmanagej.model.HexString;
import de.cotto.lndmanagej.model.Pubkey;
import de.cotto.lndmanagej.model.Route;
import de.cotto.lndmanagej.pickhardtpayments.model.MultiPathPayment;
import de.cotto.lndmanagej.pickhardtpayments.model.PaymentStatus;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class MultiPathPaymentSender {
private final GrpcPayments grpcPayments;
private final GrpcSendToRoute grpcSendToRoute;
private final MultiPathPaymentSplitter multiPathPaymentSplitter;
private final MultiPathPaymentObserver multiPathPaymentObserver;
private final PaymentLoop paymentLoop;

public MultiPathPaymentSender(
GrpcPayments grpcPayments,
GrpcSendToRoute grpcSendToRoute,
MultiPathPaymentSplitter multiPathPaymentSplitter,
MultiPathPaymentObserver multiPathPaymentObserver
) {
public MultiPathPaymentSender(GrpcPayments grpcPayments, PaymentLoop paymentLoop) {
this.grpcPayments = grpcPayments;
this.grpcSendToRoute = grpcSendToRoute;
this.multiPathPaymentSplitter = multiPathPaymentSplitter;
this.multiPathPaymentObserver = multiPathPaymentObserver;
this.paymentLoop = paymentLoop;
}

public MultiPathPayment payPaymentRequest(String paymentRequest, int feeRateWeight) {
public PaymentStatus payPaymentRequest(String paymentRequest, int feeRateWeight) {
DecodedPaymentRequest decodedPaymentRequest = grpcPayments.decodePaymentRequest(paymentRequest).orElse(null);
if (decodedPaymentRequest == null) {
return MultiPathPayment.FAILURE;
}
Pubkey destination = decodedPaymentRequest.destination();
Coins amount = decodedPaymentRequest.amount();
MultiPathPayment multiPathPayment =
multiPathPaymentSplitter.getMultiPathPaymentTo(destination, amount, feeRateWeight);
List<Route> routes = multiPathPayment.routes();
HexString paymentHash = decodedPaymentRequest.paymentHash();
for (Route route : routes) {
SendToRouteObserver sendToRouteObserver = multiPathPaymentObserver.getFor(route, paymentHash);
grpcSendToRoute.sendToRoute(route, decodedPaymentRequest, sendToRouteObserver);
return PaymentStatus.UNABLE_TO_DECODE_PAYMENT_REQUEST;
}
return multiPathPayment;
PaymentStatus paymentStatus = new PaymentStatus(decodedPaymentRequest.paymentHash());
paymentLoop.start(decodedPaymentRequest, feeRateWeight, paymentStatus);
return paymentStatus;
}
}
Loading