Skip to content

Commit

Permalink
Merge pull request #5808 from connext/testnet-prod
Browse files Browse the repository at this point in the history
prod <- testnet-prod Sync
  • Loading branch information
preethamr authored Mar 1, 2024
2 parents 902c305 + 4da7aac commit d49d9b4
Show file tree
Hide file tree
Showing 25 changed files with 485 additions and 80 deletions.
80 changes: 79 additions & 1 deletion packages/adapters/cache/src/lib/caches/routers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ export class RoutersCache extends Cache {
// TODO: Implement configurable expiry times per domain.
// Default expiry time (in seconds) after which liquidity data is considered stale.
public static readonly DEFAULT_LIQUIDITY_EXPIRY = 30; // 30 seconds.
public static readonly DEFAULT_APPROVAL_EXPIRY = 24 * 60 * 60; // 24 hours.
private readonly prefix = "routers";

/**
Expand Down Expand Up @@ -59,4 +58,83 @@ export class RoutersCache extends Cache {
}),
);
}

/**
* Set last active time for a given router.
* @param router - Router address.
*/
public async setLastActive(router: string): Promise<void> {
const key = `${this.prefix}:active`;
await this.data.hset(key, router, getNtpTimeSeconds().toString());
}

/**
* Get the recorded last active time for a given router.
* @param router - Router address.
* @returns The timestamp if recorded, 0 if not found.
*/
public async getLastActive(router: string): Promise<number> {
const key = `${this.prefix}:active`;
const res = await this.data.hget(key, router);
const lastActiveTime = res ? +res : 0;
return lastActiveTime;
}

/**
* Set the last bid time for a given router.
* @param router - Router address.
* @param bid - The bid instance.
*/
public async setLastBidTime(
router: string,
bid: { originDomain: string; destinationDomain: string; asset: string },
): Promise<void> {
const key = `${this.prefix}:bid`;
const bidKey = `${bid.originDomain}:${bid.destinationDomain}:${bid.asset}`;
let lastBids: Record<string, string> = {};
const currentTimestamp = getNtpTimeSeconds().toString();
const res = await this.data.hget(key, router);
if (res) {
lastBids = JSON.parse(res);
}
lastBids[bidKey] = currentTimestamp;
await this.data.hset(key, router, JSON.stringify(lastBids));
}

/**
* Get the recorded last bid time for a given router.
* @param router - Router address.
* @returns A record of transfer path, undefined if not found
*/
public async getLastBidTime(router: string): Promise<Record<string, string> | undefined> {
const key = `${this.prefix}:bid`;
const res = await this.data.hget(key, router);
return res ? JSON.parse(res) : undefined;
}

/**
* Add a router to the list.
* @param router - Router address.
*/
public async addRouter(router: string): Promise<void> {
const activeKey = `${this.prefix}:active`;
const addressKey = `${this.prefix}:address`;
const res = await this.data.hget(activeKey, router);
if (!res) {
await this.data.hset(activeKey, router, getNtpTimeSeconds().toString());
await this.data.rpush(addressKey, router);
}
}

/**
* Get the recorded router addresses.
* @param offset - The start index.
* @param limit - The number of items to fetch.
* @returns The list of router address.
*/
public async getRouters(offset = 0, limit = 100): Promise<string[]> {
const addressKey = `${this.prefix}:address`;
const routers = await this.data.lrange(addressKey, offset, offset + limit - 1);
return routers;
}
}
77 changes: 52 additions & 25 deletions packages/adapters/cache/test/lib/caches/routers.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Logger, expect, mock, getNtpTimeSeconds } from "@connext/nxtp-utils";
import { Logger, expect, mock, getNtpTimeSeconds, mkAddress } from "@connext/nxtp-utils";

import { RoutersCache } from "../../../src/index";
import { TimestampedCacheValue } from "../../../src/lib/entities";
Expand All @@ -8,7 +8,7 @@ const redis = new RedisMock();

