-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathpipe.go
126 lines (110 loc) · 2.95 KB
/
pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
import (
"fmt"
)
// Pipe holds the information for a single pipe
type Pipe struct {
// a list of receivers that are listening on a pipe
// allow receivers to be added an removed dynamically
receivers map[RecieveWriter]bool
senders int
bytes int
written WriteCompleteHandler
// a list of channels that want to be notified of new receivers
receiverAdded map[chan bool]bool
}
// AddReceiver adds a new receiver listening on the pipe
func (p *Pipe) AddReceiver(w RecieveWriter) {
p.receivers[w] = true
p.Write(Message{
fromID: w.ID(),
fromUser: w.Username(),
buffer: []byte("connected\n"),
system: true})
p.ReceiverAddedNotify()
}
// RemoveReceiver removes a previously added receiver
func (p *Pipe) RemoveReceiver(w RecieveWriter) {
delete(p.receivers, w)
p.Write(Message{
fromID: w.ID(),
fromUser: w.Username(),
buffer: []byte("disconnected\n"),
system: true})
}
// ReceiverCount returns the number of receivers on the pipe
func (p Pipe) ReceiverCount() int {
return len(p.receivers)
}
// ReceiverAddedSubscribe listens for new receivers
func (p *Pipe) ReceiverAddedSubscribe() chan bool {
channel := make(chan bool)
p.receiverAdded[channel] = true
return channel
}
// ReceiverAddedUnSubscribe stops listening for new receivers
func (p *Pipe) ReceiverAddedUnSubscribe(channel chan bool) {
delete(p.receiverAdded, channel)
}
// ReceiverAddedNotify notifies all listeners that a receiver was added
func (p *Pipe) ReceiverAddedNotify() {
for channel := range p.receiverAdded {
// non-blocking
select {
case channel <- true:
default:
}
}
}
// AddSender adds a new sender connected to send data on the pipe (informational)
func (p *Pipe) AddSender() {
p.senders++
}
// RemoveSender removes a sender connected to the pipe (informational)
func (p *Pipe) RemoveSender() {
p.senders--
}
// SenderCount returns the number of senders on the pipe
func (p Pipe) SenderCount() int {
return p.senders
}
// BytesSent returns the number of bytes sent through the pipe
func (p Pipe) BytesSent() int {
return p.bytes
}
// Write the buffer to all registered receivers
func (p *Pipe) Write(m Message) (int, error) {
for receiver := range p.receivers {
receiver.Write(m.Format(receiver))
}
bytes := len(m.buffer)
if !m.system {
p.bytes += bytes
p.written.WriteCompleted(bytes)
}
return bytes, nil
}
// Close all of the registered receivers
func (p *Pipe) Close() error {
for receiver := range p.receivers {
// errors from one of the receivers shouldn't affect any others
receiver.Close()
}
return nil
}
func (p Pipe) String() string {
return fmt.Sprintf("%d receivers | %d senders | %d bytes\n",
p.ReceiverCount(),
p.SenderCount(),
p.BytesSent())
}
// MakePipe creates the struct for a pipe
func MakePipe(written WriteCompleteHandler) *Pipe {
return &Pipe{
receivers: make(map[RecieveWriter]bool),
senders: 0,
bytes: 0,
written: written,
receiverAdded: make(map[chan bool]bool),
}
}