Skip to content

Commit

Permalink
refactor: ensure new kv-store is used on the server (#11662)
Browse files Browse the repository at this point in the history
This PR refactors the last few stores (other than PXE and old merkle
trees) to the new async store interface
  • Loading branch information
alexghr authored Feb 4, 2025
1 parent d3c31d8 commit aee1420
Show file tree
Hide file tree
Showing 22 changed files with 161 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,18 @@ StatsResponse LMDBStoreWrapper::get_stats()

BoolResponse LMDBStoreWrapper::close()
{
// prevent this store from receiving further messages
_msg_processor.close();

{
// close all of the open read cursors
std::lock_guard cursors(_cursor_mutex);
_cursors.clear();
}

// and finally close the database handle
_store.reset(nullptr);

return { true };
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ class AsyncMessageProcessor {
// complete on an separate thread
auto deferred = std::make_shared<Napi::Promise::Deferred>(env);

if (info.Length() < 1) {
if (!open) {
deferred->Reject(Napi::TypeError::New(env, "Message processor is closed").Value());
} else if (info.Length() < 1) {
deferred->Reject(Napi::TypeError::New(env, "Wrong number of arguments").Value());
} else if (!info[0].IsBuffer()) {
deferred->Reject(Napi::TypeError::New(env, "Argument must be a buffer").Value());
Expand All @@ -82,8 +84,11 @@ class AsyncMessageProcessor {
return deferred->Promise();
}

void close() { open = false; }

private:
bb::messaging::MessageDispatcher dispatcher;
bool open = true;

template <typename P, typename R>
void _register_handler(uint32_t msgType, const std::function<R(const P&, const msgpack::object&)>& fn)
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/aztec/src/cli/cmds/start_archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { createBlobSinkClient } from '@aztec/blob-sink/client';
import { ArchiverApiSchema } from '@aztec/circuit-types';
import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server';
import { type DataStoreConfig, dataConfigMappings } from '@aztec/kv-store/config';
import { createStore } from '@aztec/kv-store/lmdb';
import { createStore } from '@aztec/kv-store/lmdb-v2';
import { getConfigEnvVars as getTelemetryClientConfig, initTelemetryClient } from '@aztec/telemetry-client';

import { extractRelevantOptions } from '../util.js';
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { P2PBootstrapApiSchema } from '@aztec/circuit-types';
import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server';
import { type LogFn, createLogger } from '@aztec/foundation/log';
import { createStore } from '@aztec/kv-store/lmdb';
import { createStore } from '@aztec/kv-store/lmdb-v2';
import { type BootnodeConfig, BootstrapNode, bootnodeConfigMappings } from '@aztec/p2p';
import { getConfigEnvVars as getTelemetryClientConfig, initTelemetryClient } from '@aztec/telemetry-client';

Expand Down
6 changes: 3 additions & 3 deletions yarn-project/blob-sink/src/blobstore/blob_store_test_suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import { Fr } from '@aztec/foundation/fields';
import { BlobWithIndex } from '../types/index.js';
import { type BlobStore } from './interface.js';

export function describeBlobStore(getBlobStore: () => BlobStore) {
export function describeBlobStore(getBlobStore: () => Promise<BlobStore>) {
let blobStore: BlobStore;

beforeEach(() => {
blobStore = getBlobStore();
beforeEach(async () => {
blobStore = await getBlobStore();
});

it('should store and retrieve a blob', async () => {
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/blob-sink/src/blobstore/disk_blob_store.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { openTmpStore } from '@aztec/kv-store/lmdb';
import { openTmpStore } from '@aztec/kv-store/lmdb-v2';

import { describeBlobStore } from './blob_store_test_suite.js';
import { DiskBlobStore } from './disk_blob_store.js';

describe('DiskBlobStore', () => {
describeBlobStore(() => new DiskBlobStore(openTmpStore()));
describeBlobStore(async () => new DiskBlobStore(await openTmpStore('test')));
});
17 changes: 8 additions & 9 deletions yarn-project/blob-sink/src/blobstore/disk_blob_store.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
import { type AztecKVStore, type AztecMap } from '@aztec/kv-store';
import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store';

import { type BlobWithIndex, BlobsWithIndexes } from '../types/index.js';
import { type BlobStore } from './interface.js';

export class DiskBlobStore implements BlobStore {
blobs: AztecMap<string, Buffer>;
blobs: AztecAsyncMap<string, Buffer>;

constructor(store: AztecKVStore) {
constructor(store: AztecAsyncKVStore) {
this.blobs = store.openMap('blobs');
}

public getBlobSidecars(blockId: string, indices?: number[]): Promise<BlobWithIndex[] | undefined> {
const blobBuffer = this.blobs.get(`${blockId}`);
public async getBlobSidecars(blockId: string, indices?: number[]): Promise<BlobWithIndex[] | undefined> {
const blobBuffer = await this.blobs.getAsync(`${blockId}`);
if (!blobBuffer) {
return Promise.resolve(undefined);
return undefined;
}

const blobsWithIndexes = BlobsWithIndexes.fromBuffer(blobBuffer);
if (indices) {
// If indices are provided, return the blobs at the specified indices
return Promise.resolve(blobsWithIndexes.getBlobsFromIndices(indices));
return blobsWithIndexes.getBlobsFromIndices(indices);
}
// If no indices are provided, return all blobs
return Promise.resolve(blobsWithIndexes.blobs);
return blobsWithIndexes.blobs;
}

public async addBlobSidecars(blockId: string, blobSidecars: BlobWithIndex[]): Promise<void> {
await this.blobs.set(blockId, new BlobsWithIndexes(blobSidecars).toBuffer());
return Promise.resolve();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ import { describeBlobStore } from './blob_store_test_suite.js';
import { MemoryBlobStore } from './memory_blob_store.js';

describe('MemoryBlobStore', () => {
describeBlobStore(() => new MemoryBlobStore());
describeBlobStore(() => Promise.resolve(new MemoryBlobStore()));
});
6 changes: 3 additions & 3 deletions yarn-project/blob-sink/src/server/factory.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { type AztecKVStore } from '@aztec/kv-store';
import { createStore } from '@aztec/kv-store/lmdb';
import { type AztecAsyncKVStore } from '@aztec/kv-store';
import { createStore } from '@aztec/kv-store/lmdb-v2';
import { type TelemetryClient } from '@aztec/telemetry-client';

import { type BlobSinkConfig } from './config.js';
import { BlobSinkServer } from './server.js';

// If data store settings are provided, the store is created and returned.
// Otherwise, undefined is returned and an in memory store will be used.
async function getDataStoreConfig(config?: BlobSinkConfig): Promise<AztecKVStore | undefined> {
async function getDataStoreConfig(config?: BlobSinkConfig): Promise<AztecAsyncKVStore | undefined> {
if (!config?.dataStoreConfig) {
return undefined;
}
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/blob-sink/src/server/server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Blob } from '@aztec/foundation/blob';
import { type Logger, createLogger } from '@aztec/foundation/log';
import { type AztecKVStore } from '@aztec/kv-store';
import { type AztecAsyncKVStore } from '@aztec/kv-store';
import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client';

import express, { type Express, type Request, type Response, json } from 'express';
Expand Down Expand Up @@ -32,7 +32,7 @@ export class BlobSinkServer {
private metrics: BlobSinkMetrics;
private log: Logger = createLogger('aztec:blob-sink');

constructor(config?: BlobSinkConfig, store?: AztecKVStore, telemetry: TelemetryClient = getTelemetryClient()) {
constructor(config?: BlobSinkConfig, store?: AztecAsyncKVStore, telemetry: TelemetryClient = getTelemetryClient()) {
this.port = config?.port ?? 5052; // 5052 is beacon chain default http port
this.app = express();

Expand Down
10 changes: 10 additions & 0 deletions yarn-project/kv-store/src/lmdb-v2/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,13 @@ export async function openTmpStore(

return AztecLMDBStoreV2.new(dataDir, dbMapSizeKb, maxReaders, cleanup, log);
}

export function openStoreAt(
dataDir: string,
dbMapSizeKb = 10 * 1_024 * 1_024, // 10GB
maxReaders = MAX_READERS,
log: Logger = createLogger('kv-store:lmdb-v2'),
): Promise<AztecLMDBStoreV2> {
log.debug(`Opening data store at: ${dataDir} with size: ${dbMapSizeKb} KB (LMDB v2)`);
return AztecLMDBStoreV2.new(dataDir, dbMapSizeKb, maxReaders, undefined, log);
}
32 changes: 29 additions & 3 deletions yarn-project/kv-store/src/lmdb-v2/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { LMDBSingleValue } from './singleton.js';
import { WriteTransaction } from './write_transaction.js';

export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel {
private open = false;
private channel: MsgpackChannel<LMDBMessageType, LMDBRequestBody, LMDBResponseBody>;
private writerCtx = new AsyncLocalStorage<WriteTransaction>();
private writerQueue = new SerialQueue();
Expand All @@ -43,18 +44,24 @@ export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel {
this.availableCursors = new Semaphore(maxReaders - 1);
}

public get dataDirectory(): string {
return this.dataDir;
}

private async start() {
this.writerQueue.start();

await this.sendMessage(LMDBMessageType.OPEN_DATABASE, {
await this.channel.sendMessage(LMDBMessageType.OPEN_DATABASE, {
db: Database.DATA,
uniqueKeys: true,
});

await this.sendMessage(LMDBMessageType.OPEN_DATABASE, {
await this.channel.sendMessage(LMDBMessageType.OPEN_DATABASE, {
db: Database.INDEX,
uniqueKeys: false,
});

this.open = true;
}

public static async new(
Expand All @@ -70,10 +77,16 @@ export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel {
}

public getReadTx(): ReadTransaction {
if (!this.open) {
throw new Error('Store is closed');
}
return new ReadTransaction(this);
}

public getCurrentWriteTx(): WriteTransaction | undefined {
if (!this.open) {
throw new Error('Store is closed');
}
const currentWrite = this.writerCtx.getStore();
return currentWrite;
}
Expand Down Expand Up @@ -105,6 +118,10 @@ export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel {
async transactionAsync<T extends Exclude<any, Promise<any>>>(
callback: (tx: WriteTransaction) => Promise<T>,
): Promise<T> {
if (!this.open) {
throw new Error('Store is closed');
}

// transactionAsync might be called recursively
// send any writes to the parent tx, but don't close it
// if the callback throws then the parent tx will rollback automatically
Expand Down Expand Up @@ -144,14 +161,23 @@ export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel {
}

async close() {
if (!this.open) {
// already closed
return;
}
this.open = false;
await this.writerQueue.cancel();
await this.sendMessage(LMDBMessageType.CLOSE, undefined);
await this.channel.sendMessage(LMDBMessageType.CLOSE, undefined);
}

public async sendMessage<T extends LMDBMessageType>(
msgType: T,
body: LMDBRequestBody[T],
): Promise<LMDBResponseBody[T]> {
if (!this.open) {
throw new Error('Store is closed');
}

if (msgType === LMDBMessageType.START_CURSOR) {
await this.availableCursors.acquire();
}
Expand Down
6 changes: 3 additions & 3 deletions yarn-project/kv-store/src/stores/l2_tips_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { type L2Block } from '@aztec/circuit-types';
import { type BlockHeader, Fr } from '@aztec/circuits.js';
import { times } from '@aztec/foundation/collection';
import { type AztecAsyncKVStore } from '@aztec/kv-store';
import { openTmpStore } from '@aztec/kv-store/lmdb';
import { openTmpStore } from '@aztec/kv-store/lmdb-v2';

import { expect } from 'chai';

Expand All @@ -12,8 +12,8 @@ describe('L2TipsStore', () => {
let kvStore: AztecAsyncKVStore;
let tipsStore: L2TipsStore;

beforeEach(() => {
kvStore = openTmpStore(true);
beforeEach(async () => {
kvStore = await openTmpStore('test', true);
tipsStore = new L2TipsStore(kvStore, 'test');
});

Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p-bootstrap/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createLogger } from '@aztec/foundation/log';
import { createStore } from '@aztec/kv-store/lmdb';
import { createStore } from '@aztec/kv-store/lmdb-v2';
import { type BootnodeConfig, BootstrapNode } from '@aztec/p2p';
import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client';

Expand Down
10 changes: 5 additions & 5 deletions yarn-project/p2p/src/client/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import {
} from '@aztec/circuit-types';
import { type EpochCache } from '@aztec/epoch-cache';
import { createLogger } from '@aztec/foundation/log';
import { type AztecKVStore } from '@aztec/kv-store';
import { type AztecAsyncKVStore } from '@aztec/kv-store';
import { type DataStoreConfig } from '@aztec/kv-store/config';
import { createStore as createStoreV2 } from '@aztec/kv-store/lmdb-v2';
import { createStore } from '@aztec/kv-store/lmdb-v2';
import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client';

import { P2PClient } from '../client/p2p_client.js';
Expand All @@ -26,7 +26,7 @@ import { configureP2PClientAddresses, createLibP2PPeerIdFromPrivateKey, getPeerI

type P2PClientDeps<T extends P2PClientType> = {
txPool?: TxPool;
store?: AztecKVStore;
store?: AztecAsyncKVStore;
attestationPool?: T extends P2PClientType.Full ? AttestationPool : undefined;
epochProofQuotePool?: EpochProofQuotePool;
};
Expand All @@ -43,8 +43,8 @@ export const createP2PClient = async <T extends P2PClientType>(
) => {
let config = { ..._config };
const logger = createLogger('p2p');
const store = await createStoreV2('p2p-v2', config, createLogger('p2p:lmdb-v2'));
const archive = await createStoreV2('p2p-archive', config, createLogger('p2p-archive:lmdb-v2'));
const store = deps.store ?? (await createStore('p2p', config, createLogger('p2p:lmdb-v2')));
const archive = await createStore('p2p-archive', config, createLogger('p2p-archive:lmdb-v2'));

const mempools: MemPools<T> = {
txPool: deps.txPool ?? new AztecKVTxPool(store, archive, telemetry, config.archivedTxLimit),
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
import { type EpochCache } from '@aztec/epoch-cache';
import { timesParallel } from '@aztec/foundation/collection';
import { type DataStoreConfig } from '@aztec/kv-store/config';
import { openTmpStore } from '@aztec/kv-store/lmdb';
import { openTmpStore } from '@aztec/kv-store/lmdb-v2';
import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client';

import { gossipsub } from '@chainsafe/libp2p-gossipsub';
Expand Down Expand Up @@ -263,7 +263,7 @@ export async function createBootstrapNode(

async function startBootstrapNode(config: BootnodeConfig, telemetry: TelemetryClient) {
// Open an ephemeral store that will only exist in memory
const store = openTmpStore(true);
const store = await openTmpStore('bootstrap-node', true);
const bootstrapNode = new BootstrapNode(store, telemetry);
await bootstrapNode.start(config);
return bootstrapNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import {
import { type EpochCache } from '@aztec/epoch-cache';
import { createLogger } from '@aztec/foundation/log';
import { sleep } from '@aztec/foundation/sleep';
import { type AztecKVStore } from '@aztec/kv-store';
import { type AztecAsyncKVStore } from '@aztec/kv-store';
import { type DataStoreConfig } from '@aztec/kv-store/config';
import { openTmpStore } from '@aztec/kv-store/lmdb';
import { openTmpStore } from '@aztec/kv-store/lmdb-v2';

import { SignableENR } from '@chainsafe/enr';
import { describe, expect, it, jest } from '@jest/globals';
Expand All @@ -38,7 +38,7 @@ function generatePeerIdPrivateKeys(numberOfPeers: number): string[] {
const peerIdPrivateKeys: string[] = [];
for (let i = 0; i < numberOfPeers; i++) {
// magic number is multiaddr prefix: https://multiformats.io/multiaddr/
peerIdPrivateKeys.push('08021220' + generatePrivateKey().substr(2, 66));
peerIdPrivateKeys.push('08021220' + generatePrivateKey().slice(2, 68));
}
return peerIdPrivateKeys;
}
Expand All @@ -51,7 +51,7 @@ describe('Req Resp p2p client integration', () => {
let epochProofQuotePool: MockProxy<EpochProofQuotePool>;
let epochCache: MockProxy<EpochCache>;
let l2BlockSource: MockL2BlockSource;
let kvStore: AztecKVStore;
let kvStore: AztecAsyncKVStore;
let worldState: WorldStateSynchronizer;
let proofVerifier: ClientProtocolCircuitVerifier;
const logger = createLogger('p2p:test:client-integration');
Expand Down Expand Up @@ -120,7 +120,7 @@ describe('Req Resp p2p client integration', () => {
await l2BlockSource.createBlocks(100);

proofVerifier = alwaysTrueVerifier ? new AlwaysTrueCircuitVerifier() : new AlwaysFalseCircuitVerifier();
kvStore = openTmpStore();
kvStore = await openTmpStore('test');
const deps = {
txPool: txPool as unknown as TxPool,
attestationPool: attestationPool as unknown as AttestationPool,
Expand Down
Loading

0 comments on commit aee1420

Please sign in to comment.