Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mtx_do_lock on exit or reload context #170

Merged
merged 2 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions includes/NSFW.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class NSFW : public Napi::ObjectWrap<NSFW> {
std::vector<std::string> mExcludedPaths;
void updateExcludedPaths();

static void cleanup(void* arg);

class StartWorker: public Napi::AsyncWorker {
public:
StartWorker(Napi::Env env, NSFW *nsfw);
Expand Down
92 changes: 92 additions & 0 deletions js/spec/index-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1128,4 +1128,96 @@ describe('Node Sentinel File Watcher', function() {
}
});
});

// Worker tests
let Worker;
try {
Worker = require('worker_threads').Worker;
} catch (e) { /* do nothing */ }

if (Worker) {
describe('Worker tests', function() {
const local = path.join.bind(path, __dirname);

beforeEach(async function() {
try {
await fse.remove(workDir);
} catch (e) {/* we don't care about this failure */}

await fse.mkdir(workDir);
});

afterEach(function() {
return fse.remove(workDir);
});

it('can kill worker thread while watcher is working', (done) => {
const workerPath = local('./worker.js');
const inPath = path.resolve(workDir);
const worker = new Worker(workerPath, {
workerData: {
workDir: inPath,
test: 1
}
});
worker.on('message', (message) => {
switch (message) {
case 'init':
break;
case 'success':
setTimeout(() => { worker.terminate(); }, 500);
break;
case 'failure':
assert.fail();
break;
}
});
worker.on('error', () => assert.fail());
worker.on('exit', (code) => {
if (code === 1) {
if (nsfw.getAllocatedInstanceCount() > 0) {
assert.fail();
} else {
done();
}
} else {
assert.fail();
}
});
});

it('can kill worker thread while 2 watcher is working and 1 watcher is stopped and freed', (done) => {
const workerPath = local('./worker.js');
const inPath = path.resolve(workDir);
const worker = new Worker(workerPath, {
workerData: {
workDir: inPath,
test: 2
}
});
worker.on('message', (message) => {
switch (message) {
case 'init':
break;
case 'success':
setTimeout(() => {
worker.terminate();
}, 500);
break;
case 'failure':
assert.fail();
break;
}
});
worker.on('error', () => assert.fail());
worker.on('exit', (code) => {
if (code === 1) {
done();
} else {
assert.fail();
}
});
});
});
}
});
78 changes: 78 additions & 0 deletions js/spec/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
const {
isMainThread,
parentPort,
workerData
} = require('worker_threads');

const nsfw = require('../src');

if (isMainThread) {
throw new Error('Must be run via worker thread');
}


const { workDir, test } = workerData;

// aggressively collects garbage until we fail to improve terminatingIteration times.
function garbageCollect() {
const terminatingIterations = 3;
let usedBeforeGC = Number.MAX_VALUE;
let nondecreasingIterations = 0;
for (; ;) {
global.gc();
const usedAfterGC = process.memoryUsage().heapUsed;
if (usedAfterGC >= usedBeforeGC) {
nondecreasingIterations++;
if (nondecreasingIterations >= terminatingIterations) {
break;
}
}
usedBeforeGC = usedAfterGC;
}
}


parentPort.postMessage('init');

const runTest1 = async () => {
const watch = await nsfw(
workDir,
() => {}
);
await watch.start();
parentPort.postMessage('success');
};


const runTest2 = async () => {
const watch1 = await nsfw(
workDir,
() => {}
);
await watch1.start();

const watch2 = await nsfw(
workDir,
() => {}
);
await watch2.start();

let watch3 = await nsfw(
workDir,
() => {}
);
await watch3.start();
await watch3.stop();
watch3 = null;
garbageCollect();
parentPort.postMessage('success');
};

switch(test) {
case 1:
runTest1().then(() => {});
break;
case 2:
runTest2().then(() => {});
break;
}
34 changes: 17 additions & 17 deletions src/NSFW.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ NSFW::NSFW(const Napi::CallbackInfo &info):
mInterface(nullptr),
mQueue(std::make_shared<EventQueue>()),
mPath(""),
mRunning(false),
mFinalizing(false)
mRunning(false)
{
Napi::Env env = info.Env();
env.AddCleanupHook(cleanup, this);

if (info.Length() < 1 || !info[0].IsString()) {
throw Napi::TypeError::New(env, "Must pass a string path as the first argument to NSFW.");
}
Expand Down Expand Up @@ -98,21 +97,18 @@ NSFW::NSFW(const Napi::CallbackInfo &info):
}
}

void NSFW::cleanup(void* arg) {
NSFW *nsfw = (NSFW *)arg;
if (nsfw->mRunning) {
nsfw->mFinalizing = true;
NSFW::~NSFW() {
if (mRunning) {
mFinalizing = true;
{
std::lock_guard<std::mutex> lock(nsfw->mRunningLock);
nsfw->mRunning = false;
std::lock_guard<std::mutex> lock(mRunningLock);
mRunning = false;
}
nsfw->Unref();
nsfw->mWaitPoolEvents.notify_one();
nsfw->mPollThread.join();
mWaitPoolEvents.notify_one();
}
if (mPollThread.joinable()) {
mPollThread.join();
}
}

NSFW::~NSFW() {
if (gcEnabled) {
instanceCount--;
}
Expand Down Expand Up @@ -448,8 +444,12 @@ void NSFW::pollForEvents() {
);
}

mErrorCallback.Release();
mEventCallback.Release();
// If we are destroying NFSW object (destructor) we cannot release the thread safe functions at this point
// or we get a segfault
if (!mFinalizing) {
mErrorCallback.Release();
mEventCallback.Release();
}
}

Napi::Value NSFW::ExcludedPaths() {
Expand Down
2 changes: 1 addition & 1 deletion src/win32/Watcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ HANDLE Watcher::openDirectory(const std::wstring &path) {
}

void Watcher::reopenWathedFolder() {
std::lock_guard<std::mutex> lock(mHandleMutex);
{
std::lock_guard<std::mutex> lock(mHandleMutex);
CancelIo(mDirectoryHandle);
CloseHandle(mDirectoryHandle);
mDirectoryHandle = openDirectory(mPath);
Expand Down