Skip to content

Commit

Permalink
Atomically delete object queue only when empty
Browse files Browse the repository at this point in the history
Once the threads object queue is empty, other threads may explicitly
merge reference counts immediately. It's no longer safe for the original
thread to do RC operations because of races with accesses to ob_ref_local.

This ensures that the queue is empty when it's removed from the hashtable
mapping thread-id to queue. There are still other RC operations that can
happen during thread destruction, so the call to _Py_queue_destroy will
probably need to be moved.

See #50
  • Loading branch information
colesbury committed Feb 24, 2020
1 parent 605f207 commit 34ecec9
Showing 1 changed file with 20 additions and 31 deletions.
51 changes: 20 additions & 31 deletions Python/pyrefcnt.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,6 @@ _Py_RefcntQueue_Push(PyThreadState *tstate, PyObject *ob)
}
}

// NOTE: ABA problem
static PyObject *
_Py_RefcntQueue_Pop(PyThreadState *tstate)
{
PyObject **object_queue = &tstate->object_queue;
for (;;) {
PyObject *head = _Py_atomic_load_ptr_relaxed(object_queue);
if (!head) {
return NULL;
}
PyObject *next = _Py_atomic_load_ptr_relaxed(&head->ob_tid);
if (_Py_atomic_compare_exchange_ptr(object_queue, head, next)) {
return head;
}
}
}

void
_Py_queue_object(PyObject *ob)
{
Expand Down Expand Up @@ -68,26 +51,25 @@ _Py_queue_object(PyObject *ob)
return;
}

PyThreadState *target_tstate = NULL;

int err;
if ((err = pthread_rwlock_rdlock(&interp->object_queues_lk)) != 0) {
Py_FatalError("_Py_queue_object: unable to lock");
return;
}

PyThreadState *target_tstate = NULL;
_Py_hashtable_entry_t *entry = _Py_HASHTABLE_GET_ENTRY(ht, ob_tid);
if (entry) {
_Py_HASHTABLE_ENTRY_READ_DATA(ht, entry, target_tstate);
}

pthread_rwlock_unlock(&interp->object_queues_lk);

if (target_tstate) {
_Py_RefcntQueue_Push(target_tstate, ob);
}
else {
// printf("NO queue for %ld\n", ob_tid);

pthread_rwlock_unlock(&interp->object_queues_lk);

if (!target_tstate) {
_Py_ExplicitMergeRefcount(ob);
}

Expand All @@ -98,9 +80,11 @@ _Py_queue_process(PyThreadState *tstate)
{
assert(tstate);

PyObject *ob;
while ((ob = _Py_RefcntQueue_Pop(tstate)) != NULL) {
_Py_ExplicitMergeRefcount(ob);
PyObject *head = _Py_atomic_exchange_ptr(&tstate->object_queue, NULL);
while (head) {
PyObject *next = (PyObject *)head->ob_tid;
_Py_ExplicitMergeRefcount(head);
head = next;
}
}

Expand Down Expand Up @@ -131,14 +115,21 @@ _Py_queue_destroy(PyThreadState *tstate)
PyInterpreterState *interp = tstate->interp;
assert(interp);

_Py_hashtable_t *ht = interp->object_queues;
uint64_t tid = tstate->fast_thread_id;

retry:
_Py_queue_process(tstate);

if (pthread_rwlock_wrlock(&interp->object_queues_lk) != 0) {
Py_FatalError("_Py_queue_destroy: unable to lock");
return;
}

_Py_hashtable_t *ht = interp->object_queues;
uint64_t tid = tstate->fast_thread_id;
// printf("destroying queue for %ld\n", tid);
if (tstate->object_queue) {
pthread_rwlock_unlock(&interp->object_queues_lk);
goto retry;
}

PyThreadState *value = NULL;
if (!_Py_HASHTABLE_POP(ht, tid, value)) {
Expand All @@ -147,6 +138,4 @@ _Py_queue_destroy(PyThreadState *tstate)
assert(value == tstate);

pthread_rwlock_unlock(&interp->object_queues_lk);

_Py_queue_process(tstate);
}

0 comments on commit 34ecec9

Please sign in to comment.