Skip to content

Commit

Permalink
win, pipe: avoid synchronous pipe deadlocks
Browse files Browse the repository at this point in the history
Add a thread that will interrupt uv_pipe_zero_readdile_thread_proc every two
and half second. This allows other processes to access the pipe without
deadlocking

Ref: nodejs/node#10836
  • Loading branch information
bzoz committed Aug 18, 2017
1 parent c8ee8be commit c4d887e
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 1 deletion.
16 changes: 16 additions & 0 deletions docs/src/pipe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,20 @@ API
a handle of the given `type`, returned by :c:func:`uv_pipe_pending_type`
and call ``uv_accept(pipe, handle)``.
.. c:function:: void uv_pipe_enable_interrupter(uv_pipe_t* handle)
Enables automatic interrupts of synchronous pipes reads.
For synchronous pipes :c:func:`uv_read_start` will cause all other WinAPI
calls for that pipe to wait until read completes, which can cause
deadlocks. With this setting, an internal thread will interrupt the read
every 2.5s allowing other API calls to complete. This interrupt is
transparent for the user - callback will not be called, and user does not
have to restart the read.
.. note::
This setting applies to synchronous pipes on Windows only.
.. versionadded:: 1.14.0
.. seealso:: The :c:type:`uv_stream_t` API functions also apply.
1 change: 1 addition & 0 deletions include/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ UV_EXTERN int uv_pipe_getpeername(const uv_pipe_t* handle,
UV_EXTERN void uv_pipe_pending_instances(uv_pipe_t* handle, int count);
UV_EXTERN int uv_pipe_pending_count(uv_pipe_t* handle);
UV_EXTERN uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle);
UV_EXTERN void uv_pipe_enable_interrupter(uv_pipe_t* handle);


struct uv_poll_s {
Expand Down
3 changes: 3 additions & 0 deletions src/unix/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,6 @@ uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
else
return uv__handle_type(handle->accepted_fd);
}

void uv_pipe_enable_interrupter(uv_pipe_t* handle) {
}
1 change: 1 addition & 0 deletions src/win/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ extern UV_THREAD_LOCAL int uv__crt_assert_enabled;
#define UV_HANDLE_NON_OVERLAPPED_PIPE 0x01000000
#define UV_HANDLE_PIPESERVER 0x02000000
#define UV_HANDLE_PIPE_READ_CANCELABLE 0x04000000
#define UV_HANDLE_PIPE_INTERRUPT_SYNC_READ 0x08000000

/* Only used by uv_tty_t handles. */
#define UV_HANDLE_TTY_READABLE 0x01000000
Expand Down
108 changes: 107 additions & 1 deletion src/win/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ struct uv__ipc_queue_item_s {
/* A zero-size buffer for use by uv_pipe_read */
static char uv_zero_[] = "";

/* To prevent deadlocks we will interrupt synchronous pipe read every 2.5 s. */
#define PIPE_ZERO_READ_INTERRUPT_INTERVAL 2500

/* Null uv_buf_t */
static const uv_buf_t uv_null_buf_ = { 0, NULL };

Expand Down Expand Up @@ -76,6 +79,13 @@ typedef struct {
uv__ipc_socket_info_ex socket_info_ex;
} uv_ipc_frame_uv_stream;

/* Parameter for uv__pipe_readfile_interrupter */
typedef struct {
volatile HANDLE thread;
HANDLE thread_mutex;
HANDLE completed_event;
} uv__readfile_interrupter_t;

static void eof_timer_init(uv_pipe_t* pipe);
static void eof_timer_start(uv_pipe_t* pipe);
static void eof_timer_stop(uv_pipe_t* pipe);
Expand Down Expand Up @@ -942,20 +952,101 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
}


