Skip to content

Commit

Permalink
Merge 248c90f into 792e0db
Browse files Browse the repository at this point in the history
  • Loading branch information
jdelapla authored Jan 22, 2024
2 parents 792e0db + 248c90f commit c3fb240
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 63 deletions.
189 changes: 133 additions & 56 deletions src/utils/src/Threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

// per thread in threadpool
typedef struct __ThreadData {
// Informs us the state of the threadpool object
volatile ATOMIC_BOOL terminate;
// The threadpool we belong to (may have been deleted)
PThreadpool pThreadpool;
// Must be locked before changing terminate, as a result this ensures us
// that the threadpool as not been deleted while we hold this lock.
MUTEX dataMutex;
} ThreadData, *PThreadData;

Expand All @@ -12,6 +16,26 @@ typedef struct TaskData {
PVOID customData;
} TaskData, *PTaskData;

typedef struct TerminationTask {
MUTEX mutex;
PSIZE_T pCount;
SEMAPHORE_HANDLE semaphore;
} TerminationTask, *PTerminationTask;

PVOID threadpoolTermination(PVOID data)
{
PTerminationTask task = (PTerminationTask) data;
PSIZE_T pCount = NULL;
if (task != NULL) {
pCount = task->pCount;
MUTEX_LOCK(task->mutex);
semaphoreRelease(task->semaphore);
(*pCount)++;
MUTEX_UNLOCK(task->mutex);
}
return 0;
}

