Skip to content

Commit

Permalink
just curious
Browse files Browse the repository at this point in the history
  • Loading branch information
kevmoo committed Dec 14, 2024
1 parent 116eb1f commit 3df98f8
Show file tree
Hide file tree
Showing 19 changed files with 402 additions and 305 deletions.
19 changes: 11 additions & 8 deletions pkgs/bazel_worker/benchmark/benchmark.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ Future<void> main() async {
var path = 'blaze-bin/some/path/to/a/file/that/is/an/input/$i';
workRequest
..arguments.add('--input=$path')
..inputs.add(Input(
path: '',
digest: List.filled(70, 0x11),
));
..inputs.add(Input(path: '', digest: List.filled(70, 0x11)));
}

// Serialize it.
Expand All @@ -24,14 +21,20 @@ Future<void> main() async {
print('Request has $length requestBytes.');

// Add the length in front base 128 encoded as in the worker protocol.
requestBytes =
Uint8List.fromList(requestBytes.toList()..insertAll(0, _varInt(length)));
requestBytes = Uint8List.fromList(
requestBytes.toList()..insertAll(0, _varInt(length)),
);

// Split into 10000 byte chunks.
var lists = <Uint8List>[];
for (var i = 0; i < requestBytes.length; i += 10000) {
lists.add(Uint8List.sublistView(
requestBytes, i, min(i + 10000, requestBytes.length)));
lists.add(
Uint8List.sublistView(
requestBytes,
i,
min(i + 10000, requestBytes.length),
),
);
}

// Time `AsyncMessageGrouper` and deserialization.
Expand Down
5 changes: 4 additions & 1 deletion pkgs/bazel_worker/e2e_test/bin/async_worker_in_isolate.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import 'package:e2e_test/forwards_to_isolate_async_worker.dart';
Future main(List<String> args, [SendPort? message]) async {
var receivePort = ReceivePort();
await Isolate.spawnUri(
Uri.file('async_worker.dart'), [], receivePort.sendPort);
Uri.file('async_worker.dart'),
[],
receivePort.sendPort,
);

var worker = await ForwardsToIsolateAsyncWorker.create(receivePort);
await worker.run();
Expand Down
7 changes: 2 additions & 5 deletions pkgs/bazel_worker/e2e_test/lib/async_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@ import 'package:bazel_worker/bazel_worker.dart';
class ExampleAsyncWorker extends AsyncWorkerLoop {
/// Set [sendPort] to run in an isolate.
ExampleAsyncWorker([SendPort? sendPort])
: super(connection: AsyncWorkerConnection(sendPort: sendPort));
: super(connection: AsyncWorkerConnection(sendPort: sendPort));

@override
Future<WorkResponse> performRequest(WorkRequest request) async {
return WorkResponse(
exitCode: 0,
output: request.arguments.join('\n'),
);
return WorkResponse(exitCode: 0, output: request.arguments.join('\n'));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ class ForwardsToIsolateAsyncWorker extends AsyncWorkerLoop {
final IsolateDriverConnection _isolateDriverConnection;

static Future<ForwardsToIsolateAsyncWorker> create(
ReceivePort receivePort) async {
ReceivePort receivePort,
) async {
return ForwardsToIsolateAsyncWorker(
await IsolateDriverConnection.create(receivePort));
await IsolateDriverConnection.create(receivePort),
);
}

ForwardsToIsolateAsyncWorker(this._isolateDriverConnection);
Expand Down
2 changes: 1 addition & 1 deletion pkgs/bazel_worker/e2e_test/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: e2e_test
publish_to: none

environment:
sdk: ^3.4.0
sdk: ^3.7.0-0

dependencies:
bazel_worker:
Expand Down
18 changes: 11 additions & 7 deletions pkgs/bazel_worker/e2e_test/test/e2e_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ import 'package:test/test.dart';

void main() {
var dart = p.join(sdkPath, 'bin', 'dart');
runE2eTestForWorker('sync worker',
() => Process.start(dart, [p.join('bin', 'sync_worker.dart')]));
runE2eTestForWorker('async worker',
() => Process.start(dart, [p.join('bin', 'async_worker.dart')]));
runE2eTestForWorker(
'async worker in isolate',
() =>
Process.start(dart, [p.join('bin', 'async_worker_in_isolate.dart')]));
'sync worker',
() => Process.start(dart, [p.join('bin', 'sync_worker.dart')]),
);
runE2eTestForWorker(
'async worker',
() => Process.start(dart, [p.join('bin', 'async_worker.dart')]),
);
runE2eTestForWorker(
'async worker in isolate',
() => Process.start(dart, [p.join('bin', 'async_worker_in_isolate.dart')]),
);
}

void runE2eTestForWorker(String groupName, SpawnWorker spawnWorker) {
Expand Down
9 changes: 5 additions & 4 deletions pkgs/bazel_worker/example/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import 'package:bazel_worker/driver.dart';
void main() async {
var scratchSpace = await Directory.systemTemp.createTemp();
var driver = BazelWorkerDriver(
() => Process.start(Platform.resolvedExecutable,
[Platform.script.resolve('worker.dart').toFilePath()],
workingDirectory: scratchSpace.path),
maxWorkers: 4);
() => Process.start(Platform.resolvedExecutable, [
Platform.script.resolve('worker.dart').toFilePath(),
], workingDirectory: scratchSpace.path),
maxWorkers: 4,
);
var response = await driver.doWork(WorkRequest(arguments: ['foo']));
if (response.exitCode != EXIT_CODE_OK) {
print('Worker request failed');
Expand Down
23 changes: 14 additions & 9 deletions pkgs/bazel_worker/lib/src/async_message_grouper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ class AsyncMessageGrouper implements MessageGrouper {
int _messagePos = 0;

AsyncMessageGrouper(Stream<List<int>> inputStream)
: _inputQueue = StreamQueue(inputStream);
: _inputQueue = StreamQueue(inputStream);

/// Returns the next full message that is received, or null if none are left.
@override
Future<List<int>?> get next async {
try {
// Loop while there is data in the input buffer or the input stream.
while (
_inputBufferPos != _inputBuffer.length || await _inputQueue.hasNext) {
while (_inputBufferPos != _inputBuffer.length ||
await _inputQueue.hasNext) {
// If the input buffer is empty fill it from the input stream.
if (_inputBufferPos == _inputBuffer.length) {
_inputBuffer = await _inputQueue.next;
Expand Down Expand Up @@ -86,13 +86,18 @@ class AsyncMessageGrouper implements MessageGrouper {
// Copy as much as possible from the input buffer. Limit is the
// smaller of the remaining length to fill in the message and the
// remaining length in the buffer.
var lengthToCopy = min(_message.length - _messagePos,
_inputBuffer.length - _inputBufferPos);
var lengthToCopy = min(
_message.length - _messagePos,
_inputBuffer.length - _inputBufferPos,
);
_message.setRange(
_messagePos,
_messagePos + lengthToCopy,
_inputBuffer.sublist(
_inputBufferPos, _inputBufferPos + lengthToCopy));
_messagePos,
_messagePos + lengthToCopy,
_inputBuffer.sublist(
_inputBufferPos,
_inputBufferPos + lengthToCopy,
),
);
_messagePos += lengthToCopy;
_inputBufferPos += lengthToCopy;

Expand Down
156 changes: 85 additions & 71 deletions pkgs/bazel_worker/lib/src/driver/driver.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,25 @@ class BazelWorkerDriver {
/// Factory method that spawns a worker process.
final SpawnWorker _spawnWorker;

BazelWorkerDriver(this._spawnWorker,
{int? maxIdleWorkers, int? maxWorkers, int? maxRetries})
: _maxIdleWorkers = maxIdleWorkers ?? 4,
_maxWorkers = maxWorkers ?? 4,
_maxRetries = maxRetries ?? 4;
BazelWorkerDriver(
this._spawnWorker, {
int? maxIdleWorkers,
int? maxWorkers,
int? maxRetries,
}) : _maxIdleWorkers = maxIdleWorkers ?? 4,
_maxWorkers = maxWorkers ?? 4,
_maxRetries = maxRetries ?? 4;

/// Waits for an available worker, and then sends [WorkRequest] to it.
///
/// If [trackWork] is provided it will be invoked with a [Future] once the
/// [request] has been actually sent to the worker. This allows the caller
/// to determine when actual work is being done versus just waiting for an
/// available worker.
Future<WorkResponse> doWork(WorkRequest request,
{void Function(Future<WorkResponse?>)? trackWork}) {
Future<WorkResponse> doWork(
WorkRequest request, {
void Function(Future<WorkResponse?>)? trackWork,
}) {
var attempt = _WorkAttempt(request, trackWork: trackWork);
_workQueue.add(attempt);
_runWorkQueue();
Expand All @@ -69,9 +74,11 @@ class BazelWorkerDriver {
for (var worker in _readyWorkers.toList()) {
_killWorker(worker);
}
await Future.wait(_spawningWorkers.map((worker) async {
_killWorker(await worker);
}));
await Future.wait(
_spawningWorkers.map((worker) async {
_killWorker(await worker);
}),
);
}

/// Runs as many items in [_workQueue] as possible given the number of
Expand All @@ -88,8 +95,10 @@ class BazelWorkerDriver {
if (_workQueue.isEmpty) return;
if (_numWorkers == _maxWorkers && _idleWorkers.isEmpty) return;
if (_numWorkers > _maxWorkers) {
throw StateError('Internal error, created to many workers. Please '
'file a bug at https://github.com/dart-lang/bazel_worker/issues/new');
throw StateError(
'Internal error, created to many workers. Please '
'file a bug at https://github.com/dart-lang/bazel_worker/issues/new',
);
}

// At this point we definitely want to run a task, we just need to decide
Expand All @@ -102,29 +111,31 @@ class BazelWorkerDriver {
// work queue.
var futureWorker = _spawnWorker();
_spawningWorkers.add(futureWorker);
futureWorker.then((worker) {
_spawningWorkers.remove(futureWorker);
_readyWorkers.add(worker);
var connection = StdDriverConnection.forWorker(worker);
_workerConnections[worker] = connection;
_runWorker(worker, attempt);

// When the worker exits we should retry running the work queue in case
// there is more work to be done. This is primarily just a defensive
// thing but is cheap to do.
//
// We don't use `exitCode` because it is null for detached processes (
// which is common for workers).
connection.done.then((_) {
_idleWorkers.remove(worker);
_readyWorkers.remove(worker);
_runWorkQueue();
});
}).onError<Object>((e, s) {
_spawningWorkers.remove(futureWorker);
if (attempt.responseCompleter.isCompleted) return;
attempt.responseCompleter.completeError(e, s);
});
futureWorker
.then((worker) {
_spawningWorkers.remove(futureWorker);
_readyWorkers.add(worker);
var connection = StdDriverConnection.forWorker(worker);
_workerConnections[worker] = connection;
_runWorker(worker, attempt);

// When the worker exits we should retry running the work queue in case
// there is more work to be done. This is primarily just a defensive
// thing but is cheap to do.
//
// We don't use `exitCode` because it is null for detached processes (
// which is common for workers).
connection.done.then((_) {
_idleWorkers.remove(worker);
_readyWorkers.remove(worker);
_runWorkQueue();
});
})
.onError<Object>((e, s) {
_spawningWorkers.remove(futureWorker);
if (attempt.responseCompleter.isCompleted) return;
attempt.responseCompleter.completeError(e, s);
});
}
// Recursively calls itself until one of the bail out conditions are met.
_runWorkQueue();
Expand All @@ -137,48 +148,51 @@ class BazelWorkerDriver {
void _runWorker(Process worker, _WorkAttempt attempt) {
var rescheduled = false;

runZonedGuarded(() async {
var connection = _workerConnections[worker]!;
runZonedGuarded(
() async {
var connection = _workerConnections[worker]!;

connection.writeRequest(attempt.request);
var responseFuture = connection.readResponse();
if (attempt.trackWork != null) {
attempt.trackWork!(responseFuture);
}
var response = await responseFuture;

// It is possible for us to complete with an error response due to an
// unhandled async error before we get here.
if (!attempt.responseCompleter.isCompleted) {
if (response.exitCode == EXIT_CODE_BROKEN_PIPE) {
connection.writeRequest(attempt.request);
var responseFuture = connection.readResponse();
if (attempt.trackWork != null) {
attempt.trackWork!(responseFuture);
}
var response = await responseFuture;

// It is possible for us to complete with an error response due to an
// unhandled async error before we get here.
if (!attempt.responseCompleter.isCompleted) {
if (response.exitCode == EXIT_CODE_BROKEN_PIPE) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
stderr.writeln('Failed to run request ${attempt.request}');
response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output:
'Invalid response from worker, this probably means it wrote '
'invalid output or died.',
);
}
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
},
(e, s) {
// Note that we don't need to do additional cleanup here on failures. If
// the worker dies that is already handled in a generic fashion, we just
// need to make sure we complete with a valid response.
if (!attempt.responseCompleter.isCompleted) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
stderr.writeln('Failed to run request ${attempt.request}');
response = WorkResponse(
var response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output:
'Invalid response from worker, this probably means it wrote '
'invalid output or died.',
output: 'Error running worker:\n$e\n$s',
);
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
}, (e, s) {
// Note that we don't need to do additional cleanup here on failures. If
// the worker dies that is already handled in a generic fashion, we just
// need to make sure we complete with a valid response.
if (!attempt.responseCompleter.isCompleted) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
var response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output: 'Error running worker:\n$e\n$s',
);
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
});
},
);
}

/// Performs post-work cleanup for [worker].
Expand Down
13 changes: 8 additions & 5 deletions pkgs/bazel_worker/lib/src/driver/driver_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ class StdDriverConnection implements DriverConnection {

Future<void> get done => _messageGrouper.done;

StdDriverConnection(
{Stream<List<int>>? inputStream, StreamSink<List<int>>? outputStream})
: _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
_outputStream = outputStream ?? stdout;
StdDriverConnection({
Stream<List<int>>? inputStream,
StreamSink<List<int>>? outputStream,
}) : _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
_outputStream = outputStream ?? stdout;

factory StdDriverConnection.forWorker(Process worker) => StdDriverConnection(
inputStream: worker.stdout, outputStream: worker.stdin);
inputStream: worker.stdout,
outputStream: worker.stdin,
);

/// Note: This will attempts to recover from invalid proto messages by parsing
/// them as strings. This is a common error case for workers (they print a
Expand Down
Loading

0 comments on commit 3df98f8

Please sign in to comment.