Skip to content

Commit

Permalink
src: improve StreamBase write throughput
Browse files Browse the repository at this point in the history
Improve performance by transferring information about write status
to JS through an `AliasedBuffer`, rather than object properties
set from C++.

PR-URL: #23843
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
  • Loading branch information
addaleax authored and targos committed Oct 28, 2018
1 parent 0a23538 commit f01518e
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 37 deletions.
2 changes: 1 addition & 1 deletion benchmark/net/net-c2s.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const net = require('net');
const PORT = common.PORT;

const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16],
len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'],
dur: [5],
});
Expand Down
2 changes: 1 addition & 1 deletion benchmark/net/net-pipe.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const net = require('net');
const PORT = common.PORT;

const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16],
len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'],
dur: [5],
});
Expand Down
2 changes: 1 addition & 1 deletion benchmark/net/net-s2c.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const common = require('../common.js');
const PORT = common.PORT;

const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16],
len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'],
dur: [5]
});
Expand Down
2 changes: 1 addition & 1 deletion benchmark/net/net-wrap-js-stream-passthrough.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const common = require('../common.js');
const { PassThrough } = require('stream');

const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16],
len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'],
dur: [5],
}, {
Expand Down
5 changes: 3 additions & 2 deletions lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const {
WriteWrap,
kReadBytesOrError,
kArrayBufferOffset,
kLastWriteWasAsync,
streamBaseState
} = internalBinding('stream_wrap');
const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap');
Expand Down Expand Up @@ -716,10 +717,10 @@ function setupChannel(target, channel) {
}

var req = new WriteWrap();
req.async = false;

var string = JSON.stringify(message) + '\n';
var err = channel.writeUtf8String(req, string, handle);
var wasAsyncWrite = streamBaseState[kLastWriteWasAsync];

if (err === 0) {
if (handle) {
Expand All @@ -729,7 +730,7 @@ function setupChannel(target, channel) {
obj.postSend(message, handle, options, callback, target);
}

if (req.async) {
if (wasAsyncWrite) {
req.oncomplete = function() {
control.unref();
if (typeof callback === 'function')
Expand Down
22 changes: 20 additions & 2 deletions lib/internal/stream_base_commons.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const {
WriteWrap,
kReadBytesOrError,
kArrayBufferOffset,
kBytesWritten,
kLastWriteWasAsync,
streamBaseState
} = internalBinding('stream_wrap');
const { UV_EOF } = internalBinding('uv');
Expand All @@ -20,7 +22,12 @@ function handleWriteReq(req, data, encoding) {

switch (encoding) {
case 'buffer':
return handle.writeBuffer(req, data);
{
const ret = handle.writeBuffer(req, data);
if (streamBaseState[kLastWriteWasAsync])
req.buffer = data;
return ret;
}
case 'latin1':
case 'binary':
return handle.writeLatin1String(req, data);
Expand All @@ -35,7 +42,13 @@ function handleWriteReq(req, data, encoding) {
case 'utf-16le':
return handle.writeUcs2String(req, data);
default:
return handle.writeBuffer(req, Buffer.from(data, encoding));
{
const buffer = Buffer.from(data, encoding);
const ret = handle.writeBuffer(req, buffer);
if (streamBaseState[kLastWriteWasAsync])
req.buffer = buffer;
return ret;
}
}
}

Expand All @@ -45,6 +58,8 @@ function createWriteWrap(handle, oncomplete) {
req.handle = handle;
req.oncomplete = oncomplete;
req.async = false;
req.bytes = 0;
req.buffer = null;

return req;
}
Expand Down Expand Up @@ -80,6 +95,9 @@ function writeGeneric(self, req, data, encoding, cb) {
}

function afterWriteDispatched(self, req, err, cb) {
req.bytes = streamBaseState[kBytesWritten];
req.async = !!streamBaseState[kLastWriteWasAsync];

if (err !== 0)
return self.destroy(errnoException(err, 'write', req.error), cb);

Expand Down
2 changes: 0 additions & 2 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,8 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2;
V(address_string, "address") \
V(aliases_string, "aliases") \
V(args_string, "args") \
V(async, "async") \
V(async_ids_stack_string, "async_ids_stack") \
V(buffer_string, "buffer") \
V(bytes_string, "bytes") \
V(bytes_parsed_string, "bytesParsed") \
V(bytes_read_string, "bytesRead") \
V(bytes_written_string, "bytesWritten") \
Expand Down
33 changes: 7 additions & 26 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ namespace node {

using v8::Array;
using v8::ArrayBuffer;
using v8::Boolean;
using v8::Context;
using v8::FunctionCallbackInfo;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
using v8::Number;
using v8::Object;
using v8::String;
using v8::Value;
Expand Down Expand Up @@ -56,18 +54,9 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
return Shutdown(req_wrap_obj);
}

inline void SetWriteResultPropertiesOnWrapObject(
Environment* env,
Local<Object> req_wrap_obj,
const StreamWriteResult& res) {
req_wrap_obj->Set(
env->context(),
env->bytes_string(),
Number::New(env->isolate(), res.bytes)).FromJust();
req_wrap_obj->Set(
env->context(),
env->async(),
Boolean::New(env->isolate(), res.async)).FromJust();
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
env_->stream_base_state()[kBytesWritten] = res.bytes;
env_->stream_base_state()[kLastWriteWasAsync] = res.async;
}

int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
Expand Down Expand Up @@ -160,7 +149,7 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
}

StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
SetWriteResult(res);
if (res.wrap != nullptr && storage_size > 0) {
res.wrap->SetAllocatedStorage(storage.release(), storage_size);
}
Expand All @@ -185,10 +174,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
buf.len = Buffer::Length(args[1]);

StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);

if (res.async)
req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust();
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
SetWriteResult(res);

return res.err;
}
Expand Down Expand Up @@ -247,12 +233,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {

// Immediate failure or success
if (err != 0 || count == 0) {
req_wrap_obj->Set(env->context(), env->async(), False(env->isolate()))
.FromJust();
req_wrap_obj->Set(env->context(),
env->bytes_string(),
Integer::NewFromUnsigned(env->isolate(), data_size))
.FromJust();
SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
return err;
}

Expand Down Expand Up @@ -295,7 +276,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
res.bytes += synchronously_written;

SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
SetWriteResult(res);
if (res.wrap != nullptr) {
res.wrap->SetAllocatedStorage(data.release(), data_size);
}
Expand Down
4 changes: 4 additions & 0 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,17 @@ class StreamBase : public StreamResource {
enum StreamBaseStateFields {
kReadBytesOrError,
kArrayBufferOffset,
kBytesWritten,
kLastWriteWasAsync,
kNumStreamBaseStateFields
};

private:
Environment* env_;
EmitToJSStreamListener default_listener_;

void SetWriteResult(const StreamWriteResult& res);

friend class WriteWrap;
friend class ShutdownWrap;
friend class Environment; // For kNumStreamBaseStateFields.
Expand Down
2 changes: 2 additions & 0 deletions src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ void LibuvStreamWrap::Initialize(Local<Object> target,

NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
NODE_DEFINE_CONSTANT(target, kBytesWritten);
NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
env->stream_base_state().GetJSArray()).FromJust();
}
Expand Down
2 changes: 1 addition & 1 deletion test/sequential/test-async-wrap-getasyncid.js
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ if (common.hasCrypto) { // eslint-disable-line node-core/crypto-check
const err = handle.writeLatin1String(wreq, 'hi'.repeat(100000));
if (err)
throw new Error(`write failed: ${getSystemErrorName(err)}`);
if (!wreq.async) {
if (!stream_wrap.streamBaseState[stream_wrap.kLastWriteWasAsync]) {
testUninitialized(wreq, 'WriteWrap');
// Synchronous finish. Write more data until we hit an
// asynchronous write.
Expand Down

0 comments on commit f01518e

Please sign in to comment.