Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: contract log query #1601

Draft
wants to merge 11 commits into
base: develop
Choose a base branch
from
14 changes: 14 additions & 0 deletions docs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,20 @@ paths:
schema:
type: string
example: "SP6P4EJF0VG8V0RB3TQQKJBHDQKEF6NVRD1KZE3C.satoshibles"
- name: contains
in: query
description: Optional stringified JSON to select only results that contain the given JSON
required: false
schema:
type: string
example: '{"attachment":{"metadata":{"op":"name-register"}}}'
- name: filter_path
in: query
description: Optional [`jsonpath` expression](https://www.postgresql.org/docs/14/functions-json.html#FUNCTIONS-SQLJSON-PATH) to select only results that contain items matching the expression
required: false
schema:
type: string
example: '$.attachment.metadata?(@.op=="name-register")'
- name: limit
in: query
description: max number of contract events to fetch
Expand Down
14 changes: 14 additions & 0 deletions migrations/1680181889941_contract_log_json.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
exports.up = async pgm => {
pgm.addColumn('contract_logs', {
value_json: {
type: 'jsonb',
},
});
}

/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
exports.down = pgm => {
pgm.dropIndex('contract_logs', 'value_json_path_ops_idx');
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This index is never created, is this an old line?

pgm.dropColumn('contract_logs', 'value_json');
}
78 changes: 78 additions & 0 deletions src/api/query-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,84 @@ function handleBadRequest(res: Response, next: NextFunction, errorMessage: strin
throw error;
}

export function validateJsonPathQuery<TRequired extends boolean>(
req: Request,
res: Response,
next: NextFunction,
paramName: string,
args: { paramRequired: TRequired; maxCharLength: number }
): TRequired extends true ? string | never : string | null {
if (!(paramName in req.query)) {
if (args.paramRequired) {
handleBadRequest(res, next, `Request is missing required "${paramName}" query parameter`);
} else {
return null as TRequired extends true ? string | never : string | null;
}
}
const jsonPathInput = req.query[paramName];
if (typeof jsonPathInput !== 'string') {
handleBadRequest(
res,
next,
`Unexpected type for '${paramName}' parameter: ${JSON.stringify(jsonPathInput)}`
);
}

const maxCharLength = args.maxCharLength;

if (jsonPathInput.length > maxCharLength) {
handleBadRequest(
res,
next,
`JsonPath parameter '${paramName}' is invalid: char length exceeded, max=${maxCharLength}, received=${jsonPathInput.length}`
);
}

const disallowedOperation = containsDisallowedJsonPathOperation(jsonPathInput);
if (disallowedOperation) {
handleBadRequest(
res,
next,
`JsonPath parameter '${paramName}' is invalid: contains disallowed operation '${disallowedOperation.operation}'`
);
}

return jsonPathInput;
}

/**
* Disallow operations that could be used to perform expensive queries.
* See https://www.postgresql.org/docs/14/functions-json.html
*/
export function containsDisallowedJsonPathOperation(
jsonPath: string
): false | { operation: string } {
const normalizedPath = jsonPath.replace(/\s+/g, '').toLowerCase();
const hasDisallowedOperations: [() => boolean, string][] = [
[() => normalizedPath.includes('.*'), '.* wildcard accessor'],
[() => normalizedPath.includes('[*]'), '[*] wildcard array accessor'],
[() => /\[\d+to(\d+|last)\]/.test(normalizedPath), '[n to m] array range accessor'],
[() => /\[[^\]]*\([^\)]*\)[^\]]*\]/.test(normalizedPath), '[()] array expression accessor'],
[() => normalizedPath.includes('.type('), '.type()'],
[() => normalizedPath.includes('.size('), '.size()'],
[() => normalizedPath.includes('.double('), '.double()'],
[() => normalizedPath.includes('.ceiling('), '.ceiling()'],
[() => normalizedPath.includes('.floor('), '.floor()'],
[() => normalizedPath.includes('.abs('), '.abs()'],
[() => normalizedPath.includes('.datetime('), '.datetime()'],
[() => normalizedPath.includes('.keyvalue('), '.keyvalue()'],
[() => normalizedPath.includes('isunknown'), 'is unknown'],
[() => normalizedPath.includes('like_regex'), 'like_regex'],
[() => normalizedPath.includes('startswith'), 'starts with'],
];
for (const [hasDisallowedOperation, disallowedOperationName] of hasDisallowedOperations) {
if (hasDisallowedOperation()) {
return { operation: disallowedOperationName };
}
}
return false;
}