describe("RoutersCache", () => {
const prefix = "routers";
// Helpers for accessing mock cache directly and altering state.
// // Helpers for accessing mock cache directly and altering state.
const mockRedisHelpers = {
setLiquidity: async (domain: string, router: string, asset: string, amount: string, timestamp?: number) =>
await redis.hset(
Expand Down Expand Up @@ -60,10 +60,10 @@ describe("RoutersCache", () => {
const asset = mock.asset.A.address;
const amount = "1234567890";

await mockRedisHelpers.setLiquidity(domain, router, asset, amount);
await cache.setLiquidity(domain, router, asset, amount);
const res = await cache.getLiquidity(domain, router, asset);

expect(res.toString()).to.be.eq(amount);
expect(res!.toString()).to.be.eq(amount);
});

it("sad: should return undefined if liquidity data does not exist", async () => {
Expand All @@ -80,17 +80,14 @@ describe("RoutersCache", () => {
const domain = mock.domain.A;
const router = mock.address.router;
const asset = mock.asset.A.address;

await mockRedisHelpers.setLiquidity(
domain,
router,
asset,
"123",
// Subtract another 10 secs to be safe.
"123", // Subtract another 10 secs to be safe.
getNtpTimeSeconds() - RoutersCache.DEFAULT_LIQUIDITY_EXPIRY - 10,
);
const res = await cache.getLiquidity(domain, router, asset);

expect(res).to.be.undefined;
});
});
Expand All @@ -101,33 +98,63 @@ describe("RoutersCache", () => {
const router = mock.address.router;
const asset = mock.asset.A.address;
const amount = "1234567890";
const currentTime = getNtpTimeSeconds();

await cache.setLiquidity(domain, router, asset, amount);
const res = await mockRedisHelpers.getLiquidity(domain, router, asset);
const res = await cache.getLiquidity(domain, router, asset);

expect(res.value).to.be.eq(amount);
expect(res.timestamp).to.be.a("number");
expect(res.timestamp).to.be.gte(currentTime);
expect(res!.toString()).to.be.eq(amount);
});

it("happy: should update existing liquidity amount, along with timestamp", async () => {
const domain = mock.domain.A;
const router = mock.address.router;
const asset = mock.asset.A.address;
const originalAmount = "1234567890";
const originalTimestamp = 123;
const newAmount = "9876543210";
const currentTime = getNtpTimeSeconds();

await mockRedisHelpers.setLiquidity(domain, router, asset, originalAmount, originalTimestamp);
await cache.setLiquidity(domain, router, asset, newAmount);

const res = await mockRedisHelpers.getLiquidity(domain, router, asset);
expect(res.value).to.be.eq(newAmount);
expect(res.timestamp).to.be.a("number");
expect(res.timestamp).to.not.be.eq(originalTimestamp);
expect(res.timestamp).to.be.gte(currentTime);

await cache.setLiquidity(domain, router, asset, originalAmount);
const res = await cache.getLiquidity(domain, router, asset);
expect(res?.toString()).to.be.eq(originalAmount);
});
});

describe("#setLastActive/getLastActive", () => {
it("happy: should set last active timestamp", async () => {
const mockRouter1 = mkAddress("0xrouter1");
const mockRouter2 = mkAddress("0xrouter2");
const curTimestamp = getNtpTimeSeconds();
await cache.setLastActive(mockRouter1);
const lastActiveTimestamp1 = await cache.getLastActive(mockRouter1);
const lastActiveTimestamp2 = await cache.getLastActive(mockRouter2);
expect(+lastActiveTimestamp1).to.be.greaterThanOrEqual(curTimestamp);
expect(lastActiveTimestamp2).to.be.eq(0);
});
});

describe("#setLastBidTime/getLastBidTime", () => {
it("happy: should set last active timestamp", async () => {
const mockRouter1 = mkAddress("0xrouter1");
const mockRouter2 = mkAddress("0xrouter2");
await cache.setLastBidTime(mockRouter1, { originDomain: "1111", destinationDomain: "2222", asset: "0xabc" });
const lastBidTimeForRouter1 = await cache.getLastBidTime(mockRouter1);
expect(lastBidTimeForRouter1).to.not.undefined;

const lastBidTimeForRouter2 = await cache.getLastBidTime(mockRouter2);
expect(lastBidTimeForRouter2).to.be.undefined;
});
});

describe("#addRouter/getRouters", () => {
it("happy: should set last active timestamp", async () => {
const mockRouter1 = mkAddress("0xrouter1");
const mockRouter2 = mkAddress("0xrouter2");
await cache.addRouter(mockRouter1);
await cache.addRouter(mockRouter2);

// this shouldn't be added
await cache.addRouter(mockRouter1);

const routers = await cache.getRouters();
expect(routers).to.be.deep.eq([mockRouter1, mockRouter2]);
});
});
});
2 changes: 1 addition & 1 deletion packages/adapters/txservice/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@connext/nxtp-txservice",
"version": "2.3.0-alpha.1",
"version": "2.3.0-alpha.5",
"description": "Robust transaction sending service for a wallet configured across multiple chains. Will bump gas and reattempt transactions as needed",
"author": "Connext",
"license": "MIT",
Expand Down
4 changes: 2 additions & 2 deletions packages/agents/router/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ const DEFAULT_CACHE_POLL_INTERVAL = 20_000;
const DEFAULT_AUCTION_ROUND_DEPTH = 3;

//Router MQ limits
export const DEFAULT_ROUTER_MQ_RETRY_LIMIT = 20;
export const DEFAULT_ROUTER_MQ_RETRY_LIMIT = 10;
const DEFAULT_ROUTER_MQ_HEARTBEAT_LIMIT = 10;
const DEFAULT_ROUTER_MQ_FAILAFTER_LIMIT = 10;
export const DEFAULT_ROUTER_MQ_FAILAFTER_LIMIT = 10;

