Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Odata opaque cursor #934

Merged
merged 10 commits into from
Sep 12, 2023
10 changes: 7 additions & 3 deletions lib/data/entity.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,13 @@ const extractSelectedProperties = (query, properties) => {
};

// Pagination is done at the database level
const streamEntityOdata = (inStream, properties, domain, originalUrl, query, tableCount) => {
const streamEntityOdata = (inStream, properties, domain, originalUrl, query, tableCount, tableRemaining) => {
const serviceRoot = getServiceRoot(originalUrl);
const { limit, offset, shouldCount } = extractPaging(query);
const { limit, offset, shouldCount, skipToken } = extractPaging(query);
const selectedProperties = extractSelectedProperties(query, properties);

let isFirstEntity = true;
let lastUuid;
const rootStream = new Transform({
writableObjectMode: true, // we take a stream of objects from the db, but
readableObjectMode: false, // we put out a stream of text.
Expand All @@ -281,6 +282,8 @@ const streamEntityOdata = (inStream, properties, domain, originalUrl, query, tab
this.push(',');
}

lastUuid = entity.uuid;

this.push(JSON.stringify(selectFields(entity, properties, selectedProperties)));

done();
Expand All @@ -290,8 +293,9 @@ const streamEntityOdata = (inStream, properties, domain, originalUrl, query, tab
}, flush(done) {
this.push((isFirstEntity) ? '{"value":[],' : '],'); // open object or close row array.

const remaining = skipToken ? tableRemaining - limit : tableCount - (limit + offset);
// @odata.count and nextUrl.
const nextUrl = nextUrlFor(limit, offset, tableCount, originalUrl);
const nextUrl = nextUrlFor(remaining, originalUrl, { uuid: lastUuid });

this.push(jsonDataFooter({ table: 'Entities', domain, serviceRoot, nextUrl, count: (shouldCount ? tableCount.toString() : null) }));
done();
Expand Down
12 changes: 4 additions & 8 deletions lib/data/odata.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,7 @@ const submissionToOData = (fields, table, submission, options = {}) => new Promi
// on option.metadata even though they are not part of the form's own schema.
// So rather than try to inject them into the xml transformation below, we just
// formulate them herein advance:
if (!options.metadata || options.metadata.__id) {
root.__id = submission.instanceId;
}
root.__id = submission.instanceId;
if (table === 'Submissions') {
const systemObj = {
submissionDate: submission.createdAt,
Expand Down Expand Up @@ -175,7 +173,7 @@ const submissionToOData = (fields, table, submission, options = {}) => new Promi
}

// bail out without doing any work if we are encrypted.
if (encrypted === true) return resolve(result);
if (encrypted === true) return resolve({ data: result, instanceId: submission.instanceId });

// we keep a dataStack, so we build an appropriate nested structure overall, and
// we can select the appropriate layer of that nesting at will.
Expand Down Expand Up @@ -220,9 +218,7 @@ const submissionToOData = (fields, table, submission, options = {}) => new Promi

// create our new databag, push into result data, and set it as our result ptr.
const bag = generateDataFrame(schemaStack);
if (!options.metadata || options.metadata.__id) {
bag.__id = hashId(schemaStack, submission.instanceId);
}
bag.__id = hashId(schemaStack, submission.instanceId);
dataPtr[outname].push(bag);
dataStack.push(bag);

Expand Down Expand Up @@ -342,7 +338,7 @@ const submissionToOData = (fields, table, submission, options = {}) => new Promi

if (schemaStack.hasExited()) {
parser.reset();
resolve(result);
resolve({ data: result, instanceId: submission.instanceId });
}
}
}, { xmlMode: true, decodeEntities: true });
Expand Down
93 changes: 79 additions & 14 deletions lib/formats/odata.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const { sanitizeOdataIdentifier, without } = require('../util/util');
const { jsonDataFooter, extractOptions, nextUrlFor } = require('../util/odata');
const { submissionToOData, systemFields } = require('../data/odata');
const { SchemaStack } = require('../data/schema');
const { QueryOptions } = require('../util/db');


////////////////////////////////////////////////////////////////////////////////
// UTIL
Expand Down Expand Up @@ -52,9 +54,10 @@ const extractPathContext = (subpath) =>
const extractPaging = (table, query) => {
const parsedLimit = parseInt(query.$top, 10);
const limit = Number.isNaN(parsedLimit) ? Infinity : parsedLimit;
const offset = parseInt(query.$skip, 10) || 0;
const offset = (!query.$skiptoken && parseInt(query.$skip, 10)) || 0;
const shouldCount = isTrue(query.$count);
const result = { limit: max(0, limit), offset: max(0, offset), shouldCount };
const skipToken = query.$skiptoken ? QueryOptions.parseSkiptoken(query.$skiptoken) : null;
const result = { limit: max(0, limit), offset: max(0, offset), shouldCount, skipToken };

return Object.assign(result, (table === 'Submissions')
? { doLimit: Infinity, doOffset: 0 }
Expand Down Expand Up @@ -366,38 +369,66 @@ const edmxForEntities = (datasetName, properties) => {
// originalUrl: String is the request URL; we need it as well to formulate response URLs.
// query: Object is the Express Request query object indicating request querystring parameters.
// inStream: Stream[Row] is the postgres Submissions rowstream.
const rowStreamToOData = (fields, table, domain, originalUrl, query, inStream, tableCount) => {
const rowStreamToOData = (fields, table, domain, originalUrl, query, inStream, tableCount, tableRemaining) => {
// cache values we'll need repeatedly.
const serviceRoot = getServiceRoot(originalUrl);
const { limit, offset, doLimit, doOffset, shouldCount } = extractPaging(table, query);
const { doLimit, doOffset, shouldCount, skipToken } = extractPaging(table, query);
const options = extractOptions(query);
const isSubTable = table !== 'Submissions';

// make sure the target table actually exists.
// TODO: now that this doesn't require schema computation, should we move it up
// to the service level, along with the equivalent for singleRowToOData? probably.
if (!verifyTablePath(table.split('.'), fields)) throw Problem.user.notFound();

// write the header, then transform and stream each row.
// To count total number of items for subtable (repeats)
let counted = 0;
// To count items added to the downstream, required only for subtable
let added = 0;
// To count remaining items in case of subtable
let remainingItems = 0;

// skipToken is created based on following two variables
let lastInstanceId = null;
let lastRepeatId = null;

// For Submissions table, it is true because cursor is handled at database level
let cursorPredicate = !isSubTable || !skipToken;

const parserStream = new Transform({
writableObjectMode: true, // we take a stream of objects from the db, but
readableObjectMode: false, // we put out a stream of text.
transform(row, _, done) {
// per row, we do our asynchronous parsing, jam the result onto the
// text resultstream, and call done to indicate that the row is processed.
submissionToOData(fields, table, row, options).then((data) => {
submissionToOData(fields, table, row, options).then(({ data, instanceId }) => {

const parentIdProperty = data[0] ? Object.keys(data[0]).find(p => /^__.*-id$/.test(p)) : null;

// In case of subtable we are reading all Submissions without pagination because we have to
// count repeat items in each Submission
for (const field of data) {

// if $select is there and parentId is not requested then remove it
const fieldRefined = options.metadata && !options.metadata[parentIdProperty] ? without([parentIdProperty], field) : field;
let fieldRefined = options.metadata && !options.metadata[parentIdProperty] ? without([parentIdProperty], field) : field;
// if $select is there and __id is not requested then remove it
fieldRefined = options.metadata && !options.metadata.__id ? without(['__id'], fieldRefined) : fieldRefined;

if (added === doLimit) remainingItems += 1;

if ((counted >= doOffset) && (counted < (doOffset + doLimit))) {
this.push((counted === doOffset) ? '{"value":[' : ','); // header or fencepost.
if (added < doLimit && counted >= doOffset && cursorPredicate) {
this.push((added === 0) ? '{"value":[' : ','); // header or fencepost.
this.push(JSON.stringify(fieldRefined));
lastInstanceId = instanceId;
if (isSubTable) lastRepeatId = field.__id;
added += 1;
}

// Controls the rows to be skipped based on skipToken
// Once set to true remains true
cursorPredicate = cursorPredicate || skipToken.repeatId === field.__id;

counted += 1;
}
done(); // signifies that this stream element is fully processed.
Expand All @@ -406,12 +437,20 @@ const rowStreamToOData = (fields, table, domain, originalUrl, query, inStream, t
flush(done) {
// flush is called just before the transform stream is done and closed; we write
// our footer information, close the object, and tell the stream we are done.
this.push((counted <= doOffset) ? '{"value":[],' : '],'); // open object or close row array.
this.push((added === 0) ? '{"value":[],' : '],'); // open object or close row array.

// if we were given an explicit count, use it from here out, to create
// @odata.count and nextUrl.
const totalCount = (tableCount != null) ? tableCount : counted;
const nextUrl = nextUrlFor(limit, offset, totalCount, originalUrl);

// How many items are remaining for the next page?
// if there aren't any then we don't need nextUrl
const remaining = (tableRemaining != null) ? tableRemaining - added : remainingItems;

let skipTokenData = { instanceId: lastInstanceId };
if (isSubTable) skipTokenData = { repeatId: lastRepeatId };

const nextUrl = nextUrlFor(remaining, originalUrl, skipTokenData);

// we do toString on the totalCount because mustache won't bother rendering
// the block if it sees integer zero.
Expand Down Expand Up @@ -448,8 +487,10 @@ const singleRowToOData = (fields, row, domain, originalUrl, query) => {
const table = tableParts.join('.');
if (!verifyTablePath(tableParts, fields)) throw Problem.user.notFound();

const isSubTable = table !== 'Submissions';

// extract all our fields first, the field extractor doesn't know about target contexts.
return submissionToOData(fields, table, row, options).then((subrows) => {
return submissionToOData(fields, table, row, options).then(({ data: subrows, instanceId }) => {
// now we actually filter to the requested set. we actually only need to compare
// the very last specified id, since it is fully unique.
const filterContextIdx = targetContext.reduce(((extant, pair, idx) => ((pair[1] != null) ? idx : extant)), -1);
Expand All @@ -462,12 +503,36 @@ const singleRowToOData = (fields, row, domain, originalUrl, query) => {
const count = filtered.length;

// now we can process $top/$skip/$count:
const { limit, offset, shouldCount } = extractPaging(table, query);
const nextUrl = nextUrlFor(limit, offset, count, originalUrl);
const paging = extractPaging(table, query);
const { limit, shouldCount, skipToken } = paging;
let { offset } = paging;

if (skipToken) {
offset = filtered.fiindIndex(s => skipToken.repeatId === s.__id);
}

const pared = filtered.slice(offset, offset + limit);

let nextUrl = null;

if (pared.length > 0) {
const remaining = count - (offset + limit);

let skipTokenData = {
instanceId
};

if (isSubTable) skipTokenData = { repeatId: pared[pared.length - 1].__id };

nextUrl = nextUrlFor(remaining, originalUrl, skipTokenData);
}


// if $select is there and parentId is not requested then remove it
let paredRefined = options.metadata && !options.metadata[filterField] ? pared.map(p => without([filterField], p)) : pared;

// if $select is there and parentId is not requested then remove it
const paredRefined = options.metadata && !options.metadata[filterField] ? pared.map(p => without([filterField], p)) : pared;
paredRefined = options.metadata && !options.metadata.__id ? paredRefined.map(p => without(['__id'], p)) : paredRefined;

// and finally splice together and return our result:
const dataContents = paredRefined.map(JSON.stringify).join(',');
Expand Down
2 changes: 1 addition & 1 deletion lib/http/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ const isJsonType = (x) => /(^|,)(application\/json|json)($|;|,)/i.test(x);
const isXmlType = (x) => /(^|,)(application\/(atom(svc)?\+)?xml|atom|xml)($|;|,)/i.test(x);

// various supported odata constants:
const supportedParams = [ '$format', '$count', '$skip', '$top', '$filter', '$wkt', '$expand', '$select' ];
const supportedParams = [ '$format', '$count', '$skip', '$top', '$filter', '$wkt', '$expand', '$select', '$skiptoken' ];
const supportedFormats = {
json: [ 'application/json', 'json' ],
xml: [ 'application/xml', 'atom' ]
Expand Down
29 changes: 26 additions & 3 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ INNER JOIN
SELECT "entityId", (COUNT(id) - 1) AS "updates" FROM entity_defs GROUP BY "entityId"
) stats ON stats."entityId"=entity_defs."entityId"
LEFT JOIN actors ON entities."creatorId"=actors.id
${options.skiptoken ? sql`
INNER JOIN
( SELECT id, "createdAt" FROM entities WHERE "uuid" = ${options.skiptoken.uuid}) AS cursor
ON entities."createdAt" <= cursor."createdAt" AND entities.id < cursor.id
`: sql``}
WHERE
entities."datasetId" = ${datasetId}
AND entities."deletedAt" IS NULL
Expand All @@ -324,10 +329,28 @@ ORDER BY entities."createdAt" DESC, entities.id DESC
${page(options)}`)
.then(stream.map(_exportUnjoiner));

const countByDatasetId = (datasetId, options = QueryOptions.none) => ({ oneFirst }) => oneFirst(sql`
SELECT count(*) FROM entities
const countByDatasetId = (datasetId, options = QueryOptions.none) => ({ one }) => one(sql`
SELECT * FROM

(
SELECT count(*) count FROM entities
WHERE "datasetId" = ${datasetId}
AND "deletedAt" IS NULL
AND ${odataFilter(options.filter, odataToColumnMap)}
) AS "all"

CROSS JOIN
(
SELECT COUNT(*) remaining FROM entities
${options.skiptoken ? sql`
INNER JOIN
( SELECT id, "createdAt" FROM entities WHERE "uuid" = ${options.skiptoken.uuid}) AS cursor
ON entities."createdAt" <= cursor."createdAt" AND entities.id < cursor.id
`: sql``}
WHERE "datasetId" = ${datasetId}
AND ${odataFilter(options.filter, odataToColumnMap)}`);
AND "deletedAt" IS NULL
AND ${odataFilter(options.filter, odataToColumnMap)}
) AS skiptoken`);


////////////////////////////////////////////////////////////////////////////////
Expand Down
23 changes: 20 additions & 3 deletions lib/model/query/submissions.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,21 @@ const getById = (submissionId) => ({ maybeOne }) =>
maybeOne(sql`select * from submissions where id=${submissionId} and "deletedAt" is null`)
.then(map(construct(Submission)));

const countByFormId = (formId, draft, options = QueryOptions.none) => ({ oneFirst }) => oneFirst(sql`
select count(*) from submissions
where ${equals({ formId, draft })} and "deletedAt" is null and ${odataFilter(options.filter, odataToColumnMap)}`);
const countByFormId = (formId, draft, options = QueryOptions.none) => ({ one }) => one(sql`
SELECT * FROM
( SELECT COUNT(*) count FROM submissions
WHERE ${equals({ formId, draft })} AND "deletedAt" IS NULL AND ${odataFilter(options.filter, odataToColumnMap)}) AS "all"
CROSS JOIN
( SELECT COUNT(*) remaining FROM submissions
${options.skiptoken ? sql`
INNER JOIN
( SELECT id, "createdAt" FROM submissions WHERE "instanceId" = ${options.skiptoken.instanceId}) AS cursor
ON submissions."createdAt" <= cursor."createdAt" AND submissions.id < cursor.id
`: sql``}
WHERE ${equals({ formId, draft })}
AND "deletedAt" IS NULL
AND ${odataFilter(options.filter, odataToColumnMap)}) AS skiptoken
`);

const verifyVersion = (formId, rootId, instanceId, draft) => ({ maybeOne }) => maybeOne(sql`
select true from submissions
Expand Down Expand Up @@ -337,6 +349,11 @@ inner join
(select "submissionId", (count(id) - 1) as count from submission_defs
group by "submissionId") as edits
on edits."submissionId"=submission_defs."submissionId"
${options.skiptoken && !options.skiptoken.repeatId ? sql` -- in case of subtable we are fetching all Submissions without pagination
inner join
(select id, "createdAt" from submissions where "instanceId" = ${options.skiptoken.instanceId}) as cursor
on submissions."createdAt" <= cursor."createdAt" and submissions.id < cursor.id
`: sql``}
where
${encrypted ? sql`(form_defs."encKeyId" is null or form_defs."encKeyId" in (${sql.join(keyIds, sql`,`)})) and` : sql``}
${odataFilter(options.filter, options.isSubmissionsTable ? odataToColumnMap : odataSubTableToColumnMap)} and
Expand Down
4 changes: 2 additions & 2 deletions lib/resources/datasets.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ module.exports = (service, endpoint) => {
.then((project) => auth.canOrReject('dataset.list', project))
.then(() => Datasets.getList(params.id, queryOptions))));

service.get('/projects/:projectId/datasets/:name', endpoint(({ Datasets }, { params, auth }) =>
Datasets.get(params.projectId, params.name)
service.get('/projects/:projectId/datasets/:name', endpoint(({ Datasets }, { params, auth, queryOptions }) =>
Datasets.get(params.projectId, params.name, true, queryOptions.extended)
.then(getOrNotFound)
.then((dataset) => auth.canOrReject('dataset.read', dataset)
.then(() => Datasets.getMetadata(dataset)))));
Expand Down
4 changes: 2 additions & 2 deletions lib/resources/odata-entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ module.exports = (service, endpoint) => {
const dataset = await Datasets.get(params.projectId, params.name, true).then(getOrNotFound);
const properties = await Datasets.getProperties(dataset.id);
const options = QueryOptions.fromODataRequestEntities(query);
const count = await Entities.countByDatasetId(dataset.id, options);
const { count, remaining } = await Entities.countByDatasetId(dataset.id, options);
const entities = await Entities.streamForExport(dataset.id, options);
return json(streamEntityOdata(entities, properties, env.domain, originalUrl, query, count));
return json(streamEntityOdata(entities, properties, env.domain, originalUrl, query, count, remaining));
}));


Expand Down
6 changes: 3 additions & 3 deletions lib/resources/odata.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ module.exports = (service, endpoint) => {
Forms.getFields(form.def.id).then(selectFields(query, params.table)),
Submissions.streamForExport(form.id, draft, undefined, options),
((params.table === 'Submissions') && options.hasPaging())
? Submissions.countByFormId(form.id, draft, options) : resolve(null)
? Submissions.countByFormId(form.id, draft, options) : resolve({})
])
.then(([fields, stream, count]) =>
json(rowStreamToOData(fields, params.table, env.domain, originalUrl, query, stream, count)));
.then(([fields, stream, { count, remaining }]) =>
json(rowStreamToOData(fields, params.table, env.domain, originalUrl, query, stream, count, remaining)));
})));
};

Expand Down
Loading