-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclosable-chan.go
181 lines (160 loc) · 5.82 KB
/
closable-chan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/*
© 2022–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/
package parl
import (
"sync/atomic"
"github.com/haraldrudell/parl/perrors"
)
// ClosableChan wraps a channel with thread-safe idempotent panic-free observable close.
// - ClosableChan is initialization-free
// - Close is deferrable
// - IsClosed provides wether the channel is closed
//
// Usage:
//
// var errCh parl.ClosableChan[error]
// go thread(&errCh)
// err, ok := <-errCh.Ch()
// if errCh.IsClosed() { // can be inspected
// …
//
// func thread(errCh *parl.ClosableChan[error]) {
// var err error
// …
// defer errCh.Close(&err) // will not terminate the process
// errCh.Ch() <- err
type ClosableChan[T any] struct {
// ch0 is the channel object
// - ability to initialize ch0 in the constructor
// - ability to update ch0 after creation
// - ch0 therefore must be pointer
// - ch0 must offer thread-safe access and update
// ch0 as provided by contructor or nil
ch0 chan T
// ch0 provided post-constructor because ch0 nil
chp atomic.Pointer[chan T]
// indicates the channel about to close or closed
// - because the channel may transfer data, it cannot be inspected for being closed
isCloseInvoked atomic.Bool
// [parl.Once] is an observable sync.Once
// - caches close result
// - provides atomic-performance done-flag
// - ensures no return prior to channel close complete
// - ensures exactly one close invocation
closeOnce Once
}
// NewClosableChan returns a channel with idempotent panic-free observable close
// - ch is an optional non-closed channel object
// - if ch is not present, an unbuffered channel will be created
// - cannot use lock in new function
// - if an unbuffered channel is used, NewClosableChan is not required
func NewClosableChan[T any](ch ...chan T) (closable *ClosableChan[T]) {
var ch0 chan T
if len(ch) > 0 {
ch0 = ch[0] // if ch is present, apply it
}
return &ClosableChan[T]{ch0: ch0}
}
// Ch retrieves the channel as bi-directional. Thread-safe
// - nil is never returned
// - the channel may be closed, use IsClosed to determine
// - do not close the channel other than using Close method
// - per Go channel close, if one thread is blocked in channel send
// while another thread closes the channel, a data race occurs
// - thread-safe solution is to set an additional indicator of
// close requested and then reading the channel which
// releases the sending thread
func (c *ClosableChan[T]) Ch() (ch chan T) {
return c.getCh()
}
// ReceiveCh retrieves the channel as receive-only. Thread-safe
// - nil is never returned
// - the channel may already be closed
func (c *ClosableChan[T]) ReceiveCh() (ch <-chan T) {
return c.getCh()
}
// SendCh retrieves the channel as send-only. Thread-safe
// - nil is never returned
// - the channel may already be closed
// - do not close the channel other than using the Close method
// - per Go channel close, if one thread is blocked in channel send
// while another thread closes the channel, a data race occurs
// - thread-safe solution is to set an additional indicator of
// close requested and then reading the channel which
// releases the sending thread
func (c *ClosableChan[T]) SendCh() (ch chan<- T) {
return c.getCh()
}
// IsClosed indicates whether the channel is closed. Thread-safe
// - includePending: because there is a small amount of time between
// - — a thread discovering the channel closed and
// - — closeOnce indicating close complete
// - includePending true includes a check for the channel being about
// to close
func (c *ClosableChan[T]) IsClosed(includePending ...bool) (isClosed bool) {
if len(includePending) > 0 && includePending[0] {
return c.isCloseInvoked.Load()
}
return c.closeOnce.IsDone()
}
// Close ensures the channel is closed
// - Close does not return until the channel is closed
// - all invocations have the same close result in err
// - didClose indicates whether this invocation closed the channel
// - if errp is non-nil, it will receive the close result
// - per Go channel close, if one thread is blocked in channel send
// while another thread closes the channel, a data race occurs
// - thread-safe, panic-free, deferrable, idempotent, observable
// - Close does not feature deferred close indication
// - — caller must ensure no channel send is in progress
// - — channel send after Close will fail
// - — a buffered channel can be read to empty after Close
func (cl *ClosableChan[T]) Close(errp ...*error) (didClose bool, err error) {
// ensure isCloseInvoked true: channel is about to close
cl.isCloseInvoked.CompareAndSwap(false, true)
// hasResult indicates that close did already complete
// and err was obtained with atomic performance
var hasResult bool
_, hasResult, err = cl.closeOnce.Result()
// first invocation closes the channel
// - subsequent invocations await close complete
// and return the close result
if !hasResult {
didClose, _, err = cl.closeOnce.DoErr(cl.doClose)
}
// update errp if present
if len(errp) > 0 {
if errp0 := errp[0]; errp0 != nil {
*errp0 = perrors.AppendError(*errp0, err)
}
}
return
}
// getCh gets or initializes the channel object [ClosableChan.ch]
func (c *ClosableChan[T]) getCh() (ch chan T) {
if ch = c.ch0; ch != nil {
return // channel from constructor return
}
for {
if chp := c.chp.Load(); chp != nil {
ch = *chp
return // chp was present return
}
if ch == nil {
ch = make(chan T)
}
if c.chp.CompareAndSwap(nil, &ch) {
return // chp updated return
}
}
}
// doClose is behind [ClosableChan.closeOnce] and
// is therefore only invoked once
// - separate function because provided to Once
func (cl *ClosableChan[T]) doClose() (err error) {
// ensure a channel exists and close it
Closer(cl.getCh(), &err)
return
}