// Sequencer and Cartographer default urls
const SEQUENCER_URLS: Record<string, any> = {
Expand Down
31 changes: 10 additions & 21 deletions packages/agents/router/src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ import { ChainData, createMethodContext, Logger, RequestContext } from "@connext
import rabbit from "foo-foo-mq";

import { MQConnectionClosed, MQConnectionFailed } from "./errors";
import { DEFAULT_ROUTER_MQ_RETRY_LIMIT } from "./config";

export const XCALL_QUEUE = "xcalls";
export const MQ_EXCHANGE = "router";
export const XCALL_MESSAGE_TYPE = "xcall";

let routerRetryLimit = DEFAULT_ROUTER_MQ_RETRY_LIMIT;

export const setupCache = async (
host: string | undefined,
port: number | undefined,
Expand Down Expand Up @@ -45,40 +42,32 @@ export const setupMq = async (
const methodContext = createMethodContext("setupMq");
// Disable reply queues
const replyQueue = false;

logger.info("Message queue setup in progress...", requestContext, methodContext, { uri });

await rabbit.configure({
connection: { uri, replyQueue, heartbeat, failAfter, retryLimit },
queues: [{ name: XCALL_QUEUE, limit }],
exchanges: [{ name: MQ_EXCHANGE, type: "direct" }],
bindings: [{ exchange: MQ_EXCHANGE, target: XCALL_QUEUE, keys: [XCALL_QUEUE] }],
});

await rabbit.on("closed", function () {
await rabbit.on("closed", async function () {
throw new MQConnectionClosed();
});

await rabbit.on("failed", async function () {
if (routerRetryLimit > 0) {
routerRetryLimit--;
logger.warn("MQ connection failed, retrying", requestContext, methodContext, {
uri,
routerRetryLimit,
});
try {
await rabbit.retry();
} catch (err: unknown) {
throw new MQConnectionFailed(err as Error);
}
} else {
throw new MQConnectionFailed();
}
throw new MQConnectionFailed();
});

await rabbit.on("unreachable", async function () {
// throw new MQConnectionUnreachable();
logger.warn("MQ is unreachable, retrying connection", requestContext, methodContext, {
throw new MQConnectionFailed();
});

await rabbit.on("connected", function () {
logger.info("Connected to MQ!", requestContext, methodContext, {
uri,
});
await rabbit.retry();
});

logger.info("Message queue setup is done!", requestContext, methodContext, {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { createLoggingContext, jsonifyError } from "@connext/nxtp-utils";
import interval from "interval-promise";

import { retryXCalls } from "../../operations";
import { retryXCalls, sendStatusToSequencer } from "../../operations";
import { getMissingXCalls, getXCalls } from "../../operations/getXCalls";
import { getContext } from "../../publisher";

export const bindSubgraph = async (_pollInterval?: number) => {
const { config, logger } = getContext();
const { requestContext, methodContext } = createLoggingContext(bindSubgraph.name);
const pollInterval = _pollInterval ?? config.polling.subgraph;

interval(async (_, stop) => {
if (config.mode.cleanup) {
stop();
Expand Down Expand Up @@ -65,4 +66,22 @@ export const bindSubgraph = async (_pollInterval?: number) => {
}
}
}, pollInterval);

interval(async (_, stop) => {
if (config.mode.cleanup) {
stop();
} else {
try {
// 4. Sends status to sequencer at a regular inverval
await sendStatusToSequencer();
} catch (e: unknown) {
logger.error(
"Error sending status to sequencer, waiting for next loop",
requestContext,
methodContext,
jsonifyError(e as Error),
);
}
}
}, pollInterval);
};
3 changes: 3 additions & 0 deletions packages/agents/router/src/tasks/publisher/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import { StoreManager } from "@connext/nxtp-adapters-cache";
import Rabbit from "foo-foo-mq";

import { NxtpRouterConfig } from "../../config";
import { Web3Signer } from "@connext/nxtp-adapters-web3signer";
import { Wallet } from "ethers";

export type AppContext = {
logger: Logger;
adapters: {
// Stateful interfaces for peripherals.
wallet: Wallet | Web3Signer; // Used for signing metatxs for bids.
cache: StoreManager; // Used to cache important data locally.
subgraph: SubgraphReader; // Aggregates subgraphs in a FallbackSubgraph for each chain.
mqClient: typeof Rabbit;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { getXCalls } from "./getXCalls";
export { retryXCalls } from "./retryXCalls";
export { sendStatusToSequencer } from "./status";
Loading

0 comments on commit d49d9b4

Please sign in to comment.