Skip to content

Commit

Permalink
Logs: add support for a start parameter when reading logs
Browse files Browse the repository at this point in the history
This can either be an ISO date or a milliseconds timestamp

Change-type: minor
  • Loading branch information
Page- committed Feb 18, 2025
1 parent aa90528 commit 8c695b7
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 15 deletions.
4 changes: 3 additions & 1 deletion src/features/device-logs/lib/backends/loki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from '../../../../lib/config.js';
import type {
DeviceLogsBackend,
HistoryOpts,
InternalDeviceLog,
LogContext,
LokiLogContext,
Expand Down Expand Up @@ -178,7 +179,7 @@ export class LokiBackend implements DeviceLogsBackend {
*/
public async history(
$ctx: LogContext,
count: number,
{ count, start }: HistoryOpts,
): Promise<OutputDeviceLog[]> {
const ctx = await assertLokiLogContext($ctx);

Expand All @@ -189,6 +190,7 @@ export class LokiBackend implements DeviceLogsBackend {
query: this.getDeviceQuery(ctx),
limit: Number.isFinite(count) ? count : 1000,
since: '30d',
...(start != null ? { start: `${BigInt(start) * 1000000n}` } : {}),
},
json: true,
gzip: LOKI_HISTORY_GZIP,
Expand Down
16 changes: 13 additions & 3 deletions src/features/device-logs/lib/backends/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
import { DAYS } from '@balena/env-parsing';
import type {
DeviceLogsBackend,
HistoryOpts,
InternalDeviceLog,
LogContext,
OutputDeviceLog,
Expand Down Expand Up @@ -152,7 +153,7 @@ export class RedisBackend implements DeviceLogsBackend {

public async history(
ctx: LogContext,
count: number,
{ count, start }: HistoryOpts,
): Promise<OutputDeviceLog[]> {
if (!this.connected) {
throw new ServiceUnavailableError();
Expand All @@ -163,8 +164,17 @@ export class RedisBackend implements DeviceLogsBackend {
count === Infinity ? 0 : -count,
-1,
);
const parsedLogs = await Promise.all(payloads.map(this.fromRedisLog));
return _.compact(parsedLogs);
return (await Promise.all(payloads.map(this.fromRedisLog))).filter(
(log): log is NonNullable<typeof log> => {
if (log == null) {
return false;
}
if (start == null) {
return true;
}
return log.createdAt >= start;
},
);
}

public get available(): boolean {
Expand Down
40 changes: 33 additions & 7 deletions src/features/device-logs/lib/read.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {

import type {
DeviceLogsBackend,
HistoryOpts,
LogContext,
OutputDeviceLog,
} from './struct.js';
Expand All @@ -29,6 +30,8 @@ import {
LOGS_READ_STREAM_FLUSH_INTERVAL,
NDJSON_CTYPE,
} from '../../../lib/config.js';
import { DAYS } from '@balena/env-parsing';
import { checkInt } from '../../../lib/utils.js';

const { NotFoundError } = errors;
const { api } = sbvrUtils;
Expand All @@ -52,11 +55,15 @@ export const read =
? LOGS_DEFAULT_SUBSCRIPTION_COUNT
: LOGS_DEFAULT_HISTORY_COUNT,
);
const start = getStart(req.query.start as string | undefined, undefined);
if (isStreamingRead) {
await handleStreamingRead(ctx, req, res, count);
await handleStreamingRead(ctx, req, res, { count, start });
onLogReadStreamInitialized?.(req);
} else {
const logs = await getHistory(await getReadBackend(), ctx, count);
const logs = await getHistory(await getReadBackend(), ctx, {
count,
start,
});

res.json(logs);
}
Expand All @@ -73,7 +80,7 @@ async function handleStreamingRead(
ctx: LogContext,
req: Request,
res: Response,
count: number,
{ count, start }: HistoryOpts,
): Promise<void> {
const backend = await getReadBackend();
let state: StreamState = StreamState.Buffering;
Expand Down Expand Up @@ -159,7 +166,7 @@ async function handleStreamingRead(
// Subscribe in parallel so we don't miss logs in between
backend.subscribe(ctx, onLog);
try {
let logs = await getHistory(backend, ctx, count);
let logs = await getHistory(backend, ctx, { count, start });

// We need this cast as typescript narrows to `StreamState.Buffering`
// because it ignores that during the `await` break it can be changed
Expand Down Expand Up @@ -216,18 +223,37 @@ function getCount(
return Math.min(count, LOGS_DEFAULT_RETENTION_LIMIT);
}

function getStart(
startParam: string | undefined,
defaultStart?: number,
): number | undefined {
let start: number | undefined;
if (typeof startParam !== 'string') {
start = defaultStart;
} else {
start = checkInt(startParam) || new Date(startParam).getTime();
if (isNaN(start)) {
start = defaultStart;
}
}
if (start == null) {
return start;
}
return Math.max(start, Date.now() - 30 * DAYS);
}

function getHistory(
backend: DeviceLogsBackend,
ctx: LogContext,
count: number,
opts: HistoryOpts,
): Resolvable<OutputDeviceLog[]> {
// Optimize the case where the caller doesn't need any history
if (!count) {
if (!opts.count) {
return [];
}

// TODO: Implement `?since` filter here too in the next phase
return backend.history(ctx, count);
return backend.history(ctx, opts);
}

async function getReadContext(req: Request): Promise<LogContext> {
Expand Down
4 changes: 3 additions & 1 deletion src/features/device-logs/lib/struct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ export interface OldSupervisorLog {

export type Subscription = (log: OutputDeviceLog) => void;

export type HistoryOpts = { count: number; start?: number };

export interface DeviceLogsBackend {
history(ctx: LogContext, count: number): Promise<OutputDeviceLog[]>;
history(ctx: LogContext, opts: HistoryOpts): Promise<OutputDeviceLog[]>;
available: boolean;
/**
* `logs` will be mutated to empty and so must be handled synchronously
Expand Down
29 changes: 29 additions & 0 deletions test/06_device-log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import _ from 'lodash';
import { expect } from 'chai';
import * as fixtures from './test-lib/fixtures.js';
import { supertest } from './test-lib/supertest.js';
import { DAYS } from '@balena/env-parsing';

const createLog = (extra = {}) => {
return {
Expand Down Expand Up @@ -163,7 +164,9 @@ export default () => {

// Stream Reading Logs

let dateBeforeStreamedLogs: Date;
it('should allow users to stream-read device logs with a JWT', async () => {
dateBeforeStreamedLogs = new Date();
const logChunks: string[] = [];
let extraLogsSent = 0;
const req = supertest(ctx.user)
Expand Down Expand Up @@ -225,5 +228,31 @@ export default () => {
'streamed log line 1',
]);
});

for (const fn of ['toISOString', 'getTime'] as const) {
it(`should allow specifying logs start date as a ${fn}`, async () => {
const res = await supertest(ctx.user)
.get(
`/device/v2/${ctx.device.uuid}/logs?start=${dateBeforeStreamedLogs[fn]()}`,
)
.expect(200);

expect(res.body).to.have.lengthOf(2);
expect(res.body[0])
.to.have.property('message')
.equals('streamed log line 0');
expect(res.body[1])
.to.have.property('message')
.equals('streamed log line 1');

// And double check that putting the date further back does fetch all the expected logs..
const res2 = await supertest(ctx.user)
.get(
`/device/v2/${ctx.device.uuid}/logs?start=${new Date(dateBeforeStreamedLogs.getTime() - 1 * DAYS)[fn]()}`,
)
.expect(200);
expect(res2.body).to.have.lengthOf(8);
});
}
});
};
6 changes: 3 additions & 3 deletions test/13_loki-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export default () => {
const log = createLog();
const response = await loki.publish(ctx, [_.clone(log)]);
expect(response).to.be.not.null;
const history = await loki.history(ctx, 1000);
const history = await loki.history(ctx, { count: 1000 });
expect(history.at(-1)).to.deep.equal(convertToOutputLog(log));
});

Expand All @@ -73,7 +73,7 @@ export default () => {
];
const response = await loki.publish(ctx, _.cloneDeep(logs));
expect(response).to.be.not.null;
const history = await loki.history(ctx, 1000);
const history = await loki.history(ctx, { count: 1000 });
expect(history.slice(-5)).to.deep.equal(logs.map(convertToOutputLog));
});

Expand All @@ -84,7 +84,7 @@ export default () => {
const logs = [_.clone(log), _.clone(log), _.clone(log)];
const response = await loki.publish(ctx, _.cloneDeep(logs));
expect(response).to.be.not.null;
const history = await loki.history(ctx, 1000);
const history = await loki.history(ctx, { count: 1000 });
expect(history[1].timestamp).to.not.equal(log.timestamp);
});

Expand Down

0 comments on commit 8c695b7

Please sign in to comment.