Skip to content

Commit

Permalink
Improve I/O of internal executables (#641)
Browse files Browse the repository at this point in the history
  • Loading branch information
timostamm authored Dec 5, 2023
1 parent ce54cf1 commit 47c76a7
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 86 deletions.
150 changes: 83 additions & 67 deletions packages/protobuf-conformance/src/conformance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
} from "./gen/conformance/conformance_pb.js";
import { TestAllTypesProto3 } from "./gen/google/protobuf/test_messages_proto3_pb.js";
import { TestAllTypesProto2 } from "./gen/google/protobuf/test_messages_proto2_pb.js";
import { readSync, writeSync } from "fs";
import type { MessageType } from "@bufbuild/protobuf";
import {
Any,
createRegistry,
Expand All @@ -33,6 +33,7 @@ import {
Timestamp,
Value,
} from "@bufbuild/protobuf";
import type { Writable } from "node:stream";

const registry = createRegistry(
Value,
Expand All @@ -46,17 +47,24 @@ const registry = createRegistry(
Any,
);

function main() {
void main();

async function main() {
let testCount = 0;
try {
while (testIo(test)) {
const requests = readMessages(process.stdin, ConformanceRequest);
const responses = processMessages(requests, (request) => {
testCount += 1;
}
return new ConformanceResponse({
result: test(request),
});
});
await writeMessages(process.stdout, responses);
} catch (e) {
process.stderr.write(
`conformance.ts: exiting after ${testCount} tests: ${String(e)}`,
() => process.exit(1),
);
process.exit(1);
}
}

Expand Down Expand Up @@ -146,75 +154,83 @@ function test(request: ConformanceRequest): ConformanceResponse["result"] {
}
}

// Returns true if the test ran successfully, false on legitimate EOF.
// If EOF is encountered in an unexpected place, raises IOError.
function testIo(
test: (request: ConformanceRequest) => ConformanceResponse["result"],
): boolean {
setBlockingStdout();
const requestLengthBuf = readBuffer(4);
if (requestLengthBuf === "EOF") {
return false;
// Reads length-prefixed messages from a stream.
async function* readMessages<T extends Message<T>>(
stream: AsyncIterable<Uint8Array>,
type: MessageType<T>,
): AsyncIterable<T> {
// append chunk to buffer, returning updated buffer
function append(buffer: Uint8Array, chunk: Uint8Array): Uint8Array {
const n = new Uint8Array(buffer.byteLength + chunk.byteLength);
n.set(buffer);
n.set(chunk, buffer.byteLength);
return n;
}
const requestLength = requestLengthBuf.readInt32LE(0);
const serializedRequest = readBuffer(requestLength);
if (serializedRequest === "EOF") {
throw "Failed to read request.";
}
const request = ConformanceRequest.fromBinary(serializedRequest);
const response = new ConformanceResponse();
response.result = test(request);
const serializedResponse = response.toBinary();
const responseLengthBuf = Buffer.alloc(4);
responseLengthBuf.writeInt32LE(serializedResponse.length, 0);
writeBuffer(responseLengthBuf);
writeBuffer(Buffer.from(serializedResponse));
return true;
}

// Read a buffer of N bytes from stdin.
function readBuffer(bytes: number): Buffer | "EOF" {
const buf = Buffer.alloc(bytes);
let read = 0;
try {
read = readSync(0, buf, 0, bytes, null);
} catch (e) {
throw `failed to read from stdin: ${String(e)}`;
}
if (read !== bytes) {
if (read === 0) {
return "EOF";
let buffer = new Uint8Array(0);
for await (const chunk of stream) {
buffer = append(buffer, chunk);
for (;;) {
if (buffer.byteLength < 4) {
// size is incomplete, buffer more data
break;
}
const size = new DataView(buffer.buffer).getInt32(0, true);
if (buffer.byteLength < 4 + size) {
// message is incomplete, buffer more data
break;
}
yield type.fromBinary(buffer.subarray(4, 4 + size));
buffer = buffer.subarray(4 + size);
}
throw "premature EOF on stdin.";
}
return buf;
if (buffer.byteLength > 0) {
throw new Error("incomplete data");
}
}

// Write a buffer to stdout.
function writeBuffer(buffer: Buffer): void {
let totalWritten = 0;
while (totalWritten < buffer.length) {
totalWritten += writeSync(
1,
buffer,
totalWritten,
buffer.length - totalWritten,
);
}
// Returns a new iterable that processes each element of the input.
function processMessages<I extends Message<I>, O extends Message<O> = I>(
requests: AsyncIterable<I>,
processor: (req: I) => O,
): AsyncIterable<O> {
const source = requests[Symbol.asyncIterator]();
return {
[Symbol.asyncIterator]() {
return {
async next() {
const s = await source.next();
if (s.done === true) {
return {
done: true,
value: undefined,
};
}
return {
done: false,
value: processor(s.value),
};
},
};
},
};
}

// Fixes https://github.com/timostamm/protobuf-ts/issues/134
// Node is buffering chunks to stdout, meaning that for big generated
// files the CodeGeneratorResponse will not reach protoc completely.
// To fix this, we set stdout to block using the internal private
// method setBlocking(true)
function setBlockingStdout(): void {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access,@typescript-eslint/no-explicit-any,@typescript-eslint/no-unsafe-assignment
const stdoutHandle = (process.stdout as any)._handle;
if (stdoutHandle !== undefined) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-call,@typescript-eslint/no-unsafe-member-access
stdoutHandle.setBlocking(true);
// Writes length-prefixed messages to a stream.
async function writeMessages(
stream: Writable,
messages: AsyncIterable<Message> | (() => AsyncIterable<Message>),
) {
const input = typeof messages == "function" ? messages() : messages;
for await (const message of input) {
const bytes = message.toBinary();
await new Promise<void>((resolve, reject) => {
const lengthBytes = new Uint8Array(4);
new DataView(lengthBytes.buffer).setInt32(0, bytes.length, true);
stream.write(lengthBytes, (err) => (err ? reject(err) : resolve()));
});
await new Promise<void>((resolve, reject) => {
stream.write(bytes, (err) => (err ? reject(err) : resolve()));
});
}
}

main();
3 changes: 1 addition & 2 deletions packages/upstream-protobuf/bin/conformance_test_runner.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,5 @@ upstream
});
})
.catch((reason) => {
stderr.write(String(reason) + "\n");
exit(1);
stderr.write(String(reason) + "\n", () => exit(1));
});
3 changes: 1 addition & 2 deletions packages/upstream-protobuf/bin/protoc.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,5 @@ upstream
});
})
.catch((reason) => {
stderr.write(String(reason) + "\n");
exit(1);
stderr.write(String(reason) + "\n", () => exit(1));
});
7 changes: 3 additions & 4 deletions packages/upstream-protobuf/bin/upstream-files.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ async function main(args) {
protoInclude = await upstream.getTestProtoInclude();
break;
default:
exitUsage();
return exitUsage();
}
stdout.write(protoInclude.files.join(" "));
}

