Skip to content

Commit

Permalink
sync: make Cond MT-safe
Browse files Browse the repository at this point in the history
This actually simplifies the code and avoids a heap allocation in the
call to Wait. Instead, it uses the Data field of the task to store
information on whether a task was signalled early.
  • Loading branch information
aykevl authored and deadprogram committed Dec 6, 2024
1 parent edb2f2a commit 6faf36f
Showing 1 changed file with 44 additions and 37 deletions.
81 changes: 44 additions & 37 deletions src/sync/cond.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package sync

import "internal/task"
import (
"internal/task"
"unsafe"
)

// Condition variable.
// A goroutine that called Wait() can be in one of a few states depending on the
// Task.Data field:
// - When entering Wait, and before going to sleep, the data field is 0.
// - When the goroutine that calls Wait changes its data value from 0 to 1, it
// is going to sleep. It has not been awoken early.
// - When instead a call to Signal or Broadcast can change the data field from 0
// to 1, it will _not_ go to sleep but be signalled early.
// This can happen when a concurrent call to Signal happens, or the Unlock
// function calls Signal for some reason.

type Cond struct {
L Locker

unlocking *earlySignal
blocked task.Stack
}

// earlySignal is a type used to implement a stack for signalling waiters while they are unlocking.
type earlySignal struct {
next *earlySignal

signaled bool
blocked task.Stack
lock task.PMutex
}

func NewCond(l Locker) *Cond {
Expand All @@ -24,14 +31,14 @@ func (c *Cond) trySignal() bool {
// Pop a blocked task off of the stack, and schedule it if applicable.
t := c.blocked.Pop()
if t != nil {
scheduleTask(t)
return true
}

// If there any tasks which are currently unlocking, signal one.
if c.unlocking != nil {
c.unlocking.signaled = true
c.unlocking = c.unlocking.next
dataPtr := (*task.Uint32)(unsafe.Pointer(&t.Data))

// The data value is 0 when the task is not yet sleeping, and 1 when it is.
if dataPtr.Swap(1) != 0 {
// The value was already 1, so the task went to sleep (or is about to go
// to sleep). Schedule the task to be resumed.
scheduleTask(t)
}
return true
}

Expand All @@ -40,21 +47,29 @@ func (c *Cond) trySignal() bool {
}

func (c *Cond) Signal() {
c.lock.Lock()
c.trySignal()
c.lock.Unlock()
}

func (c *Cond) Broadcast() {
// Signal everything.
c.lock.Lock()
for c.trySignal() {
}
c.lock.Unlock()
}

func (c *Cond) Wait() {
// Add an earlySignal frame to the stack so we can be signalled while unlocking.
early := earlySignal{
next: c.unlocking,
}
c.unlocking = &early
// Mark us as not yet signalled or sleeping.
t := task.Current()
dataPtr := (*task.Uint32)(unsafe.Pointer(&t.Data))
dataPtr.Store(0)

// Add us to the list of waiting goroutines.
c.lock.Lock()
c.blocked.Push(t)
c.lock.Unlock()

// Temporarily unlock L.
c.L.Unlock()
Expand All @@ -63,22 +78,14 @@ func (c *Cond) Wait() {
defer c.L.Lock()

// If we were signaled while unlocking, immediately complete.
if early.signaled {
if dataPtr.Swap(1) != 0 {
// The data value was already 1, so we got a signal already (and weren't
// scheduled because trySignal was the first to change the value).
return
}

// Remove the earlySignal frame.
prev := c.unlocking
for prev != nil && prev.next != &early {
prev = prev.next
}
if prev != nil {
prev.next = early.next
} else {
c.unlocking = early.next
}

// Wait for a signal.
c.blocked.Push(task.Current())
// We were the first to change the value from 0 to 1, meaning we did not get
// a signal during the call to Unlock(). So we wait until we do get a
// signal.
task.Pause()
}

0 comments on commit 6faf36f

Please sign in to comment.