Skip to content

Commit

Permalink
feat(txs-tracer-core): ✨ improve tx trace machine
Browse files Browse the repository at this point in the history
improved tx trace machine state structer and add websocket query to rpc node.
  • Loading branch information
DavideSegullo committed Mar 4, 2023
1 parent 2cb93a9 commit 31c6000
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 53 deletions.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
"scripts": {},
"private": true,
"dependencies": {
"@cosmjs/encoding": "^0.29.5",
"@cosmjs/stargate": "^0.29.5",
"@cosmjs/tendermint-rpc": "^0.29.5",
"xstate": "^4.37.0"
"xstate": "^4.37.0",
"xstream": "^11.14.0"
},
"devDependencies": {
"@nrwl/eslint-plugin-nx": "15.7.2",
Expand Down
1 change: 1 addition & 0 deletions packages/txs-tracer-core/src/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './types';
export * from './machines';
export * from './utils';
Original file line number Diff line number Diff line change
@@ -1,19 +1,60 @@
import { interpret } from 'xstate';
import { txTraceMachine } from './txs-trace-machine';

test('should eventually reach "pending"', () =>
new Promise<void>(done => {
const fetchService = interpret(txTraceMachine).onTransition(state => {
if (state.matches('pending')) {
done();
}
});
describe('txs-trace-machine', () => {
test('should eventually reach "pending_subscription"', () =>
new Promise<void>(done => {
const fetchService = interpret(
txTraceMachine.withContext({
...txTraceMachine.context,
}),
)
.onTransition(state => {
console.log(state.value);
if (state.matches('pending_subscription')) {
done();
}
})
.onEvent(event => {
console.log(event.type);
});

fetchService.start();
fetchService.start();

/*
* Send zero or more events to the service that should
* cause it to eventually reach its expected state
*/
fetchService.send({ type: 'TRACE' });
}));
/*
* Send zero or more events to the service that should
* cause it to eventually reach its expected state
*/
fetchService.send({ type: 'TRACE' });
}));

test(
'should eventually reach "results"',
() =>
new Promise<void>(done => {
const fetchService = interpret(
txTraceMachine.withContext({
...txTraceMachine.context,
subscribeTimeout: 1000,
query: "message.action='/osmosis.gamm.v1beta1.MsgSwapExactAmountIn'",
}),
).onTransition(state => {
if (state.matches('results')) {
console.log(state.context.txs);
done();
}
});

fetchService.start();

/*
* Send zero or more events to the service that should
* cause it to eventually reach its expected state
*/
fetchService.send({ type: 'TRACE' });
}),
{
timeout: 30_000,
},
);
});
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { createMachine, raise } from 'xstate';
import { Tendermint34Client } from '@cosmjs/tendermint-rpc';
import { TxTraceContext } from '../../types';
import { Tendermint34Client, TxEvent } from '@cosmjs/tendermint-rpc';
import { Listener, Stream } from 'xstream';
import { TxTraceContext, TxTraceEvents } from '../../types';
import { mapIndexedTx } from '../../utils';

export const txTraceMachine = createMachine(
{
Expand Down Expand Up @@ -37,12 +39,7 @@ export const txTraceMachine = createMachine(
},
},
},
pending: {
after: {
TX_TIMEOUT: {
target: 'pending',
},
},
pending_search_txs: {
on: {
TX_RESULTS: {
target: 'result',
Expand All @@ -55,39 +52,50 @@ export const txTraceMachine = createMachine(
},
},
},
pending_subscription: {
entry: ['subscribeToQuery'],
after: {
SUBSCRIBE_TIMEOUT: {
target: 'pending_search_txs',
},
},
on: {
TX_RESULTS: {
target: 'result',
},
CONNECTION_DISCONNECT: {
target: 'connection_error',
},
},
},
result: {
type: 'final',
data: (ctx, event) => ({
event,
txs: ctx.txs,
}),
entry: ['closeConnection'],
},
connected: {
on: {
SEND_QUERY_MESSAGE: { target: 'pending' },
SEND_QUERY_MESSAGE: { target: 'pending_subscription' },
},
},
closed: {
type: 'final',
entry: ctx => {
if (ctx.tendermintClient) {
ctx.tendermintClient.disconnect();

ctx.tendermintClient = undefined;
}
},
entry: ['closeConnection'],
},
connection_timeout: {
type: 'final',
entry: ctx => {
if (ctx.tendermintClient) {
ctx.tendermintClient.disconnect();

ctx.tendermintClient = undefined;
}
},
entry: ['closeConnection'],
},
not_found_error: {
type: 'final',
entry: ['closeConnection'],
},
connection_error: {
type: 'final',
entry: ['closeConnection'],
},
idle: {
on: {
Expand All @@ -99,30 +107,55 @@ export const txTraceMachine = createMachine(
},
schema: {
context: {} as TxTraceContext,
events: {} as
| { type: 'TX_RESULTS' }
| { type: 'CONNECTION_SUCCESS' }
| { type: 'CONNECTION_ERROR' }
| { type: 'SEND_QUERY_MESSAGE' }
| { type: 'CONNECTION_DISCONNECT' }
| { type: 'TRACE' }
| { type: 'TX_SEARCH_EMPTY' },
events: {} as TxTraceEvents,
},
context: {
tendermintClient: undefined,
txTimeout: 5000,
subscribeTimeout: 5000,
connectionTimeout: 5000,
websocketUrl: 'wss://rpc-osmosis.blockapsis.com',
query: "acknowledge_packet.packet_sequence='1753590'",
method: 'tx_search',
query: "acknowledge_packet.packet_sequence='1777404'",
txs: [],
},
predictableActionArguments: true,
preserveActionOrder: true,
},
{
actions: {
closeConnection: ctx => {
if (ctx.tendermintClient) {
ctx.tendermintClient.disconnect();

ctx.tendermintClient = undefined;
}
},
subscribeToQuery: ctx => {
let subscription: Stream<TxEvent>;
let listener: Listener<TxEvent>;

if (ctx.tendermintClient) {
subscription = ctx.tendermintClient.subscribeTx(ctx.query).take(1);

listener = {
next: txResult => {
ctx.txs = [mapIndexedTx(txResult)];
},
error: err => {
console.error(err);
raise('CONNECTION_DISCONNECT');
},
complete: () => {
raise('TX_RESULTS');
},
};

subscription.addListener(listener);
}
},
},
delays: {
TX_TIMEOUT: context => {
return context.txTimeout;
SUBSCRIBE_TIMEOUT: context => {
return context.subscribeTimeout;
},
CONNECTION_TIMEOUT: context => {
return context.connectionTimeout;
Expand Down
14 changes: 12 additions & 2 deletions packages/txs-tracer-core/src/lib/types/tx-trace.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
import { IndexedTx } from '@cosmjs/stargate';
import { Tendermint34Client } from '@cosmjs/tendermint-rpc';

export interface TxTraceContext {
txTimeout: number;
subscribeTimeout: number;
connectionTimeout: number;
websocketUrl: string;
query: string;
method: string;
txs: IndexedTx[];
tendermintClient?: Tendermint34Client;
}

export type TxTraceEvents =
| { type: 'TX_RESULTS' }
| { type: 'CONNECTION_SUCCESS' }
| { type: 'CONNECTION_ERROR' }
| { type: 'SEND_QUERY_MESSAGE' }
| { type: 'CONNECTION_DISCONNECT' }
| { type: 'TRACE' }
| { type: 'TX_SEARCH_EMPTY' };
1 change: 1 addition & 0 deletions packages/txs-tracer-core/src/lib/utils/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './tx';
14 changes: 14 additions & 0 deletions packages/txs-tracer-core/src/lib/utils/tx.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { TxEvent, TxResponse } from '@cosmjs/tendermint-rpc';
import { fromTendermint34Event, IndexedTx } from '@cosmjs/stargate';
import { toHex } from '@cosmjs/encoding';

export const mapIndexedTx = (tx: TxResponse | TxEvent): IndexedTx => ({
height: tx.height,
hash: toHex(tx.hash).toUpperCase(),
code: tx.result.code,
events: tx.result.events.map(fromTendermint34Event),
rawLog: tx.result.log || '',
tx: tx.tx,
gasUsed: tx.result.gasUsed,
gasWanted: tx.result.gasWanted,
});
4 changes: 4 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 31c6000

Please sign in to comment.