Skip to content
This repository has been archived by the owner on Aug 28, 2019. It is now read-only.

Commit

Permalink
feat(api): subscribe to swap events (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
michael1011 authored Dec 22, 2018
1 parent 9fc446f commit 47b1f41
Show file tree
Hide file tree
Showing 13 changed files with 1,376 additions and 62 deletions.
2 changes: 1 addition & 1 deletion lib/BoltzMiddleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class BoltzMiddleware {

private api: Api;

constructor(argv: Arguments) {
constructor(argv: Arguments<any>) {
this.config = new Config();
this.config.init(argv);

Expand Down
2 changes: 1 addition & 1 deletion lib/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Config {
];
}

public init = (argv: Arguments) => {
public init = (argv: Arguments<any>) => {
this.parseParameters(argv);
}

Expand Down
3 changes: 3 additions & 0 deletions lib/api/Api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Api {

constructor(private logger: Logger, private config: ApiConfig, service: Service) {
this.app = express();

this.app.use(cors());
this.app.use(express.json());

Expand All @@ -35,6 +36,8 @@ class Api {
this.app.route('/broadcasttransaction').post(controller.broadcastTransaction);

this.app.route('/createswap').post(controller.createSwap);

this.app.route('/swapstatus').get(controller.swapStatus);
}
}

Expand Down
32 changes: 29 additions & 3 deletions lib/api/Controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,18 @@ import { Request, Response } from 'express';
import Service from '../service/Service';

class Controller {
constructor(private service: Service) {}
// A map between the ids and HTTP responses of all pending swaps
private pendingSwaps = new Map<string, Response>();

constructor(private service: Service) {
this.service.on('swap.update', (id: string, message: string) => {
const response = this.pendingSwaps.get(id);

if (response) {
response.write(`data: ${JSON.stringify({ message })}\n\n`);
}
});
}

public getPairs = async (_req: Request, res: Response) => {
const response = this.service.getPairs();
Expand All @@ -18,7 +29,6 @@ class Controller {

const response = await this.service.getTransaction(currency, transactionHash);
this.successResponse(res, response);

} catch (error) {
this.writeErrorResponse(res, error);
}
Expand All @@ -33,7 +43,6 @@ class Controller {

const response = await this.service.broadcastTransaction(currency, transactionHex);
this.successResponse(res, response);

} catch (error) {
this.writeErrorResponse(res, error);
}
Expand All @@ -50,7 +59,24 @@ class Controller {

const response = await this.service.createSwap(pairId, orderSide, invoice, refundPublicKey);
this.swapCreatedResponse(res, response);
} catch (error) {
this.writeErrorResponse(res, error);
}
}

public swapStatus = (req: Request, res: Response) => {
try {
const { id } = this.validateBody(req.query, [
{ name: 'id', type: 'string' },
]);

res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});

this.pendingSwaps.set(id, res);
} catch (error) {
this.writeErrorResponse(res, error);
}
Expand Down
86 changes: 76 additions & 10 deletions lib/boltz/BoltzClient.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import fs from 'fs';
import grpc from 'grpc';
import grpc, { ClientReadableStream } from 'grpc';
import BaseClient from '../BaseClient';
import Logger from '../Logger';
import Errors from './Errors';
Expand All @@ -25,13 +25,25 @@ interface BoltzMethodIndex extends GrpcClient {
[methodName: string]: Function;
}

interface BoltzClient {
on(event: 'transaction.confirmed', listener: (transactionHash: string, outputAddress: string) => void): this;
emit(event: 'transaction.confirmed', transactionHash: string, outputAddress: string): boolean;

on(even: 'invoice.paid', listener: (invoice: string) => void): this;
emit(event: 'invoice.paid', invoice: string): boolean;
}

// TODO: reconnect after stream emits error
class BoltzClient extends BaseClient {
private uri!: string;
private credentials!: grpc.ChannelCredentials;

private lightning!: GrpcClient | BoltzMethodIndex;
private boltz!: GrpcClient | BoltzMethodIndex;
private meta!: grpc.Metadata;

private transactionSubscription?: ClientReadableStream<boltzrpc.SubscribeTransactionsResponse>;
private invoicesSubscription?: ClientReadableStream<boltzrpc.SubscribeInvoicesResponse>;

constructor(private logger: Logger, config: BoltzConfig) {
super();

Expand All @@ -50,11 +62,11 @@ class BoltzClient extends BaseClient {
}

/**
* Connects to Boltz
* Connects to Boltz and subscribes to confirmed transactions and paid invoices afterwards
*/
public connect = async () => {
if (!this.isConnected()) {
this.lightning = new GrpcClient(this.uri, this.credentials);
this.boltz = new GrpcClient(this.uri, this.credentials);

try {
const getInfo = await this.getInfo();
Expand All @@ -65,9 +77,11 @@ class BoltzClient extends BaseClient {
this.setClientStatus(ClientStatus.Connected);
this.clearReconnectTimer();

this.subscribeTransactions();
this.subscribeInvoices();
} catch (error) {
this.logger.error(`Could not connect to Boltz: ${error.message}`);
this.logger.verbose(`Retrying in ${this.RECONNECT_INTERVAL} ms`);
this.logger.info(`Retrying in ${this.RECONNECT_INTERVAL} ms`);

this.setClientStatus(ClientStatus.Disconnected);
this.reconnectionTimer = setTimeout(this.connect, this.RECONNECT_INTERVAL);
Expand All @@ -81,12 +95,16 @@ class BoltzClient extends BaseClient {
public disconnect = async () => {
this.clearReconnectTimer();

this.lightning.close();
if (this.transactionSubscription) {
this.transactionSubscription.cancel();
}

this.boltz.close();
}

private unaryCall = <T, U>(methodName: string, params: T): Promise<U> => {
return new Promise((resolve, reject) => {
(this.lightning as BoltzMethodIndex)[methodName](params, this.meta, (err: grpc.ServiceError, response: GrpcResponse) => {
(this.boltz as BoltzMethodIndex)[methodName](params, this.meta, (err: grpc.ServiceError, response: GrpcResponse) => {
if (err) {
reject(err);
} else {
Expand All @@ -112,7 +130,7 @@ class BoltzClient extends BaseClient {
request.setCurrency(currency);
request.setTransactionHash(transactionHash);

return this.unaryCall<boltzrpc.GetTransactionRequest, boltzrpc.GetTransactionResponse>('getTransaction', request);
return this.unaryCall<boltzrpc.GetTransactionRequest, boltzrpc.GetTransactionResponse.AsObject>('getTransaction', request);
}

/**
Expand All @@ -124,7 +142,55 @@ class BoltzClient extends BaseClient {
request.setCurrency(currency);
request.setTransactionHex(transactionHex);

return this.unaryCall<boltzrpc.BroadcastTransactionRequest, boltzrpc.BroadcastTransactionResponse>('broadcastTransaction', request);
return this.unaryCall<boltzrpc.BroadcastTransactionRequest, boltzrpc.BroadcastTransactionResponse.AsObject>('broadcastTransaction', request);
}

/**
* Adds an entry to the list of addresses SubscribeTransactions listens to
*/
public listenOnAddress = (currency: string, address: string) => {
const request = new boltzrpc.ListenOnAddressRequest();

request.setCurrency(currency);
request.setAddress(address);

return this.unaryCall<boltzrpc.ListenOnAddressRequest, boltzrpc.ListenOnAddressResponse.AsObject>('listenOnAddress', request);
}

/**
* Subscribes to a stream of confirmed transactions to addresses that were specified with "ListenOnAddress"
*/
public subscribeTransactions = () => {
if (this.transactionSubscription) {
this.transactionSubscription.cancel();
}

this.transactionSubscription = this.boltz.subscribeTransactions(new boltzrpc.SubscribeTransactionsRequest(), this.meta)
.on('data', (response: boltzrpc.SubscribeTransactionsResponse) => {
this.logger.silly(`Found transaction to address ${response.getOutputAddress()} confirmed: ${response.getTransactionHash()}`);
this.emit('transaction.confirmed', response.getTransactionHash(), response.getOutputAddress());
})
.on('error', (error) => {
this.logger.error(`Transaction subscription errored: ${stringify(error)}`);
});
}

/**
* Subscribes to a stream of invoices paid by Boltz
*/
public subscribeInvoices = () => {
if (this.invoicesSubscription) {
this.invoicesSubscription.cancel();
}

this.invoicesSubscription = this.boltz.subscribeInvoices(new boltzrpc.SubscribeInvoicesRequest(), this.meta)
.on('data', (response: boltzrpc.SubscribeInvoicesResponse) => {
this.logger.silly(`Paid invoice: ${response.getInvoice()}`);
this.emit('invoice.paid', response.getInvoice());
})
.on('error', (error) => {
this.logger.error(`Invoice subscription errored: ${stringify(error)}`);
});
}

/**
Expand All @@ -146,7 +212,7 @@ class BoltzClient extends BaseClient {
request.setOutputType(outputType);
}

return this.unaryCall<boltzrpc.CreateSwapRequest, boltzrpc.CreateSwapResponse>('createSwap', request);
return this.unaryCall<boltzrpc.CreateSwapRequest, boltzrpc.CreateSwapResponse.AsObject>('createSwap', request);
}
}

Expand Down
47 changes: 47 additions & 0 deletions lib/proto/boltzrpc_grpc_pb.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 47b1f41

Please sign in to comment.