/**
* @return never
* @return void
*/
function exitUsage() {
stderr.write(`USAGE: upstream-files wkt|conformance|test\n`);
exit(1);
stderr.write(`USAGE: upstream-files wkt|conformance|test\n`, () => exit(1));
}
7 changes: 3 additions & 4 deletions packages/upstream-protobuf/bin/upstream-include.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ async function main(args) {
protoInclude = await upstream.getTestProtoInclude();
break;
default:
exitUsage();
return exitUsage();
}
stdout.write(protoInclude.dir);
}

/**
* @return never
* @return void
*/
function exitUsage() {
stderr.write(`USAGE: upstream-include wkt|conformance|test\n`);
exit(1);
stderr.write(`USAGE: upstream-include wkt|conformance|test\n`, () => exit(1));
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async function main(args) {
allowPositionals: true,
}));
} catch {
exitUsage();
return exitUsage();
}
const upstream = new UpstreamProtobuf();
const defaults = await upstream.getFeatureSetDefaults(min, max);
Expand All @@ -47,8 +47,10 @@ async function main(args) {
const content = readFileSync(path, "utf-8");
const r = inject(content, ` "${defaults.toString("base64url")}" `);
if (!r.ok) {
stderr.write(`Error injecting into ${path}: ${r.message}\n`);
exit(1);
stderr.write(`Error injecting into ${path}: ${r.message}\n`, () =>
exit(1),
);
return;
}
if (r.newContent === content) {
stdout.write(`- ${path} - no changes\n`);
Expand Down Expand Up @@ -92,11 +94,11 @@ function inject(content, contentToInject) {
}

/**
* @return never
* @return void
*/
function exitUsage() {
stderr.write(
`USAGE: upstream-inject-feature-defaults [--min <mininum supported edition>] [--max <maximum supported edition>] <file-to-inject-into>\n`,
() => exit(1),
);
exit(1);
}
3 changes: 1 addition & 2 deletions packages/upstream-protobuf/bin/upstream-warmup.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ const upstream = new UpstreamProtobuf();
upstream.warmup().then(
() => exit(0),
(reason) => {
stderr.write(`${String(reason)}\n`);
exit(1);
stderr.write(`${String(reason)}\n`, () => exit(1));
},
);

0 comments on commit 47c76a7

Please sign in to comment.