Skip to content

Commit

Permalink
Made changes for Submission OData
Browse files Browse the repository at this point in the history
need to add more unit tests
  • Loading branch information
sadiqkhoja committed Jul 27, 2023
1 parent 0f7f11d commit d3a678c
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 135 deletions.
16 changes: 8 additions & 8 deletions lib/data/odata.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ const submissionToOData = (fields, table, submission, options = {}) => new Promi
// on option.metadata even though they are not part of the form's own schema.
// So rather than try to inject them into the xml transformation below, we just
// formulate them herein advance:
if (!options.metadata || options.metadata.__id) {
root.__id = submission.instanceId;
}
// if (!options.metadata || options.metadata.__id) {
root.__id = submission.instanceId;
// }
if (table === 'Submissions') {
const systemObj = {
submissionDate: submission.createdAt,
Expand Down Expand Up @@ -175,7 +175,7 @@ const submissionToOData = (fields, table, submission, options = {}) => new Promi
}

// bail out without doing any work if we are encrypted.
if (encrypted === true) return resolve(result);
if (encrypted === true) return resolve({ data: result, instanceId: submission.instanceId });

// we keep a dataStack, so we build an appropriate nested structure overall, and
// we can select the appropriate layer of that nesting at will.
Expand Down Expand Up @@ -220,9 +220,9 @@ const submissionToOData = (fields, table, submission, options = {}) => new Promi

// create our new databag, push into result data, and set it as our result ptr.
const bag = generateDataFrame(schemaStack);
if (!options.metadata || options.metadata.__id) {
bag.__id = hashId(schemaStack, submission.instanceId);
}
// if (!options.metadata || options.metadata.__id) {
bag.__id = hashId(schemaStack, submission.instanceId);
// }
dataPtr[outname].push(bag);
dataStack.push(bag);

Expand Down Expand Up @@ -342,7 +342,7 @@ const submissionToOData = (fields, table, submission, options = {}) => new Promi

if (schemaStack.hasExited()) {
parser.reset();
resolve(result);
resolve({ data: result, instanceId: submission.instanceId });
}
}
}, { xmlMode: true, decodeEntities: true });
Expand Down
85 changes: 71 additions & 14 deletions lib/formats/odata.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const { sanitizeOdataIdentifier, without } = require('../util/util');
const { jsonDataFooter, extractOptions, nextUrlFor } = require('../util/odata');
const { submissionToOData, systemFields } = require('../data/odata');
const { SchemaStack } = require('../data/schema');
const { QueryOptions } = require('../util/db');


