Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src,stream: remove *Check*() calls from non-Initialize() functions #40425

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions src/stream_base-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,11 @@ int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {

const char* msg = Error();
if (msg != nullptr) {
req_wrap_obj->Set(
env->context(),
env->error_string(), OneByteString(env->isolate(), msg)).Check();
if (req_wrap_obj->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg)).IsNothing()) {
return UV_EBUSY;
}
ClearError();
}

Expand Down Expand Up @@ -203,9 +205,11 @@ StreamWriteResult StreamBase::Write(

const char* msg = Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg)).Check();
if (req_wrap_obj->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg)).IsNothing()) {
return StreamWriteResult { false, UV_EBUSY, nullptr, 0, {} };
}
ClearError();
}

Expand Down Expand Up @@ -279,10 +283,12 @@ void StreamReq::Done(int status, const char* error_str) {
Environment* env = async_wrap->env();
if (error_str != nullptr) {
v8::HandleScope handle_scope(env->isolate());
async_wrap->object()->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), error_str))
.Check();
if (async_wrap->object()->Set(
env->context(),
env->error_string(),
OneByteString(env->isolate(), error_str)).IsNothing()) {
return;
}
}

OnDone(status);
Expand Down
73 changes: 46 additions & 27 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,31 +106,40 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
if (!all_buffers) {
// Determine storage size first
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(context, i * 2).ToLocalChecked();
Local<Value> chunk;
if (!chunks->Get(context, i * 2).ToLocal(&chunk))
return -1;

if (Buffer::HasInstance(chunk))
continue;
// Buffer chunk, no additional storage required

// String chunk
Local<String> string = chunk->ToString(context).ToLocalChecked();
enum encoding encoding = ParseEncoding(isolate,
chunks->Get(context, i * 2 + 1).ToLocalChecked());
Local<String> string;
if (!chunk->ToString(context).ToLocal(&string))
return -1;
Local<Value> next_chunk;
if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
return -1;
enum encoding encoding = ParseEncoding(isolate, next_chunk);
size_t chunk_size;
if (encoding == UTF8 && string->Length() > 65535 &&
!StringBytes::Size(isolate, string, encoding).To(&chunk_size))
return 0;
else if (!StringBytes::StorageSize(isolate, string, encoding)
.To(&chunk_size))
return 0;
if ((encoding == UTF8 &&
string->Length() > 65535 &&
!StringBytes::Size(isolate, string, encoding).To(&chunk_size)) ||
!StringBytes::StorageSize(isolate, string, encoding)
.To(&chunk_size)) {
return -1;
}
storage_size += chunk_size;
}

