Skip to content

Commit

Permalink
tsfn: Implement copy constructor
Browse files Browse the repository at this point in the history
* tsfn: Implement copy constructor

Refs: #524

PR-URL:  #546
Reviewed-By: Michael Dawson <michael_dawson@ca.ibm.com>
Reviewed-By: Chengzhong Wu <legendecas@gmail.com>
  • Loading branch information
KevinEady authored and mhdawson committed Nov 1, 2019
1 parent 650562c commit 2e71842
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 34 deletions.
39 changes: 13 additions & 26 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4025,29 +4025,16 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
}

inline ThreadSafeFunction::ThreadSafeFunction()
: _tsfn(new napi_threadsafe_function(nullptr), _d) {
: _tsfn() {
}

inline ThreadSafeFunction::ThreadSafeFunction(
napi_threadsafe_function tsfn)
: _tsfn(new napi_threadsafe_function(tsfn), _d) {
: _tsfn(tsfn) {
}

inline ThreadSafeFunction::ThreadSafeFunction(ThreadSafeFunction&& other)
: _tsfn(std::move(other._tsfn)) {
other._tsfn.reset();
}

inline ThreadSafeFunction& ThreadSafeFunction::operator =(
ThreadSafeFunction&& other) {
if (*_tsfn != nullptr) {
Error::Fatal("ThreadSafeFunction::operator =",
"You cannot assign a new TSFN because existing one is still alive.");
return *this;
}
_tsfn = std::move(other._tsfn);
other._tsfn.reset();
return *this;
inline ThreadSafeFunction::operator napi_threadsafe_function() const {
return _tsfn;
}

inline napi_status ThreadSafeFunction::BlockingCall() const {
Expand Down Expand Up @@ -4090,34 +4077,34 @@ inline napi_status ThreadSafeFunction::NonBlockingCall(

inline void ThreadSafeFunction::Ref(napi_env env) const {
if (_tsfn != nullptr) {
napi_status status = napi_ref_threadsafe_function(env, *_tsfn);
napi_status status = napi_ref_threadsafe_function(env, _tsfn);
NAPI_THROW_IF_FAILED_VOID(env, status);
}
}

inline void ThreadSafeFunction::Unref(napi_env env) const {
if (_tsfn != nullptr) {
napi_status status = napi_unref_threadsafe_function(env, *_tsfn);
napi_status status = napi_unref_threadsafe_function(env, _tsfn);
NAPI_THROW_IF_FAILED_VOID(env, status);
}
}

inline napi_status ThreadSafeFunction::Acquire() const {
return napi_acquire_threadsafe_function(*_tsfn);
return napi_acquire_threadsafe_function(_tsfn);
}

inline napi_status ThreadSafeFunction::Release() {
return napi_release_threadsafe_function(*_tsfn, napi_tsfn_release);
return napi_release_threadsafe_function(_tsfn, napi_tsfn_release);
}

inline napi_status ThreadSafeFunction::Abort() {
return napi_release_threadsafe_function(*_tsfn, napi_tsfn_abort);
return napi_release_threadsafe_function(_tsfn, napi_tsfn_abort);
}

inline ThreadSafeFunction::ConvertibleContext
ThreadSafeFunction::GetContext() const {
void* context;
napi_get_threadsafe_function_context(*_tsfn, &context);
napi_get_threadsafe_function_context(_tsfn, &context);
return ConvertibleContext({ context });
}

Expand All @@ -4140,10 +4127,10 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,

ThreadSafeFunction tsfn;
auto* finalizeData = new details::ThreadSafeFinalize<ContextType, Finalizer,
FinalizerDataType>({ data, finalizeCallback, tsfn._tsfn.get() });
FinalizerDataType>({ data, finalizeCallback, &tsfn._tsfn });
napi_status status = napi_create_threadsafe_function(env, callback, resource,
Value::From(env, resourceName), maxQueueSize, initialThreadCount,
finalizeData, wrapper, context, CallJS, tsfn._tsfn.get());
finalizeData, wrapper, context, CallJS, &tsfn._tsfn);
if (status != napi_ok) {
delete finalizeData;
NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction());
Expand All @@ -4156,7 +4143,7 @@ inline napi_status ThreadSafeFunction::CallInternal(
CallbackWrapper* callbackWrapper,
napi_threadsafe_function_call_mode mode) const {
napi_status status = napi_call_threadsafe_function(
*_tsfn, callbackWrapper, mode);
_tsfn, callbackWrapper, mode);
if (status != napi_ok && callbackWrapper != nullptr) {
delete callbackWrapper;
}
Expand Down
10 changes: 2 additions & 8 deletions napi.h
Original file line number Diff line number Diff line change
Expand Up @@ -2009,8 +2009,7 @@ namespace Napi {
ThreadSafeFunction();
ThreadSafeFunction(napi_threadsafe_function tsFunctionValue);

ThreadSafeFunction(ThreadSafeFunction&& other);
ThreadSafeFunction& operator=(ThreadSafeFunction&& other);
operator napi_threadsafe_function() const;

// This API may be called from any thread.
napi_status BlockingCall() const;
Expand Down Expand Up @@ -2082,13 +2081,8 @@ namespace Napi {
napi_value jsCallback,
void* context,
void* data);
struct Deleter {
// napi_threadsafe_function is managed by Node.js, leave it alone.
void operator()(napi_threadsafe_function*) const {};
};

std::unique_ptr<napi_threadsafe_function, Deleter> _tsfn;
Deleter _d;
napi_threadsafe_function _tsfn;
};

template<class T>
Expand Down
2 changes: 2 additions & 0 deletions test/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Object InitObjectDeprecated(Env env);
Object InitPromise(Env env);
#if (NAPI_VERSION > 3)
Object InitThreadSafeFunctionPtr(Env env);
Object InitThreadSafeFunctionSum(Env env);
Object InitThreadSafeFunctionUnref(Env env);
Object InitThreadSafeFunction(Env env);
#endif
Expand Down Expand Up @@ -90,6 +91,7 @@ Object Init(Env env, Object exports) {
exports.Set("promise", InitPromise(env));
#if (NAPI_VERSION > 3)
exports.Set("threadsafe_function_ptr", InitThreadSafeFunctionPtr(env));
exports.Set("threadsafe_function_sum", InitThreadSafeFunctionSum(env));
exports.Set("threadsafe_function_unref", InitThreadSafeFunctionUnref(env));
exports.Set("threadsafe_function", InitThreadSafeFunction(env));
#endif
Expand Down
1 change: 1 addition & 0 deletions test/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
'object/set_property.cc',
'promise.cc',
'threadsafe_function/threadsafe_function_ptr.cc',
'threadsafe_function/threadsafe_function_sum.cc',
'threadsafe_function/threadsafe_function_unref.cc',
'threadsafe_function/threadsafe_function.cc',
'typedarray.cc',
Expand Down
2 changes: 2 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ let testModules = [
'object/set_property',
'promise',
'threadsafe_function/threadsafe_function_ptr',
'threadsafe_function/threadsafe_function_sum',
'threadsafe_function/threadsafe_function_unref',
'threadsafe_function/threadsafe_function',
'typedarray',
Expand Down Expand Up @@ -71,6 +72,7 @@ if ((process.env.npm_config_NAPI_VERSION !== undefined) &&
(process.env.npm_config_NAPI_VERSION < 4)) {
testModules.splice(testModules.indexOf('asyncprogressworker'), 1);
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_ptr'), 1);
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_sum'), 1);
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_unref'), 1);
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function'), 1);
}
Expand Down
199 changes: 199 additions & 0 deletions test/threadsafe_function/threadsafe_function_sum.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#include "napi.h"
#include <thread>
#include <cstdlib>
#include <condition_variable>
#include <mutex>

#if (NAPI_VERSION > 3)

using namespace Napi;

namespace {

struct TestData {

TestData(Promise::Deferred&& deferred) : deferred(std::move(deferred)) {};

// Native Promise returned to JavaScript
Promise::Deferred deferred;

// List of threads created for test. This list only ever accessed via main
// thread.
std::vector<std::thread> threads = {};

ThreadSafeFunction tsfn = ThreadSafeFunction();
};

void FinalizerCallback(Napi::Env env, TestData* finalizeData){
for (size_t i = 0; i < finalizeData->threads.size(); ++i) {
finalizeData->threads[i].join();
}
finalizeData->deferred.Resolve(Boolean::New(env,true));
delete finalizeData;
}

/**
* See threadsafe_function_sum.js for descriptions of the tests in this file
*/

void entryWithTSFN(ThreadSafeFunction tsfn, int threadId) {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
tsfn.BlockingCall( [=](Napi::Env env, Function callback) {
callback.Call( { Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}

static Value TestWithTSFN(const CallbackInfo& info) {
int threadCount = info[0].As<Number>().Int32Value();
Function cb = info[1].As<Function>();

// We pass the test data to the Finalizer for cleanup. The finalizer is
// responsible for deleting this data as well.
TestData *testData = new TestData(Promise::Deferred::New(info.Env()));

ThreadSafeFunction tsfn = ThreadSafeFunction::New(
info.Env(), cb, "Test", 0, threadCount,
std::function<decltype(FinalizerCallback)>(FinalizerCallback), testData);

for (int i = 0; i < threadCount; ++i) {
// A copy of the ThreadSafeFunction will go to the thread entry point
testData->threads.push_back( std::thread(entryWithTSFN, tsfn, i) );
}

return testData->deferred.Promise();
}

// Task instance created for each new std::thread
class DelayedTSFNTask {
public:
// Each instance has its own tsfn
ThreadSafeFunction tsfn;

// Thread-safety
std::mutex mtx;
std::condition_variable cv;

// Entry point for std::thread
void entryDelayedTSFN(int threadId) {
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk);
tsfn.BlockingCall([=](Napi::Env env, Function callback) {
callback.Call({Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
};
};

struct TestDataDelayed {

TestDataDelayed(Promise::Deferred &&deferred)
: deferred(std::move(deferred)){};
~TestDataDelayed() { taskInsts.clear(); };
// Native Promise returned to JavaScript
Promise::Deferred deferred;

// List of threads created for test. This list only ever accessed via main
// thread.
std::vector<std::thread> threads = {};

// List of DelayedTSFNThread instances
std::vector<std::unique_ptr<DelayedTSFNTask>> taskInsts = {};

ThreadSafeFunction tsfn = ThreadSafeFunction();
};

void FinalizerCallbackDelayed(Napi::Env env, TestDataDelayed *finalizeData) {
for (size_t i = 0; i < finalizeData->threads.size(); ++i) {
finalizeData->threads[i].join();
}
finalizeData->deferred.Resolve(Boolean::New(env, true));
delete finalizeData;
}

static Value TestDelayedTSFN(const CallbackInfo &info) {
int threadCount = info[0].As<Number>().Int32Value();
Function cb = info[1].As<Function>();

TestDataDelayed *testData =
new TestDataDelayed(Promise::Deferred::New(info.Env()));

testData->tsfn =
ThreadSafeFunction::New(info.Env(), cb, "Test", 0, threadCount,
std::function<decltype(FinalizerCallbackDelayed)>(
FinalizerCallbackDelayed),
testData);

for (int i = 0; i < threadCount; ++i) {
testData->taskInsts.push_back(
std::unique_ptr<DelayedTSFNTask>(new DelayedTSFNTask()));
testData->threads.push_back(std::thread(&DelayedTSFNTask::entryDelayedTSFN,
testData->taskInsts.back().get(),
i));
}
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));

for (auto &task : testData->taskInsts) {
std::lock_guard<std::mutex> lk(task->mtx);
task->tsfn = testData->tsfn;
task->cv.notify_all();
}

return testData->deferred.Promise();
}

void entryAcquire(ThreadSafeFunction tsfn, int threadId) {
tsfn.Acquire();
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
tsfn.BlockingCall( [=](Napi::Env env, Function callback) {
callback.Call( { Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}

static Value CreateThread(const CallbackInfo& info) {
TestData* testData = static_cast<TestData*>(info.Data());
ThreadSafeFunction tsfn = testData->tsfn;
int threadId = testData->threads.size();
// A copy of the ThreadSafeFunction will go to the thread entry point
testData->threads.push_back( std::thread(entryAcquire, tsfn, threadId) );
return Number::New(info.Env(), threadId);
}

static Value StopThreads(const CallbackInfo& info) {
TestData* testData = static_cast<TestData*>(info.Data());
ThreadSafeFunction tsfn = testData->tsfn;
tsfn.Release();
return info.Env().Undefined();
}

static Value TestAcquire(const CallbackInfo& info) {
Function cb = info[0].As<Function>();
Napi::Env env = info.Env();

// We pass the test data to the Finalizer for cleanup. The finalizer is
// responsible for deleting this data as well.
TestData *testData = new TestData(Promise::Deferred::New(info.Env()));

testData->tsfn = ThreadSafeFunction::New(
env, cb, "Test", 0, 1,
std::function<decltype(FinalizerCallback)>(FinalizerCallback), testData);

Object result = Object::New(env);
result["createThread"] = Function::New( env, CreateThread, "createThread", testData);
result["stopThreads"] = Function::New( env, StopThreads, "stopThreads", testData);
result["promise"] = testData->deferred.Promise();

return result;
}
}

Object InitThreadSafeFunctionSum(Env env) {
Object exports = Object::New(env);
exports["testDelayedTSFN"] = Function::New(env, TestDelayedTSFN);
exports["testWithTSFN"] = Function::New(env, TestWithTSFN);
exports["testAcquire"] = Function::New(env, TestAcquire);
return exports;
}

#endif
Loading

0 comments on commit 2e71842

Please sign in to comment.