-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsnd.go
299 lines (252 loc) · 8.82 KB
/
snd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
package tomtp
import (
"context"
"errors"
"sync"
)
type packetKey [10]byte
func (p packetKey) offset() uint64 {
return Uint64(p[:8])
}
func (p packetKey) length() uint16 {
return Uint16(p[8:])
}
func (p packetKey) less(other packetKey) bool {
for i := 0; i < 10; i++ {
if p[i] < other[i] {
return true
}
if p[i] > other[i] {
return false
}
}
return false
}
func createPacketKey(offset uint64, length uint16) packetKey {
var p packetKey
PutUint64(p[:8], offset)
PutUint16(p[8:], length)
return p
}
// StreamBuffer represents a single stream's dataToSend and metadata
type StreamBuffer struct {
// here we append the dataToSend, after appending, we sent currentOffset.
// This is necessary, as when dataToSend gets acked, we Remove the acked dataToSend,
// which will be in front of the array. Thus, len(dataToSend) would not work.
dataToSend []byte
// this is the offset of the dataToSend we did not send yet
unsentOffset uint64
// this is the offset of the dataToSend we did send
sentOffset uint64
// when dataToSend is acked, we Remove the dataToSend, however we don't want to update all the offsets, hence this bias
// TODO: check what happens on an 64bit rollover
bias uint64
// inflight dataToSend - key is offset, which is uint48, len in 16bit is added to a 64bit key. value is sentTime
// If MTU changes for inflight packets and need to be resent. The range is split. Example:
// offset: 500, len/mtu: 50 -> 1 range: 500/50,time
// retransmit with mtu:20 -> 3 dataInFlightMap: 500/20,time; 520/20,time; 540/10,time
dataInFlightMap *linkedHashMap[packetKey, *node[packetKey, uint64]]
}
type SendBuffer struct {
streams *linkedHashMap[uint32, *StreamBuffer] // Changed to LinkedHashMap
lastReadToSendStream uint32 //for round-robin, we continue where we left
lastReadToRetransmitStream uint32
capacity int //len(dataToSend) of all streams cannot become larger than capacity
totalSize int //len(dataToSend) of all streams
capacityAvailable chan struct{} // Signal that capacity is now available
mu *sync.Mutex
}
func NewStreamBuffer() *StreamBuffer {
return &StreamBuffer{
dataToSend: []byte{},
dataInFlightMap: newLinkedHashMap[packetKey, *node[packetKey, uint64]](),
}
}
func NewSendBuffer(capacity int) *SendBuffer {
return &SendBuffer{
streams: newLinkedHashMap[uint32, *StreamBuffer](),
capacity: capacity,
capacityAvailable: make(chan struct{}, 1), // Buffered channel of size 1
mu: &sync.Mutex{},
}
}
// InsertBlocking stores the dataToSend in the dataMap, does not send yet
func (sb *SendBuffer) InsertBlocking(ctx context.Context, streamId uint32, data []byte) (int, error) {
var processedBytes int
remainingData := data
for len(remainingData) > 0 {
sb.mu.Lock()
// Calculate how much dataToSend we can insert
remainingCapacity := sb.capacity - sb.totalSize
if remainingCapacity <= 0 {
sb.mu.Unlock()
select {
case <-sb.capacityAvailable:
continue
case <-ctx.Done():
return processedBytes, ctx.Err()
}
}
// Calculate chunk size
chunkSize := min(len(remainingData), remainingCapacity)
chunk := remainingData[:chunkSize]
// Get or create stream buffer
entry := sb.streams.Get(streamId)
if entry == nil {
stream := NewStreamBuffer()
entry = sb.streams.Put(streamId, stream)
}
stream := entry.value
// Store chunk
stream.dataToSend = append(stream.dataToSend, chunk...)
stream.unsentOffset = stream.unsentOffset + uint64(chunkSize)
sb.totalSize += chunkSize
// Update remaining dataToSend
remainingData = remainingData[chunkSize:]
processedBytes += chunkSize
sb.mu.Unlock()
}
return processedBytes, nil
}
// ReadyToSend gets data from dataToSend and creates a entry in dataInFlightMap
func (sb *SendBuffer) ReadyToSend(streamId uint32, maxData uint16, nowMillis uint64) (splitData []byte) {
sb.mu.Lock()
defer sb.mu.Unlock()
if sb.streams.Size() == 0 {
return nil
}
streamPair := sb.streams.Get(streamId)
if streamPair == nil {
return nil
}
stream := streamPair.value
streamId = streamPair.key
// Check if there's unsent dataToSend, if true, we have unsent dataToSend
if stream.unsentOffset > stream.sentOffset {
remainingData := stream.unsentOffset - stream.sentOffset
//the max length we can send
length := uint16(min(uint64(maxData), remainingData))
// Pack offset and length into key
key := createPacketKey(stream.sentOffset, length)
// Check if range is already tracked
if stream.dataInFlightMap.Get(key) == nil {
// Get dataToSend slice accounting for bias
offset := stream.sentOffset - stream.bias
splitData = stream.dataToSend[offset : offset+uint64(length)]
// Track range
stream.dataInFlightMap.Put(key, newNode(key, nowMillis))
// Update tracking
stream.sentOffset = stream.sentOffset + uint64(length)
sb.lastReadToSendStream = streamId
return splitData
} else {
panic(errors.New("stream range already sent? should not happen"))
}
}
return nil
}
// ReadyToRetransmit finds expired dataInFlightMap that need to be resent
func (sb *SendBuffer) ReadyToRetransmit(streamId uint32, maxData uint16, rto uint64, nowMillis uint64) (data []byte) {
sb.mu.Lock()
defer sb.mu.Unlock()
if sb.streams.Size() == 0 {
return nil
}
streamPair := sb.streams.Get(streamId)
if streamPair == nil {
return nil
}
stream := streamPair.value
streamId = streamPair.key
// Check Oldest range first
dataInFlight := stream.dataInFlightMap.Oldest()
if dataInFlight != nil {
sentTime := dataInFlight.value.value
if !dataInFlight.value.IsShadow() && nowMillis-uint64(sentTime) > rto {
// Extract offset and length from key
rangeOffset := dataInFlight.key.offset()
rangeLen := dataInFlight.key.length()
// Get dataToSend using bias
dataOffset := rangeOffset - stream.bias
data = stream.dataToSend[dataOffset : dataOffset+uint64(rangeLen)]
sb.lastReadToRetransmitStream = streamId
if rangeLen <= maxData {
// Remove old range
stream.dataInFlightMap.Remove(dataInFlight.key)
// Same MTU - resend entire range
stream.dataInFlightMap.Put(dataInFlight.key, newNode(dataInFlight.key, nowMillis))
return data
} else {
// Split range due to smaller MTU
leftKey := createPacketKey(rangeOffset, maxData)
// Queue remaining dataToSend with nxt offset
remainingOffset := rangeOffset + uint64(maxData)
remainingLen := rangeLen - maxData
rightKey := createPacketKey(remainingOffset, remainingLen)
l, r := dataInFlight.value.Split(leftKey, nowMillis, rightKey, dataInFlight.value.value)
oldParentKey := dataInFlight.key
oldParentValue := dataInFlight.value.value
n := newNode(r.key, oldParentValue)
//we return the left, thus we need to reinsert as we have a new send time
//the right we keep, and Replace it with the old value, so it keeps the send time
dataInFlight.Replace(r.key, n)
stream.dataInFlightMap.Put(l.key, newNode(l.key, nowMillis))
stream.dataInFlightMap.Put(oldParentKey, newNode(oldParentKey, nowMillis))
return data[:maxData]
}
}
}
return nil
}
// AcknowledgeRange handles acknowledgment of dataToSend
func (sb *SendBuffer) AcknowledgeRange(streamId uint32, offset uint64, length uint16) (sentTimeMillis uint64) {
sb.mu.Lock()
streamPair := sb.streams.Get(streamId)
if streamPair == nil {
sb.mu.Unlock()
return 0
}
stream := streamPair.value
// Remove range key
key := createPacketKey(offset, length)
rangePair := stream.dataInFlightMap.Remove(key)
if rangePair == nil {
sb.mu.Unlock()
return 0
}
sentTimeMillis = rangePair.value.value
delKeys := rangePair.value.Remove()
for _, delKey := range delKeys {
deletePair := stream.dataInFlightMap.Remove(delKey)
if deletePair != nil {
removeSentTime := deletePair.value.value
if removeSentTime < sentTimeMillis {
sentTimeMillis = removeSentTime
}
}
}
// If this range starts at our bias point, we can Remove dataToSend
if offset == stream.bias {
// Check if we have a gap between this ack and nxt range
nextRange := stream.dataInFlightMap.Oldest()
if nextRange == nil {
// No gap, safe to Remove all dataToSend
stream.dataToSend = stream.dataToSend[stream.sentOffset-stream.bias:]
sb.totalSize -= int(stream.sentOffset - stream.bias)
stream.bias += stream.sentOffset - stream.bias
} else {
nextOffset := nextRange.key.offset()
stream.dataToSend = stream.dataToSend[nextOffset-stream.bias:]
stream.bias += nextOffset
sb.totalSize -= int(nextOffset)
}
// Broadcast capacity availability
select {
case sb.capacityAvailable <- struct{}{}: //Signal the release
default: // Non-blocking send to avoid blocking when the channel is full
// another goroutine is already aware of this, skipping
}
}
sb.mu.Unlock()
return sentTimeMillis
}