diff --git a/benchmark/dgram/array-vs-concat.js b/benchmark/dgram/array-vs-concat.js index 669cf47df4..d260a48063 100644 --- a/benchmark/dgram/array-vs-concat.js +++ b/benchmark/dgram/array-vs-concat.js @@ -29,17 +29,25 @@ function main({ dur, len, num, type, chunks }) { function onsendConcat() { if (sent++ % num === 0) { - for (var i = 0; i < num; i++) { - socket.send(Buffer.concat(chunk), PORT, '127.0.0.1', onsend); - } + // The setImmediate() is necessary to have event loop progress on OSes + // that only perform synchronous I/O on nonblocking UDP sockets. + setImmediate(() => { + for (var i = 0; i < num; i++) { + socket.send(Buffer.concat(chunk), PORT, '127.0.0.1', onsend); + } + }); } } function onsendMulti() { if (sent++ % num === 0) { - for (var i = 0; i < num; i++) { - socket.send(chunk, PORT, '127.0.0.1', onsend); - } + // The setImmediate() is necessary to have event loop progress on OSes + // that only perform synchronous I/O on nonblocking UDP sockets. + setImmediate(() => { + for (var i = 0; i < num; i++) { + socket.send(chunk, PORT, '127.0.0.1', onsend); + } + }); } } diff --git a/benchmark/dgram/multi-buffer.js b/benchmark/dgram/multi-buffer.js index a1c50551b8..7b69a82255 100644 --- a/benchmark/dgram/multi-buffer.js +++ b/benchmark/dgram/multi-buffer.js @@ -27,9 +27,13 @@ function main({ dur, len, num, type, chunks }) { function onsend() { if (sent++ % num === 0) { - for (var i = 0; i < num; i++) { - socket.send(chunk, PORT, '127.0.0.1', onsend); - } + // The setImmediate() is necessary to have event loop progress on OSes + // that only perform synchronous I/O on nonblocking UDP sockets. + setImmediate(() => { + for (var i = 0; i < num; i++) { + socket.send(chunk, PORT, '127.0.0.1', onsend); + } + }); } } diff --git a/benchmark/dgram/offset-length.js b/benchmark/dgram/offset-length.js index 7c672acae2..696fa6a7a0 100644 --- a/benchmark/dgram/offset-length.js +++ b/benchmark/dgram/offset-length.js @@ -23,9 +23,13 @@ function main({ dur, len, num, type }) { function onsend() { if (sent++ % num === 0) { - for (var i = 0; i < num; i++) { - socket.send(chunk, 0, chunk.length, PORT, '127.0.0.1', onsend); - } + // The setImmediate() is necessary to have event loop progress on OSes + // that only perform synchronous I/O on nonblocking UDP sockets. + setImmediate(() => { + for (var i = 0; i < num; i++) { + socket.send(chunk, 0, chunk.length, PORT, '127.0.0.1', onsend); + } + }); } } diff --git a/benchmark/dgram/single-buffer.js b/benchmark/dgram/single-buffer.js index d183b9cd1d..5c95b17887 100644 --- a/benchmark/dgram/single-buffer.js +++ b/benchmark/dgram/single-buffer.js @@ -23,9 +23,13 @@ function main({ dur, len, num, type }) { function onsend() { if (sent++ % num === 0) { - for (var i = 0; i < num; i++) { - socket.send(chunk, PORT, '127.0.0.1', onsend); - } + // The setImmediate() is necessary to have event loop progress on OSes + // that only perform synchronous I/O on nonblocking UDP sockets. + setImmediate(() => { + for (var i = 0; i < num; i++) { + socket.send(chunk, PORT, '127.0.0.1', onsend); + } + }); } } diff --git a/lib/dgram.js b/lib/dgram.js index 923463cc2e..d6aa613402 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -214,7 +214,9 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) { if (arguments.length && typeof arguments[arguments.length - 1] === 'function') this.once('listening', arguments[arguments.length - 1]); - if (port instanceof UDP) { + if (port !== null && + typeof port === 'object' && + typeof port.recvStart === 'function') { replaceHandle(this, port); startListening(this); return this; @@ -666,6 +668,14 @@ function doSend(ex, self, ip, list, address, port, callback) { else err = state.handle.send(req, list, list.length, !!callback); + if (err >= 1) { + // Synchronous finish. The return code is msg_length + 1 so that we can + // distinguish between synchronous success and asynchronous success. + if (callback) + process.nextTick(callback, null, err - 1); + return; + } + if (err && callback) { // Don't emit as error, dgram_legacy.js compatibility const ex = exceptionWithHostPort(err, 'send', address, port); diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index c12dbd156e..74573ff640 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -40,6 +40,15 @@ const { owner_symbol, }, } = require('internal/async_hooks'); +const dgram = require('dgram'); +const internalDgram = require('internal/dgram'); + +const { + constants: { + UV_UDP_IPV6ONLY, + UV_UDP_REUSEADDR, + } +} = internalBinding('udp_wrap'); const { writeGeneric, @@ -61,7 +70,6 @@ const { const { codes: { ERR_INVALID_ARG_TYPE, - ERR_INVALID_ARG_VALUE, ERR_INVALID_OPT_VALUE, ERR_INVALID_CALLBACK, ERR_OUT_OF_RANGE, @@ -97,8 +105,6 @@ const { constants: { AF_INET, AF_INET6, - UV_UDP_IPV6ONLY, - UV_UDP_REUSEADDR, NGTCP2_ALPN_H3, NGTCP2_MAX_CIDLEN, NGTCP2_MIN_CIDLEN, @@ -171,6 +177,7 @@ const kSetSocket = Symbol('kSetSocket'); const kStreamClose = Symbol('kStreamClose'); const kStreamReset = Symbol('kStreamReset'); const kTrackWriteState = Symbol('kTrackWriteState'); +const kUDPHandleForTesting = Symbol('kUDPHandleForTesting'); const kVersionNegotiation = Symbol('kVersionNegotiation'); const kWriteGeneric = Symbol('kWriteGeneric'); @@ -228,11 +235,6 @@ function setTransportParams(config) { sessionConfig[IDX_QUIC_SESSION_CONFIG_COUNT] = flags; } -// Called when the socket has been bound and is ready for use -function onSocketReady(fd) { - this[owner_symbol][kReady](fd); -} - // Called when the socket is closed function onSocketClose() { this[owner_symbol].destroy(); @@ -443,7 +445,6 @@ function onSessionSilentClose(statelessReset, code, family) { // Register the callbacks with the QUIC internal binding. setCallbacks({ - onSocketReady, onSocketClose, onSocketError, onSocketServerBusy, @@ -627,6 +628,7 @@ class QuicSocket extends EventEmitter { #type = undefined; #alpn = undefined; #stats = undefined; + #udpSocket = undefined; constructor(options = {}) { const { @@ -674,11 +676,20 @@ class QuicSocket extends EventEmitter { const socketOptions = (validateAddress ? QUICSOCKET_OPTIONS_VALIDATE_ADDRESS : 0) | (validateAddressLRU ? QUICSOCKET_OPTIONS_VALIDATE_ADDRESS_LRU : 0); + this.#udpSocket = dgram.createSocket(type === AF_INET6 ? 'udp6' : 'udp4'); + if (typeof options[kUDPHandleForTesting] === 'object') { + this.#udpSocket.bind(options[kUDPHandleForTesting]); + this.#state = kSocketBound; + process.nextTick(() => this[kReady](undefined)); + } + const udpHandle = this.#udpSocket[internalDgram.kStateSymbol].handle; const handle = new QuicSocketHandle( + udpHandle, socketOptions, retryTokenTimeout, maxConnectionsPerHost); + udpHandle.quicSocket = handle; handle[owner_symbol] = this; this[async_id_symbol] = handle.getAsyncId(); this[kSetHandle](handle); @@ -735,7 +746,8 @@ class QuicSocket extends EventEmitter { const flags = (this.#reuseAddr ? UV_UDP_REUSEADDR : 0) || (this.#ipv6Only ? UV_UDP_IPV6ONLY : 0); - const ret = this[kHandle].bind(this.#type, ip, this.#port || 0, flags); + const udpHandle = this.#udpSocket[internalDgram.kStateSymbol].handle; + const ret = udpHandle.bind(ip, this.#port || 0, flags); if (ret) { this.destroy(exceptionWithHostPort(ret, 'bind', ip, this.#port || 0)); return; @@ -743,6 +755,8 @@ class QuicSocket extends EventEmitter { if (typeof callback === 'function') callback(); + + this[kReady](udpHandle.fd); } // The kReady function is called after the socket has been bound to the @@ -902,13 +916,18 @@ class QuicSocket extends EventEmitter { if (typeof callback === 'function') session.on('ready', callback); - this[kMaybeBind]( - connectAfterBind.bind( - this, - session, - this.#lookup, - address, - getSocketType(type))); + const afterBind = + connectAfterBind.bind( + this, + session, + this.#lookup, + address, + getSocketType(type)); + if (this.#state === kSocketBound) { + afterBind(); + } else { + this[kMaybeBind](afterBind); + } return session; } @@ -920,20 +939,23 @@ class QuicSocket extends EventEmitter { if (handle !== undefined) { this[kSetHandle](); handle[owner_symbol] = undefined; - handle.close((err) => { - // If an error occurs while attempting to close, it will take - // precedence over any original error specified on the args - // TODO(@jasnell): Alternatively we might set the original - // error as a property on the new error. - if (err) error = err; - - // Capture a copy of the stats as they will no longer be - // available once this function returns. - this.#stats = new BigInt64Array(handle.stats); - - if (error) process.nextTick(emit.bind(this, 'error', error)); - process.nextTick(emit.bind(this, 'close')); - }); + handle.ondone = () => { + this.#udpSocket.close((err) => { + // If an error occurs while attempting to close, it will take + // precedence over any original error specified on the args + // TODO(@jasnell): Alternatively we might set the original + // error as a property on the new error. + if (err) error = err; + + // Capture a copy of the stats as they will no longer be + // available once this function returns. + this.#stats = new BigInt64Array(handle.stats); + + if (error) process.nextTick(emit.bind(this, 'error', error)); + process.nextTick(emit.bind(this, 'close')); + }); + }; + handle.waitForPendingCallbacks(); } } @@ -1071,14 +1093,14 @@ class QuicSocket extends EventEmitter { ref() { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('ref'); - this[kHandle].ref(); + this.#udpSocket.ref(); return this; } unref() { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('unref'); - this[kHandle].unref(); + this.#udpSocket.unref(); return this; } @@ -1089,11 +1111,16 @@ class QuicSocket extends EventEmitter { get address() { const out = {}; if (this.#state !== kSocketDestroyed) { - const err = this[kHandle].getsockname(out); - // If err is returned, socket is not bound. - // Return empty object - if (err) - return {}; + try { + return this.#udpSocket.address(); + } catch (err) { + if (err.code === 'EBADF') { + // If there is an EBADF error, the socket is not bound. + // Return empty object + return {}; + } + throw err; + } } return out; } @@ -1121,85 +1148,49 @@ class QuicSocket extends EventEmitter { setTTL(ttl) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('setTTL'); - if (typeof ttl !== 'number') - throw new ERR_INVALID_ARG_TYPE('ttl', 'number', ttl); - if (ttl < 1 || ttl > 255) - throw new ERR_INVALID_ARG_VALUE('ttl', ttl); - const err = this[kHandle].setTTL(ttl); - if (err) - throw errnoException(err, 'dropMembership'); + this.#udpSocket.setTTL(ttl); return this; } setMulticastTTL(ttl) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('setMulticastTTL'); - if (typeof ttl !== 'number') - throw new ERR_INVALID_ARG_TYPE('ttl', 'number', ttl); - if (ttl < 1 || ttl > 255) - throw new ERR_INVALID_ARG_VALUE('ttl', ttl); - const err = this[kHandle].setMulticastTTL(ttl); - if (err) - throw errnoException(err, 'dropMembership'); + this.#udpSocket.setMulticastTTL(ttl); return this; } setBroadcast(on = true) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('setBroadcast'); - if (typeof on !== 'boolean') - throw new ERR_INVALID_ARG_TYPE('on', 'boolean', on); - const err = this[kHandle].setBroadcast(on); - if (err) - throw errnoException(err, 'dropMembership'); + this.#udpSocket.setBroadcast(on); return this; } setMulticastLoopback(on = true) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('setMulticastLoopback'); - if (typeof on !== 'boolean') - throw new ERR_INVALID_ARG_TYPE('on', 'boolean', on); - const err = this[kHandle].setMulticastLoopback(on); - if (err) - throw errnoException(err, 'dropMembership'); + this.#udpSocket.setMulticastLoopback(on); return this; } setMulticastInterface(iface) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('setMulticastInterface'); - if (typeof iface !== 'string') - throw new ERR_INVALID_ARG_TYPE('iface', 'string', iface); - const err = this[kHandle].setMulticastInterface(iface); - if (err) - throw errnoException(err, 'dropMembership'); + this.#udpSocket.setMulticastInterface(iface); return this; } addMembership(address, iface) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('addMembership'); - if (typeof address !== 'string') - throw new ERR_INVALID_ARG_TYPE('address', 'string', address); - if (typeof iface !== 'string') - throw new ERR_INVALID_ARG_TYPE('iface', 'string', iface); - const err = this[kHandle].addMembership(address, iface); - if (err) - throw errnoException(err, 'addMembership'); + this.#udpSocket.addMembership(address, iface); return this; } dropMembership(address, iface) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('dropMembership'); - if (typeof address !== 'string') - throw new ERR_INVALID_ARG_TYPE('address', 'string', address); - if (typeof iface !== 'string') - throw new ERR_INVALID_ARG_TYPE('iface', 'string', iface); - const err = this[kHandle].dropMembership(address, iface); - if (err) - throw errnoException(err, 'dropMembership'); + this.#udpSocket.dropMembership(address, iface); return this; } @@ -2403,7 +2394,8 @@ function createSocket(options = {}) { } module.exports = { - createSocket + createSocket, + kUDPHandleForTesting }; /* eslint-enable no-use-before-define */ diff --git a/node.gyp b/node.gyp index c5fc225e0b..241f10a850 100644 --- a/node.gyp +++ b/node.gyp @@ -524,6 +524,7 @@ 'src/js_native_api_v8.h', 'src/js_native_api_v8_internals.h', 'src/js_stream.cc', + 'src/js_udp_wrap.cc', 'src/module_wrap.cc', 'src/node.cc', 'src/node_api.cc', diff --git a/src/async_wrap.h b/src/async_wrap.h index 3e70996f09..d312e7aae7 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -51,6 +51,7 @@ namespace node { V(HTTPINCOMINGMESSAGE) \ V(HTTPCLIENTREQUEST) \ V(JSSTREAM) \ + V(JSUDPWRAP) \ V(MESSAGEPORT) \ V(PIPECONNECTWRAP) \ V(PIPESERVERWRAP) \ @@ -60,6 +61,7 @@ namespace node { V(QUERYWRAP) \ V(QUICCLIENTSESSION) \ V(QUICSERVERSESSION) \ + V(QUICSENDWRAP) \ V(QUICSOCKET) \ V(QUICSTREAM) \ V(SHUTDOWNWRAP) \ diff --git a/src/env.h b/src/env.h index dcb40f948b..ef94dfc159 100644 --- a/src/env.h +++ b/src/env.h @@ -428,7 +428,6 @@ constexpr size_t kFsStatsBufferLength = # define QUIC_ENVIRONMENT_STRONG_PERSISTENT_VALUES(V) \ V(quic_on_socket_close_function, v8::Function) \ V(quic_on_socket_error_function, v8::Function) \ - V(quic_on_socket_ready_function, v8::Function) \ V(quic_on_socket_server_busy_function, v8::Function) \ V(quic_on_session_cert_function, v8::Function) \ V(quic_on_session_client_hello_function, v8::Function) \ diff --git a/src/handle_wrap.cc b/src/handle_wrap.cc index bd9a030368..d95b176721 100644 --- a/src/handle_wrap.cc +++ b/src/handle_wrap.cc @@ -131,6 +131,7 @@ void HandleWrap::OnClose(uv_handle_t* handle) { wrap->state_ = kClosed; wrap->OnClose(); + wrap->handle_wrap_queue_.Remove(); if (!wrap->persistent().IsEmpty() && wrap->object()->Has(env->context(), env->handle_onclose_symbol()) diff --git a/src/js_udp_wrap.cc b/src/js_udp_wrap.cc new file mode 100644 index 0000000000..76d9eb39b8 --- /dev/null +++ b/src/js_udp_wrap.cc @@ -0,0 +1,223 @@ +#include "udp_wrap.h" +#include "async_wrap-inl.h" +#include "node_errors.h" + +namespace node { + +using errors::TryCatchScope; +using v8::Array; +using v8::Context; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::HandleScope; +using v8::Int32; +using v8::Local; +using v8::Object; +using v8::String; +using v8::Value; + +class JSUDPWrap final : public UDPWrapBase, public AsyncWrap { + public: + JSUDPWrap(Environment* env, Local obj); + + int RecvStart() override; + int RecvStop() override; + ssize_t Send(uv_buf_t* bufs, + size_t nbufs, + const sockaddr* addr) override; + int GetPeerName(sockaddr* name, int* namelen) override; + int GetSockName(sockaddr* name, int* namelen) override; + int GetSockaddr(sockaddr* name, int* namelen, bool peer); + AsyncWrap* GetAsyncWrap() override { return this; } + + static void New(const FunctionCallbackInfo& args); + static void EmitReceived(const FunctionCallbackInfo& args); + static void OnSendDone(const FunctionCallbackInfo& args); + static void OnAfterBind(const FunctionCallbackInfo& args); + + static void Initialize(Local target, + Local unused, + Local context, + void* priv); + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(JSUDPWrap) + SET_SELF_SIZE(JSUDPWrap) +}; + +JSUDPWrap::JSUDPWrap(Environment* env, Local obj) + : AsyncWrap(env, obj, PROVIDER_JSUDPWRAP) { + MakeWeak(); + + obj->SetAlignedPointerInInternalField( + kUDPWrapBaseField, static_cast(this)); +} + +int JSUDPWrap::RecvStart() { + HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + TryCatchScope try_catch(env()); + Local value; + int32_t value_int = UV_EPROTO; + if (!MakeCallback(env()->onreadstart_string(), 0, nullptr).ToLocal(&value) || + !value->Int32Value(env()->context()).To(&value_int)) { + if (try_catch.HasCaught() && !try_catch.HasTerminated()) + errors::TriggerUncaughtException(env()->isolate(), try_catch); + } + return value_int; +} + +int JSUDPWrap::RecvStop() { + HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + TryCatchScope try_catch(env()); + Local value; + int32_t value_int = UV_EPROTO; + if (!MakeCallback(env()->onreadstop_string(), 0, nullptr).ToLocal(&value) || + !value->Int32Value(env()->context()).To(&value_int)) { + if (try_catch.HasCaught() && !try_catch.HasTerminated()) + errors::TriggerUncaughtException(env()->isolate(), try_catch); + } + return value_int; +} + +ssize_t JSUDPWrap::Send(uv_buf_t* bufs, + size_t nbufs, + const sockaddr* addr) { + HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + TryCatchScope try_catch(env()); + Local value; + int64_t value_int = UV_EPROTO; + size_t total_len = 0; + + MaybeStackBuffer, 16> buffers(nbufs); + for (size_t i = 0; i < nbufs; i++) { + buffers[i] = Buffer::Copy(env(), bufs[i].base, bufs[i].len) + .ToLocalChecked(); + total_len += bufs[i].len; + } + + Local args[] = { + listener()->CreateSendWrap(total_len)->object(), + Array::New(env()->isolate(), buffers.out(), nbufs), + AddressToJS(env(), addr) + }; + + if (!MakeCallback(env()->onwrite_string(), arraysize(args), args) + .ToLocal(&value) || + !value->IntegerValue(env()->context()).To(&value_int)) { + if (try_catch.HasCaught() && !try_catch.HasTerminated()) + errors::TriggerUncaughtException(env()->isolate(), try_catch); + } + return value_int; +} + +int JSUDPWrap::GetPeerName(sockaddr* name, int* namelen) { + return GetSockaddr(name, namelen, true); +} + +int JSUDPWrap::GetSockName(sockaddr* name, int* namelen) { + return GetSockaddr(name, namelen, false); +} + +int JSUDPWrap::GetSockaddr(sockaddr* name, int* namelen, bool peer) { + // TODO(addaleax): Maybe turn this into a real JS-based method. + sockaddr_in addr_in; + CHECK_EQ(uv_ip4_addr("127.0.0.1", 1337, &addr_in), 0); + memcpy(name, &addr_in, + std::min(static_cast(*namelen), sizeof(addr_in))); + *namelen = sizeof(addr_in); + return 0; +} + +void JSUDPWrap::New(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + CHECK(args.IsConstructCall()); + new JSUDPWrap(env, args.Holder()); +} + +void JSUDPWrap::EmitReceived(const FunctionCallbackInfo& args) { + JSUDPWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + Environment* env = wrap->env(); + + ArrayBufferViewContents buffer(args[0]); + const char* data = buffer.data(); + int len = buffer.length(); + + CHECK(args[1]->IsInt32()); // family + CHECK(args[2]->IsString()); // address + CHECK(args[3]->IsInt32()); // port + CHECK(args[4]->IsInt32()); // flags + int family = args[1].As()->Value() == 4 ? AF_INET : AF_INET6; + Utf8Value address(env->isolate(), args[2]); + int port = args[3].As()->Value(); + int flags = args[3].As()->Value(); + + sockaddr_storage addr; + CHECK_EQ(sockaddr_for_family(family, *address, port, &addr), 0); + + // Repeatedly ask the stream's owner for memory, copy the data that we + // just read from JS into those buffers and emit them as reads. + while (len != 0) { + uv_buf_t buf = wrap->listener()->OnAlloc(len); + ssize_t avail = len; + if (static_cast(buf.len) < avail) + avail = buf.len; + + memcpy(buf.base, data, avail); + data += avail; + len -= avail; + wrap->listener()->OnRecv( + avail, buf, reinterpret_cast(&addr), flags); + } +} + +void JSUDPWrap::OnSendDone(const FunctionCallbackInfo& args) { + JSUDPWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + CHECK(args[0]->IsObject()); + CHECK(args[1]->IsInt32()); + ReqWrap* req_wrap; + ASSIGN_OR_RETURN_UNWRAP(&req_wrap, args[0].As()); + int status = args[1].As()->Value(); + + wrap->listener()->OnSendDone(req_wrap, status); +} + +void JSUDPWrap::OnAfterBind(const FunctionCallbackInfo& args) { + JSUDPWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + wrap->listener()->OnAfterBind(); +} + +void JSUDPWrap::Initialize(Local target, + Local unused, + Local context, + void* priv) { + Environment* env = Environment::GetCurrent(context); + + Local t = env->NewFunctionTemplate(New); + Local js_udp_wrap_string = + FIXED_ONE_BYTE_STRING(env->isolate(), "JSUDPWrap"); + t->SetClassName(js_udp_wrap_string); + t->InstanceTemplate() + ->SetInternalFieldCount(UDPWrapBase::kUDPWrapBaseField + 1); + t->Inherit(AsyncWrap::GetConstructorTemplate(env)); + + UDPWrapBase::AddMethods(env, t); + env->SetProtoMethod(t, "emitReceived", EmitReceived); + env->SetProtoMethod(t, "onSendDone", OnSendDone); + env->SetProtoMethod(t, "onAfterBind", OnAfterBind); + + target->Set(env->context(), + js_udp_wrap_string, + t->GetFunction(context).ToLocalChecked()).Check(); +} + + +} // namespace node + +NODE_MODULE_CONTEXT_AWARE_INTERNAL(js_udp_wrap, node::JSUDPWrap::Initialize) diff --git a/src/node_binding.cc b/src/node_binding.cc index 74b308383f..498f029da9 100644 --- a/src/node_binding.cc +++ b/src/node_binding.cc @@ -64,6 +64,7 @@ V(http_parser) \ V(inspector) \ V(js_stream) \ + V(js_udp_wrap) \ V(messaging) \ V(module_wrap) \ V(native_module) \ diff --git a/src/node_options.cc b/src/node_options.cc index 34efe6336a..4320493e08 100644 --- a/src/node_options.cc +++ b/src/node_options.cc @@ -449,6 +449,8 @@ EnvironmentOptionsParser::EnvironmentOptionsParser() { "write warnings to file instead of stderr", &EnvironmentOptions::redirect_warnings, kAllowedInEnvironment); + AddOption("--test-udp-no-try-send", "", // For testing only. + &EnvironmentOptions::test_udp_no_try_send); AddOption("--throw-deprecation", "throw an exception on deprecations", &EnvironmentOptions::throw_deprecation, diff --git a/src/node_options.h b/src/node_options.h index 780e669eb6..236f1aab92 100644 --- a/src/node_options.h +++ b/src/node_options.h @@ -134,6 +134,7 @@ class EnvironmentOptions : public Options { bool heap_prof = false; #endif // HAVE_INSPECTOR std::string redirect_warnings; + bool test_udp_no_try_send = false; bool throw_deprecation = false; bool trace_deprecation = false; bool trace_sync_io = false; diff --git a/src/node_quic.cc b/src/node_quic.cc index 510fa1da5e..47e029d49a 100755 --- a/src/node_quic.cc +++ b/src/node_quic.cc @@ -47,7 +47,6 @@ void QuicSetCallbacks(const FunctionCallbackInfo& args) { env->set_quic_on_##callback##_function(fn.As()); \ } while (0) - SETFUNCTION("onSocketReady", socket_ready); SETFUNCTION("onSocketClose", socket_close); SETFUNCTION("onSocketError", socket_error); SETFUNCTION("onSessionReady", session_ready); @@ -163,8 +162,6 @@ void Initialize(Local target, NODE_DEFINE_CONSTANT(constants, SSL_OP_SINGLE_ECDH_USE); NODE_DEFINE_CONSTANT(constants, TLS1_3_VERSION); NODE_DEFINE_CONSTANT(constants, UV_EBADF); - NODE_DEFINE_CONSTANT(constants, UV_UDP_IPV6ONLY); - NODE_DEFINE_CONSTANT(constants, UV_UDP_REUSEADDR); NODE_DEFINE_CONSTANT(constants, IDX_QUIC_SESSION_ACTIVE_CONNECTION_ID_LIMIT); NODE_DEFINE_CONSTANT(constants, IDX_QUIC_SESSION_MAX_STREAM_DATA_BIDI_LOCAL); diff --git a/src/node_quic_buffer.h b/src/node_quic_buffer.h index ec29699c8a..90feabdc4d 100644 --- a/src/node_quic_buffer.h +++ b/src/node_quic_buffer.h @@ -43,17 +43,14 @@ struct quic_buffer_chunk : public MemoryRetainer { size_t offset = 0; size_t roffset = 0; bool done_called = false; - v8::Global keep_alive; std::unique_ptr next; quic_buffer_chunk( MallocedBuffer&& buf_, - done_cb done_, - v8::Local keep_alive_) + done_cb done_) : quic_buffer_chunk(uv_buf_init(reinterpret_cast(buf_.data), buf_.size), - done_, - keep_alive_) { + done_) { data_buf = std::move(buf_); } @@ -61,12 +58,9 @@ struct quic_buffer_chunk : public MemoryRetainer { quic_buffer_chunk( uv_buf_t buf_, - done_cb done_, - v8::Local keep_alive_) + done_cb done_) : quic_buffer_chunk(buf_) { done = std::move(done_); - if (!keep_alive.IsEmpty()) - keep_alive.Reset(keep_alive_->GetIsolate(), keep_alive_); } ~quic_buffer_chunk() override { @@ -196,14 +190,11 @@ class QuicBuffer : public MemoryRetainer { // Push one or more uv_buf_t instances into the buffer. // the done_cb callback will be invoked when the last // uv_buf_t in the bufs array is consumed and popped out - // of the internal linked list. The keep_alive allows a reference to a - // JS object to be kept around until the final uv_buf_t - // is consumed. + // of the internal linked list. size_t Push( uv_buf_t* bufs, size_t nbufs, - done_cb done = default_quic_buffer_chunk_done, - v8::Local keep_alive = v8::Local()) { + done_cb done = default_quic_buffer_chunk_done) { size_t len = 0; if (nbufs == 0 || bufs == nullptr || IsEmptyBuffer(bufs[0])) { done(0); @@ -223,26 +214,23 @@ class QuicBuffer : public MemoryRetainer { length_ += bufs[n].len; rlength_ += bufs[n].len; len += bufs[n].len; - Push(bufs[n], done, keep_alive); + Push(bufs[n], done); return len; } // Push a single malloc buf into the buffer. // The done_cb will be invoked when the buf is consumed - // and popped out of the internal linked list. The keep_alive allows a - // reference to a JS object to be kept around until the - // final uv_buf_t is consumed. + // and popped out of the internal linked list. size_t Push( MallocedBuffer&& buffer, - done_cb done = default_quic_buffer_chunk_done, - v8::Local keep_alive = v8::Local()) { + done_cb done = default_quic_buffer_chunk_done) { if (buffer.size == 0) { done(0); return 0; } length_ += buffer.size; rlength_ += buffer.size; - Push(new quic_buffer_chunk(std::move(buffer), done, keep_alive)); + Push(new quic_buffer_chunk(std::move(buffer), done)); return buffer.size; } @@ -384,8 +372,8 @@ class QuicBuffer : public MemoryRetainer { Push(new quic_buffer_chunk(buf)); } - void Push(uv_buf_t buf, done_cb done, v8::Local keep_alive) { - Push(new quic_buffer_chunk(buf, done, keep_alive)); + void Push(uv_buf_t buf, done_cb done) { + Push(new quic_buffer_chunk(buf, done)); } bool Pop(int status = 0) { diff --git a/src/node_quic_socket.cc b/src/node_quic_socket.cc index 69a37a604b..f1f1716758 100644 --- a/src/node_quic_socket.cc +++ b/src/node_quic_socket.cc @@ -1,6 +1,7 @@ #include "async_wrap-inl.h" #include "debug_utils.h" #include "env-inl.h" +#include "memory_tracker-inl.h" #include "nghttp2/nghttp2.h" #include "node.h" #include "node_crypto.h" @@ -10,6 +11,7 @@ #include "node_quic_session-inl.h" #include "node_quic_socket.h" #include "node_quic_util.h" +#include "req_wrap-inl.h" #include "util.h" #include "uv.h" #include "v8.h" @@ -31,6 +33,7 @@ using v8::Isolate; using v8::Local; using v8::Number; using v8::Object; +using v8::ObjectTemplate; using v8::PropertyAttribute; using v8::String; using v8::Value; @@ -65,12 +68,11 @@ inline uint32_t GenerateReservedVersion( QuicSocket::QuicSocket( Environment* env, Local wrap, + Local udp_base_wrap, uint64_t retry_token_expiration, size_t max_connections_per_host, uint32_t options) : - HandleWrap(env, wrap, - reinterpret_cast(&handle_), - AsyncWrap::PROVIDER_QUICSOCKET), + AsyncWrap(env, wrap, AsyncWrap::PROVIDER_QUICSOCKET), alloc_info_(MakeAllocator()), options_(options), max_connections_per_host_(max_connections_per_host), @@ -80,7 +82,15 @@ QuicSocket::QuicSocket( env->isolate(), sizeof(socket_stats_) / sizeof(uint64_t), reinterpret_cast(&socket_stats_)) { - CHECK_EQ(uv_udp_init(env->event_loop(), &handle_), 0); + MakeWeak(); + + udp_ = static_cast( + udp_base_wrap->GetAlignedPointerFromInternalField( + UDPWrapBase::kUDPWrapBaseField)); + CHECK_NOT_NULL(udp_); + udp_->set_listener(this); + udp_strong_ptr_.reset(udp_->GetAsyncWrap()); + Debug(this, "New QuicSocket created."); EntropySource(token_secret_.data(), token_secret_.size()); @@ -147,100 +157,18 @@ void QuicSocket::AssociateCID( dcid_to_scid_.emplace(cid->ToStr(), scid->ToStr()); } -int QuicSocket::Bind( - const char* address, - uint32_t port, - uint32_t flags, - int family) { - Debug(this, - "Binding to address %s, port %d, with flags %d, and family %d", - address, port, flags, family); - - HandleScope scope(env()->isolate()); - Context::Scope context_scope(env()->context()); - - sockaddr_storage addr; - int err = SocketAddress::ToSockAddr(family, address, port, &addr); - if (err != 0) - return err; - - Local arg = Undefined(env()->isolate()); - - err = - uv_udp_bind( - &handle_, - reinterpret_cast(&addr), - flags); - if (err != 0) { - Debug(this, "Bind failed. Error %d", err); - arg = Integer::New(env()->isolate(), err); - MakeCallback(env()->quic_on_socket_error_function(), 1, &arg); - return 0; - } - - local_address_.Set(&handle_); +void QuicSocket::OnAfterBind() { + sockaddr_storage addr_buf; + sockaddr* addr = reinterpret_cast(&addr_buf); + int addrlen = sizeof(addr_buf); -#if !defined(_WIN32) - int fd = UV_EBADF; - uv_fileno(reinterpret_cast(&handle_), &fd); - if (fd != UV_EBADF) - arg = Integer::New(env()->isolate(), fd); -#endif + CHECK_EQ(udp_->GetSockName(addr, &addrlen), 0); + local_address_.Copy(addr); + Debug(this, "Socket bound"); - MakeCallback(env()->quic_on_socket_ready_function(), 1, &arg); socket_stats_.bound_at = uv_hrtime(); - return 0; } -// If there are no pending QuicSocket::SendWrap callbacks, the -// QuicSocket instance will be closed immediately and the -// close callback will be invoked. Otherwise, the QuicSocket -// will be marked as pending close and will close as soon as -// the final remaining QuicSocket::SendWrap callback is invoked. -// This design ensures that packets that have been sent down to -// the libuv level are processed even tho we are shutting down. -// -// TODO(@jasnell): We will want to implement an additional function -// that will close things down immediately, canceling any still -// pending operations. -void QuicSocket::Close(Local close_callback) { - if (!IsInitialized() || IsFlagSet(QUICSOCKET_FLAGS_PENDING_CLOSE)) - return; - SetFlag(QUICSOCKET_FLAGS_PENDING_CLOSE); - Debug(this, "Closing"); - - CHECK_EQ(false, persistent().IsEmpty()); - if (!close_callback.IsEmpty() && close_callback->IsFunction()) { - object()->Set(env()->context(), - env()->handle_onclose_symbol(), - close_callback).Check(); - } - - // Attempt to close immediately. - MaybeClose(); -} - -// A QuicSocket can close if there are no pending udp send -// callbacks and QuicSocket::Close() has been called. -void QuicSocket::MaybeClose() { - if (!IsInitialized() || - !IsFlagSet(QUICSOCKET_FLAGS_PENDING_CLOSE) || - HasPendingCallbacks()) - return; - - CHECK_EQ(false, persistent().IsEmpty()); - - Debug(this, "Closing the libuv handle"); - - // Close the libuv handle first. The OnClose handler - // will free the QuicSocket instance after it invokes - // the close callback, letting the JavaScript side know - // that the handle is being freed. - uv_close(GetHandle(), OnClose); - MarkAsClosing(); -} - - void QuicSocket::DisassociateCID(QuicCID* cid) { Debug(this, "Removing associations for cid %s", cid->ToHex().c_str()); dcid_to_scid_.erase(cid->ToStr()); @@ -276,38 +204,41 @@ void QuicSocket::StopListening() { SetFlag(QUICSOCKET_FLAGS_SERVER_LISTENING, false); } -void QuicSocket::OnAlloc( - uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf) { - QuicSocket* socket = - ContainerOf(&QuicSocket::handle_, reinterpret_cast(handle)); - *buf = socket->env()->AllocateManaged(suggested_size).release(); +void QuicSocket::WaitForPendingCallbacks() { + if (!HasPendingCallbacks()) { + Debug(this, "No pending callbacks, calling ondone immediately"); + MakeCallback(env()->ondone_string(), 0, nullptr); + return; + } + SetFlag(QUICSOCKET_FLAGS_WAITING_FOR_CALLBACKS); + Debug(this, "Waiting for pending callbacks"); +} + +uv_buf_t QuicSocket::OnAlloc(size_t suggested_size) { + return env()->AllocateManaged(suggested_size).release(); } void QuicSocket::OnRecv( - uv_udp_t* handle, ssize_t nread, - const uv_buf_t* buf_, + const uv_buf_t& buf_, const struct sockaddr* addr, unsigned int flags) { - QuicSocket* socket = ContainerOf(&QuicSocket::handle_, handle); - AllocatedBuffer buf(socket->env(), *buf_); + AllocatedBuffer buf(env(), buf_); if (nread == 0) return; if (nread < 0) { - Debug(socket, "Reading data from UDP socket failed. Error %d", nread); - Environment* env = socket->env(); + Debug(this, "Reading data from UDP socket failed. Error %d", nread); + Environment* env = this->env(); HandleScope scope(env->isolate()); Context::Scope context_scope(env->context()); Local arg = Number::New(env->isolate(), static_cast(nread)); - socket->MakeCallback(env->quic_on_socket_error_function(), 1, &arg); + MakeCallback(env->quic_on_socket_error_function(), 1, &arg); return; } - socket->Receive(nread, std::move(buf), addr, flags); + Receive(nread, std::move(buf), addr, flags); } void QuicSocket::Receive( @@ -435,14 +366,11 @@ void QuicSocket::Receive( } int QuicSocket::ReceiveStart() { - int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv); - if (err == UV_EALREADY) - err = 0; - return err; + return udp_->RecvStart(); } int QuicSocket::ReceiveStop() { - return uv_udp_recv_stop(&handle_); + return udp_->RecvStop(); } void QuicSocket::RemoveSession(QuicCID* cid, const sockaddr* addr) { @@ -494,18 +422,13 @@ void QuicSocket::SendInitialConnectionClose( &alloc_info_, nullptr); - SendWrapStack* req = - new SendWrapStack( - this, - addr, - NGTCP2_MAX_PKTLEN_IPV6, - "initial cc"); + MallocedBuffer buf(NGTCP2_MAX_PKTLEN_IPV6); ssize_t nwrite = ngtcp2_conn_write_connection_close( conn, *path, - req->buffer(), + reinterpret_cast(buf.data), NGTCP2_MAX_PKTLEN_IPV6, error_code, uv_hrtime()); @@ -514,10 +437,10 @@ void QuicSocket::SendInitialConnectionClose( // is serialized. We won't be using this one any longer. ngtcp2_conn_del(conn); - if (nwrite > 0) { - req->SetLength(nwrite); - if (req->Send() != 0) delete req; // TODO(addaleax): Better error handling? - } + if (nwrite <= 0) + return; + buf.Realloc(nwrite); + Send(addr, std::move(buf), "initial cc"); } void QuicSocket::SendVersionNegotiation( @@ -525,13 +448,6 @@ void QuicSocket::SendVersionNegotiation( QuicCID* dcid, QuicCID* scid, const sockaddr* addr) { - SendWrapStack* req = - new SendWrapStack( - this, - addr, - NGTCP2_MAX_PKTLEN_IPV6, - "version negotiation"); - std::array sv; sv[0] = GenerateReservedVersion(addr, version); sv[1] = NGTCP2_PROTO_VER; @@ -539,8 +455,9 @@ void QuicSocket::SendVersionNegotiation( uint8_t unused_random; EntropySource(&unused_random, 1); + MallocedBuffer buf(NGTCP2_MAX_PKTLEN_IPV6); ssize_t nwrite = ngtcp2_pkt_write_version_negotiation( - req->buffer(), + reinterpret_cast(buf.data), NGTCP2_MAX_PKTLEN_IPV6, unused_random, dcid->data(), @@ -549,10 +466,10 @@ void QuicSocket::SendVersionNegotiation( scid->length(), sv.data(), sv.size()); - if (nwrite < 0) + if (nwrite <= 0) return; - req->SetLength(nwrite); - if (req->Send() != 0) delete req; // TODO(addaleax): Better error handling? + buf.Realloc(nwrite); + Send(addr, std::move(buf), "version negotiation"); } bool QuicSocket::SendRetry( @@ -560,13 +477,6 @@ bool QuicSocket::SendRetry( QuicCID* dcid, QuicCID* scid, const sockaddr* addr) { - SendWrapStack* req = - new SendWrapStack( - this, - addr, - NGTCP2_MAX_PKTLEN_IPV4, - "retry"); - std::array token; size_t tokenlen = token.size(); @@ -592,9 +502,10 @@ bool QuicSocket::SendRetry( EntropySource(hd.scid.data, NGTCP2_SV_SCIDLEN); + MallocedBuffer buf(NGTCP2_MAX_PKTLEN_IPV4); ssize_t nwrite = ngtcp2_pkt_write_retry( - req->buffer(), + reinterpret_cast(buf.data), NGTCP2_MAX_PKTLEN_IPV4, &hd, **dcid, @@ -602,12 +513,8 @@ bool QuicSocket::SendRetry( tokenlen); if (nwrite <= 0) return false; - - req->SetLength(nwrite); - - int err = req->Send(); - if (err != 0) delete req; - return err == 0; + buf.Realloc(nwrite); + return Send(addr, std::move(buf), "retry") == 0; } namespace { @@ -771,39 +678,30 @@ void QuicSocket::SetServerBusy(bool on) { MakeCallback(env()->quic_on_socket_server_busy_function(), 1, &arg); } -int QuicSocket::SetTTL(int ttl) { - Debug(this, "Setting UDP TTL to %d", ttl); - return uv_udp_set_ttl(&handle_, ttl); -} - -int QuicSocket::SetMulticastTTL(int ttl) { - Debug(this, "Setting UDP Multicast TTL to %d", ttl); - return uv_udp_set_multicast_ttl(&handle_, ttl); -} - -int QuicSocket::SetBroadcast(bool on) { - Debug(this, "Turning UDP Broadcast %s", on ? "on" : "off"); - return uv_udp_set_broadcast(&handle_, on ? 1 : 0); -} - -int QuicSocket::SetMulticastLoopback(bool on) { - Debug(this, "Turning UDP Multicast Loopback %s", on ? "on" : "off"); - return uv_udp_set_multicast_loop(&handle_, on ? 1 : 0); +QuicSocket::SendWrap::SendWrap( + Environment* env, + Local req_wrap_obj, + size_t total_length) + : ReqWrap(env, req_wrap_obj, PROVIDER_QUICSOCKET), + total_length_(total_length) { } -int QuicSocket::SetMulticastInterface(const char* iface) { - Debug(this, "Setting the UDP Multicast Interface to %s", iface); - return uv_udp_set_multicast_interface(&handle_, iface); +std::string QuicSocket::SendWrap::MemoryInfoName() const { + return "QuicSendWrap"; } -int QuicSocket::AddMembership(const char* address, const char* iface) { - Debug(this, "Joining UDP group: address %s, iface %s", address, iface); - return uv_udp_set_membership(&handle_, address, iface, UV_JOIN_GROUP); +void QuicSocket::SendWrap::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("session", session_); + tracker->TrackField("buffer", buffer_); + tracker->TrackField("data", data_); } -int QuicSocket::DropMembership(const char* address, const char* iface) { - Debug(this, "Leaving UDP group: address %s, iface %s", address, iface); - return uv_udp_set_membership(&handle_, address, iface, UV_LEAVE_GROUP); +ReqWrap* QuicSocket::CreateSendWrap(size_t msg_size) { + HandleScope handle_scope(env()->isolate()); + Local obj; + if (!env()->quicsocketsendwrap_constructor_template() + ->NewInstance(env()->context()).ToLocal(&obj)) return nullptr; + return last_created_send_wrap_ = new SendWrap(env(), obj, msg_size); } int QuicSocket::SendPacket( @@ -818,190 +716,135 @@ int QuicSocket::SendPacket( if (buffer->Length() == 0 || buffer->RemainingLength() == 0) return 0; - char host[INET6_ADDRSTRLEN]; - SocketAddress::GetAddress(dest, host, sizeof(host)); - Debug(this, "Sending to %s at port %d", host, SocketAddress::GetPort(dest)); - - QuicSocket::SendWrap* wrap = - new QuicSocket::SendWrap( - this, - dest, - buffer, - session, - diagnostic_label); - int err = wrap->Send(); - if (err != 0) delete wrap; - return err; -} + { + char host[INET6_ADDRSTRLEN]; + SocketAddress::GetAddress(dest, host, sizeof(host)); + Debug(this, "Sending to %s at port %d", host, SocketAddress::GetPort(dest)); + } -void QuicSocket::OnSend( - int status, - size_t length, - const char* diagnostic_label) { - IncrementSocketStat( - length, - &socket_stats_, - &socket_stats::bytes_sent); - IncrementSocketStat( - 1, - &socket_stats_, - &socket_stats::packets_sent); + // Remaining Length should never be zero at this point + CHECK_GT(buffer->RemainingLength(), 0); - Debug(this, "Packet sent status: %d (label: %s)", - status, - diagnostic_label != nullptr ? diagnostic_label : "unspecified"); + std::vector vec; + size_t total_length; + size_t len = buffer->DrainInto(&vec, &total_length); - DecrementPendingCallbacks(); - MaybeClose(); -} + // len should never be zero + CHECK_GT(len, 0); -QuicSocket::SendWrapBase::SendWrapBase( - QuicSocket* socket, - const sockaddr* dest, - const char* diagnostic_label) : - socket_(socket), - diagnostic_label_(diagnostic_label) { - req_.data = this; - address_.Copy(dest); - socket->IncrementPendingCallbacks(); -} + Debug(this, "Sending %" PRIu64 " bytes (label: %s)", + total_length, diagnostic_label); + // If DiagnosticPacketLoss returns true, it will call Done() internally + if (UNLIKELY(IsDiagnosticPacketLoss(tx_loss_))) { + Debug(this, "Simulating transmitted packet loss."); + return 0; + } -void QuicSocket::SendWrapBase::OnSend(uv_udp_send_t* req, int status) { - std::unique_ptr wrap( - static_cast(req->data)); - wrap->Done(status); -} + last_created_send_wrap_ = nullptr; + int err = udp_->Send(vec.data(), vec.size(), dest); -bool QuicSocket::SendWrapBase::IsDiagnosticPacketLoss() { - if (Socket()->IsDiagnosticPacketLoss(Socket()->tx_loss_)) { - Debug(Socket(), "Simulating transmitted packet loss."); - Done(0); - return true; - } - return false; -} + Debug(this, "Advancing read head %" PRIu64 " status = %d", + total_length, err); + buffer->SeekHeadOffset(total_length); -void QuicSocket::SendWrapBase::Done(int status) { - socket_->env()->DecreaseWaitingRequestCounter(); - socket_->OnSend(status, Length(), diagnostic_label()); -} + if (err != 0) { + if (err > 0) err = 0; + OnSend(err, total_length, buffer, diagnostic_label); + } else { + IncrementPendingCallbacks(); -QuicSocket::SendWrapStack::SendWrapStack( - QuicSocket* socket, - const sockaddr* dest, - size_t len, - const char* diagnostic_label) : - SendWrapBase(socket, dest, diagnostic_label) { - buf_.AllocateSufficientStorage(len); + CHECK_NOT_NULL(last_created_send_wrap_); + last_created_send_wrap_->set_diagnostic_label(diagnostic_label); + last_created_send_wrap_->set_quic_buffer(buffer); + last_created_send_wrap_->set_session(session); + } + return err; } -int QuicSocket::SendWrapStack::Send() { - Debug(Socket(), "Sending %" PRIu64 " bytes (label: %s)", - buf_.length(), - diagnostic_label()); +int QuicSocket::Send(const sockaddr* addr, + MallocedBuffer&& buf, + const char* diagnostic_label) { + Debug(this, "Sending %" PRIu64 " bytes (label: %s)", + buf.size, + diagnostic_label); - CHECK_GT(buf_.length(), 0); + CHECK_GT(buf.size, 0); // If DiagnosticPacketLoss returns true, it will call Done() internally - if (UNLIKELY(IsDiagnosticPacketLoss())) + if (UNLIKELY(IsDiagnosticPacketLoss(tx_loss_))) { + Debug(this, "Simulating transmitted packet loss."); return 0; + } - uv_buf_t buf = - uv_buf_init( - reinterpret_cast(*buf_), - buf_.length()); - - int err = uv_udp_send( - req(), - &Socket()->handle_, - &buf, 1, - **Address(), - OnSend); - // As this does not inherit from ReqWrap, we have to manage the request - // counter manually. - if (err == 0) Socket()->env()->IncreaseWaitingRequestCounter(); - return err; -} - -// The QuicSocket::SendWrap will maintain a std::weak_ref -// pointer to the buffer given to it. -QuicSocket::SendWrap::SendWrap( - QuicSocket* socket, - SocketAddress* dest, - QuicBuffer* buffer, - BaseObjectPtr session, - const char* diagnostic_label) - : SendWrap(socket, **dest, buffer, session, diagnostic_label) {} + uv_buf_t buf_send = uv_buf_init(buf.data, buf.size); -QuicSocket::SendWrap::SendWrap( - QuicSocket* socket, - const sockaddr* dest, - QuicBuffer* buffer, - BaseObjectPtr session, - const char* diagnostic_label) - : SendWrapBase(socket, dest, diagnostic_label), - buffer_(buffer), - session_(session) {} - -void QuicSocket::SendWrap::Done(int status) { - // If the weak_ref to the QuicBuffer is still valid - // consume the data, otherwise, do nothing - if (status == 0) { - Debug(Socket(), "Consuming %" PRId64 " bytes (label: %s)", - length_, - diagnostic_label()); - buffer_->Consume(length_); + last_created_send_wrap_ = nullptr; + int err = udp_->Send(&buf_send, 1, addr); + if (err != 0) { + if (err > 0) err = 0; + OnSend(err, buf.size, nullptr, diagnostic_label); } else { - Debug(Socket(), "Cancelling %" PRId64 " bytes (status: %d, label: %s)", - length_, - status, - diagnostic_label()); - buffer_->Cancel(status); + IncrementPendingCallbacks(); + + CHECK_NOT_NULL(last_created_send_wrap_); + last_created_send_wrap_->set_diagnostic_label(diagnostic_label); + last_created_send_wrap_->set_data(std::move(buf)); } - SendWrapBase::Done(status); + return err; } -// Sending will take the current content of the QuicBuffer -// and forward it off to the uv_udp_t handle. -int QuicSocket::SendWrap::Send() { - // Remaining Length should never be zero at this point - CHECK_GT(buffer_->RemainingLength(), 0); - - std::vector vec; - size_t len = buffer_->DrainInto(&vec, &length_); - - // len should never be zero - CHECK_GT(len, 0); +void QuicSocket::OnSend( + int status, + size_t length, + QuicBuffer* buffer, + const char* diagnostic_label) { + if (buffer != nullptr) { + // If the weak_ref to the QuicBuffer is still valid + // consume the data, otherwise, do nothing + if (status == 0) { + Debug(this, "Consuming %" PRId64 " bytes (label: %s)", + length, + diagnostic_label); + buffer->Consume(length); + } else { + Debug(this, "Cancelling %" PRId64 " bytes (status: %d, label: %s)", + length, + status, + diagnostic_label); + buffer->Cancel(status); + } + } - Debug(Socket(), - "Sending %" PRIu64 " bytes (label: %s)", - length_, - diagnostic_label()); + IncrementSocketStat( + length, + &socket_stats_, + &socket_stats::bytes_sent); + IncrementSocketStat( + 1, + &socket_stats_, + &socket_stats::packets_sent); - // If DiagnosticPacketLoss returns true, it will call Done() internally - if (UNLIKELY(IsDiagnosticPacketLoss())) - return 0; + Debug(this, "Packet sent status: %d (label: %s)", + status, + diagnostic_label != nullptr ? diagnostic_label : "unspecified"); - int err = uv_udp_send( - req(), - &(Socket()->handle_), - vec.data(), - vec.size(), - **Address(), - OnSend); - - if (err == 0) { - // As this does not inherit from ReqWrap, we have to manage the request - // counter manually. - Socket()->env()->IncreaseWaitingRequestCounter(); - Debug(Socket(), "Advancing read head %" PRIu64, length_); - buffer_->SeekHeadOffset(length_); + if (!HasPendingCallbacks() && + IsFlagSet(QUICSOCKET_FLAGS_WAITING_FOR_CALLBACKS)) { + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + MakeCallback(env()->ondone_string(), 0, nullptr); } - return err; } - +void QuicSocket::OnSendDone(ReqWrap* wrap, int status) { + std::unique_ptr req_wrap(static_cast(wrap)); + DecrementPendingCallbacks(); + OnSend(status, + req_wrap->total_length(), + req_wrap->quic_buffer(), + req_wrap->diagnostic_label()); +} bool QuicSocket::IsDiagnosticPacketLoss(double prob) { if (LIKELY(prob == 0.0)) return false; @@ -1032,14 +875,17 @@ namespace { void NewQuicSocket(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); CHECK(args.IsConstructCall()); + CHECK(args[0]->IsObject()); + CHECK_GE(args[0].As()->InternalFieldCount(), + UDPWrapBase::kUDPWrapBaseField); uint32_t options; uint32_t retry_token_expiration; uint32_t max_connections_per_host; - if (!args[0]->Uint32Value(env->context()).To(&options) || - !args[1]->Uint32Value(env->context()).To(&retry_token_expiration) || - !args[2]->Uint32Value(env->context()).To(&max_connections_per_host)) { + if (!args[1]->Uint32Value(env->context()).To(&options) || + !args[2]->Uint32Value(env->context()).To(&retry_token_expiration) || + !args[3]->Uint32Value(env->context()).To(&max_connections_per_host)) { return; } CHECK_GE(retry_token_expiration, MIN_RETRYTOKEN_EXPIRATION); @@ -1048,6 +894,7 @@ void NewQuicSocket(const FunctionCallbackInfo& args) { new QuicSocket( env, args.This(), + args[0].As(), retry_token_expiration, max_connections_per_host, options); @@ -1075,60 +922,12 @@ void QuicSocketSetDiagnosticPacketLoss( socket->SetDiagnosticPacketLoss(rx, tx); } -void QuicSocketAddMembership(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 2); - CHECK(args[0]->IsString()); - CHECK(args[1]->IsString()); - - Utf8Value address(env->isolate(), args[0]); - Utf8Value iface(env->isolate(), args[1]); - args.GetReturnValue().Set(socket->AddMembership(*address, *iface)); -} - -void QuicSocketBind(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - - CHECK_EQ(args.Length(), 4); - - node::Utf8Value address(args.GetIsolate(), args[1]); - int32_t type; - uint32_t port, flags; - if (!args[0]->Int32Value(env->context()).To(&type) || - !args[2]->Uint32Value(env->context()).To(&port) || - !args[3]->Uint32Value(env->context()).To(&flags)) - return; - CHECK(type == AF_INET || type == AF_INET6); - - args.GetReturnValue().Set(socket->Bind(*address, port, flags, type)); -} - void QuicSocketDestroy(const FunctionCallbackInfo& args) { QuicSocket* socket; ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder()); socket->ReceiveStop(); } -void QuicSocketDropMembership(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 2); - CHECK(args[0]->IsString()); - CHECK(args[1]->IsString()); - - Utf8Value address(env->isolate(), args[0]); - Utf8Value iface(env->isolate(), args[1]); - args.GetReturnValue().Set(socket->DropMembership(*address, *iface)); -} - void QuicSocketListen(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); QuicSocket* socket; @@ -1193,34 +992,6 @@ void QuicSocketReceiveStop(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(socket->ReceiveStop()); } -void QuicSocketSetBroadcast(const FunctionCallbackInfo& args) { - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 1); - args.GetReturnValue().Set(socket->SetBroadcast(args[0]->IsTrue())); -} - -void QuicSocketSetMulticastInterface(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 1); - CHECK(args[0]->IsString()); - - Utf8Value iface(env->isolate(), args[0]); - args.GetReturnValue().Set(socket->SetMulticastInterface(*iface)); -} - -void QuicSocketSetMulticastLoopback(const FunctionCallbackInfo& args) { - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 1); - args.GetReturnValue().Set(socket->SetMulticastLoopback(args[0]->IsTrue())); -} - void QuicSocketSetServerBusy(const FunctionCallbackInfo& args) { QuicSocket* socket; ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder()); @@ -1228,29 +999,13 @@ void QuicSocketSetServerBusy(const FunctionCallbackInfo& args) { socket->SetServerBusy(args[0]->IsTrue()); } -void QuicSocketSetMulticastTTL(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); +void QuicSocketWaitForPendingCallbacks( + const FunctionCallbackInfo& args) { QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 1); - int ttl; - if (!args[0]->Int32Value(env->context()).To(&ttl)) - return; - args.GetReturnValue().Set(socket->SetMulticastTTL(ttl)); + ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder()); + socket->WaitForPendingCallbacks(); } -void QuicSocketSetTTL(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 1); - int ttl; - if (!args[0]->Int32Value(env->context()).To(&ttl)) - return; - args.GetReturnValue().Set(socket->SetTTL(ttl)); -} } // namespace void QuicSocket::Initialize( @@ -1263,21 +1018,9 @@ void QuicSocket::Initialize( socket->SetClassName(class_name); socket->InstanceTemplate()->SetInternalFieldCount(1); socket->InstanceTemplate()->Set(env->owner_symbol(), Null(isolate)); - env->SetProtoMethod(socket, - "addMembership", - QuicSocketAddMembership); - env->SetProtoMethod(socket, - "bind", - QuicSocketBind); env->SetProtoMethod(socket, "destroy", QuicSocketDestroy); - env->SetProtoMethod(socket, - "dropMembership", - QuicSocketDropMembership); - env->SetProtoMethod(socket, - "getsockname", - node::GetSockOrPeerName); env->SetProtoMethod(socket, "listen", QuicSocketListen); @@ -1290,30 +1033,23 @@ void QuicSocket::Initialize( env->SetProtoMethod(socket, "setDiagnosticPacketLoss", QuicSocketSetDiagnosticPacketLoss); - env->SetProtoMethod(socket, - "setTTL", - QuicSocketSetTTL); - env->SetProtoMethod(socket, - "setBroadcast", - QuicSocketSetBroadcast); - env->SetProtoMethod(socket, - "setMulticastInterface", - QuicSocketSetMulticastInterface); - env->SetProtoMethod(socket, - "setMulticastTTL", - QuicSocketSetMulticastTTL); - env->SetProtoMethod(socket, - "setMulticastLoopback", - QuicSocketSetMulticastLoopback); env->SetProtoMethod(socket, "setServerBusy", QuicSocketSetServerBusy); env->SetProtoMethod(socket, "stopListening", QuicSocketStopListening); + env->SetProtoMethod(socket, + "waitForPendingCallbacks", + QuicSocketWaitForPendingCallbacks); socket->Inherit(HandleWrap::GetConstructorTemplate(env)); target->Set(context, class_name, socket->GetFunction(env->context()).ToLocalChecked()).FromJust(); + + // TODO(addaleax): None of these templates actually are constructor templates. + Local sendwrap_template = ObjectTemplate::New(isolate); + sendwrap_template->SetInternalFieldCount(1); + env->set_quicsocketsendwrap_constructor_template(sendwrap_template); } } // namespace quic diff --git a/src/node_quic_socket.h b/src/node_quic_socket.h index f659b73d2c..09d419fb2b 100644 --- a/src/node_quic_socket.h +++ b/src/node_quic_socket.h @@ -10,7 +10,7 @@ #include "node_quic_session.h" #include "node_quic_util.h" #include "env.h" -#include "handle_wrap.h" +#include "udp_wrap.h" #include "v8.h" #include "uv.h" @@ -43,7 +43,8 @@ enum QuicSocketOptions : uint32_t { QUICSOCKET_OPTIONS_VALIDATE_ADDRESS_LRU = 0x2, }; -class QuicSocket : public HandleWrap, +class QuicSocket : public AsyncWrap, + public UDPListener, public mem::NgLibMemoryManager { public: static void Initialize( @@ -54,6 +55,7 @@ class QuicSocket : public HandleWrap, QuicSocket( Environment* env, Local wrap, + Local udp_base_wrap, uint64_t retry_token_expiration, size_t max_connections_per_host, uint32_t options = 0); @@ -61,30 +63,16 @@ class QuicSocket : public HandleWrap, SocketAddress* GetLocalAddress() { return &local_address_; } - void Close( - v8::Local close_callback = v8::Local()) override; - void MaybeClose(); - int AddMembership( - const char* address, - const char* iface); void AddSession( QuicCID* cid, BaseObjectPtr session); void AssociateCID( QuicCID* cid, QuicCID* scid); - int Bind( - const char* address, - uint32_t port, - uint32_t flags, - int family); void DisassociateCID( QuicCID* cid); - int DropMembership( - const char* address, - const char* iface); void Listen( crypto::SecureContext* context, const sockaddr* preferred_address = nullptr, @@ -97,16 +85,6 @@ class QuicSocket : public HandleWrap, const sockaddr* addr); void ReportSendError( int error); - int SetBroadcast( - bool on); - int SetMulticastInterface( - const char* iface); - int SetMulticastLoopback( - bool on); - int SetMulticastTTL( - int ttl); - int SetTTL( - int ttl); int SendPacket( const sockaddr* dest, QuicBuffer* buf, @@ -115,13 +93,12 @@ class QuicSocket : public HandleWrap, void SetServerBusy(bool on); void SetDiagnosticPacketLoss(double rx = 0.0, double tx = 0.0); void StopListening(); + void WaitForPendingCallbacks(); crypto::SecureContext* GetServerSecureContext() { return server_secure_context_; } - const uv_udp_t* operator*() const { return &handle_; } - void MemoryInfo(MemoryTracker* tracker) const override; SET_MEMORY_INFO_NAME(QuicSocket) SET_SELF_SIZE(QuicSocket) @@ -131,6 +108,16 @@ class QuicSocket : public HandleWrap, void IncreaseAllocatedSize(size_t size); void DecreaseAllocatedSize(size_t size); + // Implementation for UDPWrapListener + uv_buf_t OnAlloc(size_t suggested_size) override; + void OnRecv(ssize_t nread, + const uv_buf_t& buf, + const sockaddr* addr, + unsigned int flags) override; + ReqWrap* CreateSendWrap(size_t msg_size) override; + void OnSendDone(ReqWrap* wrap, int status) override; + void OnAfterBind() override; + private: static void OnAlloc( uv_handle_t* handle, @@ -165,6 +152,7 @@ class QuicSocket : public HandleWrap, void OnSend( int status, size_t length, + QuicBuffer* buffer, const char* diagnostic_label); void SetValidatedAddress(const sockaddr* addr); @@ -192,25 +180,17 @@ class QuicSocket : public HandleWrap, void DecrementPendingCallbacks() { pending_callbacks_--; } bool HasPendingCallbacks() { return pending_callbacks_ > 0; } - template - friend void node::GetSockOrPeerName( - const v8::FunctionCallbackInfo&); - // Returns true if, and only if, diagnostic packet loss is enabled // and the current packet should be artificially considered lost. bool IsDiagnosticPacketLoss(double prob); - // Fields and TypeDefs - typedef uv_udp_t HandleType; - enum QuicSocketFlags : uint32_t { QUICSOCKET_FLAGS_NONE = 0x0, // Indicates that the QuicSocket has entered a graceful // closing phase, indicating that no additional QUICSOCKET_FLAGS_GRACEFUL_CLOSE = 0x1, - QUICSOCKET_FLAGS_PENDING_CLOSE = 0x2, + QUICSOCKET_FLAGS_WAITING_FOR_CALLBACKS = 0x2, QUICSOCKET_FLAGS_SERVER_LISTENING = 0x4, QUICSOCKET_FLAGS_SERVER_BUSY = 0x8, }; @@ -238,7 +218,8 @@ class QuicSocket : public HandleWrap, } ngtcp2_mem alloc_info_; - uv_udp_t handle_; + UDPWrapBase* udp_; + BaseObjectPtr udp_strong_ptr_; uint32_t flags_ = QUICSOCKET_FLAGS_NONE; uint32_t options_; uint32_t server_options_; @@ -330,96 +311,37 @@ class QuicSocket : public HandleWrap, access(a, mems...) += delta; } - class SendWrapBase { + class SendWrap : public ReqWrap { public: - SendWrapBase( - QuicSocket* socket, - const sockaddr* dest, - const char* diagnostic_label = nullptr); - - virtual ~SendWrapBase() = default; - - virtual void Done(int status); - - virtual int Send() = 0; - - uv_udp_send_t* operator*() { return &req_; } - - uv_udp_send_t* req() { return &req_; } - - QuicSocket* Socket() { return socket_.get(); } - - SocketAddress* Address() { return &address_; } - + SendWrap(Environment* env, + v8::Local req_wrap_obj, + size_t total_length_); + + void set_data(MallocedBuffer&& data) { data_ = std::move(data); } + void set_quic_buffer(QuicBuffer* buffer) { buffer_ = buffer; } + void set_session(BaseObjectPtr session) { session_ = session; } + void set_diagnostic_label(const char* label) { diagnostic_label_ = label; } + QuicBuffer* quic_buffer() const { return buffer_; } const char* diagnostic_label() const { return diagnostic_label_; } + size_t total_length() const { return total_length_; } - static void OnSend( - uv_udp_send_t* req, - int status); - - virtual size_t Length() = 0; - - bool IsDiagnosticPacketLoss(); + SET_SELF_SIZE(SendWrap); + std::string MemoryInfoName() const override; + void MemoryInfo(MemoryTracker* tracker) const override; private: - uv_udp_send_t req_; - BaseObjectPtr socket_; - SocketAddress address_; - const char* diagnostic_label_; - }; - - // The SendWrap drains the given QuicBuffer and sends it to the - // uv_udp_t handle. When the async operation completes, the done_cb - // is invoked with the status and the user_data forwarded on. - class SendWrap : public SendWrapBase { - public: - SendWrap( - QuicSocket* socket, - SocketAddress* dest, - QuicBuffer* buffer, - BaseObjectPtr session, - const char* diagnostic_label = nullptr); - - SendWrap( - QuicSocket* socket, - const sockaddr* dest, - QuicBuffer* buffer, - BaseObjectPtr session, - const char* diagnostic_label = nullptr); - - void Done(int status) override; - - int Send() override; - - size_t Length() override { return length_; } - - private: - QuicBuffer* buffer_; BaseObjectPtr session_; - size_t length_ = 0; + QuicBuffer* buffer_ = nullptr; + MallocedBuffer data_; + const char* diagnostic_label_ = nullptr; + size_t total_length_; }; - class SendWrapStack : public SendWrapBase { - public: - SendWrapStack( - QuicSocket* socket, - const sockaddr* dest, - size_t len, - const char* diagnostic_label = nullptr); - - int Send() override; - - uint8_t* buffer() { return *buf_; } - - void SetLength(size_t len) { - buf_.SetLength(len); - } + SendWrap* last_created_send_wrap_ = nullptr; - size_t Length() override { return buf_.length(); } - - private: - MaybeStackBuffer buf_; - }; + int Send(const sockaddr* addr, + MallocedBuffer&& data, + const char* diagnostic_label = "unspecified"); }; } // namespace quic diff --git a/src/node_quic_stream.cc b/src/node_quic_stream.cc index fa6df71669..ad6a2f8b28 100644 --- a/src/node_quic_stream.cc +++ b/src/node_quic_stream.cc @@ -160,6 +160,7 @@ int QuicStream::DoWrite( return 0; } + BaseObjectPtr strong_ref{req_wrap->GetAsyncWrap()}; // The list of buffers will be appended onto streambuf_ without // copying. Those will remain in that buffer until the serialized // stream frames are acknowledged. @@ -167,7 +168,7 @@ int QuicStream::DoWrite( streambuf_.Push( bufs, nbufs, - [req_wrap](int status) { + [req_wrap, strong_ref](int status) { // This callback function will be invoked once this // complete batch of buffers has been acknowledged // by the peer. This will have the side effect of @@ -179,8 +180,7 @@ int QuicStream::DoWrite( // also means that writes will be significantly // less performant unless written in batches. req_wrap->Done(status); - }, - req_wrap->object()); + }); Debug(this, "Queuing %" PRIu64 " bytes of data from %d buffers", length, nbufs); IncrementStat(length, &stream_stats_, &stream_stats::bytes_sent); diff --git a/src/node_quic_util.cc b/src/node_quic_util.cc index 75ebdde819..0b4c6938ac 100644 --- a/src/node_quic_util.cc +++ b/src/node_quic_util.cc @@ -22,10 +22,6 @@ void Timer::OnTimeout(uv_timer_t* timer) { t->fn_(); } -void Timer::CleanupHook(void* data) { - Free(static_cast(data)); -} - ngtcp2_crypto_level from_ossl_level(OSSL_ENCRYPTION_LEVEL ossl_level) { switch (ossl_level) { case ssl_encryption_initial: diff --git a/src/node_quic_util.h b/src/node_quic_util.h index 0f7ecbbd3a..768216b558 100644 --- a/src/node_quic_util.h +++ b/src/node_quic_util.h @@ -271,14 +271,6 @@ class SocketAddress { memcpy(&address_, source, GetAddressLen(source)); } - void Set(uv_udp_t* handle) { - int addrlen = sizeof(address_); - CHECK_EQ(uv_udp_getsockname( - handle, - reinterpret_cast(&address_), - &addrlen), 0); - } - void Update(const ngtcp2_addr* addr) { memcpy(&address_, addr->addr, addr->addrlen); } @@ -387,16 +379,10 @@ void IncrementStat( class Timer final : public MemoryRetainer { public: explicit Timer(Environment* env, std::function fn) - : stopped_(false), - env_(env), + : env_(env), fn_(fn) { uv_timer_init(env_->event_loop(), &timer_); timer_.data = this; - env->AddCleanupHook(CleanupHook, this); - } - - ~Timer() override { - env_->RemoveCleanupHook(CleanupHook, this); } // Stops the timer with the side effect of the timer no longer being usable. @@ -429,9 +415,8 @@ class Timer final : public MemoryRetainer { private: static void OnTimeout(uv_timer_t* timer); - static void CleanupHook(void* data); - bool stopped_; + bool stopped_ = false; Environment* env_; std::function fn_; uv_timer_t timer_; diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 65af037d15..a9cece7fc2 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -35,7 +35,8 @@ inline StreamReq* StreamReq::FromObject(v8::Local req_wrap_obj) { inline void StreamReq::Dispose() { object()->SetAlignedPointerInInternalField(kStreamReqField, nullptr); - delete this; + BaseObjectPtr destroy_me{GetAsyncWrap()}; + destroy_me->Detach(); } inline v8::Local StreamReq::object() { diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc index 3c66db2155..f76e277dec 100644 --- a/src/udp_wrap.cc +++ b/src/udp_wrap.cc @@ -69,18 +69,57 @@ SendWrap::SendWrap(Environment* env, } -inline bool SendWrap::have_callback() const { +bool SendWrap::have_callback() const { return have_callback_; } +UDPListener::~UDPListener() { + if (wrap_ != nullptr) + wrap_->set_listener(nullptr); +} + +UDPWrapBase::~UDPWrapBase() { + set_listener(nullptr); +} + +UDPListener* UDPWrapBase::listener() const { + CHECK_NOT_NULL(listener_); + return listener_; +} + +void UDPWrapBase::set_listener(UDPListener* listener) { + if (listener_ != nullptr) + listener_->wrap_ = nullptr; + listener_ = listener; + if (listener_ != nullptr) { + CHECK_NULL(listener_->wrap_); + listener_->wrap_ = this; + } +} + +UDPWrapBase* UDPWrapBase::FromObject(Local obj) { + CHECK_GT(obj->InternalFieldCount(), kUDPWrapBaseField); + return static_cast( + obj->GetAlignedPointerFromInternalField(kUDPWrapBaseField)); +} + +void UDPWrapBase::AddMethods(Environment* env, Local t) { + env->SetProtoMethod(t, "recvStart", RecvStart); + env->SetProtoMethod(t, "recvStop", RecvStop); +} UDPWrap::UDPWrap(Environment* env, Local object) : HandleWrap(env, object, reinterpret_cast(&handle_), AsyncWrap::PROVIDER_UDPWRAP) { + object->SetAlignedPointerInInternalField( + kUDPWrapBaseField, static_cast(this)); + int r = uv_udp_init(env->event_loop(), &handle_); CHECK_EQ(r, 0); // can't fail anyway + + set_listener(this); } @@ -91,7 +130,7 @@ void UDPWrap::Initialize(Local target, Environment* env = Environment::GetCurrent(context); Local t = env->NewFunctionTemplate(New); - t->InstanceTemplate()->SetInternalFieldCount(1); + t->InstanceTemplate()->SetInternalFieldCount(kUDPWrapBaseField + 1); Local udpString = FIXED_ONE_BYTE_STRING(env->isolate(), "UDP"); t->SetClassName(udpString); @@ -112,6 +151,7 @@ void UDPWrap::Initialize(Local target, Local(), attributes); + UDPWrapBase::AddMethods(env, t); env->SetProtoMethod(t, "open", Open); env->SetProtoMethod(t, "bind", Bind); env->SetProtoMethod(t, "connect", Connect); @@ -120,8 +160,6 @@ void UDPWrap::Initialize(Local target, env->SetProtoMethod(t, "connect6", Connect6); env->SetProtoMethod(t, "send6", Send6); env->SetProtoMethod(t, "disconnect", Disconnect); - env->SetProtoMethod(t, "recvStart", RecvStart); - env->SetProtoMethod(t, "recvStop", RecvStop); env->SetProtoMethod(t, "getpeername", GetSockOrPeerName); env->SetProtoMethod(t, "getsockname", @@ -156,6 +194,7 @@ void UDPWrap::Initialize(Local target, Local constants = Object::New(env->isolate()); NODE_DEFINE_CONSTANT(constants, UV_UDP_IPV6ONLY); + NODE_DEFINE_CONSTANT(constants, UV_UDP_REUSEADDR); target->Set(context, env->constants_string(), constants).Check(); @@ -216,6 +255,9 @@ void UDPWrap::DoBind(const FunctionCallbackInfo& args, int family) { flags); } + if (err == 0) + wrap->listener()->OnAfterBind(); + args.GetReturnValue().Set(err); } @@ -422,19 +464,14 @@ void UDPWrap::DoSend(const FunctionCallbackInfo& args, int family) { CHECK(args[3]->IsBoolean()); } - Local req_wrap_obj = args[0].As(); Local chunks = args[1].As(); // it is faster to fetch the length of the // array in js-land size_t count = args[2].As()->Value(); - const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue(); - SendWrap* req_wrap; - { - AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap); - req_wrap = new SendWrap(env, req_wrap_obj, have_callback); - } - size_t msg_size = 0; + wrap->current_send_req_wrap_ = args[0].As(); + wrap->current_send_has_callback_ = + sendto ? args[5]->IsTrue() : args[3]->IsTrue(); MaybeStackBuffer bufs(count); @@ -445,11 +482,8 @@ void UDPWrap::DoSend(const FunctionCallbackInfo& args, int family) { size_t length = Buffer::Length(chunk); bufs[i] = uv_buf_init(Buffer::Data(chunk), length); - msg_size += length; } - req_wrap->msg_size = msg_size; - int err = 0; struct sockaddr_storage addr_storage; sockaddr* addr = nullptr; @@ -463,20 +497,78 @@ void UDPWrap::DoSend(const FunctionCallbackInfo& args, int family) { } if (err == 0) { - err = req_wrap->Dispatch(uv_udp_send, - &wrap->handle_, - *bufs, - count, - addr, - OnSend); + err = wrap->Send(*bufs, count, addr); } - if (err) - delete req_wrap; - args.GetReturnValue().Set(err); } +ssize_t UDPWrap::Send(uv_buf_t* bufs_ptr, + size_t count, + const sockaddr* addr) { + if (IsHandleClosing()) return UV_EBADF; + + size_t msg_size = 0; + for (size_t i = 0; i < count; i++) + msg_size += bufs_ptr[i].len; + + int err = 0; + if (!UNLIKELY(env()->options()->test_udp_no_try_send)) { + err = uv_udp_try_send(&handle_, bufs_ptr, count, addr); + if (err == UV_ENOSYS || err == UV_EAGAIN) { + err = 0; + } else if (err >= 0) { + size_t sent = err; + while (count > 0 && bufs_ptr->len <= sent) { + sent -= bufs_ptr->len; + bufs_ptr++; + count--; + } + if (count > 0) { + CHECK_LT(sent, bufs_ptr->len); + bufs_ptr->base += sent; + bufs_ptr->len -= sent; + } else { + CHECK_EQ(static_cast(err), msg_size); + // + 1 so that the JS side can distinguish 0-length async sends from + // 0-length sync sends. + return msg_size + 1; + } + } + } + + if (err == 0) { + AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this); + ReqWrap* req_wrap = listener()->CreateSendWrap(msg_size); + if (req_wrap == nullptr) return UV_ENOSYS; + + err = req_wrap->Dispatch( + uv_udp_send, + &handle_, + bufs_ptr, + count, + addr, + uv_udp_send_cb{[](uv_udp_send_t* req, int status) { + UDPWrap* self = ContainerOf(&UDPWrap::handle_, req->handle); + self->listener()->OnSendDone( + ReqWrap::from_req(req), status); + }}); + if (err) + delete req_wrap; + } + + return err; +} + + +ReqWrap* UDPWrap::CreateSendWrap(size_t msg_size) { + SendWrap* req_wrap = new SendWrap(env(), + current_send_req_wrap_, + current_send_has_callback_); + req_wrap->msg_size = msg_size; + return req_wrap; +} + void UDPWrap::Send(const FunctionCallbackInfo& args) { DoSend(args, AF_INET); @@ -488,31 +580,46 @@ void UDPWrap::Send6(const FunctionCallbackInfo& args) { } -void UDPWrap::RecvStart(const FunctionCallbackInfo& args) { - UDPWrap* wrap; - ASSIGN_OR_RETURN_UNWRAP(&wrap, - args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv); +AsyncWrap* UDPWrap::GetAsyncWrap() { + return this; +} + +int UDPWrap::GetPeerName(sockaddr* name, int* namelen) { + return uv_udp_getpeername(&handle_, name, namelen); +} + +int UDPWrap::GetSockName(sockaddr* name, int* namelen) { + return uv_udp_getsockname(&handle_, name, namelen); +} + +void UDPWrapBase::RecvStart(const FunctionCallbackInfo& args) { + UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder()); + args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStart()); +} + +int UDPWrap::RecvStart() { + if (IsHandleClosing()) return UV_EBADF; + int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv); // UV_EALREADY means that the socket is already bound but that's okay if (err == UV_EALREADY) err = 0; - args.GetReturnValue().Set(err); + return err; } -void UDPWrap::RecvStop(const FunctionCallbackInfo& args) { - UDPWrap* wrap; - ASSIGN_OR_RETURN_UNWRAP(&wrap, - args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - int r = uv_udp_recv_stop(&wrap->handle_); - args.GetReturnValue().Set(r); +void UDPWrapBase::RecvStop(const FunctionCallbackInfo& args) { + UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder()); + args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStop()); +} + +int UDPWrap::RecvStop() { + if (IsHandleClosing()) return UV_EBADF; + return uv_udp_recv_stop(&handle_); } -void UDPWrap::OnSend(uv_udp_send_t* req, int status) { - std::unique_ptr req_wrap{static_cast(req->data)}; +void UDPWrap::OnSendDone(ReqWrap* req, int status) { + std::unique_ptr req_wrap{static_cast(req)}; if (req_wrap->have_callback()) { Environment* env = req_wrap->env(); HandleScope handle_scope(env->isolate()); @@ -529,19 +636,30 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) { void UDPWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - UDPWrap* wrap = static_cast(handle->data); - *buf = wrap->env()->AllocateManaged(suggested_size).release(); + UDPWrap* wrap = ContainerOf(&UDPWrap::handle_, + reinterpret_cast(handle)); + *buf = wrap->listener()->OnAlloc(suggested_size); +} + +uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) { + return env()->AllocateManaged(suggested_size).release(); } void UDPWrap::OnRecv(uv_udp_t* handle, ssize_t nread, - const uv_buf_t* buf_, - const struct sockaddr* addr, + const uv_buf_t* buf, + const sockaddr* addr, unsigned int flags) { - UDPWrap* wrap = static_cast(handle->data); - Environment* env = wrap->env(); + UDPWrap* wrap = ContainerOf(&UDPWrap::handle_, handle); + wrap->listener()->OnRecv(nread, *buf, addr, flags); +} - AllocatedBuffer buf(env, *buf_); +void UDPWrap::OnRecv(ssize_t nread, + const uv_buf_t& buf_, + const sockaddr* addr, + unsigned int flags) { + Environment* env = this->env(); + AllocatedBuffer buf(env, buf_); if (nread == 0 && addr == nullptr) { return; } @@ -549,23 +667,22 @@ void UDPWrap::OnRecv(uv_udp_t* handle, HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); - Local wrap_obj = wrap->object(); Local argv[] = { Integer::New(env->isolate(), nread), - wrap_obj, + object(), Undefined(env->isolate()), Undefined(env->isolate()) }; if (nread < 0) { - wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv); + MakeCallback(env->onmessage_string(), arraysize(argv), argv); return; } buf.Resize(nread); argv[2] = buf.ToBuffer().ToLocalChecked(); argv[3] = AddressToJS(env, addr); - wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv); + MakeCallback(env->onmessage_string(), arraysize(argv), argv); } MaybeLocal UDPWrap::Instantiate(Environment* env, diff --git a/src/udp_wrap.h b/src/udp_wrap.h index fb2a9362cd..5a6cb605d5 100644 --- a/src/udp_wrap.h +++ b/src/udp_wrap.h @@ -32,7 +32,91 @@ namespace node { -class UDPWrap: public HandleWrap { +class UDPWrapBase; + +// A listener that can be attached to an `UDPWrapBase` object and generally +// manages its I/O activity. This is similar to `StreamListener`. +class UDPListener { + public: + virtual ~UDPListener(); + + // Called right before data is received from the socket. Must return a + // buffer suitable for reading data into, that is then passed to OnRecv. + virtual uv_buf_t OnAlloc(size_t suggested_size) = 0; + + // Called right after data is received from the socket, and includes + // information about the source address. If `nread` is negative, an error + // has occurred, and it represents a libuv error code. + virtual void OnRecv(ssize_t nread, + const uv_buf_t& buf, + const sockaddr* addr, + unsigned int flags) = 0; + + // Called when an asynchronous request for writing data is created. + // The `msg_size` value contains the total size of the data to be sent, + // but may be ignored by the implementation of this Method. + // The return value is later passed to OnSendDone. + virtual ReqWrap* CreateSendWrap(size_t msg_size) = 0; + + // Called when an asynchronous request for writing data has finished. + // If status is negative, an error has occurred, and it represents a libuv + // error code. + virtual void OnSendDone(ReqWrap* wrap, int status) = 0; + + // Optional callback that is called after the socket has been bound. + virtual void OnAfterBind() {} + + inline UDPWrapBase* udp() const { return wrap_; } + + protected: + UDPWrapBase* wrap_ = nullptr; + + friend class UDPWrapBase; +}; + +class UDPWrapBase { + public: + static constexpr int kUDPWrapBaseField = 1; + + virtual ~UDPWrapBase(); + + // Start emitting OnAlloc() + OnRecv() events on the listener. + virtual int RecvStart() = 0; + + // Stop emitting OnAlloc() + OnRecv() events on the listener. + virtual int RecvStop() = 0; + + // Send a chunk of data over this socket. This may call CreateSendWrap() + // on the listener if an async transmission is necessary. + virtual ssize_t Send(uv_buf_t* bufs, + size_t nbufs, + const sockaddr* addr) = 0; + + // Stores the sockaddr for the peer in `name`. + virtual int GetPeerName(sockaddr* name, int* namelen) = 0; + + // Stores the sockaddr for the local socket in `name`. + virtual int GetSockName(sockaddr* name, int* namelen) = 0; + + // Returns an AsyncWrap object with the same lifetime as this object. + virtual AsyncWrap* GetAsyncWrap() = 0; + + void set_listener(UDPListener* listener); + UDPListener* listener() const; + + static UDPWrapBase* FromObject(v8::Local obj); + + static void RecvStart(const v8::FunctionCallbackInfo& args); + static void RecvStop(const v8::FunctionCallbackInfo& args); + static void AddMethods(Environment* env, v8::Local t); + + private: + UDPListener* listener_ = nullptr; +}; + +class UDPWrap final : public HandleWrap, + public UDPWrapBase, + public UDPListener { public: enum SocketType { SOCKET @@ -51,8 +135,6 @@ class UDPWrap: public HandleWrap { static void Connect6(const v8::FunctionCallbackInfo& args); static void Send6(const v8::FunctionCallbackInfo& args); static void Disconnect(const v8::FunctionCallbackInfo& args); - static void RecvStart(const v8::FunctionCallbackInfo& args); - static void RecvStop(const v8::FunctionCallbackInfo& args); static void AddMembership(const v8::FunctionCallbackInfo& args); static void DropMembership(const v8::FunctionCallbackInfo& args); static void SetMulticastInterface( @@ -64,6 +146,25 @@ class UDPWrap: public HandleWrap { static void SetTTL(const v8::FunctionCallbackInfo& args); static void BufferSize(const v8::FunctionCallbackInfo& args); + // UDPListener implementation + uv_buf_t OnAlloc(size_t suggested_size) override; + void OnRecv(ssize_t nread, + const uv_buf_t& buf, + const sockaddr* addr, + unsigned int flags) override; + ReqWrap* CreateSendWrap(size_t msg_size) override; + void OnSendDone(ReqWrap* wrap, int status) override; + + // UDPWrapBase implementation + int RecvStart() override; + int RecvStop() override; + ssize_t Send(uv_buf_t* bufs, + size_t nbufs, + const sockaddr* addr) override; + int GetPeerName(sockaddr* name, int* namelen) override; + int GetSockName(sockaddr* name, int* namelen) override; + AsyncWrap* GetAsyncWrap() override; + static v8::MaybeLocal Instantiate(Environment* env, AsyncWrap* parent, SocketType type); @@ -92,7 +193,6 @@ class UDPWrap: public HandleWrap { static void OnAlloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); - static void OnSend(uv_udp_send_t* req, int status); static void OnRecv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, @@ -100,8 +200,16 @@ class UDPWrap: public HandleWrap { unsigned int flags); uv_udp_t handle_; + + bool current_send_has_callback_; + v8::Local current_send_req_wrap_; }; +int sockaddr_for_family(int address_family, + const char* address, + const unsigned short port, + sockaddr_storage* addr); + } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/test/async-hooks/test-udpsendwrap.js b/test/async-hooks/test-udpsendwrap.js index 25b7eb6103..f1403e3226 100644 --- a/test/async-hooks/test-udpsendwrap.js +++ b/test/async-hooks/test-udpsendwrap.js @@ -1,3 +1,4 @@ +// Flags: --test-udp-no-try-send 'use strict'; const common = require('../common'); diff --git a/test/common/README.md b/test/common/README.md index 415f05a867..8d6c85f5d5 100644 --- a/test/common/README.md +++ b/test/common/README.md @@ -21,6 +21,7 @@ This directory contains modules used to test the Node.js implementation. * [Report module](#report-module) * [tick module](#tick-module) * [tmpdir module](#tmpdir-module) +* [UDP pair helper](#udp-pair-helper) * [WPT module](#wpt-module) ## Benchmark Module @@ -912,6 +913,19 @@ listener to process `'beforeExit'`. If a file needs to be left open until Node.js completes, use a child process and call `refresh()` only in the parent. +## UDP pair helper + +The `common/udppair` module exports a function `makeUDPPair` and a class +`FakeUDPWrap`. + +`FakeUDPWrap` emits `'send'` events when data is to be sent on it, and provides +an `emitReceived()` API for acting as if data has been received on it. + +`makeUDPPair` returns an object `{ clientSide, serverSide }` where each side +is an `FakeUDPWrap` connected to the other side. + +There is no difference between client or server side beyond their names. + ## WPT Module ### harness diff --git a/test/common/udppair.js b/test/common/udppair.js new file mode 100644 index 0000000000..0178d722fd --- /dev/null +++ b/test/common/udppair.js @@ -0,0 +1,100 @@ +/* eslint-disable node-core/require-common-first, node-core/required-modules */ +'use strict'; +const { internalBinding } = require('internal/test/binding'); +const { JSUDPWrap } = internalBinding('js_udp_wrap'); +const EventEmitter = require('events'); + +class FakeUDPWrap extends EventEmitter { + constructor() { + super(); + + this._handle = new JSUDPWrap(); + + this._handle.onreadstart = () => this._startReading(); + this._handle.onreadstop = () => this._stopReading(); + this._handle.onwrite = + (wrap, buffers, addr) => this._write(wrap, buffers, addr); + this._handle.getsockname = (obj) => { + Object.assign(obj, { address: '127.0.0.1', family: 'IPv4', port: 1337 }); + return 0; + }; + + this.reading = false; + this.bufferedReceived = []; + this.emitBufferedImmediate = null; + } + + _emitBuffered = () => { + if (!this.reading) return; + if (this.bufferedReceived.length > 0) { + this.emitReceived(this.bufferedReceived.shift()); + this.emitBufferedImmediate = setImmediate(this._emitBuffered); + } else { + this.emit('wantRead'); + } + }; + + _startReading() { + this.reading = true; + this.emitBufferedImmediate = setImmediate(this._emitBuffered); + } + + _stopReading() { + this.reading = false; + clearImmediate(this.emitBufferedImmediate); + } + + _write(wrap, buffers, addr) { + this.emit('send', { buffers, addr }); + setImmediate(() => this._handle.onSendDone(wrap, 0)); + } + + afterBind() { + this._handle.onAfterBind(); + } + + emitReceived(info) { + if (!this.reading) { + this.bufferedReceived.push(info); + return; + } + + const { + buffers, + addr: { + family = 4, + address = '127.0.0.1', + port = 1337, + }, + flags = 0 + } = info; + + let familyInt; + switch (family) { + case 'IPv4': familyInt = 4; break; + case 'IPv6': familyInt = 6; break; + default: throw new Error('bad family'); + } + + for (const buffer of buffers) { + this._handle.emitReceived(buffer, familyInt, address, port, flags); + } + } +} + +function makeUDPPair() { + const serverSide = new FakeUDPWrap(); + const clientSide = new FakeUDPWrap(); + + serverSide.on('send', + (chk) => setImmediate(() => clientSide.emitReceived(chk))); + clientSide.on('send', + (chk) => setImmediate(() => serverSide.emitReceived(chk))); + + return { serverSide, clientSide }; +} + +module.exports = { + FakeUDPWrap, + makeUDPPair +}; diff --git a/test/parallel/test-dgram-send-callback-recursive.js b/test/parallel/test-dgram-send-callback-recursive.js index 835fa332df..1a4c7c84fc 100644 --- a/test/parallel/test-dgram-send-callback-recursive.js +++ b/test/parallel/test-dgram-send-callback-recursive.js @@ -22,7 +22,7 @@ function onsend() { client.on('listening', function() { port = this.address().port; - setImmediate(function() { + process.nextTick(() => { async = true; }); diff --git a/test/parallel/test-quic-ipv6only.js b/test/parallel/test-quic-ipv6only.js index c6b93ac704..977c37621d 100644 --- a/test/parallel/test-quic-ipv6only.js +++ b/test/parallel/test-quic-ipv6only.js @@ -27,9 +27,8 @@ const kALPN = 'zzz'; common.expectsError({ code: 'EINVAL', type: Error, - // TODO(@oyyd): Currently we can't know the exact "syscall" so that it's - // undefined here. - message: 'undefined EINVAL', + message: 'bind EINVAL 0.0.0.0', + syscall: 'bind' })(err); })); diff --git a/test/parallel/test-quic-quicsocket.js b/test/parallel/test-quic-quicsocket.js index 80130033aa..deffe35d9b 100644 --- a/test/parallel/test-quic-quicsocket.js +++ b/test/parallel/test-quic-quicsocket.js @@ -124,20 +124,10 @@ socket.on('ready', common.mustCall(() => { socket.setBroadcast(); socket.setBroadcast(true); socket.setBroadcast(false); - [1, 'test', {}, NaN, 1n, null].forEach((i) => { - assert.throws(() => socket.setBroadcast(i), { - code: 'ERR_INVALID_ARG_TYPE' - }); - }); socket.setMulticastLoopback(); socket.setMulticastLoopback(true); socket.setMulticastLoopback(false); - [1, 'test', {}, NaN, 1n, null].forEach((i) => { - assert.throws(() => socket.setMulticastLoopback(i), { - code: 'ERR_INVALID_ARG_TYPE' - }); - }); socket.setMulticastInterface('0.0.0.0'); diff --git a/test/parallel/test-quic-with-fake-udp.js b/test/parallel/test-quic-with-fake-udp.js new file mode 100644 index 0000000000..d1a2624f46 --- /dev/null +++ b/test/parallel/test-quic-with-fake-udp.js @@ -0,0 +1,72 @@ +// Flags: --expose-internals +'use strict'; +const common = require('../common'); +const { makeUDPPair } = require('../common/udppair'); +const assert = require('assert'); +const quic = require('quic'); +const { kUDPHandleForTesting } = require('internal/quic/core'); + +const fixtures = require('../common/fixtures'); +const key = fixtures.readKey('agent1-key.pem', 'binary'); +const cert = fixtures.readKey('agent1-cert.pem', 'binary'); +const ca = fixtures.readKey('ca1-cert.pem', 'binary'); + +const { serverSide, clientSide } = makeUDPPair(); + +const server = quic.createSocket({ + port: 0, validateAddress: true, [kUDPHandleForTesting]: serverSide._handle +}); + +serverSide.afterBind(); +server.listen({ + key, + cert, + ca, + rejectUnauthorized: false, + maxCryptoBuffer: 4096, + alpn: 'meow' +}); + +server.on('session', common.mustCall((session) => { + session.on('secure', common.mustCall((servername, alpn, cipher) => { + const stream = session.openStream({ halfOpen: false }); + stream.end('Hi!'); + stream.on('data', common.mustNotCall()); + stream.on('finish', common.mustCall()); + stream.on('close', common.mustNotCall()); + stream.on('end', common.mustNotCall()); + })); + + session.on('close', common.mustNotCall()); +})); + +server.on('ready', common.mustCall(() => { + const client = quic.createSocket({ + port: 0, + [kUDPHandleForTesting]: clientSide._handle, + client: { + key, + cert, + ca, + alpn: 'meow' + } + }); + clientSide.afterBind(); + + const req = client.connect({ + address: 'localhost', + port: server.address.port + }); + + req.on('stream', common.mustCall((stream) => { + stream.on('data', common.mustCall((data) => { + assert.strictEqual(data.toString(), 'Hi!'); + })); + + stream.on('end', common.mustCall()); + })); + + req.on('close', common.mustNotCall()); +})); + +server.on('close', common.mustNotCall()); diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js index 39fd8f6183..2abe7f61d7 100644 --- a/test/sequential/test-async-wrap-getasyncid.js +++ b/test/sequential/test-async-wrap-getasyncid.js @@ -1,5 +1,5 @@ 'use strict'; -// Flags: --expose-gc --expose-internals --no-warnings +// Flags: --expose-gc --expose-internals --no-warnings --test-udp-no-try-send const common = require('../common'); const { internalBinding } = require('internal/test/binding');