Skip to content

Commit

Permalink
Fix: Added streaming implementation for web fetch adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Binozo committed Feb 7, 2025
1 parent 1fc19e0 commit 3cb934e
Showing 1 changed file with 86 additions and 44 deletions.
130 changes: 86 additions & 44 deletions plugins/web_adapter/lib/src/adapter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ class BrowserHttpClientAdapter implements HttpClientAdapter {
receiveStopwatch = Stopwatch();
}

final Map<String, List<String>> headers = {};
final _IterableHeaders responseHeaders = response.headers as _IterableHeaders;
responseHeaders.forEach(
(String value, String header, [JSAny? _]) {
headers[header.toLowerCase()] = [value];
}.toJS,
);

final BytesBuilder receivedBody = BytesBuilder();
final int totalResponseLength = int.tryParse(
response.headers.get(Headers.contentLengthHeader) ?? '-1',
Expand All @@ -187,57 +195,91 @@ class BrowserHttpClientAdapter implements HttpClientAdapter {
receiveStopwatch?.start();

final web.ReadableStreamDefaultReader reader = response.body!.getReader() as web.ReadableStreamDefaultReader;
StreamController<Uint8List>? dataStreamController;
if (options.responseType == ResponseType.stream) {
dataStreamController = StreamController(
onCancel: () {
// Abort
abortController.abort();
},
);
}

int totalRead = 0;
while (true) {
final web.ReadableStreamReadResult chunk = await reader.read().toDart;
if (chunk.done) {
break;
Future readResponse() async {
int totalRead = 0;
while (true) {
final web.ReadableStreamReadResult chunk = await reader.read().toDart;
if (chunk.done) {
dataStreamController?.close();
break;
}

if (receiveStopwatch != null && receiveStopwatch.elapsed > receiveTimeout) {
receiveStopwatch.stop();
abortController.abort();

completer.completeError(
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
StackTrace.current,
);
}

// https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader/read#examples
final Uint8List payload = (chunk.value as JSUint8Array).toDart;
totalRead += payload.lengthInBytes;
if (options.responseType == ResponseType.stream) {
dataStreamController!.add(payload);
} else {
receivedBody.add(payload);
}
if (options.onReceiveProgress != null) {
options.onReceiveProgress!(totalRead, totalResponseLength);
}
}
}

if (receiveStopwatch != null && receiveStopwatch.elapsed > receiveTimeout) {
receiveStopwatch.stop();
abortController.abort();

completer.completeError(
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
StackTrace.current,
);
}
if (options.responseType == ResponseType.stream) {
readResponse();

// https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader/read#examples
final Uint8List payload = (chunk.value as JSUint8Array).toDart;
totalRead += payload.lengthInBytes;
receivedBody.add(payload);
if (options.onReceiveProgress != null) {
options.onReceiveProgress!(totalRead, totalResponseLength);
}
completer.complete(
ResponseBody(
dataStreamController!.stream,
response.status,
statusMessage: response.statusText,
headers: headers,
isRedirect: response.redirected,
),
);
} else {
await readResponse();

completer.complete(
ResponseBody.fromBytes(
receivedBody.toBytes(),
response.status,
statusMessage: response.statusText,
headers: headers,
isRedirect: response.redirected,
),
);
}
} else {
// No response data

completer.complete(
ResponseBody.fromBytes(
Uint8List(0),
response.status,
statusMessage: response.statusText,
headers: headers,
isRedirect: response.redirected,
),
);
}

// Done

final Map<String, List<String>> headers = {};
final _IterableHeaders responseHeaders = response.headers as _IterableHeaders;
responseHeaders.forEach(
(String value, String header, [JSAny? _]) {
headers[header.toLowerCase()] = [value];
}.toJS,
);

completer.complete(
ResponseBody.fromBytes(
receivedBody.toBytes(),
response.status,
statusMessage: response.statusText,
headers: headers,
isRedirect: response.redirected,
),
);

return completer.future.whenComplete(() {
_abortables.remove(abortController);
});
Expand Down

0 comments on commit 3cb934e

Please sign in to comment.