PVOID threadpoolActor(PVOID data)
{
PThreadData pThreadData = (PThreadData) data;
Expand All @@ -31,14 +55,18 @@ PVOID threadpoolActor(PVOID data)
// attempt to acquire thread mutex, if we cannot it means the threadpool has already been
// destroyed. Quickly exit
if (MUTEX_TRYLOCK(pThreadData->dataMutex)) {
pThreadpool = pThreadData->pThreadpool;
if (!ATOMIC_LOAD_BOOL(&pThreadData->terminate)) {
pThreadpool = pThreadData->pThreadpool;

if (pThreadpool == NULL) {
DLOGE("Threadpool actor unable to start, threadpool is NULL");
return NULL;
}
if (pThreadpool == NULL) {
DLOGE("Threadpool actor unable to start, threadpool is NULL");
return NULL;
}

pQueue = pThreadpool->taskQueue;
pQueue = pThreadpool->taskQueue;
} else {
finished = TRUE;
}
MUTEX_UNLOCK(pThreadData->dataMutex);
} else {
finished = TRUE;
Expand All @@ -58,68 +86,57 @@ PVOID threadpoolActor(PVOID data)
// This way the thread actors can avoid accessing the Threadpool after termination.
if (!ATOMIC_LOAD_BOOL(&pThreadData->terminate)) {
ATOMIC_INCREMENT(&pThreadData->pThreadpool->availableThreads);
MUTEX_UNLOCK(pThreadData->dataMutex);
if (safeBlockingQueueDequeue(pQueue, &item) == STATUS_SUCCESS) {
pTask = (PTaskData) item;
ATOMIC_DECREMENT(&pThreadData->pThreadpool->availableThreads);
MUTEX_UNLOCK(pThreadData->dataMutex);
if (pTask != NULL) {
pTask->function(pTask->customData);
SAFE_MEMFREE(pTask);
}
// got an error, but not terminating, so fixup available thread count
} else if (!ATOMIC_LOAD_BOOL(&pThreadData->terminate)) {
} else {
ATOMIC_DECREMENT(&pThreadData->pThreadpool->availableThreads);
MUTEX_UNLOCK(pThreadData->dataMutex);
}
} else {
finished = TRUE;
MUTEX_UNLOCK(pThreadData->dataMutex);
break;
}

// We use trylock to avoid a potential deadlock with the destructor. The destructor needs
// to lock listMutex and then dataMutex, but we're locking dataMutex and then listMutex.
//
// The destructor also uses trylock for the same reason.
if (MUTEX_TRYLOCK(pThreadData->dataMutex)) {
if (ATOMIC_LOAD_BOOL(&pThreadData->terminate)) {
MUTEX_LOCK(pThreadData->dataMutex);
if (ATOMIC_LOAD_BOOL(&pThreadData->terminate)) {
MUTEX_UNLOCK(pThreadData->dataMutex);
} else {
// Threadpool is active - lock its mutex
MUTEX_LOCK(pThreadpool->listMutex);

// if threadpool is in teardown, release this mutex and go to queue.
// We don't want to be the one to remove this actor from the list in the event
// of teardown.
if (ATOMIC_LOAD_BOOL(&pThreadpool->terminate)) {
MUTEX_UNLOCK(pThreadpool->listMutex);
MUTEX_UNLOCK(pThreadData->dataMutex);
} else {
// Threadpool is active - lock its mutex
MUTEX_LOCK(pThreadpool->listMutex);

// Check that there aren't any pending tasks.
if (safeBlockingQueueIsEmpty(pQueue, &taskQueueEmpty) == STATUS_SUCCESS) {
if (taskQueueEmpty) {
// Check if this thread is needed to maintain minimum thread count
// otherwise exit loop and remove it.
if (stackQueueGetCount(pThreadpool->threadList, &count) == STATUS_SUCCESS) {
if (count > pThreadpool->minThreads) {
finished = TRUE;
if (stackQueueRemoveItem(pThreadpool->threadList, (UINT64) pThreadData) != STATUS_SUCCESS) {
DLOGE("Failed to remove thread data from threadpool");
}
continue;
}

// Check that there aren't any pending tasks.
if (safeBlockingQueueIsEmpty(pQueue, &taskQueueEmpty) == STATUS_SUCCESS) {
if (taskQueueEmpty) {
// Check if this thread is needed to maintain minimum thread count
// otherwise exit loop and remove it.
if (stackQueueGetCount(pThreadpool->threadList, &count) == STATUS_SUCCESS) {
if (count > pThreadpool->minThreads) {
finished = TRUE;
if (stackQueueRemoveItem(pThreadpool->threadList, (UINT64) pThreadData) != STATUS_SUCCESS) {
DLOGE("Failed to remove thread data from threadpool");
}
}
}
}
// this is a redundant safety check. To get here the threadpool must be being
// actively being destroyed, but it was unable to acquire our lock so entered a
// sleep spin check to allow this thread to finish. However somehow the task queue
// was not empty, so we ended up here. This check forces us to still exit gracefully
// in the event somehow the queue is not empty.
else if (ATOMIC_LOAD_BOOL(&pThreadData->terminate)) {
finished = TRUE;
if (stackQueueRemoveItem(pThreadpool->threadList, (UINT64) pThreadData) != STATUS_SUCCESS) {
DLOGE("Failed to remove thread data from threadpool");
}
}
MUTEX_UNLOCK(pThreadpool->listMutex);
MUTEX_UNLOCK(pThreadData->dataMutex);
}
} else {
// couldn't lock our mutex, which means Threadpool locked this mutex to indicate
// Threadpool has been deleted.
finished = TRUE;
MUTEX_UNLOCK(pThreadpool->listMutex);
MUTEX_UNLOCK(pThreadData->dataMutex);
}
}
// now that we've released the listMutex, we can do an actual MUTEX_LOCK to ensure the
Expand Down Expand Up @@ -179,7 +196,7 @@ STATUS threadpoolInternalCreateThread(PThreadpool pThreadpool)
{
STATUS retStatus = STATUS_SUCCESS;
PThreadData data = NULL;
BOOL locked = FALSE, dataCreated = FALSE;
BOOL locked = FALSE, dataCreated = FALSE, mutexCreated = FALSE;
TID thread;
CHK(pThreadpool != NULL, STATUS_NULL_ARG);

Expand All @@ -193,6 +210,7 @@ STATUS threadpoolInternalCreateThread(PThreadpool pThreadpool)
dataCreated = TRUE;

data->dataMutex = MUTEX_CREATE(FALSE);
mutexCreated = TRUE;
data->pThreadpool = pThreadpool;
ATOMIC_STORE_BOOL(&data->terminate, FALSE);

Expand All @@ -211,8 +229,13 @@ STATUS threadpoolInternalCreateThread(PThreadpool pThreadpool)

// If logic changes such that it's possible successfully enqueue data but not create the thread
// We may attempt a double free. Right now it's fine.
if (STATUS_FAILED(retStatus) && dataCreated) {
SAFE_MEMFREE(data);
if (STATUS_FAILED(retStatus)) {
if (mutexCreated) {
MUTEX_FREE(data->dataMutex);
}
if (dataCreated) {
SAFE_MEMFREE(data);
}
}

return retStatus;
Expand Down Expand Up @@ -283,7 +306,13 @@ STATUS threadpoolFree(PThreadpool pThreadpool)
StackQueueIterator iterator;
PThreadData item = NULL;
UINT64 data;
BOOL finished = FALSE, taskQueueEmpty = FALSE, listMutexLocked = FALSE;
BOOL finished = FALSE, taskQueueEmpty = FALSE, listMutexLocked = FALSE, tempMutexLocked = FALSE;
;
SIZE_T threadCount = 0, i = 0, sentTerminationTasks = 0, finishedTerminationTasks = 0;
MUTEX tempMutex;
SEMAPHORE_HANDLE tempSemaphore;
TerminationTask terminateTask;
PTaskData pTask = NULL;
CHK(pThreadpool != NULL, STATUS_NULL_ARG);

// Threads are not forced to finish their tasks. If the user has assigned
Expand All @@ -295,6 +324,17 @@ STATUS threadpoolFree(PThreadpool pThreadpool)
// set terminate flag of pool -- no new threads/items can be added now
ATOMIC_STORE_BOOL(&pThreadpool->terminate, TRUE);

// This is used to block threadpool actors on a task so that they release all
// mutexes the destructor will need to acquire to teardown gracefully.
tempMutex = MUTEX_CREATE(FALSE);
MUTEX_LOCK(tempMutex);
CHK_STATUS(semaphoreEmptyCreate(pThreadpool->maxThreads, &tempSemaphore));
tempMutexLocked = TRUE;

terminateTask.mutex = tempMutex;
terminateTask.semaphore = tempSemaphore;
terminateTask.pCount = &finishedTerminationTasks;

CHK_STATUS(safeBlockingQueueIsEmpty(pThreadpool->taskQueue, &taskQueueEmpty));
if (!taskQueueEmpty) {
CHK_STATUS(safeBlockingQueueClear(pThreadpool->taskQueue, TRUE));
Expand All @@ -321,7 +361,8 @@ STATUS threadpoolFree(PThreadpool pThreadpool)
if (stackQueueRemoveItem(pThreadpool->threadList, data) != STATUS_SUCCESS) {
DLOGE("Failed to remove NULL thread data from threadpool");
}
// attempt to lock mutex of item
// We use trylock to avoid a potential deadlock with the actor. The destructor needs
// to lock listMutex and then dataMutex, but the actor locks dataMutex and then listMutex.
} else if (MUTEX_TRYLOCK(item->dataMutex)) {
// set terminate flag of item
ATOMIC_STORE_BOOL(&item->terminate, TRUE);
Expand All @@ -332,10 +373,14 @@ STATUS threadpoolFree(PThreadpool pThreadpool)
}
MUTEX_UNLOCK(item->dataMutex);
} else {
// if the mutex is taken, unlock list mutex, sleep 10 ms and 'start over'
// if the mutex is taken, give each thread a waiting task, unlock list mutex, sleep 10 ms and 'start over'
//
// The reasoning here is that the threadActors only acquire their mutex to check
// for termination, after acquiring their mutex they need the list mutex to evaluate
// The reasoning here is that the threadActors acquire their mutex during 2 operations:
//
// 1. While waiting on the queue, so we need to publish a sleep task to the queue to get the actors
// to release the mutex.
//
// 2. to checkfor termination, after acquiring their mutex they need the list mutex to evaluate
// the current count and determine if they should exit or wait on the taskQueue.
//
// Therefore if we currently have the list mutex, but cannot acquire the item mutex they
Expand All @@ -344,6 +389,11 @@ STATUS threadpoolFree(PThreadpool pThreadpool)
// the termination flag we set earlier and will exit.
//
// When we unlock and sleep we give them
CHK_STATUS(stackQueueGetCount(pThreadpool->threadList, &threadCount));
for (i = 0; i < threadCount; i++) {
CHK_STATUS(threadpoolInternalCreateTask(pThreadpool, threadpoolTermination, &terminateTask));
sentTerminationTasks++;
}
break;
}
} while (1);
Expand All @@ -352,17 +402,44 @@ STATUS threadpoolFree(PThreadpool pThreadpool)
listMutexLocked = FALSE;
if (!finished) {
// the aforementioned sleep
THREAD_SLEEP(10 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
THREAD_SLEEP(5 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
}
}

// in the event that we sent termination tasks, we now need to clean up all those
// tasks so we can safely free the associated mutex.
while (finishedTerminationTasks < sentTerminationTasks) {
MUTEX_UNLOCK(tempMutex);
tempMutexLocked = FALSE;

// if there are still items in the queue, then we need to clear them
CHK_STATUS(safeBlockingQueueGetCount(pThreadpool->taskQueue, &i));

if (i > 0 && safeBlockingQueueDequeue(pThreadpool->taskQueue, &data) == STATUS_SUCCESS) {
pTask = (PTaskData) data;
if (pTask != NULL) {
pTask->function(pTask->customData);
SAFE_MEMFREE(pTask);
}
}
semaphoreAcquire(tempSemaphore, INFINITE_TIME_VALUE);
MUTEX_LOCK(tempMutex);
tempMutexLocked = TRUE;
}

CleanUp:

if (tempMutexLocked) {
MUTEX_UNLOCK(tempMutex);
}

if (listMutexLocked) {
MUTEX_UNLOCK(pThreadpool->listMutex);
}

// now free all the memory
MUTEX_FREE(tempMutex);
semaphoreFree(&tempSemaphore);
MUTEX_FREE(pThreadpool->listMutex);
stackQueueFree(pThreadpool->threadList);

Expand Down
Loading

0 comments on commit c3fb240

Please sign in to comment.