////////////////////////////////////////////////////////////////////////////////
// UTIL
Expand Down Expand Up @@ -52,9 +54,10 @@ const extractPathContext = (subpath) =>
const extractPaging = (table, query) => {
const parsedLimit = parseInt(query.$top, 10);
const limit = Number.isNaN(parsedLimit) ? Infinity : parsedLimit;
const offset = parseInt(query.$skip, 10) || 0;
const offset = (!query.$skiptoken && parseInt(query.$skip, 10)) || 0;
const shouldCount = isTrue(query.$count);
const result = { limit: max(0, limit), offset: max(0, offset), shouldCount };
const skipToken = query.$skiptoken ? QueryOptions.parseSkiptoken(query.$skiptoken) : null;
const result = { limit: max(0, limit), offset: max(0, offset), shouldCount, skipToken };

return Object.assign(result, (table === 'Submissions')
? { doLimit: Infinity, doOffset: 0 }
Expand Down Expand Up @@ -366,11 +369,12 @@ const edmxForEntities = (datasetName, properties) => {
// originalUrl: String is the request URL; we need it as well to formulate response URLs.
// query: Object is the Express Request query object indicating request querystring parameters.
// inStream: Stream[Row] is the postgres Submissions rowstream.
const rowStreamToOData = (fields, table, domain, originalUrl, query, inStream, tableCount) => {
const rowStreamToOData = (fields, table, domain, originalUrl, query, inStream, tableCount, tableRemaining) => {
// cache values we'll need repeatedly.
const serviceRoot = getServiceRoot(originalUrl);
const { limit, offset, doLimit, doOffset, shouldCount } = extractPaging(table, query);
const { doLimit, doOffset, shouldCount, skipToken } = extractPaging(table, query);
const options = extractOptions(query);
const isSubTable = table !== 'Submissions';

// make sure the target table actually exists.
// TODO: now that this doesn't require schema computation, should we move it up
Expand All @@ -379,25 +383,47 @@ const rowStreamToOData = (fields, table, domain, originalUrl, query, inStream, t

// write the header, then transform and stream each row.
let counted = 0;
let lastInstanceId = null;
let lastRepeatId = null;
let added = 0;
let remainingItems = 0;

// For Submissions table, it is true because cursor is handled at database level
let cursorPredicate = !isSubTable || !skipToken;

const parserStream = new Transform({
writableObjectMode: true, // we take a stream of objects from the db, but
readableObjectMode: false, // we put out a stream of text.
transform(row, _, done) {
// per row, we do our asynchronous parsing, jam the result onto the
// text resultstream, and call done to indicate that the row is processed.
submissionToOData(fields, table, row, options).then((data) => {
submissionToOData(fields, table, row, options).then(({ data, instanceId }) => {

const parentIdProperty = data[0] ? Object.keys(data[0]).find(p => /^__.*-id$/.test(p)) : null;

// In case of subtable we are reading all Submissions without pagination because we have to
// count repeat items in each Submission
for (const field of data) {

// if $select is there and parentId is not requested then remove it
const fieldRefined = options.metadata && !options.metadata[parentIdProperty] ? without([parentIdProperty], field) : field;
let fieldRefined = options.metadata && !options.metadata[parentIdProperty] ? without([parentIdProperty], field) : field;
// if $select is there and __id is not requested then remove it
fieldRefined = options.metadata && !options.metadata.__id ? without(['__id'], fieldRefined) : fieldRefined;

if ((counted >= doOffset) && (counted < (doOffset + doLimit))) {
this.push((counted === doOffset) ? '{"value":[' : ','); // header or fencepost.
if (added === doLimit) remainingItems += 1;

if (added < doLimit && counted >= doOffset && cursorPredicate) {
this.push((added === 0) ? '{"value":[' : ','); // header or fencepost.
this.push(JSON.stringify(fieldRefined));
lastInstanceId = instanceId;
if (isSubTable) lastRepeatId = field.__id;
added += 1;
}

// Controls the rows to be skipped based on skipToken
// Once set to true remains true
cursorPredicate = cursorPredicate || (skipToken.instanceId === instanceId && skipToken.repeatId === field.__id);

counted += 1;
}
done(); // signifies that this stream element is fully processed.
Expand All @@ -406,12 +432,17 @@ const rowStreamToOData = (fields, table, domain, originalUrl, query, inStream, t
flush(done) {
// flush is called just before the transform stream is done and closed; we write
// our footer information, close the object, and tell the stream we are done.
this.push((counted <= doOffset) ? '{"value":[],' : '],'); // open object or close row array.
this.push((added === 0) ? '{"value":[],' : '],'); // open object or close row array.

// if we were given an explicit count, use it from here out, to create
// @odata.count and nextUrl.
const remaining = (tableRemaining != null) ? tableRemaining - added : remainingItems;
const totalCount = (tableCount != null) ? tableCount : counted;
const nextUrl = nextUrlFor(limit, offset, totalCount, originalUrl);

const skipTokenData = { instanceId: lastInstanceId };
if (isSubTable) skipTokenData.repeatId = lastRepeatId;

const nextUrl = nextUrlFor(remaining, originalUrl, skipTokenData);

// we do toString on the totalCount because mustache won't bother rendering
// the block if it sees integer zero.
Expand Down Expand Up @@ -448,8 +479,10 @@ const singleRowToOData = (fields, row, domain, originalUrl, query) => {
const table = tableParts.join('.');
if (!verifyTablePath(tableParts, fields)) throw Problem.user.notFound();

const isSubTable = table !== 'Submissions';

// extract all our fields first, the field extractor doesn't know about target contexts.
return submissionToOData(fields, table, row, options).then((subrows) => {
return submissionToOData(fields, table, row, options).then(({ data: subrows, instanceId }) => {
// now we actually filter to the requested set. we actually only need to compare
// the very last specified id, since it is fully unique.
const filterContextIdx = targetContext.reduce(((extant, pair, idx) => ((pair[1] != null) ? idx : extant)), -1);
Expand All @@ -462,12 +495,36 @@ const singleRowToOData = (fields, row, domain, originalUrl, query) => {
const count = filtered.length;

// now we can process $top/$skip/$count:
const { limit, offset, shouldCount } = extractPaging(table, query);
const nextUrl = nextUrlFor(limit, offset, count, originalUrl);
const paging = extractPaging(table, query);
const { limit, shouldCount, skipToken } = paging;
let { offset } = paging;

if (skipToken) {
offset = filtered.fiindIndex(s => skipToken.repeatId === s.__id);
}

const pared = filtered.slice(offset, offset + limit);

let nextUrl = null;

if (pared.length > 0) {
const remaining = count - (offset + limit);

const skipTokenData = {
instanceId
};

if (isSubTable) skipTokenData.repeatId = pared[pared.length - 1].__id;

nextUrl = nextUrlFor(remaining, originalUrl, skipTokenData);
}


// if $select is there and parentId is not requested then remove it
let paredRefined = options.metadata && !options.metadata[filterField] ? pared.map(p => without([filterField], p)) : pared;

// if $select is there and parentId is not requested then remove it
const paredRefined = options.metadata && !options.metadata[filterField] ? pared.map(p => without([filterField], p)) : pared;
paredRefined = options.metadata && !options.metadata.__id ? paredRefined.map(p => without(['__id'], p)) : paredRefined;

// and finally splice together and return our result:
const dataContents = paredRefined.map(JSON.stringify).join(',');
Expand Down
2 changes: 1 addition & 1 deletion lib/http/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ const isJsonType = (x) => /(^|,)(application\/json|json)($|;|,)/i.test(x);
const isXmlType = (x) => /(^|,)(application\/(atom(svc)?\+)?xml|atom|xml)($|;|,)/i.test(x);

// various supported odata constants:
const supportedParams = [ '$format', '$count', '$skip', '$top', '$filter', '$wkt', '$expand', '$select' ];
const supportedParams = [ '$format', '$count', '$skip', '$top', '$filter', '$wkt', '$expand', '$select', '$skiptoken' ];
const supportedFormats = {
json: [ 'application/json', 'json' ],
xml: [ 'application/xml', 'atom' ]
Expand Down
23 changes: 20 additions & 3 deletions lib/model/query/submissions.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,21 @@ const getById = (submissionId) => ({ maybeOne }) =>
maybeOne(sql`select * from submissions where id=${submissionId} and "deletedAt" is null`)
.then(map(construct(Submission)));

const countByFormId = (formId, draft, options = QueryOptions.none) => ({ oneFirst }) => oneFirst(sql`
select count(*) from submissions
where ${equals({ formId, draft })} and "deletedAt" is null and ${odataFilter(options.filter, odataToColumnMap)}`);
const countByFormId = (formId, draft, options = QueryOptions.none) => ({ one }) => one(sql`
SELECT * FROM
( SELECT COUNT(*) count FROM submissions
WHERE ${equals({ formId, draft })} AND "deletedAt" IS NULL AND ${odataFilter(options.filter, odataToColumnMap)}) AS "all"
CROSS JOIN
( SELECT COUNT(*) remaining FROM submissions
${options.skiptoken ? sql`
INNER JOIN
( SELECT id, "createdAt" FROM submissions WHERE "instanceId" = ${options.skiptoken.instanceId}) AS cursor
ON submissions."createdAt" <= cursor."createdAt" AND submissions.id < cursor.id
`: sql``}
WHERE ${equals({ formId, draft })}
AND "deletedAt" IS NULL
AND ${odataFilter(options.filter, odataToColumnMap)}) AS skiptoken
`);

const verifyVersion = (formId, rootId, instanceId, draft) => ({ maybeOne }) => maybeOne(sql`
select true from submissions
Expand Down Expand Up @@ -335,6 +347,11 @@ inner join
(select "submissionId", (count(id) - 1) as count from submission_defs
group by "submissionId") as edits
on edits."submissionId"=submission_defs."submissionId"
${options.skiptoken && !options.skiptoken.repeatId ? sql` -- in case of subtable we are fetching all Submissions without pagination
inner join
(select id, "createdAt" from submissions where "instanceId" = ${options.skiptoken.instanceId}) as cursor
on submissions."createdAt" <= cursor."createdAt" and submissions.id < cursor.id
`: sql``}
where
${encrypted ? sql`(form_defs."encKeyId" is null or form_defs."encKeyId" in (${sql.join(keyIds, sql`,`)})) and` : sql``}
${odataFilter(options.filter, options.isSubmissionsTable ? odataToColumnMap : odataSubTableToColumnMap)} and
Expand Down
6 changes: 3 additions & 3 deletions lib/resources/odata.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ module.exports = (service, endpoint) => {
Forms.getFields(form.def.id).then(selectFields(query, params.table)),
Submissions.streamForExport(form.id, draft, undefined, options),
((params.table === 'Submissions') && options.hasPaging())
? Submissions.countByFormId(form.id, draft, options) : resolve(null)
? Submissions.countByFormId(form.id, draft, options) : resolve({})
])
.then(([fields, stream, count]) =>
json(rowStreamToOData(fields, params.table, env.domain, originalUrl, query, stream, count)));
.then(([fields, stream, { count, remaining }]) =>
json(rowStreamToOData(fields, params.table, env.domain, originalUrl, query, stream, count, remaining)));
})));
};

Expand Down
26 changes: 22 additions & 4 deletions lib/util/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const { reject } = require('./promise');
const Problem = require('./problem');
const Option = require('./option');
const { PartialPipe, mapStream } = require('./stream');
const { construct } = require('./util');
const { construct, base64ToUtf8, utf8ToBase64 } = require('./util');
const { isTrue, isFalse } = require('./http');

const { Transform } = require('stream');
Expand Down Expand Up @@ -222,11 +222,17 @@ const equals = (obj) => {

const page = (options) => {
const parts = [];
if (options.offset != null) parts.push(sql`offset ${options.offset}`);
if (options.offset != null && !options.skiptoken) parts.push(sql`offset ${options.offset}`);
if (options.limit != null) parts.push(sql`limit ${options.limit}`);
return parts.length ? sql.join(parts, sql` `) : nothing;
};

const greaterThan = (k, v) => {
if (!k || !v) return sql`true`;

return sql`${sql.identifier(k.split('.'))} > ${v}`;
};

////////////////////////////////////////
// query func decorator
//
Expand Down Expand Up @@ -360,15 +366,27 @@ class QueryOptions {
return f(this.args[arg]);
}

static parseSkiptoken(token) {
const jsonString = base64ToUtf8(token);
return JSON.parse(jsonString);
}

static getSkiptoken(data) {
const jsonString = JSON.stringify(data);
return utf8ToBase64(jsonString);
}

static fromODataRequest(params, query) {
const result = { extended: true };
result.isSubmissionsTable = params.table === 'Submissions';
if ((params.table === 'Submissions') && (query.$skip != null))
if ((params.table === 'Submissions') && (!query.$skiptoken) && (query.$skip != null))
result.offset = parseInt(query.$skip, 10);
if ((params.table === 'Submissions') && (query.$top != null))
result.limit = parseInt(query.$top, 10);
if (query.$filter != null)
result.filter = query.$filter;
if ((params.table === 'Submissions') && (query.$skiptoken != null))
result.skiptoken = QueryOptions.parseSkiptoken(query.$skiptoken);

return new QueryOptions(result);
}
Expand Down Expand Up @@ -532,7 +550,7 @@ const postgresErrorToProblem = (x) => {

module.exports = {
connectionString, connectionObject,
unjoiner, extender, equals, page, queryFuncs,
unjoiner, extender, equals, greaterThan, page, queryFuncs,
insert, insertMany, updater, markDeleted, markUndeleted,
QueryOptions,
postgresErrorToProblem
Expand Down
9 changes: 5 additions & 4 deletions lib/util/odata.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const { max } = Math;
const Problem = require('./problem');
const { parse, render } = require('mustache');
const { isTrue, urlWithQueryParams } = require('./http');
const { QueryOptions } = require('./db');

const template = (body) => {
parse(body); // caches template for future perf.
Expand Down Expand Up @@ -49,10 +50,10 @@ const getServiceRoot = (subpath) => {
// Given limit: Int, offset: Int, count: Int, originalUrl: String, calculates
// what the nextUrl should be to supply server-driven paging (11.2.5.7). Returns
// url: String?
const nextUrlFor = (limit, offset, count, originalUrl) =>
((offset + limit >= count)
? null
: urlWithQueryParams(originalUrl, { $skip: (offset + limit), $top: null }));
const nextUrlFor = (remaining, originalUrl, skipTokenData) => ((!skipTokenData || remaining <= 0)
? null
: urlWithQueryParams(originalUrl, { $skip: null, $skiptoken: QueryOptions.getSkiptoken(skipTokenData) }));


// Given a querystring object, returns an object of relevant OData options. Right
// now that is only { wkt: Bool, expand: String, metadata: Array }
Expand Down
14 changes: 14 additions & 0 deletions lib/util/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ const pickAll = (keys, obj) => {
return result;
};

// source: https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem

function base64ToUtf8(base64) {
const binString = atob(base64);
const bytes = Uint8Array.from(binString, (m) => m.codePointAt(0));
return new TextDecoder().decode(bytes);
}

function utf8ToBase64(string) {
const bytes = new TextEncoder().encode(string);
const binString = Array.from(bytes, (x) => String.fromCodePoint(x)).join('');
return btoa(binString);
}

////////////////////////////////////////
// CLASSES
Expand All @@ -76,6 +89,7 @@ module.exports = {
noop, noargs,
isBlank, isPresent, blankStringToNull, sanitizeOdataIdentifier,
printPairs, without, pickAll,
base64ToUtf8, utf8ToBase64,
construct
};

2 changes: 1 addition & 1 deletion test/integration/api/odata-entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ describe('api: /datasets/:name.svc', () => {
await asAlice.get('/v1/projects/1/datasets/people.svc/Entities?$top=1')
.expect(200)
.then(({ body }) => {
body['@odata.nextLink'].should.be.equal('http://localhost:8989/v1/projects/1/datasets/people.svc/Entities?%24skip=1');
body['@odata.nextLink'].should.be.equal('http://localhost:8989/0?%24skiptoken=Mg%3D%3D');
});
}));

Expand Down
Loading

0 comments on commit d3a678c

Please sign in to comment.