Skip to content

Commit

Permalink
feat: new grpc call for subscribring alerts such as low balance (#864)
Browse files Browse the repository at this point in the history
  • Loading branch information
rsercano committed Nov 18, 2020
1 parent 74a59dc commit a448e24
Show file tree
Hide file tree
Showing 20 changed files with 1,910 additions and 381 deletions.
75 changes: 75 additions & 0 deletions docs/api.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 3 additions & 24 deletions lib/cli/commands/streamorders.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { ServiceError, status } from 'grpc';
import { Arguments, Argv } from 'yargs';
import { XudClient } from '../../proto/xudrpc_grpc_pb';
import * as xudrpc from '../../proto/xudrpc_pb';
import { setTimeoutPromise } from '../../utils/utils';
import { loadXudClient } from '../command';
import { onStreamError, waitForClient } from '../utils';

export const command = 'streamorders [existing]';

Expand All @@ -26,20 +25,8 @@ const ensureConnection = async (argv: Arguments, printError?: boolean) => {
if (!client) {
client = await loadXudClient(argv);
}
client.waitForReady(Date.now() + 3000, (error: Error | null) => {
if (error) {
if (error.message === 'Failed to connect before the deadline') {
console.error(`could not connect to xud at ${argv.rpchost}:${argv.rpcport}, is xud running?`);
process.exit(1);
}

if (printError) console.error(`${error.name}: ${error.message}`);
setTimeout(ensureConnection.bind(undefined, argv, printError), 3000);
} else {
console.log('Successfully connected, subscribing for orders');
streamOrders(argv);
}
});
waitForClient(client, argv, ensureConnection, streamOrders, printError);
};

const streamOrders = (argv: Arguments<any>) => {
Expand All @@ -57,15 +44,7 @@ const streamOrders = (argv: Arguments<any>) => {
// adding end, close, error events only once,
// since they'll be thrown for three of subscriptions in the corresponding cases, catching once is enough.
ordersSubscription.on('end', reconnect.bind(undefined, argv));
ordersSubscription.on('error', async (err: ServiceError) => {
if (err.code === status.UNIMPLEMENTED) {
console.error("xud is locked, run 'xucli unlock', 'xucli create', or 'xucli restore' then try again");
process.exit(1);
}
console.warn(`Unexpected error occured: ${err.message}, reconnecting in 1 second`);
await setTimeoutPromise(1000);
await ensureConnection(argv);
});
ordersSubscription.on('error', onStreamError.bind(undefined, ensureConnection.bind(undefined, argv)));

const swapsRequest = new xudrpc.SubscribeSwapsRequest();
swapsRequest.setIncludeTaker(true);
Expand Down
51 changes: 51 additions & 0 deletions lib/cli/commands/subscribealerts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { Arguments, Argv } from 'yargs';
import { XudClient } from '../../proto/xudrpc_grpc_pb';
import * as xudrpc from '../../proto/xudrpc_pb';
import { loadXudClient } from '../command';
import { AlertType } from '../../constants/enums';
import { onStreamError, waitForClient } from '../utils';

export const command = 'subscribealerts';

export const describe = 'subscribe alerts such as low balance';

export const builder = (argv: Argv) => argv
.option('pretty', {
type: 'boolean',
})
.example('$0 subscribealerts -j', 'prints alert payload in a JSON structure')
.example('$0 subscribealerts', 'prints alert message only');

export const handler = async (argv: Arguments) => {
await ensureConnection(argv, true);
};

let client: XudClient;

const ensureConnection = async (argv: Arguments, printError?: boolean) => {
if (!client) {
client = await loadXudClient(argv);
}

waitForClient(client, argv, ensureConnection, subscribeAlerts, printError);
};

const subscribeAlerts = (argv: Arguments<any>) => {
const request = new xudrpc.SubscribeAlertsRequest();
const alertsSubscription = client.subscribeAlerts(request);

alertsSubscription.on('data', (alert: xudrpc.Alert) => {
if (argv.json) {
console.log(JSON.stringify(alert, undefined, 2));
} else {
console.log(`${AlertType[alert.getType()]}: ${alert.getMessage()}`);
}
});
alertsSubscription.on('end', reconnect.bind(undefined, argv));
alertsSubscription.on('error', onStreamError.bind(undefined, ensureConnection.bind(undefined, argv)));
};

const reconnect = async (argv: Arguments) => {
console.log('Stream has closed, trying to reconnect');
await ensureConnection(argv, false);
};
31 changes: 31 additions & 0 deletions lib/cli/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import colors from 'colors/safe';
import { accessSync, watch } from 'fs';
import os from 'os';
import path from 'path';
import { XudClient } from '../proto/xudrpc_grpc_pb';
import { Arguments } from 'yargs';
import { ServiceError, status } from 'grpc';
import { setTimeoutPromise } from '../utils/utils';

const SATOSHIS_PER_COIN = 10 ** 8;

Expand Down Expand Up @@ -80,3 +84,30 @@ export const waitForCert = (certPath: string) => {
}
});
};