static DWORD WINAPI uv__readfile_interrupter_thread(void* param) {
uv__readfile_interrupter_t* interrupter;
int r;

interrupter = param;
for (;;) {
r = WaitForSingleObject(interrupter->completed_event,
PIPE_ZERO_READ_INTERRUPT_INTERVAL);
if (r == WAIT_TIMEOUT) {
/* ReadFile thread is active - interrupt ReadFile to give other */
/* processes a chance to act upon pipe */
WaitForSingleObject(interrupter->thread_mutex, INFINITE);
if (interrupter->thread != NULL) {
pCancelSynchronousIo(interrupter->thread);
SwitchToThread();
}
ReleaseMutex(interrupter->thread_mutex);
} else if (r == WAIT_OBJECT_0) {
/* ReadFile thread has terminated - clean up the handles and exit */
CloseHandle(interrupter->completed_event);
CloseHandle(interrupter->thread_mutex);
uv__free(interrupter);
return 0;
}
}
}

static uv__readfile_interrupter_t* uv__start_readfile_interrupter(void) {
HANDLE thread_handle;
uv__readfile_interrupter_t* interrupter;

interrupter = uv__malloc(sizeof(*interrupter));
if (interrupter == NULL)
return NULL;

if (!DuplicateHandle(GetCurrentProcess(),
GetCurrentThread(),
GetCurrentProcess(),
&thread_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS))
goto failed_duplicate;

interrupter->thread = thread_handle;
interrupter->thread_mutex = CreateMutex(NULL, FALSE, NULL);
if (interrupter->thread_mutex == NULL)
goto failed_mutex;

interrupter->completed_event = CreateEvent(NULL, FALSE, FALSE, NULL);
if (interrupter->completed_event == NULL)
goto failed_event;

if (!QueueUserWorkItem(uv__readfile_interrupter_thread,
interrupter,
WT_EXECUTEINLONGTHREAD))
goto failed_queue;

return interrupter;

failed_queue:
CloseHandle(interrupter->completed_event);
failed_event:
CloseHandle(interrupter->thread_mutex);
failed_mutex:
CloseHandle(interrupter->thread);
failed_duplicate:
uv__free(interrupter);
return NULL;
}

static void uv__stop_readfile_interrupter(uv__readfile_interrupter_t* interrupter) {
WaitForSingleObject(interrupter->thread_mutex, INFINITE);
CloseHandle(interrupter->thread);
interrupter->thread = NULL;
ReleaseMutex(interrupter->thread_mutex);
SetEvent(interrupter->completed_event);
}

static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
int result;
DWORD bytes;
uv_read_t* req = (uv_read_t*) parameter;
uv_pipe_t* handle = (uv_pipe_t*) req->data;
uv_loop_t* loop = handle->loop;
HANDLE hThread = NULL;
uv__readfile_interrupter_t* interrupter;
DWORD err;
uv_mutex_t *m = &handle->pipe.conn.readfile_mutex;

assert(req != NULL);
assert(req->type == UV_READ);
assert(handle->type == UV_NAMED_PIPE);

interrupter = NULL;
if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
uv_mutex_lock(m); /* mutex controls *setting* of readfile_thread */
if (DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
Expand All @@ -965,7 +1056,10 @@ static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
} else {
hThread = NULL;
}
uv_mutex_unlock(m);
uv_mutex_unlock(m);
if (handle->flags & UV_HANDLE_PIPE_INTERRUPT_SYNC_READ) {
interrupter = uv__start_readfile_interrupter();
}
}
restart_readfile:
if (handle->flags & UV_HANDLE_READING) {
Expand All @@ -985,6 +1079,9 @@ static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
uv_mutex_lock(m);
handle->pipe.conn.readfile_thread = hThread;
uv_mutex_unlock(m);
/* Give some time for other processes to wake before restarting. */
if (interrupter != NULL)
Sleep(1);
goto restart_readfile;
} else {
result = 1; /* successfully stopped reading */
Expand All @@ -1010,6 +1107,9 @@ static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
}

POST_COMPLETION_FOR_REQ(loop, req);
if (interrupter != NULL) {
uv__stop_readfile_interrupter(interrupter);
}
return 0;
}

