-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstream.go
144 lines (112 loc) · 2.76 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package tomtp
import (
"context"
"errors"
"io"
"log/slog"
"sync"
)
type StreamState uint8
const (
StreamStarting StreamState = iota
StreamOpen
StreamEnding
StreamEnded
)
var (
ErrStreamClosed = errors.New("stream closed")
ErrWriteTooLarge = errors.New("write exceeds maximum Size")
ErrTimeout = errors.New("operation timed out")
)
type Stream struct {
// Connection info
streamId uint32
conn *Connection
state StreamState
// Reliable delivery buffers
//rbRcv *ReceiveBuffer // Receive buffer for incoming dataToSend
// Statistics
bytesRead uint64
lastWindowUpdate uint64
// Stream state
lastActive uint64 // Unix timestamp of last activity
closeTimeout uint64 // Unix timestamp for close timeout
closeInitiated bool
closePending bool
closeCtx context.Context
closeCancelFn context.CancelFunc
mu sync.Mutex
}
func (s *Stream) Write(b []byte) (nTot int, err error) {
s.mu.Lock()
defer s.mu.Unlock()
slog.Debug("Write", debugGoroutineID(), s.debug(), slog.String("b...", string(b[:min(10, len(b))])))
for len(b) > 0 {
var n int
n, err = s.conn.rbSnd.InsertBlocking(s.closeCtx, s.streamId, b)
if err != nil {
return nTot, err
}
nTot += n
// Signal the listener that there is dataToSend to send
err = s.conn.listener.localConn.CancelRead()
if err != nil {
return nTot, err
}
b = b[n:]
}
return nTot, nil
}
func (s *Stream) Read(b []byte) (n int, err error) {
s.mu.Lock()
defer s.mu.Unlock()
slog.Debug("read dataToSend start", debugGoroutineID(), s.debug())
_, data, err := s.conn.rbRcv.RemoveOldestInOrderBlocking(s.closeCtx, s.streamId)
if err != nil {
return 0, err
}
if data == nil {
if s.state >= StreamEnded {
return 0, io.EOF
}
return 0, nil
}
n = copy(b, data)
slog.Debug("read Data done", debugGoroutineID(), s.debug(), slog.String("b...", string(b[:min(10, n)])))
s.bytesRead += uint64(n)
return n, nil
}
func (s *Stream) ReadBytes() (b []byte, err error) {
s.mu.Lock()
defer s.mu.Unlock()
slog.Debug("read dataToSend start", debugGoroutineID(), s.debug())
_, data, err := s.conn.rbRcv.RemoveOldestInOrderBlocking(s.closeCtx, s.streamId)
if err != nil {
return nil, err
}
s.bytesRead += uint64(len(data))
return data, nil
}
func (s *Stream) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.state >= StreamEnding {
return nil
}
s.state = StreamEnding
s.closeCancelFn()
return nil
}
func (s *Stream) debug() slog.Attr {
return s.conn.listener.debug(s.conn.remoteAddr)
}
func (s *Stream) receive(offset uint64, decodedData []byte) {
s.mu.Lock()
defer s.mu.Unlock()
if len(decodedData) > 0 {
s.conn.rbRcv.Insert(s.streamId, offset, decodedData)
}
}
func (s *Stream) calcLen(mtu int, ackLen int) uint16 {
return uint16(mtu - s.Overhead(ackLen))
}