export const waitForClient = (client: XudClient, argv: Arguments, ensureConnection: Function, successCallback: Function, printError?: boolean) => {
client.waitForReady(Date.now() + 3000, (error: Error | null) => {
if (error) {
if (error.message === 'Failed to connect before the deadline') {
console.error(`could not connect to xud at ${argv.rpchost}:${argv.rpcport}, is xud running?`);
process.exit(1);
}

if (printError) console.error(`${error.name}: ${error.message}`);
setTimeout(ensureConnection.bind(undefined, argv, printError), 3000);
} else {
console.log('Successfully connected, subscribing for alerts');
successCallback(argv);
}
});
};

export const onStreamError = async (ensureConnection: Function, err: ServiceError) => {
if (err.code === status.UNIMPLEMENTED) {
console.error("xud is locked, run 'xucli unlock', 'xucli create', or 'xucli restore' then try again");
process.exit(1);
}
console.warn(`Unexpected error occured: ${err.message}, reconnecting in 1 second`);
await setTimeoutPromise(1000);
await ensureConnection();
};
48 changes: 46 additions & 2 deletions lib/connextclient/ConnextClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import assert from 'assert';
import http from 'http';
import { SwapClientType, SwapRole, SwapState } from '../constants/enums';
import { ChannelSide, SwapClientType, SwapRole, SwapState } from '../constants/enums';
import { CurrencyInstance } from '../db/types';
import { XudError } from '../types';
import Logger from '../Logger';
Expand All @@ -14,7 +14,7 @@ import SwapClient, {
PaymentStatus,
WithdrawArguments,
} from '../swaps/SwapClient';
import { SwapDeal, CloseChannelParams, OpenChannelParams, SwapCapacities } from '../swaps/types';
import { SwapDeal, CloseChannelParams, OpenChannelParams, SwapCapacities, ChannelBalanceAlert } from '../swaps/types';
import { UnitConverter } from '../utils/UnitConverter';
import errors, { errorCodes } from './errors';
import {
Expand Down Expand Up @@ -46,13 +46,15 @@ interface ConnextClient {
on(event: 'htlcAccepted', listener: (rHash: string, amount: number, currency: string) => void): this;
on(event: 'connectionVerified', listener: (swapClientInfo: SwapClientInfo) => void): this;
on(event: 'depositConfirmed', listener: (hash: string) => void): this;
on(event: 'lowBalance', listener: (alert: ChannelBalanceAlert) => void): this;
once(event: 'initialized', listener: () => void): this;
emit(event: 'htlcAccepted', rHash: string, amount: number, currency: string): boolean;
emit(event: 'connectionVerified', swapClientInfo: SwapClientInfo): boolean;
emit(event: 'initialized'): boolean;
emit(event: 'preimage', preimageRequest: ProvidePreimageEvent): void;
emit(event: 'transferReceived', transferReceivedRequest: TransferReceivedEvent): void;
emit(event: 'depositConfirmed', hash: string): void;
emit(event: 'lowBalance', alert: ChannelBalanceAlert): boolean;
}

/**
Expand Down Expand Up @@ -332,11 +334,53 @@ class ConnextClient extends SwapClient {
channelBalancePromises.push(this.channelBalance(currency));
}
await Promise.all(channelBalancePromises);

for (const [currency, address] of this.tokenAddresses) {
const remoteBalance = this.inboundAmounts.get(currency) || 0;
const localBalance = this.outboundAmounts.get(currency) || 0;
const totalBalance = remoteBalance + localBalance;
const alertThreshold = totalBalance * 0.1;

this.checkLowBalance(
remoteBalance,
localBalance,
totalBalance,
alertThreshold,
currency,
address,
this.emit,
);
}
} catch (e) {
this.logger.error('failed to update total outbound capacity', e);
}
}

private checkLowBalance = (remoteBalance: number, localBalance: number, totalBalance: number,
alertThreshold: number, currency: string, channelPoint: string, emit: Function) => {
if (localBalance < alertThreshold) {
emit('lowBalance', {
totalBalance,
currency,
channelPoint,
side: ChannelSide.Local,
sideBalance: localBalance,
bound: 10,
});
}

if (remoteBalance < alertThreshold) {
emit('lowBalance', {
totalBalance,
currency,
channelPoint,
side: ChannelSide.Remote,
sideBalance: remoteBalance,
bound: 10,
});
}
}

protected getTokenAddress(currency: string): string {
const tokenAdress = this.tokenAddresses.get(currency);
if (!tokenAdress) {
Expand Down
9 changes: 9 additions & 0 deletions lib/constants/enums.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,12 @@ export enum DisconnectionReason {
AuthFailureInvalidSignature = 12,
WireProtocolErr = 13,
}

export enum AlertType {
LowBalance = 0,
}

export enum ChannelSide {
Remote = 0,
Local = 1,
}
Loading

0 comments on commit a448e24

Please sign in to comment.