From 35c6b1dfade51780cab1ab5ad97925e415af7d50 Mon Sep 17 00:00:00 2001 From: Gabriel Schulhof Date: Fri, 10 Aug 2018 23:22:34 -0400 Subject: [PATCH] n-api: clean up thread-safe function * Move class `TsFn` to name space `v8impl` and rename it to `ThreadSafeFunction` * Remove `NAPI_EXTERN` from API declarations, because it's only needed in the header file. PR-URL: https://github.com/nodejs/node/pull/22259 Reviewed-By: Anna Henningsen Reviewed-By: Kyle Farnung Reviewed-By: Michael Dawson --- src/node_api.cc | 1179 ++++++++++++++++++++++++----------------------- 1 file changed, 592 insertions(+), 587 deletions(-) diff --git a/src/node_api.cc b/src/node_api.cc index 89a51162d0ff88..a4408b70f24c10 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -856,180 +856,513 @@ napi_status ConcludeDeferred(napi_env env, return GET_RETURN_STATUS(env); } -} // end of namespace v8impl - -// Intercepts the Node-V8 module registration callback. Converts parameters -// to NAPI equivalents and then calls the registration callback specified -// by the NAPI module. -void napi_module_register_cb(v8::Local exports, - v8::Local module, - v8::Local context, - void* priv) { - napi_module* mod = static_cast(priv); +class ThreadSafeFunction : public node::AsyncResource { + public: + ThreadSafeFunction(v8::Local func, + v8::Local resource, + v8::Local name, + size_t thread_count_, + void* context_, + size_t max_queue_size_, + napi_env env_, + void* finalize_data_, + napi_finalize finalize_cb_, + napi_threadsafe_function_call_js call_js_cb_): + AsyncResource(env_->isolate, + resource, + *v8::String::Utf8Value(env_->isolate, name)), + thread_count(thread_count_), + is_closing(false), + context(context_), + max_queue_size(max_queue_size_), + env(env_), + finalize_data(finalize_data_), + finalize_cb(finalize_cb_), + call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_), + handles_closing(false) { + ref.Reset(env->isolate, func); + node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this); + } - if (mod->nm_register_func == nullptr) { - node::Environment::GetCurrent(context)->ThrowError( - "Module has no declared entry point."); - return; + ~ThreadSafeFunction() { + node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this); + if (ref.IsEmpty()) + return; + ref.ClearWeak(); + ref.Reset(); } - // Create a new napi_env for this module or reference one if a pre-existing - // one is found. - napi_env env = v8impl::GetEnv(context); + // These methods can be called from any thread. - napi_value _exports; - NAPI_CALL_INTO_MODULE_THROW(env, - _exports = mod->nm_register_func(env, - v8impl::JsValueFromV8LocalValue(exports))); + napi_status Push(void* data, napi_threadsafe_function_call_mode mode) { + node::Mutex::ScopedLock lock(this->mutex); - // If register function returned a non-null exports object different from - // the exports object we passed it, set that as the "exports" property of - // the module. - if (_exports != nullptr && - _exports != v8impl::JsValueFromV8LocalValue(exports)) { - napi_value _module = v8impl::JsValueFromV8LocalValue(module); - napi_set_named_property(env, _module, "exports", _exports); + while (queue.size() >= max_queue_size && + max_queue_size > 0 && + !is_closing) { + if (mode == napi_tsfn_nonblocking) { + return napi_queue_full; + } + cond->Wait(lock); + } + + if (is_closing) { + if (thread_count == 0) { + return napi_invalid_arg; + } else { + thread_count--; + return napi_closing; + } + } else { + if (uv_async_send(&async) != 0) { + return napi_generic_failure; + } + queue.push(data); + return napi_ok; + } } -} -} // end of anonymous namespace + napi_status Acquire() { + node::Mutex::ScopedLock lock(this->mutex); -// Registers a NAPI module. -void napi_module_register(napi_module* mod) { - node::node_module* nm = new node::node_module { - -1, - mod->nm_flags, - nullptr, - mod->nm_filename, - nullptr, - napi_module_register_cb, - mod->nm_modname, - mod, // priv - nullptr, - }; - node::node_module_register(nm); -} + if (is_closing) { + return napi_closing; + } -napi_status napi_add_env_cleanup_hook(napi_env env, - void (*fun)(void* arg), - void* arg) { - CHECK_ENV(env); - CHECK_ARG(env, fun); + thread_count++; - node::AddEnvironmentCleanupHook(env->isolate, fun, arg); + return napi_ok; + } - return napi_ok; -} + napi_status Release(napi_threadsafe_function_release_mode mode) { + node::Mutex::ScopedLock lock(this->mutex); -napi_status napi_remove_env_cleanup_hook(napi_env env, - void (*fun)(void* arg), - void* arg) { - CHECK_ENV(env); - CHECK_ARG(env, fun); + if (thread_count == 0) { + return napi_invalid_arg; + } - node::RemoveEnvironmentCleanupHook(env->isolate, fun, arg); + thread_count--; - return napi_ok; -} + if (thread_count == 0 || mode == napi_tsfn_abort) { + if (!is_closing) { + is_closing = (mode == napi_tsfn_abort); + if (is_closing && max_queue_size > 0) { + cond->Signal(lock); + } + if (uv_async_send(&async) != 0) { + return napi_generic_failure; + } + } + } -// Warning: Keep in-sync with napi_status enum -static -const char* error_messages[] = {nullptr, - "Invalid argument", - "An object was expected", - "A string was expected", - "A string or symbol was expected", - "A function was expected", - "A number was expected", - "A boolean was expected", - "An array was expected", - "Unknown failure", - "An exception is pending", - "The async work item was cancelled", - "napi_escape_handle already called on scope", - "Invalid handle scope usage", - "Invalid callback scope usage", - "Thread-safe function queue is full", - "Thread-safe function handle is closing" -}; + return napi_ok; + } -static inline napi_status napi_clear_last_error(napi_env env) { - env->last_error.error_code = napi_ok; + void EmptyQueueAndDelete() { + for (; !queue.empty() ; queue.pop()) { + call_js_cb(nullptr, nullptr, context, queue.front()); + } + delete this; + } - // TODO(boingoing): Should this be a callback? - env->last_error.engine_error_code = 0; - env->last_error.engine_reserved = nullptr; - return napi_ok; -} + // These methods must only be called from the loop thread. -static inline -napi_status napi_set_last_error(napi_env env, napi_status error_code, - uint32_t engine_error_code, - void* engine_reserved) { - env->last_error.error_code = error_code; - env->last_error.engine_error_code = engine_error_code; - env->last_error.engine_reserved = engine_reserved; - return error_code; -} + napi_status Init() { + ThreadSafeFunction* ts_fn = this; -napi_status napi_get_last_error_info(napi_env env, - const napi_extended_error_info** result) { - CHECK_ENV(env); - CHECK_ARG(env, result); + if (uv_async_init(env->loop, &async, AsyncCb) == 0) { + if (max_queue_size > 0) { + cond.reset(new node::ConditionVariable); + } + if ((max_queue_size == 0 || cond.get() != nullptr) && + uv_idle_init(env->loop, &idle) == 0) { + return napi_ok; + } - // you must update this assert to reference the last message - // in the napi_status enum each time a new error message is added. - // We don't have a napi_status_last as this would result in an ABI - // change each time a message was added. - static_assert( - node::arraysize(error_messages) == napi_closing + 1, - "Count of error messages must match count of error values"); - CHECK_LE(env->last_error.error_code, napi_callback_scope_mismatch); + uv_close(reinterpret_cast(&async), + [] (uv_handle_t* handle) -> void { + ThreadSafeFunction* ts_fn = + node::ContainerOf(&ThreadSafeFunction::async, + reinterpret_cast(handle)); + delete ts_fn; + }); - // Wait until someone requests the last error information to fetch the error - // message string - env->last_error.error_message = - error_messages[env->last_error.error_code]; + // Prevent the thread-safe function from being deleted here, because + // the callback above will delete it. + ts_fn = nullptr; + } - *result = &(env->last_error); - return napi_ok; -} + delete ts_fn; -napi_status napi_fatal_exception(napi_env env, napi_value err) { - NAPI_PREAMBLE(env); - CHECK_ARG(env, err); + return napi_generic_failure; + } - v8::Local local_err = v8impl::V8LocalValueFromJsValue(err); - v8impl::trigger_fatal_exception(env, local_err); + napi_status Unref() { + uv_unref(reinterpret_cast(&async)); + uv_unref(reinterpret_cast(&idle)); - return napi_clear_last_error(env); -} + return napi_ok; + } -NAPI_NO_RETURN void napi_fatal_error(const char* location, - size_t location_len, - const char* message, - size_t message_len) { - std::string location_string; - std::string message_string; + napi_status Ref() { + uv_ref(reinterpret_cast(&async)); + uv_ref(reinterpret_cast(&idle)); - if (location_len != NAPI_AUTO_LENGTH) { - location_string.assign( - const_cast(location), location_len); - } else { - location_string.assign( - const_cast(location), strlen(location)); + return napi_ok; } - if (message_len != NAPI_AUTO_LENGTH) { - message_string.assign( - const_cast(message), message_len); - } else { - message_string.assign( - const_cast(message), strlen(message)); - } + void DispatchOne() { + void* data = nullptr; + bool popped_value = false; + bool idle_stop_failed = false; - node::FatalError(location_string.c_str(), message_string.c_str()); -} + { + node::Mutex::ScopedLock lock(this->mutex); + if (is_closing) { + CloseHandlesAndMaybeDelete(); + } else { + size_t size = queue.size(); + if (size > 0) { + data = queue.front(); + queue.pop(); + popped_value = true; + if (size == max_queue_size && max_queue_size > 0) { + cond->Signal(lock); + } + size--; + } + + if (size == 0) { + if (thread_count == 0) { + is_closing = true; + if (max_queue_size > 0) { + cond->Signal(lock); + } + CloseHandlesAndMaybeDelete(); + } else { + if (uv_idle_stop(&idle) != 0) { + idle_stop_failed = true; + } + } + } + } + } + + if (popped_value || idle_stop_failed) { + v8::HandleScope scope(env->isolate); + CallbackScope cb_scope(this); + + if (idle_stop_failed) { + CHECK(napi_throw_error(env, + "ERR_NAPI_TSFN_STOP_IDLE_LOOP", + "Failed to stop the idle loop") == napi_ok); + } else { + v8::Local js_cb = + v8::Local::New(env->isolate, ref); + call_js_cb(env, + v8impl::JsValueFromV8LocalValue(js_cb), + context, + data); + } + } + } + + node::Environment* NodeEnv() { + // For some reason grabbing the Node.js environment requires a handle scope. + v8::HandleScope scope(env->isolate); + return node::Environment::GetCurrent(env->isolate); + } + + void MaybeStartIdle() { + if (uv_idle_start(&idle, IdleCb) != 0) { + v8::HandleScope scope(env->isolate); + CallbackScope cb_scope(this); + CHECK(napi_throw_error(env, + "ERR_NAPI_TSFN_START_IDLE_LOOP", + "Failed to start the idle loop") == napi_ok); + } + } + + void Finalize() { + v8::HandleScope scope(env->isolate); + if (finalize_cb) { + CallbackScope cb_scope(this); + finalize_cb(env, finalize_data, context); + } + EmptyQueueAndDelete(); + } + + inline void* Context() { + return context; + } + + void CloseHandlesAndMaybeDelete(bool set_closing = false) { + if (set_closing) { + node::Mutex::ScopedLock lock(this->mutex); + is_closing = true; + if (max_queue_size > 0) { + cond->Signal(lock); + } + } + if (handles_closing) { + return; + } + handles_closing = true; + uv_close( + reinterpret_cast(&async), + [] (uv_handle_t* handle) -> void { + ThreadSafeFunction* ts_fn = + node::ContainerOf(&ThreadSafeFunction::async, + reinterpret_cast(handle)); + uv_close( + reinterpret_cast(&ts_fn->idle), + [] (uv_handle_t* handle) -> void { + ThreadSafeFunction* ts_fn = + node::ContainerOf(&ThreadSafeFunction::idle, + reinterpret_cast(handle)); + ts_fn->Finalize(); + }); + }); + } + + // Default way of calling into JavaScript. Used when ThreadSafeFunction is + // constructed without a call_js_cb_. + static void CallJs(napi_env env, napi_value cb, void* context, void* data) { + if (!(env == nullptr || cb == nullptr)) { + napi_value recv; + napi_status status; + + status = napi_get_undefined(env, &recv); + if (status != napi_ok) { + napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED", + "Failed to retrieve undefined value"); + return; + } + + status = napi_call_function(env, recv, cb, 0, nullptr, nullptr); + if (status != napi_ok && status != napi_pending_exception) { + napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS", + "Failed to call JS callback"); + return; + } + } + } + + static void IdleCb(uv_idle_t* idle) { + ThreadSafeFunction* ts_fn = + node::ContainerOf(&ThreadSafeFunction::idle, idle); + ts_fn->DispatchOne(); + } + + static void AsyncCb(uv_async_t* async) { + ThreadSafeFunction* ts_fn = + node::ContainerOf(&ThreadSafeFunction::async, async); + ts_fn->MaybeStartIdle(); + } + + static void Cleanup(void* data) { + reinterpret_cast(data) + ->CloseHandlesAndMaybeDelete(true); + } + + private: + // These are variables protected by the mutex. + node::Mutex mutex; + std::unique_ptr cond; + std::queue queue; + uv_async_t async; + uv_idle_t idle; + size_t thread_count; + bool is_closing; + + // These are variables set once, upon creation, and then never again, which + // means we don't need the mutex to read them. + void* context; + size_t max_queue_size; + + // These are variables accessed only from the loop thread. + v8::Persistent ref; + napi_env env; + void* finalize_data; + napi_finalize finalize_cb; + napi_threadsafe_function_call_js call_js_cb; + bool handles_closing; +}; + +} // end of namespace v8impl + +// Intercepts the Node-V8 module registration callback. Converts parameters +// to NAPI equivalents and then calls the registration callback specified +// by the NAPI module. +void napi_module_register_cb(v8::Local exports, + v8::Local module, + v8::Local context, + void* priv) { + napi_module* mod = static_cast(priv); + + if (mod->nm_register_func == nullptr) { + node::Environment::GetCurrent(context)->ThrowError( + "Module has no declared entry point."); + return; + } + + // Create a new napi_env for this module or reference one if a pre-existing + // one is found. + napi_env env = v8impl::GetEnv(context); + + napi_value _exports; + NAPI_CALL_INTO_MODULE_THROW(env, + _exports = mod->nm_register_func(env, + v8impl::JsValueFromV8LocalValue(exports))); + + // If register function returned a non-null exports object different from + // the exports object we passed it, set that as the "exports" property of + // the module. + if (_exports != nullptr && + _exports != v8impl::JsValueFromV8LocalValue(exports)) { + napi_value _module = v8impl::JsValueFromV8LocalValue(module); + napi_set_named_property(env, _module, "exports", _exports); + } +} + +} // end of anonymous namespace + +// Registers a NAPI module. +void napi_module_register(napi_module* mod) { + node::node_module* nm = new node::node_module { + -1, + mod->nm_flags, + nullptr, + mod->nm_filename, + nullptr, + napi_module_register_cb, + mod->nm_modname, + mod, // priv + nullptr, + }; + node::node_module_register(nm); +} + +napi_status napi_add_env_cleanup_hook(napi_env env, + void (*fun)(void* arg), + void* arg) { + CHECK_ENV(env); + CHECK_ARG(env, fun); + + node::AddEnvironmentCleanupHook(env->isolate, fun, arg); + + return napi_ok; +} + +napi_status napi_remove_env_cleanup_hook(napi_env env, + void (*fun)(void* arg), + void* arg) { + CHECK_ENV(env); + CHECK_ARG(env, fun); + + node::RemoveEnvironmentCleanupHook(env->isolate, fun, arg); + + return napi_ok; +} + +// Warning: Keep in-sync with napi_status enum +static +const char* error_messages[] = {nullptr, + "Invalid argument", + "An object was expected", + "A string was expected", + "A string or symbol was expected", + "A function was expected", + "A number was expected", + "A boolean was expected", + "An array was expected", + "Unknown failure", + "An exception is pending", + "The async work item was cancelled", + "napi_escape_handle already called on scope", + "Invalid handle scope usage", + "Invalid callback scope usage", + "Thread-safe function queue is full", + "Thread-safe function handle is closing" +}; + +static inline napi_status napi_clear_last_error(napi_env env) { + env->last_error.error_code = napi_ok; + + // TODO(boingoing): Should this be a callback? + env->last_error.engine_error_code = 0; + env->last_error.engine_reserved = nullptr; + return napi_ok; +} + +static inline +napi_status napi_set_last_error(napi_env env, napi_status error_code, + uint32_t engine_error_code, + void* engine_reserved) { + env->last_error.error_code = error_code; + env->last_error.engine_error_code = engine_error_code; + env->last_error.engine_reserved = engine_reserved; + return error_code; +} + +napi_status napi_get_last_error_info(napi_env env, + const napi_extended_error_info** result) { + CHECK_ENV(env); + CHECK_ARG(env, result); + + // you must update this assert to reference the last message + // in the napi_status enum each time a new error message is added. + // We don't have a napi_status_last as this would result in an ABI + // change each time a message was added. + static_assert( + node::arraysize(error_messages) == napi_closing + 1, + "Count of error messages must match count of error values"); + CHECK_LE(env->last_error.error_code, napi_callback_scope_mismatch); + + // Wait until someone requests the last error information to fetch the error + // message string + env->last_error.error_message = + error_messages[env->last_error.error_code]; + + *result = &(env->last_error); + return napi_ok; +} + +napi_status napi_fatal_exception(napi_env env, napi_value err) { + NAPI_PREAMBLE(env); + CHECK_ARG(env, err); + + v8::Local local_err = v8impl::V8LocalValueFromJsValue(err); + v8impl::trigger_fatal_exception(env, local_err); + + return napi_clear_last_error(env); +} + +NAPI_NO_RETURN void napi_fatal_error(const char* location, + size_t location_len, + const char* message, + size_t message_len) { + std::string location_string; + std::string message_string; + + if (location_len != NAPI_AUTO_LENGTH) { + location_string.assign( + const_cast(location), location_len); + } else { + location_string.assign( + const_cast(location), strlen(location)); + } + + if (message_len != NAPI_AUTO_LENGTH) { + message_string.assign( + const_cast(message), message_len); + } else { + message_string.assign( + const_cast(message), strlen(message)); + } + + node::FatalError(location_string.c_str(), message_string.c_str()); +} napi_status napi_create_function(napi_env env, const char* utf8name, @@ -3473,456 +3806,126 @@ napi_status napi_create_async_work(napi_env env, *result = reinterpret_cast(work); return napi_clear_last_error(env); -} - -napi_status napi_delete_async_work(napi_env env, napi_async_work work) { - CHECK_ENV(env); - CHECK_ARG(env, work); - - uvimpl::Work::Delete(reinterpret_cast(work)); - - return napi_clear_last_error(env); -} - -napi_status napi_get_uv_event_loop(napi_env env, uv_loop_t** loop) { - CHECK_ENV(env); - CHECK_ARG(env, loop); - *loop = env->loop; - return napi_clear_last_error(env); -} - -napi_status napi_queue_async_work(napi_env env, napi_async_work work) { - CHECK_ENV(env); - CHECK_ARG(env, work); - - napi_status status; - uv_loop_t* event_loop = nullptr; - status = napi_get_uv_event_loop(env, &event_loop); - if (status != napi_ok) - return napi_set_last_error(env, status); - - uvimpl::Work* w = reinterpret_cast(work); - - CALL_UV(env, uv_queue_work(event_loop, - w->Request(), - uvimpl::Work::ExecuteCallback, - uvimpl::Work::CompleteCallback)); - - return napi_clear_last_error(env); -} - -napi_status napi_cancel_async_work(napi_env env, napi_async_work work) { - CHECK_ENV(env); - CHECK_ARG(env, work); - - uvimpl::Work* w = reinterpret_cast(work); - - CALL_UV(env, uv_cancel(reinterpret_cast(w->Request()))); - - return napi_clear_last_error(env); -} - -napi_status napi_create_promise(napi_env env, - napi_deferred* deferred, - napi_value* promise) { - NAPI_PREAMBLE(env); - CHECK_ARG(env, deferred); - CHECK_ARG(env, promise); - - auto maybe = v8::Promise::Resolver::New(env->isolate->GetCurrentContext()); - CHECK_MAYBE_EMPTY(env, maybe, napi_generic_failure); - - auto v8_resolver = maybe.ToLocalChecked(); - auto v8_deferred = new v8::Persistent(); - v8_deferred->Reset(env->isolate, v8_resolver); - - *deferred = v8impl::JsDeferredFromV8Persistent(v8_deferred); - *promise = v8impl::JsValueFromV8LocalValue(v8_resolver->GetPromise()); - return GET_RETURN_STATUS(env); -} - -napi_status napi_resolve_deferred(napi_env env, - napi_deferred deferred, - napi_value resolution) { - return v8impl::ConcludeDeferred(env, deferred, resolution, true); -} - -napi_status napi_reject_deferred(napi_env env, - napi_deferred deferred, - napi_value resolution) { - return v8impl::ConcludeDeferred(env, deferred, resolution, false); -} - -napi_status napi_is_promise(napi_env env, - napi_value promise, - bool* is_promise) { - CHECK_ENV(env); - CHECK_ARG(env, promise); - CHECK_ARG(env, is_promise); - - *is_promise = v8impl::V8LocalValueFromJsValue(promise)->IsPromise(); - - return napi_clear_last_error(env); -} - -napi_status napi_run_script(napi_env env, - napi_value script, - napi_value* result) { - NAPI_PREAMBLE(env); - CHECK_ARG(env, script); - CHECK_ARG(env, result); - - v8::Local v8_script = v8impl::V8LocalValueFromJsValue(script); - - if (!v8_script->IsString()) { - return napi_set_last_error(env, napi_string_expected); - } - - v8::Local context = env->isolate->GetCurrentContext(); - - auto maybe_script = v8::Script::Compile(context, - v8::Local::Cast(v8_script)); - CHECK_MAYBE_EMPTY(env, maybe_script, napi_generic_failure); - - auto script_result = - maybe_script.ToLocalChecked()->Run(context); - CHECK_MAYBE_EMPTY(env, script_result, napi_generic_failure); - - *result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked()); - return GET_RETURN_STATUS(env); -} - -class TsFn: public node::AsyncResource { - public: - TsFn(v8::Local func, - v8::Local resource, - v8::Local name, - size_t thread_count_, - void* context_, - size_t max_queue_size_, - napi_env env_, - void* finalize_data_, - napi_finalize finalize_cb_, - napi_threadsafe_function_call_js call_js_cb_): - AsyncResource(env_->isolate, - resource, - *v8::String::Utf8Value(env_->isolate, name)), - thread_count(thread_count_), - is_closing(false), - context(context_), - max_queue_size(max_queue_size_), - env(env_), - finalize_data(finalize_data_), - finalize_cb(finalize_cb_), - call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_), - handles_closing(false) { - ref.Reset(env->isolate, func); - node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this); - } - - ~TsFn() { - node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this); - if (ref.IsEmpty()) - return; - ref.ClearWeak(); - ref.Reset(); - } - - // These methods can be called from any thread. - - napi_status Push(void* data, napi_threadsafe_function_call_mode mode) { - node::Mutex::ScopedLock lock(this->mutex); - - while (queue.size() >= max_queue_size && - max_queue_size > 0 && - !is_closing) { - if (mode == napi_tsfn_nonblocking) { - return napi_queue_full; - } - cond->Wait(lock); - } - - if (is_closing) { - if (thread_count == 0) { - return napi_invalid_arg; - } else { - thread_count--; - return napi_closing; - } - } else { - if (uv_async_send(&async) != 0) { - return napi_generic_failure; - } - queue.push(data); - return napi_ok; - } - } - - napi_status Acquire() { - node::Mutex::ScopedLock lock(this->mutex); - - if (is_closing) { - return napi_closing; - } - - thread_count++; - - return napi_ok; - } - - napi_status Release(napi_threadsafe_function_release_mode mode) { - node::Mutex::ScopedLock lock(this->mutex); - - if (thread_count == 0) { - return napi_invalid_arg; - } - - thread_count--; - - if (thread_count == 0 || mode == napi_tsfn_abort) { - if (!is_closing) { - is_closing = (mode == napi_tsfn_abort); - if (is_closing && max_queue_size > 0) { - cond->Signal(lock); - } - if (uv_async_send(&async) != 0) { - return napi_generic_failure; - } - } - } - - return napi_ok; - } - - void EmptyQueueAndDelete() { - for (; !queue.empty() ; queue.pop()) { - call_js_cb(nullptr, nullptr, context, queue.front()); - } - delete this; - } - - // These methods must only be called from the loop thread. - - napi_status Init() { - TsFn* ts_fn = this; - - if (uv_async_init(env->loop, &async, AsyncCb) == 0) { - if (max_queue_size > 0) { - cond.reset(new node::ConditionVariable); - } - if ((max_queue_size == 0 || cond.get() != nullptr) && - uv_idle_init(env->loop, &idle) == 0) { - return napi_ok; - } +} - uv_close(reinterpret_cast(&async), - [] (uv_handle_t* handle) -> void { - TsFn* ts_fn = - node::ContainerOf(&TsFn::async, - reinterpret_cast(handle)); - delete ts_fn; - }); +napi_status napi_delete_async_work(napi_env env, napi_async_work work) { + CHECK_ENV(env); + CHECK_ARG(env, work); - // Prevent the thread-safe function from being deleted here, because - // the callback above will delete it. - ts_fn = nullptr; - } + uvimpl::Work::Delete(reinterpret_cast(work)); - delete ts_fn; + return napi_clear_last_error(env); +} - return napi_generic_failure; - } +napi_status napi_get_uv_event_loop(napi_env env, uv_loop_t** loop) { + CHECK_ENV(env); + CHECK_ARG(env, loop); + *loop = env->loop; + return napi_clear_last_error(env); +} - napi_status Unref() { - uv_unref(reinterpret_cast(&async)); - uv_unref(reinterpret_cast(&idle)); +napi_status napi_queue_async_work(napi_env env, napi_async_work work) { + CHECK_ENV(env); + CHECK_ARG(env, work); - return napi_ok; - } + napi_status status; + uv_loop_t* event_loop = nullptr; + status = napi_get_uv_event_loop(env, &event_loop); + if (status != napi_ok) + return napi_set_last_error(env, status); - napi_status Ref() { - uv_ref(reinterpret_cast(&async)); - uv_ref(reinterpret_cast(&idle)); + uvimpl::Work* w = reinterpret_cast(work); - return napi_ok; - } + CALL_UV(env, uv_queue_work(event_loop, + w->Request(), + uvimpl::Work::ExecuteCallback, + uvimpl::Work::CompleteCallback)); - void DispatchOne() { - void* data = nullptr; - bool popped_value = false; - bool idle_stop_failed = false; + return napi_clear_last_error(env); +} - { - node::Mutex::ScopedLock lock(this->mutex); - if (is_closing) { - CloseHandlesAndMaybeDelete(); - } else { - size_t size = queue.size(); - if (size > 0) { - data = queue.front(); - queue.pop(); - popped_value = true; - if (size == max_queue_size && max_queue_size > 0) { - cond->Signal(lock); - } - size--; - } +napi_status napi_cancel_async_work(napi_env env, napi_async_work work) { + CHECK_ENV(env); + CHECK_ARG(env, work); - if (size == 0) { - if (thread_count == 0) { - is_closing = true; - if (max_queue_size > 0) { - cond->Signal(lock); - } - CloseHandlesAndMaybeDelete(); - } else { - if (uv_idle_stop(&idle) != 0) { - idle_stop_failed = true; - } - } - } - } - } + uvimpl::Work* w = reinterpret_cast(work); - if (popped_value || idle_stop_failed) { - v8::HandleScope scope(env->isolate); - CallbackScope cb_scope(this); + CALL_UV(env, uv_cancel(reinterpret_cast(w->Request()))); - if (idle_stop_failed) { - CHECK(napi_throw_error(env, - "ERR_NAPI_TSFN_STOP_IDLE_LOOP", - "Failed to stop the idle loop") == napi_ok); - } else { - v8::Local js_cb = - v8::Local::New(env->isolate, ref); - call_js_cb(env, - v8impl::JsValueFromV8LocalValue(js_cb), - context, - data); - } - } - } + return napi_clear_last_error(env); +} - node::Environment* NodeEnv() { - // For some reason grabbing the Node.js environment requires a handle scope. - v8::HandleScope scope(env->isolate); - return node::Environment::GetCurrent(env->isolate); - } +napi_status napi_create_promise(napi_env env, + napi_deferred* deferred, + napi_value* promise) { + NAPI_PREAMBLE(env); + CHECK_ARG(env, deferred); + CHECK_ARG(env, promise); - void MaybeStartIdle() { - if (uv_idle_start(&idle, IdleCb) != 0) { - v8::HandleScope scope(env->isolate); - CallbackScope cb_scope(this); - CHECK(napi_throw_error(env, - "ERR_NAPI_TSFN_START_IDLE_LOOP", - "Failed to start the idle loop") == napi_ok); - } - } + auto maybe = v8::Promise::Resolver::New(env->isolate->GetCurrentContext()); + CHECK_MAYBE_EMPTY(env, maybe, napi_generic_failure); - void Finalize() { - v8::HandleScope scope(env->isolate); - if (finalize_cb) { - CallbackScope cb_scope(this); - finalize_cb(env, finalize_data, context); - } - EmptyQueueAndDelete(); - } + auto v8_resolver = maybe.ToLocalChecked(); + auto v8_deferred = new v8::Persistent(); + v8_deferred->Reset(env->isolate, v8_resolver); - inline void* Context() { - return context; - } + *deferred = v8impl::JsDeferredFromV8Persistent(v8_deferred); + *promise = v8impl::JsValueFromV8LocalValue(v8_resolver->GetPromise()); + return GET_RETURN_STATUS(env); +} - void CloseHandlesAndMaybeDelete(bool set_closing = false) { - if (set_closing) { - node::Mutex::ScopedLock lock(this->mutex); - is_closing = true; - if (max_queue_size > 0) { - cond->Signal(lock); - } - } - if (handles_closing) { - return; - } - handles_closing = true; - uv_close( - reinterpret_cast(&async), - [] (uv_handle_t* handle) -> void { - TsFn* ts_fn = node::ContainerOf(&TsFn::async, - reinterpret_cast(handle)); - uv_close( - reinterpret_cast(&ts_fn->idle), - [] (uv_handle_t* handle) -> void { - TsFn* ts_fn = node::ContainerOf(&TsFn::idle, - reinterpret_cast(handle)); - ts_fn->Finalize(); - }); - }); - } +napi_status napi_resolve_deferred(napi_env env, + napi_deferred deferred, + napi_value resolution) { + return v8impl::ConcludeDeferred(env, deferred, resolution, true); +} - // Default way of calling into JavaScript. Used when TsFn is constructed - // without a call_js_cb_. - static void CallJs(napi_env env, napi_value cb, void* context, void* data) { - if (!(env == nullptr || cb == nullptr)) { - napi_value recv; - napi_status status; +napi_status napi_reject_deferred(napi_env env, + napi_deferred deferred, + napi_value resolution) { + return v8impl::ConcludeDeferred(env, deferred, resolution, false); +} - status = napi_get_undefined(env, &recv); - if (status != napi_ok) { - napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED", - "Failed to retrieve undefined value"); - return; - } +napi_status napi_is_promise(napi_env env, + napi_value promise, + bool* is_promise) { + CHECK_ENV(env); + CHECK_ARG(env, promise); + CHECK_ARG(env, is_promise); - status = napi_call_function(env, recv, cb, 0, nullptr, nullptr); - if (status != napi_ok && status != napi_pending_exception) { - napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS", - "Failed to call JS callback"); - return; - } - } - } + *is_promise = v8impl::V8LocalValueFromJsValue(promise)->IsPromise(); - static void IdleCb(uv_idle_t* idle) { - TsFn* ts_fn = - node::ContainerOf(&TsFn::idle, idle); - ts_fn->DispatchOne(); - } + return napi_clear_last_error(env); +} - static void AsyncCb(uv_async_t* async) { - TsFn* ts_fn = - node::ContainerOf(&TsFn::async, async); - ts_fn->MaybeStartIdle(); - } +napi_status napi_run_script(napi_env env, + napi_value script, + napi_value* result) { + NAPI_PREAMBLE(env); + CHECK_ARG(env, script); + CHECK_ARG(env, result); - static void Cleanup(void* data) { - reinterpret_cast(data)->CloseHandlesAndMaybeDelete(true); + v8::Local v8_script = v8impl::V8LocalValueFromJsValue(script); + + if (!v8_script->IsString()) { + return napi_set_last_error(env, napi_string_expected); } - private: - // These are variables protected by the mutex. - node::Mutex mutex; - std::unique_ptr cond; - std::queue queue; - uv_async_t async; - uv_idle_t idle; - size_t thread_count; - bool is_closing; + v8::Local context = env->isolate->GetCurrentContext(); - // These are variables set once, upon creation, and then never again, which - // means we don't need the mutex to read them. - void* context; - size_t max_queue_size; + auto maybe_script = v8::Script::Compile(context, + v8::Local::Cast(v8_script)); + CHECK_MAYBE_EMPTY(env, maybe_script, napi_generic_failure); - // These are variables accessed only from the loop thread. - v8::Persistent ref; - napi_env env; - void* finalize_data; - napi_finalize finalize_cb; - napi_threadsafe_function_call_js call_js_cb; - bool handles_closing; -}; + auto script_result = + maybe_script.ToLocalChecked()->Run(context); + CHECK_MAYBE_EMPTY(env, script_result, napi_generic_failure); + + *result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked()); + return GET_RETURN_STATUS(env); +} -NAPI_EXTERN napi_status +napi_status napi_create_threadsafe_function(napi_env env, napi_value func, napi_value async_resource, @@ -3957,16 +3960,17 @@ napi_create_threadsafe_function(napi_env env, v8::Local v8_name; CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name); - TsFn* ts_fn = new TsFn(v8_func, - v8_resource, - v8_name, - initial_thread_count, - context, - max_queue_size, - env, - thread_finalize_data, - thread_finalize_cb, - call_js_cb); + v8impl::ThreadSafeFunction* ts_fn = + new v8impl::ThreadSafeFunction(v8_func, + v8_resource, + v8_name, + initial_thread_count, + context, + max_queue_size, + env, + thread_finalize_data, + thread_finalize_cb, + call_js_cb); if (ts_fn == nullptr) { status = napi_generic_failure; @@ -3981,45 +3985,46 @@ napi_create_threadsafe_function(napi_env env, return napi_set_last_error(env, status); } -NAPI_EXTERN napi_status +napi_status napi_get_threadsafe_function_context(napi_threadsafe_function func, void** result) { CHECK(func != nullptr); CHECK(result != nullptr); - *result = reinterpret_cast(func)->Context(); + *result = reinterpret_cast(func)->Context(); return napi_ok; } -NAPI_EXTERN napi_status +napi_status napi_call_threadsafe_function(napi_threadsafe_function func, void* data, napi_threadsafe_function_call_mode is_blocking) { CHECK(func != nullptr); - return reinterpret_cast(func)->Push(data, is_blocking); + return reinterpret_cast(func)->Push(data, + is_blocking); } -NAPI_EXTERN napi_status +napi_status napi_acquire_threadsafe_function(napi_threadsafe_function func) { CHECK(func != nullptr); - return reinterpret_cast(func)->Acquire(); + return reinterpret_cast(func)->Acquire(); } -NAPI_EXTERN napi_status +napi_status napi_release_threadsafe_function(napi_threadsafe_function func, napi_threadsafe_function_release_mode mode) { CHECK(func != nullptr); - return reinterpret_cast(func)->Release(mode); + return reinterpret_cast(func)->Release(mode); } -NAPI_EXTERN napi_status +napi_status napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) { CHECK(func != nullptr); - return reinterpret_cast(func)->Unref(); + return reinterpret_cast(func)->Unref(); } -NAPI_EXTERN napi_status +napi_status napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) { CHECK(func != nullptr); - return reinterpret_cast(func)->Ref(); + return reinterpret_cast(func)->Ref(); }