Skip to content

Commit

Permalink
stream: support stream by figi and auto-reconnect, see #4 #6
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalets committed Jul 31, 2022
1 parent eca8c52 commit b91a2c7
Show file tree
Hide file tree
Showing 6 changed files with 480 additions and 91 deletions.
30 changes: 22 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,34 @@ const { candles } = await api.marketdata.getCandles({
Для работы со стримом сделана обертка `api.stream`:
```ts
// подписка на свечи
api.stream.market.watch({ candles: [
{ figi: 'BBG00QPYJ5H0', interval: SubscriptionInterval.SUBSCRIPTION_INTERVAL_ONE_MINUTE }
]});

// обработка событий
api.stream.market.on('data', data => console.log('stream data', data));
const unsubscribe = await api.stream.market.candles({
instruments: [
{ figi: 'BBG00QPYJ5H0', interval: SubscriptionInterval.SUBSCRIPTION_INTERVAL_ONE_MINUTE }
],
waitingClose: false,
}, candle => console.log(candle));

// обработка дополнительных событий
api.stream.market.on('error', error => console.log('stream error', error));
api.stream.market.on('close', error => console.log('stream closed, reason:', error));

// закрыть соединение через 3 сек
setTimeout(() => api.stream.market.cancel(), 3000);
// отписаться
await unsubscribe();

// закрыть соединение
await api.stream.market.cancel();
```
> Примечание: со стримом можно работать и напрямую через `api.marketdataStream`. Но там `AsyncIterable`, которые менее удобны (имхо)
По умолчанию стрим автоматически переподключается при потере соединения ([#4](https://github.com/vitalets/tinkoff-invest-api/issues/4)). Чтобы это отключить, установите `api.stream.market.options.autoReconnect = false`.

Стримы доступны по следующим сущностям:
* `.candles(request, handler)`
* `.trades(request, handler)`
* `.orderBook(request, handler)`
* `.lastPrice(request, handler)`
* `.info(request, handler)`

### Универсальный счет
Для бесшовной работы со счетами в бою и песочнице сделан универсальный интерфейс `TinkoffAccount`.

Expand Down
18 changes: 11 additions & 7 deletions src/stream/base.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* Базовый класс обертки на bi-directional grpc stream.
*/
import { on, EventEmitter } from 'node:events';
import { on, EventEmitter, once } from 'node:events';
import TypedEmitter from 'typed-emitter';
import { TinkoffInvestApi } from '../api.js';

Expand Down Expand Up @@ -44,10 +44,14 @@ export abstract class BaseStream<Req, Res> {
/**
* Отмена запроса.
*/
cancel() {
this.sendSubscriptionRequest(CLOSE_STREAM_VALUE);
// todo: remove all listeners?
// todo: make async and return promise
async cancel() {
if (this.connected) {
this.sendRequest(CLOSE_STREAM_VALUE);
const [ error ] = await once(this.emitter, 'close');
return error;
// todo: remove all data listeners?
// this.emitter.removeAllListeners('data');
}
}

protected async *createRequestIterable() {
Expand All @@ -61,11 +65,11 @@ export abstract class BaseStream<Req, Res> {
}
}

protected sendSubscriptionRequest(req: Req | CloseReq) {
protected sendRequest(req: Req | CloseReq) {
this.emitter.emit('request', req);
}

protected async loop(call: AsyncIterable<Res>) {
protected async waitEvents(call: AsyncIterable<Res>) {
this.connected = true;
this.emitter.emit('open');
let error: Error | undefined = undefined;
Expand Down
110 changes: 110 additions & 0 deletions src/stream/market-subscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Класс подписки в стриме market.
* Позволяет ожидать результата подписки и вызывать обработчики для figi, заданных при подписке.
* See: https://github.com/vitalets/tinkoff-invest-api/issues/6
*/
import {
Candle,
CandleSubscription,
InfoSubscription,
LastPrice,
LastPriceSubscription,
MarketDataRequest,
MarketDataResponse,
OrderBook,
OrderBookSubscription,
SubscriptionAction,
SubscriptionStatus,
Trade,
TradeSubscription,
TradingStatus
} from '../generated/marketdata.js';

type ResponseSubscription = CandleSubscription
| TradeSubscription
| OrderBookSubscription
| LastPriceSubscription
| InfoSubscription;
type ResponseData = Candle | Trade | OrderBook | LastPrice | TradingStatus;

type MarketSubscriptionOptions<S, D> = {
buildRequest: (subscriptionAction: SubscriptionAction) => MarketDataRequest,
buildResponse: (res: MarketDataResponse) => UniversalMarketResponse<S, D>,
dataHandler: (data: D) => unknown,
requestKeys: string[],
}

/**
* Универсальный ответ (одинаковые поля для разных типов подписок)
*/
export type UniversalMarketResponse<S, D> = {
trackingId?: string;
subscriptions?: S[];
// ключи в статусе подписки, по которым определяем, что статус для нужного запроса подписки
subscriptionKeys?: string[];
data?: D;
// ключ данных в ответе, по которому определяем, что данные для нужного обработчика
dataKey?: string;
}

export class MarketSubscription<S extends ResponseSubscription, D extends ResponseData> {
protected waitingStatusResolve?: () => unknown;
protected waitingStatusReject?: (error: Error) => unknown;

constructor(protected options: MarketSubscriptionOptions<S, D>) {
this.handler = this.handler.bind(this);
}

getRequest(subscriptionAction: SubscriptionAction) {
return this.options.buildRequest(subscriptionAction);
}

handler(res: MarketDataResponse) {
const uniRes = this.options.buildResponse(res);
this.statusHandler(uniRes);
this.dataHandler(uniRes);
}

async waitStatus() {
try {
await new Promise<void>((resolve, reject) => {
this.waitingStatusResolve = resolve;
this.waitingStatusReject = reject;
});
} finally {
this.waitingStatusResolve = undefined;
this.waitingStatusReject = undefined;
}
}

// eslint-disable-next-line complexity
protected statusHandler({ subscriptions, subscriptionKeys, trackingId }: UniversalMarketResponse<S, D>) {
if (!this.waitingStatusResolve || !this.waitingStatusReject) return;
if (!subscriptions || !subscriptionKeys) return;
if (subscriptionKeys.sort().join() !== this.options.requestKeys.sort().join()) return;
const errorSubscriptions = subscriptions
.filter(s => s.subscriptionStatus !== SubscriptionStatus.SUBSCRIPTION_STATUS_SUCCESS);
if (errorSubscriptions.length) {
const error = this.buildSubscriptionError(errorSubscriptions, trackingId);
this.waitingStatusReject(error);
} else {
this.waitingStatusResolve();
}
}

protected dataHandler({ data, dataKey }: UniversalMarketResponse<S, D>) {
if (!data || !dataKey) return;
if (this.options.requestKeys.includes(dataKey)) {
this.options.dataHandler(data);
}
}

protected buildSubscriptionError(errorSubscriptions: S[], trackingId?: string) {
const lines = errorSubscriptions.map(s => `${s.figi}: status ${s.subscriptionStatus}`);
return new Error([
'Subscription error:',
...lines,
`TrackingId: ${trackingId}`,
].join('\n'));
}
}
Loading

0 comments on commit b91a2c7

Please sign in to comment.