Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logs: add support for a start parameter when reading logs #1957

Merged
merged 2 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/features/device-logs/lib/backends/loki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from '../../../../lib/config.js';
import type {
DeviceLogsBackend,
HistoryOpts,
InternalDeviceLog,
LogContext,
LokiLogContext,
Expand Down Expand Up @@ -178,7 +179,7 @@ export class LokiBackend implements DeviceLogsBackend {
*/
public async history(
$ctx: LogContext,
count: number,
{ count, start }: HistoryOpts,
): Promise<OutputDeviceLog[]> {
const ctx = await assertLokiLogContext($ctx);

Expand All @@ -189,6 +190,7 @@ export class LokiBackend implements DeviceLogsBackend {
query: this.getDeviceQuery(ctx),
limit: Number.isFinite(count) ? count : 1000,
since: '30d',
...(start != null ? { start: `${BigInt(start) * 1000000n}` } : {}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be v ?

Suggested change
...(start != null ? { start: `${BigInt(start) * 1000000n}` } : {}),
...(typeof start === 'number' ? { start: `${BigInt(start) * 1000000n}` } : {}),

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm I just saw the tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's either undefined or a number, so both checks should be equivalent?

},
json: true,
gzip: LOKI_HISTORY_GZIP,
Expand Down
16 changes: 13 additions & 3 deletions src/features/device-logs/lib/backends/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
import { DAYS } from '@balena/env-parsing';
import type {
DeviceLogsBackend,
HistoryOpts,
InternalDeviceLog,
LogContext,
OutputDeviceLog,
Expand Down Expand Up @@ -152,7 +153,7 @@ export class RedisBackend implements DeviceLogsBackend {

public async history(
ctx: LogContext,
count: number,
{ count, start }: HistoryOpts,
): Promise<OutputDeviceLog[]> {
if (!this.connected) {
throw new ServiceUnavailableError();
Expand All @@ -163,8 +164,17 @@ export class RedisBackend implements DeviceLogsBackend {
count === Infinity ? 0 : -count,
-1,
);
const parsedLogs = await Promise.all(payloads.map(this.fromRedisLog));
return _.compact(parsedLogs);
return (await Promise.all(payloads.map(this.fromRedisLog))).filter(
(log): log is NonNullable<typeof log> => {
if (log == null) {
return false;
}
if (start == null) {
return true;
}
return log.createdAt >= start;
},
);
}

public get available(): boolean {
Expand Down
61 changes: 41 additions & 20 deletions src/features/device-logs/lib/read.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {

import type {
DeviceLogsBackend,
HistoryOpts,
LogContext,
OutputDeviceLog,
} from './struct.js';
Expand All @@ -29,6 +30,8 @@ import {
LOGS_READ_STREAM_FLUSH_INTERVAL,
NDJSON_CTYPE,
} from '../../../lib/config.js';
import { DAYS } from '@balena/env-parsing';
import { checkInt } from '../../../lib/utils.js';

const { NotFoundError } = errors;
const { api } = sbvrUtils;
Expand All @@ -45,16 +48,22 @@ export const read =
async (req: Request, res: Response) => {
try {
const ctx = await getReadContext(req);
if (req.query.stream === '1') {
await handleStreamingRead(ctx, req, res);
const isStreamingRead = req.query.stream === '1';
const count = getCount(
req.query.count as string | undefined,
isStreamingRead
? LOGS_DEFAULT_SUBSCRIPTION_COUNT
: LOGS_DEFAULT_HISTORY_COUNT,
);
const start = getStart(req.query.start as string | undefined, undefined);
if (isStreamingRead) {
await handleStreamingRead(ctx, req, res, { count, start });
onLogReadStreamInitialized?.(req);
} else {
const logs = await getHistory(
await getReadBackend(),
ctx,
req,
LOGS_DEFAULT_HISTORY_COUNT,
);
const logs = await getHistory(await getReadBackend(), ctx, {
count,
start,
});

res.json(logs);
}
Expand All @@ -71,6 +80,7 @@ async function handleStreamingRead(
ctx: LogContext,
req: Request,
res: Response,
{ count, start }: HistoryOpts,
): Promise<void> {
const backend = await getReadBackend();
let state: StreamState = StreamState.Buffering;
Expand Down Expand Up @@ -156,12 +166,7 @@ async function handleStreamingRead(
// Subscribe in parallel so we don't miss logs in between
backend.subscribe(ctx, onLog);
try {
let logs = await getHistory(
backend,
ctx,
req,
LOGS_DEFAULT_SUBSCRIPTION_COUNT,
);
let logs = await getHistory(backend, ctx, { count, start });

// We need this cast as typescript narrows to `StreamState.Buffering`
// because it ignores that during the `await` break it can be changed
Expand Down Expand Up @@ -218,21 +223,37 @@ function getCount(
return Math.min(count, LOGS_DEFAULT_RETENTION_LIMIT);
}

function getStart(
startParam: string | undefined,
defaultStart?: number,
): number | undefined {
let start: number | undefined;
if (typeof startParam !== 'string') {
start = defaultStart;
} else {
start = checkInt(startParam) || new Date(startParam).getTime();
if (isNaN(start)) {
start = defaultStart;
}
}
if (start == null) {
return start;
}
return Math.max(start, Date.now() - 30 * DAYS);
}

function getHistory(
backend: DeviceLogsBackend,
ctx: LogContext,
{ query }: Request,
defaultCount: number,
opts: HistoryOpts,
): Resolvable<OutputDeviceLog[]> {
const count = getCount(query.count as string | undefined, defaultCount);

// Optimize the case where the caller doesn't need any history
if (!count) {
if (!opts.count) {
return [];
}

// TODO: Implement `?since` filter here too in the next phase
return backend.history(ctx, count);
return backend.history(ctx, opts);
}

async function getReadContext(req: Request): Promise<LogContext> {
Expand Down
4 changes: 3 additions & 1 deletion src/features/device-logs/lib/struct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ export interface OldSupervisorLog {

export type Subscription = (log: OutputDeviceLog) => void;

export type HistoryOpts = { count: number; start?: number };

export interface DeviceLogsBackend {
history(ctx: LogContext, count: number): Promise<OutputDeviceLog[]>;
history(ctx: LogContext, opts: HistoryOpts): Promise<OutputDeviceLog[]>;
available: boolean;
/**
* `logs` will be mutated to empty and so must be handled synchronously
Expand Down
29 changes: 29 additions & 0 deletions test/06_device-log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import _ from 'lodash';
import { expect } from 'chai';
import * as fixtures from './test-lib/fixtures.js';
import { supertest } from './test-lib/supertest.js';
import { DAYS } from '@balena/env-parsing';

const createLog = (extra = {}) => {
return {
Expand Down Expand Up @@ -163,7 +164,9 @@ export default () => {

// Stream Reading Logs

let dateBeforeStreamedLogs: Date;
it('should allow users to stream-read device logs with a JWT', async () => {
dateBeforeStreamedLogs = new Date();
const logChunks: string[] = [];
let extraLogsSent = 0;
const req = supertest(ctx.user)
Expand Down Expand Up @@ -225,5 +228,31 @@ export default () => {
'streamed log line 1',
]);
});

for (const fn of ['toISOString', 'getTime'] as const) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

;)

it(`should allow specifying logs start date as a ${fn}`, async () => {
const res = await supertest(ctx.user)
.get(
`/device/v2/${ctx.device.uuid}/logs?start=${dateBeforeStreamedLogs[fn]()}`,
)
.expect(200);

expect(res.body).to.have.lengthOf(2);
expect(res.body[0])
.to.have.property('message')
.equals('streamed log line 0');
expect(res.body[1])
.to.have.property('message')
.equals('streamed log line 1');

// And double check that putting the date further back does fetch all the expected logs..
const res2 = await supertest(ctx.user)
.get(
`/device/v2/${ctx.device.uuid}/logs?start=${new Date(dateBeforeStreamedLogs.getTime() - 1 * DAYS)[fn]()}`,
)
.expect(200);
expect(res2.body).to.have.lengthOf(8);
});
}
});
};
6 changes: 3 additions & 3 deletions test/13_loki-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export default () => {
const log = createLog();
const response = await loki.publish(ctx, [_.clone(log)]);
expect(response).to.be.not.null;
const history = await loki.history(ctx, 1000);
const history = await loki.history(ctx, { count: 1000 });
expect(history.at(-1)).to.deep.equal(convertToOutputLog(log));
});

Expand All @@ -73,7 +73,7 @@ export default () => {
];
const response = await loki.publish(ctx, _.cloneDeep(logs));
expect(response).to.be.not.null;
const history = await loki.history(ctx, 1000);
const history = await loki.history(ctx, { count: 1000 });
expect(history.slice(-5)).to.deep.equal(logs.map(convertToOutputLog));
});

Expand All @@ -84,7 +84,7 @@ export default () => {
const logs = [_.clone(log), _.clone(log), _.clone(log)];
const response = await loki.publish(ctx, _.cloneDeep(logs));
expect(response).to.be.not.null;
const history = await loki.history(ctx, 1000);
const history = await loki.history(ctx, { count: 1000 });
expect(history[1].timestamp).to.not.equal(log.timestamp);
});

Expand Down
Loading