Skip to content

Commit

Permalink
test: fix intermittent TSFN crashes
Browse files Browse the repository at this point in the history
PR-URL: nodejs#974
Reviewed-By: Michael Dawson <midawson@redhat.com>
  • Loading branch information
KevinEady authored and Deepak Rajamohan committed Oct 15, 2021
1 parent cc8534f commit 2529469
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 21 deletions.
2 changes: 1 addition & 1 deletion test/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Object Init(Env env, Object exports) {
exports.Set("threadsafe_function_ptr", InitThreadSafeFunctionPtr(env));
exports.Set("threadsafe_function_sum", InitThreadSafeFunctionSum(env));
exports.Set("threadsafe_function_unref", InitThreadSafeFunctionUnref(env));
exports.Set("threadsafe_function", InitTypedThreadSafeFunction(env));
exports.Set("threadsafe_function", InitThreadSafeFunction(env));
exports.Set("typed_threadsafe_function_ctx",
InitTypedThreadSafeFunctionCtx(env));
exports.Set("typed_threadsafe_function_existing_tsfn",
Expand Down
24 changes: 18 additions & 6 deletions test/threadsafe_function/threadsafe_function.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
#include "napi.h"

Expand All @@ -22,6 +24,9 @@ struct ThreadSafeFunctionInfo {
bool startSecondary;
FunctionReference jsFinalizeCallback;
uint32_t maxQueueSize;
bool closeCalledFromJs;
std::mutex protect;
std::condition_variable signal;
} tsfnInfo;

// Thread data to transmit to JS
Expand Down Expand Up @@ -65,12 +70,13 @@ static void DataSourceThread() {
break;
}

if (info->maxQueueSize == 0) {
// Let's make this thread really busy for 200 ms to give the main thread a
// chance to abort.
auto start = std::chrono::high_resolution_clock::now();
constexpr auto MS_200 = std::chrono::milliseconds(200);
for (; std::chrono::high_resolution_clock::now() - start < MS_200;);
if (info->abort && info->type != ThreadSafeFunctionInfo::NON_BLOCKING) {
// Let's make this thread really busy to give the main thread a chance to
// abort / close.
std::unique_lock<std::mutex> lk(info->protect);
while (!info->closeCalledFromJs) {
info->signal.wait(lk);
}
}

switch (status) {
Expand Down Expand Up @@ -112,6 +118,11 @@ static Value StopThread(const CallbackInfo& info) {
} else {
tsfn.Release();
}
{
std::lock_guard<std::mutex> _(tsfnInfo.protect);
tsfnInfo.closeCalledFromJs = true;
tsfnInfo.signal.notify_one();
}
return Value();
}

Expand All @@ -134,6 +145,7 @@ static Value StartThreadInternal(const CallbackInfo& info,
tsfnInfo.abort = info[1].As<Boolean>();
tsfnInfo.startSecondary = info[2].As<Boolean>();
tsfnInfo.maxQueueSize = info[3].As<Number>().Uint32Value();
tsfnInfo.closeCalledFromJs = false;

tsfn = ThreadSafeFunction::New(info.Env(), info[0].As<Function>(),
"Test", tsfnInfo.maxQueueSize, 2, &tsfnInfo, JoinTheThreads, threads);
Expand Down
42 changes: 28 additions & 14 deletions test/typed_threadsafe_function/typed_threadsafe_function.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
#include "napi.h"

Expand All @@ -17,6 +19,9 @@ static struct ThreadSafeFunctionInfo {
bool startSecondary;
FunctionReference jsFinalizeCallback;
uint32_t maxQueueSize;
bool closeCalledFromJs;
std::mutex protect;
std::condition_variable signal;
} tsfnInfo;

static void TSFNCallJS(Env env,
Expand All @@ -42,7 +47,7 @@ static int ints[ARRAY_LENGTH];

static void SecondaryThread() {
if (tsfn.Release() != napi_ok) {
Error::Fatal("SecondaryThread", "ThreadSafeFunction.Release() failed");
Error::Fatal("TypedSecondaryThread", "ThreadSafeFunction.Release() failed");
}
}

Expand All @@ -52,7 +57,8 @@ static void DataSourceThread() {

if (info->startSecondary) {
if (tsfn.Acquire() != napi_ok) {
Error::Fatal("DataSourceThread", "ThreadSafeFunction.Acquire() failed");
Error::Fatal("TypedDataSourceThread",
"ThreadSafeFunction.Acquire() failed");
}

threads[1] = std::thread(SecondaryThread);
Expand All @@ -75,13 +81,13 @@ static void DataSourceThread() {
break;
}

if (info->maxQueueSize == 0) {
// Let's make this thread really busy for 200 ms to give the main thread a
// chance to abort.
auto start = std::chrono::high_resolution_clock::now();
constexpr auto MS_200 = std::chrono::milliseconds(200);
for (; std::chrono::high_resolution_clock::now() - start < MS_200;)
;
if (info->abort && info->type != ThreadSafeFunctionInfo::NON_BLOCKING) {
// Let's make this thread really busy to give the main thread a chance to
// abort / close.
std::unique_lock<std::mutex> lk(info->protect);
while (!info->closeCalledFromJs) {
info->signal.wait(lk);
}
}

switch (status) {
Expand All @@ -98,20 +104,22 @@ static void DataSourceThread() {
break;

default:
Error::Fatal("DataSourceThread", "ThreadSafeFunction.*Call() failed");
Error::Fatal("TypedDataSourceThread",
"ThreadSafeFunction.*Call() failed");
}
}

if (info->type == ThreadSafeFunctionInfo::NON_BLOCKING && !queueWasFull) {
Error::Fatal("DataSourceThread", "Queue was never full");
Error::Fatal("TypedDataSourceThread", "Queue was never full");
}

if (info->abort && !queueWasClosing) {
Error::Fatal("DataSourceThread", "Queue was never closing");
Error::Fatal("TypedDataSourceThread", "Queue was never closing");
}

if (!queueWasClosing && tsfn.Release() != napi_ok) {
Error::Fatal("DataSourceThread", "ThreadSafeFunction.Release() failed");
Error::Fatal("TypedDataSourceThread",
"ThreadSafeFunction.Release() failed");
}
}

Expand All @@ -123,6 +131,11 @@ static Value StopThread(const CallbackInfo& info) {
} else {
tsfn.Release();
}
{
std::lock_guard<std::mutex> _(tsfnInfo.protect);
tsfnInfo.closeCalledFromJs = true;
tsfnInfo.signal.notify_one();
}
return Value();
}

Expand All @@ -145,6 +158,7 @@ static Value StartThreadInternal(const CallbackInfo& info,
tsfnInfo.abort = info[1].As<Boolean>();
tsfnInfo.startSecondary = info[2].As<Boolean>();
tsfnInfo.maxQueueSize = info[3].As<Number>().Uint32Value();
tsfnInfo.closeCalledFromJs = false;

tsfn = TSFN::New(info.Env(),
info[0].As<Function>(),
Expand All @@ -163,7 +177,7 @@ static Value StartThreadInternal(const CallbackInfo& info,

static Value Release(const CallbackInfo& /* info */) {
if (tsfn.Release() != napi_ok) {
Error::Fatal("Release", "ThreadSafeFunction.Release() failed");
Error::Fatal("Release", "TypedThreadSafeFunction.Release() failed");
}
return Value();
}
Expand Down

0 comments on commit 2529469

Please sign in to comment.