Expand Down Expand Up @@ -2130,3 +2230,9 @@ uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
else
return UV_TCP;
}

void uv_pipe_enable_interrupter(uv_pipe_t* handle) {
if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
handle->flags |= UV_HANDLE_PIPE_INTERRUPT_SYNC_READ;
}
}
6 changes: 6 additions & 0 deletions test/test-list.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ TEST_DECLARE (pipe_ref4)
TEST_DECLARE (pipe_close_stdout_read_stdin)
#endif
TEST_DECLARE (pipe_set_non_blocking)
#ifdef _WIN32
TEST_DECLARE (pipe_open_read_pipe)
#endif
TEST_DECLARE (process_ref)
TEST_DECLARE (has_ref)
TEST_DECLARE (active)
Expand Down Expand Up @@ -436,6 +439,9 @@ TASK_LIST_START
TEST_ENTRY (pipe_close_stdout_read_stdin)
#endif
TEST_ENTRY (pipe_set_non_blocking)
#ifdef _WIN32
TEST_ENTRY_CUSTOM (pipe_open_read_pipe, 0, 0, 10000)
#endif
TEST_ENTRY (tty)
#ifdef _WIN32
TEST_ENTRY (tty_raw)
Expand Down
81 changes: 81 additions & 0 deletions test/test-pipe-open-read-pipe.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/* Copyright libuv project contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

#include "uv.h"
#include "task.h"

#ifndef _WIN32
TEST_IMPL(pipe_open_read_pipe) {
RETURN_SKIP("Test only for Windows.");
}
#else

void alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(suggested_size);
buf->len = suggested_size;
}

void read_cb(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
}

void pipe_read_thread_proc(void* arg) {
uv_pipe_t* pipe;
pipe = arg;
uv_read_start((uv_stream_t*) pipe, alloc_cb, read_cb);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
FATAL("loop should not exit");
}

TEST_IMPL(pipe_open_read_pipe) {
int r, pipe_fd;
uv_thread_t pipe_read_thread;
uv_pipe_t uv_pipe, uv_reopen_pipe;
uv_loop_t test_loop;
HANDLE stdin_read_pipe, stdin_write_pipe;
SECURITY_ATTRIBUTES sa_attr;

sa_attr.nLength = sizeof(sa_attr);
sa_attr.bInheritHandle = TRUE;
sa_attr.lpSecurityDescriptor = NULL;
r = CreatePipe(&stdin_read_pipe, &stdin_write_pipe, &sa_attr, 0);
ASSERT(r != 0);

r = uv_pipe_init(uv_default_loop(), &uv_pipe, 0);
ASSERT(r == 0);
pipe_fd = _open_osfhandle((intptr_t) stdin_read_pipe, 0);
r = uv_pipe_open(&uv_pipe, pipe_fd);
ASSERT(r == 0);
uv_pipe_enable_interrupter(&uv_pipe);

r = uv_thread_create(&pipe_read_thread, pipe_read_thread_proc, &uv_pipe);
ASSERT(r == 0);

/* Give uv_run some time to start */
uv_sleep(250);
/* Try to access the pipe again, in different loop */
r = uv_loop_init(&test_loop);
ASSERT(r == 0);
r = uv_pipe_init(&test_loop, &uv_reopen_pipe, 0);
ASSERT(r == 0);
r = uv_pipe_open(&uv_reopen_pipe, pipe_fd);
return TEST_OK;
}
#endif
1 change: 1 addition & 0 deletions uv.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@
'test/test-pipe-server-close.c',
'test/test-pipe-close-stdout-read-stdin.c',
'test/test-pipe-set-non-blocking.c',
'test/test-pipe-open-read-pipe.c',
'test/test-platform-output.c',
'test/test-poll.c',
'test/test-poll-close.c',
Expand Down

0 comments on commit c4d887e

Please sign in to comment.