if (storage_size > INT_MAX)
return UV_ENOBUFS;
} else {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(context, i).ToLocalChecked();
Local<Value> chunk;
if (!chunks->Get(context, i).ToLocal(&chunk))
return -1;
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
}
Expand All @@ -145,7 +154,9 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
offset = 0;
if (!all_buffers) {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(context, i * 2).ToLocalChecked();
Local<Value> chunk;
if (!chunks->Get(context, i * 2).ToLocal(&chunk))
return -1;

// Write buffer
if (Buffer::HasInstance(chunk)) {
Expand All @@ -160,9 +171,13 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
static_cast<char*>(bs ? bs->Data() : nullptr) + offset;
size_t str_size = (bs ? bs->ByteLength() : 0) - offset;

Local<String> string = chunk->ToString(context).ToLocalChecked();
enum encoding encoding = ParseEncoding(isolate,
chunks->Get(context, i * 2 + 1).ToLocalChecked());
Local<String> string;
if (!chunk->ToString(context).ToLocal(&string))
return -1;
Local<Value> next_chunk;
if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
return -1;
enum encoding encoding = ParseEncoding(isolate, next_chunk);
str_size = StringBytes::Write(isolate,
str_storage,
str_size,
Expand Down Expand Up @@ -207,9 +222,11 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
// Reference LibuvStreamWrap instance to prevent it from being garbage
// collected before `AfterWrite` is called.
req_wrap_obj->Set(env->context(),
env->handle_string(),
send_handle_obj).Check();
if (req_wrap_obj->Set(env->context(),
env->handle_string(),
send_handle_obj).IsNothing()) {
return -1;
}
}

StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
Expand All @@ -236,12 +253,12 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
// For UTF8 strings that are very long, go ahead and take the hit for
// computing their actual size, rather than tripling the storage.
size_t storage_size;
if (enc == UTF8 && string->Length() > 65535 &&
!StringBytes::Size(isolate, string, enc).To(&storage_size))
return 0;
else if (!StringBytes::StorageSize(isolate, string, enc)
.To(&storage_size))
return 0;
if ((enc == UTF8 &&
string->Length() > 65535 &&
!StringBytes::Size(isolate, string, enc).To(&storage_size)) ||
!StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) {
return -1;
}

if (storage_size > INT_MAX)
return UV_ENOBUFS;
Expand Down Expand Up @@ -312,9 +329,11 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
// Reference LibuvStreamWrap instance to prevent it from being garbage
// collected before `AfterWrite` is called.
req_wrap_obj->Set(env->context(),
env->handle_string(),
send_handle_obj).Check();
if (req_wrap_obj->Set(env->context(),
env->handle_string(),
send_handle_obj).IsNothing()) {
return -1;
}
}

StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
Expand Down
6 changes: 6 additions & 0 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class StreamReq {
virtual AsyncWrap* GetAsyncWrap() = 0;
inline v8::Local<v8::Object> object();

// TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
// if there is a pending exception/termination.
inline void Done(int status, const char* error_str = nullptr);
inline void Dispose();

Expand Down Expand Up @@ -326,13 +328,17 @@ class StreamBase : public StreamResource {
// subclasses are also `BaseObject`s.
Environment* stream_env() const { return env_; }

// TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
// if there is a pending exception/termination.
// Shut down the current stream. This request can use an existing
// ShutdownWrap object (that was created in JS), or a new one will be created.
// Returns 1 in case of a synchronous completion, 0 in case of asynchronous
// completion, and a libuv error case in case of synchronous failure.
inline int Shutdown(
v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());

// TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
// if there is a pending exception/termination.
// Write data to the current stream. This request can use an existing
// WriteWrap object (that was created in JS), or a new one will be created.
// This will first try to write synchronously using `DoTryWrite()`, then
Expand Down
31 changes: 22 additions & 9 deletions src/stream_pipe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,26 @@ StreamPipe::StreamPipe(StreamBase* source,
// In particular, this makes sure that they are garbage collected as a group,
// if that applies to the given streams (for example, Http2Streams use
// weak references).
obj->Set(env()->context(), env()->source_string(), source->GetObject())
.Check();
source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
.Check();
obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
.Check();
sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
.Check();
if (obj->Set(env()->context(),
env()->source_string(),
source->GetObject()).IsNothing()) {
return;
}
if (source->GetObject()->Set(env()->context(),
env()->pipe_target_string(),
obj).IsNothing()) {
return;
}
if (obj->Set(env()->context(),
env()->sink_string(),
sink->GetObject()).IsNothing()) {
return;
}
if (sink->GetObject()->Set(env()->context(),
env()->pipe_source_string(),
obj).IsNothing()) {
return;
}
addaleax marked this conversation as resolved.
Show resolved Hide resolved
}

StreamPipe::~StreamPipe() {
Expand Down Expand Up @@ -172,7 +184,8 @@ void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
Environment* env = pipe->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked();
if (pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).IsEmpty())
return;
stream()->RemoveStreamListener(this);
}
return;
Expand Down
7 changes: 6 additions & 1 deletion src/stream_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ namespace node {

class StreamPipe : public AsyncWrap {
public:
StreamPipe(StreamBase* source, StreamBase* sink, v8::Local<v8::Object> obj);
~StreamPipe() override;

void Unpipe(bool is_in_deletion = false);

// TODO(RaisinTen): Just like MessagePort, add the following overload:
// static StreamPipe* New(StreamBase* source, StreamBase* sink,
// v8::Local<v8::Object> obj);
// so that we can indicate if there is a pending exception/termination.
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Unpipe(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand All @@ -25,6 +28,8 @@ class StreamPipe : public AsyncWrap {
SET_SELF_SIZE(StreamPipe)

private:
StreamPipe(StreamBase* source, StreamBase* sink, v8::Local<v8::Object> obj);

inline StreamBase* source();
inline StreamBase* sink();

Expand Down
12 changes: 6 additions & 6 deletions src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,12 @@ void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
CHECK_EQ(type, UV_UNKNOWN_HANDLE);
}

if (!pending_obj.IsEmpty()) {
object()
->Set(env()->context(),
env()->pending_handle_string(),
pending_obj.ToLocalChecked())
.Check();
Local<Object> local_pending_obj;
if (pending_obj.ToLocal(&local_pending_obj) &&
object()->Set(env()->context(),
env()->pending_handle_string(),
local_pending_obj).IsNothing()) {
return;
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/stream_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {

// Callbacks for libuv
void OnUvAlloc(size_t suggested_size, uv_buf_t* buf);
// TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
// if there is a pending exception/termination.
void OnUvRead(ssize_t nread, const uv_buf_t* buf);

static void AfterUvWrite(uv_write_t* req, int status);
Expand Down