export function booleanValueForParam(
req: Request,
res: Response,
Expand Down
34 changes: 32 additions & 2 deletions src/api/routes/contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as express from 'express';
import { asyncHandler } from '../async-handler';
import { getPagingQueryLimit, parsePagingQueryInput, ResourceType } from '../pagination';
import { parseDbEvent } from '../controllers/db-controller';
import { parseTraitAbi } from '../query-helpers';
import { parseTraitAbi, validateJsonPathQuery } from '../query-helpers';
import { PgStore } from '../../datastore/pg-store';

export function createContractRouter(db: PgStore): express.Router {
Expand Down Expand Up @@ -50,14 +50,44 @@ export function createContractRouter(db: PgStore): express.Router {

router.get(
'/:contract_id/events',
asyncHandler(async (req, res) => {
asyncHandler(async (req, res, next) => {
const { contract_id } = req.params;
const limit = getPagingQueryLimit(ResourceType.Contract, req.query.limit);
const offset = parsePagingQueryInput(req.query.offset ?? 0);

const filterPath = validateJsonPathQuery(req, res, next, 'filter_path', {
paramRequired: false,
maxCharLength: 200,
});

const containsJsonQuery = req.query['contains'];
if (containsJsonQuery && typeof containsJsonQuery !== 'string') {
res.status(400).json({ error: `'contains' query param must be a string` });
return;
}
let containsJson: any | undefined;
const maxContainsJsonCharLength = 200;
if (containsJsonQuery) {
if (containsJsonQuery.length > maxContainsJsonCharLength) {
res.status(400).json({
error: `'contains' query param value exceeds ${maxContainsJsonCharLength} character limit`,
});
return;
}
try {
containsJson = JSON.parse(containsJsonQuery);
} catch (error) {
res.status(400).json({ error: `'contains' query param value must be valid JSON` });
return;
}
}

const eventsQuery = await db.getSmartContractEvents({
contractId: contract_id,
limit,
offset,
filterPath,
containsJson,
});
if (!eventsQuery.found) {
res.status(404).json({ error: `cannot find events for contract by ID: ${contract_id}` });
Expand Down
1 change: 1 addition & 0 deletions src/datastore/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,7 @@ export interface SmartContractEventInsertValues {
contract_identifier: string;
topic: string;
value: PgBytea;
value_json: string;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be best if we set this as string | null IMO to avoid storing empty strings

}

export interface BurnchainRewardInsertValues {
Expand Down
109 changes: 107 additions & 2 deletions src/datastore/migrations.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import * as path from 'path';
import PgMigrate, { RunnerOption } from 'node-pg-migrate';
import { Client } from 'pg';
import { APP_DIR, isDevEnv, isTestEnv, logError, logger, REPO_DIR } from '../helpers';
import { Client, QueryResultRow } from 'pg';
import * as PgCursor from 'pg-cursor';
import {
APP_DIR,
clarityValueToCompactJson,
isDevEnv,
isTestEnv,
logError,
logger,
REPO_DIR,
} from '../helpers';
import { getPgClientConfig, PgClientConfig } from './connection-legacy';
import { connectPostgres, PgServer } from './connection';
import { databaseHasData } from './event-requests';
Expand Down Expand Up @@ -43,6 +52,7 @@ export async function runMigrations(
runnerOpts.schema = clientConfig.schema;
}
await PgMigrate(runnerOpts);
await completeSqlMigrations(client, clientConfig);
} catch (error) {
logError(`Error running pg-migrate`, error);
throw error;
Expand Down Expand Up @@ -99,3 +109,98 @@ export async function dangerousDropAllTables(opts?: {
await sql.end();
}
}

// Function to finish running sql migrations that are too complex for the node-pg-migrate library.
async function completeSqlMigrations(client: Client, clientConfig: PgClientConfig) {
try {
await client.query('BEGIN');
await complete_1680181889941_contract_log_json(client, clientConfig);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
}
}

async function* pgCursorQuery<R extends QueryResultRow = any>(args: {
clientConfig: PgClientConfig;
queryText: string;
queryValues?: any[];
batchSize: number;
}) {
const cursorClient = new Client(args.clientConfig);
try {
await cursorClient.connect();
const cursor = new PgCursor<R>(args.queryText, args.queryValues);
const cursorQuery = cursorClient.query(cursor);
let rows: R[] = [];
do {
rows = await new Promise((resolve, reject) => {
cursorQuery.read(args.batchSize, (error, rows) => (error ? reject(error) : resolve(rows)));
});
yield* rows;
} while (rows.length > 0);
} finally {
await cursorClient.end();
}
}

async function complete_1680181889941_contract_log_json(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a pressing issue that would require us to ship this feature with this kind of migration? This code looks good to me (and I like this experimentation as you specified in the PR desc), but I worry that it could create some problems if we want to add newer migrations that rely on the new columns or something. IMO it would be easier and more maintainable to just ship this as a regular new migration and rely on replay, or to explicitly mark this code as a "patch" somehow which we could drop once we switch to a new v8 release.

What do you think?

client: Client,
clientConfig: PgClientConfig
) {
// Determine if this migration has already been run by checking if the bew column is nullable.
const result = await client.query(`
SELECT is_nullable FROM information_schema.columns
WHERE table_name = 'contract_logs' AND column_name = 'value_json'
`);
const migrationNeeded = result.rows[0].is_nullable === 'YES';
if (!migrationNeeded) {
return;
}
logger.info(`Running migration 1680181889941_contract_log_json..`);

const contractLogsCursor = pgCursorQuery<{ id: string; value: string }>({
clientConfig,
queryText: 'SELECT id, value FROM contract_logs',
batchSize: 1000,
});

const rowCountQuery = await client.query<{ count: number }>(
'SELECT COUNT(*)::integer FROM contract_logs'
);
const totalRowCount = rowCountQuery.rows[0].count;
let rowsProcessed = 0;
let lastPercentComplete = 0;
const percentLogInterval = 3;

for await (const row of contractLogsCursor) {
const clarityValJson = JSON.stringify(clarityValueToCompactJson(row.value));
await client.query({
name: 'update_contract_log_json',
text: 'UPDATE contract_logs SET value_json = $1 WHERE id = $2',
values: [clarityValJson, row.id],
});
rowsProcessed++;
const percentComplete = Math.round((rowsProcessed / totalRowCount) * 100);
if (percentComplete > lastPercentComplete + percentLogInterval) {
lastPercentComplete = percentComplete;
logger.info(`Running migration 1680181889941_contract_log_json.. ${percentComplete}%`);
}
}

logger.info(`Running migration 1680181889941_contract_log_json.. set NOT NULL`);
await client.query(`ALTER TABLE contract_logs ALTER COLUMN value_json SET NOT NULL`);

logger.info('Running migration 1680181889941_contract_log_json.. creating jsonb_path_ops index');
await client.query(
`CREATE INDEX contract_logs_jsonpathops_idx ON contract_logs USING GIN (value_json jsonb_path_ops)`
);

logger.info('Running migration 1680181889941_contract_log_json.. creating jsonb_ops index');
await client.query(
`CREATE INDEX contract_logs_jsonops_idx ON contract_logs USING GIN (value_json jsonb_ops)`
);

logger.info(`Running migration 1680181889941_contract_log_json.. 100%`);
}
10 changes: 10 additions & 0 deletions src/datastore/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2094,11 +2094,18 @@ export class PgStore {
contractId,
limit,
offset,
filterPath,
containsJson,
}: {
contractId: string;
limit: number;
offset: number;
filterPath: string | null;
containsJson: any | undefined;
}): Promise<FoundOrNot<DbSmartContractEvent[]>> {
const hasFilterPath = filterPath !== null;
const hasJsonContains = containsJson !== undefined;

const logResults = await this.sql<
{
event_index: number;
Expand All @@ -2108,12 +2115,15 @@ export class PgStore {
contract_identifier: string;
topic: string;
value: string;
value_json: any;
}[]
>`
SELECT
event_index, tx_id, tx_index, block_height, contract_identifier, topic, value
FROM contract_logs
WHERE canonical = true AND microblock_canonical = true AND contract_identifier = ${contractId}
${hasFilterPath ? this.sql`AND value_json @? ${filterPath}::jsonpath` : this.sql``}
${hasJsonContains ? this.sql`AND value_json @> ${containsJson}::jsonb` : this.sql``}
ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC
LIMIT ${limit}
OFFSET ${offset}
Expand Down
12 changes: 11 additions & 1 deletion src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
import { logger, logError, getOrAdd, batchIterate, isProdEnv, I32_MAX } from '../helpers';
import {
logger,
logError,
getOrAdd,
batchIterate,
isProdEnv,
I32_MAX,
clarityValueToCompactJson,
} from '../helpers';
import {
DbBlock,
DbTx,
Expand Down Expand Up @@ -1228,6 +1236,7 @@ export class PgWriteStore extends PgStore {
contract_identifier: event.contract_identifier,
topic: event.topic,
value: event.value,
value_json: JSON.stringify(clarityValueToCompactJson(event.value)),
}));
const res = await sql`
INSERT INTO contract_logs ${sql(values)}
Expand All @@ -1253,6 +1262,7 @@ export class PgWriteStore extends PgStore {
contract_identifier: event.contract_identifier,
topic: event.topic,
value: event.value,
value_json: JSON.stringify(clarityValueToCompactJson(event.value)),
};
await sql`
INSERT INTO contract_logs ${sql(values)}
Expand Down
Loading