From 55319fe79804b12c0b88849353307f37230204e9 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 19 Apr 2016 14:46:53 -0400 Subject: [PATCH] stream_base: expose `bytesRead` getter This will provide `bytesRead` data on consumed sockets. Fix: #3021 PR-URL: https://github.com/nodejs/node/pull/6284 Reviewed-By: Ben Noordhuis --- lib/net.js | 18 ++++++++++---- src/env.h | 1 + src/stream_base-inl.h | 17 +++++++++++++ src/stream_base.h | 13 ++++++++-- test/parallel/test-net-bytes-read.js | 37 ++++++++++++++++++++++++++++ 5 files changed, 79 insertions(+), 7 deletions(-) create mode 100644 test/parallel/test-net-bytes-read.js diff --git a/lib/net.js b/lib/net.js index 020f9bd4573f4d..6b4553b6092092 100644 --- a/lib/net.js +++ b/lib/net.js @@ -97,7 +97,6 @@ exports._normalizeConnectArgs = normalizeConnectArgs; // called when creating new Socket, or when re-using a closed Socket function initSocketHandle(self) { self.destroyed = false; - self.bytesRead = 0; self._bytesDispatched = 0; self._sockname = null; @@ -112,6 +111,10 @@ function initSocketHandle(self) { } } + +const BYTES_READ = Symbol('bytesRead'); + + function Socket(options) { if (!(this instanceof Socket)) return new Socket(options); @@ -179,6 +182,9 @@ function Socket(options) { // Reserve properties this.server = null; this._server = null; + + // Used after `.destroy()` + this[BYTES_READ] = 0; } util.inherits(Socket, stream.Duplex); @@ -472,6 +478,9 @@ Socket.prototype._destroy = function(exception, cb) { if (this !== process.stderr) debug('close handle'); var isException = exception ? true : false; + // `bytesRead` should be accessible after `.destroy()` + this[BYTES_READ] = this._handle.bytesRead; + this._handle.close(function() { debug('emit close'); self.emit('close', isException); @@ -523,10 +532,6 @@ function onread(nread, buffer) { // will prevent this from being called again until _read() gets // called again. - // if it's not enough data, we'll just call handle.readStart() - // again right away. - self.bytesRead += nread; - // Optimization: emit the original buffer with end points var ret = self.push(buffer); @@ -582,6 +587,9 @@ Socket.prototype._getpeername = function() { return this._peername; }; +Socket.prototype.__defineGetter__('bytesRead', function() { + return this._handle ? this._handle.bytesRead : this[BYTES_READ]; +}); Socket.prototype.__defineGetter__('remoteAddress', function() { return this._getpeername().address; diff --git a/src/env.h b/src/env.h index ea5e8fea6edf62..1531d9911e310b 100644 --- a/src/env.h +++ b/src/env.h @@ -54,6 +54,7 @@ namespace node { V(buffer_string, "buffer") \ V(bytes_string, "bytes") \ V(bytes_parsed_string, "bytesParsed") \ + V(bytes_read_string, "bytesRead") \ V(callback_string, "callback") \ V(change_string, "change") \ V(oncertcb_string, "oncertcb") \ diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 81114a265e9a08..099e105334cedf 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -43,6 +43,13 @@ void StreamBase::AddMethods(Environment* env, v8::DEFAULT, attributes); + t->InstanceTemplate()->SetAccessor(env->bytes_read_string(), + GetBytesRead, + nullptr, + env->as_external(), + v8::DEFAULT, + attributes); + env->SetProtoMethod(t, "readStart", JSMethod); env->SetProtoMethod(t, "readStop", JSMethod); if ((flags & kFlagNoShutdown) == 0) @@ -79,6 +86,16 @@ void StreamBase::GetFD(Local key, } +template +void StreamBase::GetBytesRead(Local key, + const PropertyCallbackInfo& args) { + StreamBase* wrap = Unwrap(args.Holder()); + + // uint64_t -> double. 53bits is enough for all real cases. + args.GetReturnValue().Set(static_cast(wrap->bytes_read_)); +} + + template void StreamBase::GetExternal(Local key, const PropertyCallbackInfo& args) { diff --git a/src/stream_base.h b/src/stream_base.h index fad2ddd2e086f0..e722a208a8af68 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -136,7 +136,7 @@ class StreamResource { uv_handle_type pending, void* ctx); - StreamResource() { + StreamResource() : bytes_read_(0) { } virtual ~StreamResource() = default; @@ -160,9 +160,11 @@ class StreamResource { alloc_cb_.fn(size, buf, alloc_cb_.ctx); } - inline void OnRead(size_t nread, + inline void OnRead(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending = UV_UNKNOWN_HANDLE) { + if (nread > 0) + bytes_read_ += static_cast(nread); if (!read_cb_.is_empty()) read_cb_.fn(nread, buf, pending, read_cb_.ctx); } @@ -182,6 +184,9 @@ class StreamResource { Callback after_write_cb_; Callback alloc_cb_; Callback read_cb_; + uint64_t bytes_read_; + + friend class StreamBase; }; class StreamBase : public StreamResource { @@ -249,6 +254,10 @@ class StreamBase : public StreamResource { static void GetExternal(v8::Local key, const v8::PropertyCallbackInfo& args); + template + static void GetBytesRead(v8::Local key, + const v8::PropertyCallbackInfo& args); + template & args)> diff --git a/test/parallel/test-net-bytes-read.js b/test/parallel/test-net-bytes-read.js new file mode 100644 index 00000000000000..5473ca96ca2cb6 --- /dev/null +++ b/test/parallel/test-net-bytes-read.js @@ -0,0 +1,37 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const net = require('net'); + +const big = Buffer(1024 * 1024); + +const server = net.createServer((socket) => { + socket.end(big); + server.close(); +}).listen(common.PORT, () => { + let prev = 0; + + function checkRaise(value) { + assert(value > prev); + prev = value; + } + + const socket = net.connect(common.PORT, () => { + socket.on('data', (chunk) => { + checkRaise(socket.bytesRead); + }); + + socket.on('end', common.mustCall(() => { + assert.equal(socket.bytesRead, prev); + assert.equal(big.length, prev); + })); + + socket.on('close', common.mustCall(() => { + assert(!socket._handle); + assert.equal(socket.bytesRead, prev); + assert.equal(big.length, prev); + })); + }); + socket.end(); +});