Skip to content

Commit

Permalink
more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
tbocek committed Feb 9, 2025
1 parent 1c2f367 commit 4ed1bca
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 135 deletions.
25 changes: 25 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: Go

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.23'

- name: Build
run: go build -v ./...

- name: Test
run: go test -v ./...
2 changes: 1 addition & 1 deletion codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (s *Stream) encode(b []byte) (enc []byte, offset int, err error) {
p := &Payload{
CloseOp: GetCloseOp(s.state == StreamEnding, s.conn.state == ConnectionEnding),
IsSender: s.conn.sender,
RcvWndSize: uint64(s.rbRcv.Size()),
RcvWndSize: s.rcvWndSize - uint64(s.rbRcv.Size()),
Acks: s.rbRcv.GetAcks(),
StreamId: s.streamId,
StreamOffset: s.streamOffsetNext,
Expand Down
4 changes: 4 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tomtp

import (
"context"
"crypto/ecdh"
"log/slog"
"net/netip"
Expand Down Expand Up @@ -120,12 +121,15 @@ func (c *Connection) GetOrNewStreamRcv(streamId uint32) (*Stream, bool) {
}

if stream, ok := c.streams[streamId]; !ok {
ctx, cancel := context.WithCancel(context.Background())
s := &Stream{
streamId: streamId,
streamOffsetNext: 0,
state: StreamStarting,
conn: c,
rbRcv: NewReceiveBuffer(rcvBufferCapacity),
closeCtx: ctx,
closeCancelFn: cancel,
mu: sync.Mutex{},
}
c.streams[streamId] = s
Expand Down
16 changes: 7 additions & 9 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,23 @@ func TestConnection_GetOrNewStreamRcv(t *testing.T) {
{
name: "new stream",
streamID: 1,
setup: false,
setup: true,
},
{
name: "existing stream",
streamID: 2,
setup: true,
streamID: 1,
setup: false,
},
}

conn := &Connection{
streams: make(map[uint32]*Stream),
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conn := &Connection{
streams: make(map[uint32]*Stream),
}

stream, isNew := conn.GetOrNewStreamRcv(tt.streamID)
assert.NotNil(t, stream)
assert.Equal(t, tt.streamID, stream.streamId)
assert.Equal(t, !tt.setup, isNew)
assert.Equal(t, tt.setup, isNew)
})
}
}
Expand Down
27 changes: 12 additions & 15 deletions end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func relayData(connSrc, connDest *inMemoryNetworkConn, maxBytes int) (int, error
return int(bytesWritten), nil
}

func createConnectedStreams(
func createTwoStreams(
nConnA *inMemoryNetworkConn,
nConnB *inMemoryNetworkConn,
prvKeyA *ecdh.PrivateKey,
Expand All @@ -184,7 +184,7 @@ func createConnectedStreams(
return nil, nil, errors.New("failed to create listener B: " + err.Error())
}

connA, err := listenerA.DialString(nConnB.LocalAddr().String(), hexPublicKey2)
connA, err := listenerA.DialString(nConnB.LocalAddr().String(), hexPubKey2)
if err != nil {
listenerA.Close() // clean up everything here!
listenerB.Close()
Expand All @@ -203,26 +203,23 @@ func createConnectedStreams(

func TestEndToEndInMemory(t *testing.T) {
nConnA, nConnB, err := setupInMemoryPair()
if err != nil {
t.Fatalf("failed to setup in-memory connections: %v", err)
}
assert.Nil(t, err)
defer nConnA.Close()
defer nConnB.Close()

var streamB *Stream
acceptB := func(s *Stream) {
slog.Info("A: accept connection")
streamB = s
}

streamA, listenerB, err := createConnectedStreams(nConnA, nConnB, testPrivateKey1, testPrivateKey2, acceptB)
streamA, listenerB, err := createTwoStreams(nConnA, nConnB, testPrvKey1, testPrvKey2, func(s *Stream) { streamB = s })
assert.Nil(t, err)

a := []byte("hallo")
streamA.Write(a)
streamA.conn.listener.Update(0)
relayData(nConnA, nConnB, startMtu)
listenerB.Update(0)
_, err = streamA.Write(a)
assert.Nil(t, err)
err = streamA.conn.listener.Update(0)
assert.Nil(t, err)
_, err = relayData(nConnA, nConnB, startMtu)
assert.Nil(t, err)
err = listenerB.Update(0)
assert.Nil(t, err)
b, err := streamB.ReadBytes()
assert.Nil(t, err)
assert.Equal(t, a, b)
Expand Down
34 changes: 17 additions & 17 deletions listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ import (
)

var (
testPrivateSeed1 = [32]byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
testPrivateSeed2 = [32]byte{2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}
testPrivateKey1, _ = ecdh.X25519().NewPrivateKey(testPrivateSeed1[:])
testPrivateKey2, _ = ecdh.X25519().NewPrivateKey(testPrivateSeed2[:])
testPrvSeed1 = [32]byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
testPrvSeed2 = [32]byte{2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}
testPrvKey1, _ = ecdh.X25519().NewPrivateKey(testPrvSeed1[:])
testPrvKey2, _ = ecdh.X25519().NewPrivateKey(testPrvSeed2[:])

hexPublicKey1 = fmt.Sprintf("0x%x", testPrivateKey1.PublicKey().Bytes())
hexPublicKey2 = fmt.Sprintf("0x%x", testPrivateKey2.PublicKey().Bytes())
hexPubKey1 = fmt.Sprintf("0x%x", testPrvKey1.PublicKey().Bytes())
hexPubKey2 = fmt.Sprintf("0x%x", testPrvKey2.PublicKey().Bytes())
)

func TestNewListener(t *testing.T) {
// Test case 1: Create a new listener with a valid address
addr := "127.0.0.1:8080"
listener, err := ListenString(addr, func(s *Stream) {}, WithSeed(testPrivateSeed1))
listener, err := ListenString(addr, func(s *Stream) {}, WithSeed(testPrvSeed1))
defer listener.Close()
if err != nil {
t.Errorf("Expected no error, but got: %v", err)
Expand All @@ -34,25 +34,25 @@ func TestNewListener(t *testing.T) {

// Test case 2: Create a new listener with an invalid address
invalidAddr := "127.0.0.1:99999"
_, err = ListenString(invalidAddr, func(s *Stream) {}, WithSeed(testPrivateSeed1))
_, err = ListenString(invalidAddr, func(s *Stream) {}, WithSeed(testPrvSeed1))
if err == nil {
t.Errorf("Expected an error, but got nil")
}
}

func TestNewStream(t *testing.T) {
// Test case 1: Create a new multi-stream with a valid remote address
listener, err := ListenString("127.0.0.1:9080", func(s *Stream) {}, WithSeed(testPrivateSeed1))
listener, err := ListenString("127.0.0.1:9080", func(s *Stream) {}, WithSeed(testPrvSeed1))
defer listener.Close()
assert.Nil(t, err)
conn, err := listener.DialString("127.0.0.1:9081", hexPublicKey1)
conn, err := listener.DialString("127.0.0.1:9081", hexPubKey1)
assert.Nil(t, err)
if conn == nil {
t.Errorf("Expected a multi-stream, but got nil")
}

// Test case 2: Create a new multi-stream with an invalid remote address
conn, err = listener.DialString("127.0.0.1:99999", hexPublicKey1)
conn, err = listener.DialString("127.0.0.1:99999", hexPubKey1)
if conn != nil {
t.Errorf("Expected nil, but got a multi-stream")
}
Expand All @@ -61,10 +61,10 @@ func TestNewStream(t *testing.T) {

func TestClose(t *testing.T) {
// Test case 1: Close a listener with no multi-streams
listener, err := ListenString("127.0.0.1:9080", func(s *Stream) {}, WithSeed(testPrivateSeed1))
listener, err := ListenString("127.0.0.1:9080", func(s *Stream) {}, WithSeed(testPrvSeed1))
assert.NoError(t, err)
// Test case 2: Close a listener with multi-streams
listener.DialString("127.0.0.1:9081", hexPublicKey1)
listener.DialString("127.0.0.1:9081", hexPubKey1)
err = listener.Close()
if err != nil {
t.Errorf("Expected no error, but got: %v", err)
Expand All @@ -78,7 +78,7 @@ func TestListenerUpdate_NoActivity(t *testing.T) {
acceptFn := func(s *Stream) {
acceptCalled = true
}
listener, err := ListenString("127.0.0.1:9080", acceptFn, WithSeed(testPrivateSeed1))
listener, err := ListenString("127.0.0.1:9080", acceptFn, WithSeed(testPrvSeed1))
assert.NoError(t, err)
defer listener.Close()

Expand All @@ -103,16 +103,16 @@ func TestListenerUpdate_ReceiveData(t *testing.T) {
acceptFn := func(s *Stream) {
acceptCalled = true
}
listenerSnd, err := ListenString(":8881", func(stream *Stream) {}, WithSeed(testPrivateSeed1))
listenerSnd, err := ListenString(":8881", func(stream *Stream) {}, WithSeed(testPrvSeed1))
assert.NoError(t, err)
defer listenerSnd.Close()

connectionSnd, err := listenerSnd.DialString("127.0.0.1:8882", hexPublicKey2)
connectionSnd, err := listenerSnd.DialString("127.0.0.1:8882", hexPubKey2)
assert.NoError(t, err)

streamSnd, _ := connectionSnd.GetOrNewStreamRcv(0)

listenerRcv, err := ListenString(":8882", acceptFn, WithSeed(testPrivateSeed2))
listenerRcv, err := ListenString(":8882", acceptFn, WithSeed(testPrvSeed2))

// Sender setup
streamSnd.Write([]byte("hello"))
Expand Down
91 changes: 64 additions & 27 deletions rcv.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tomtp

import (
"context"
"sync"
)

Expand All @@ -18,19 +19,21 @@ type RcvSegment struct {
}

type ReceiveBuffer struct {
segments *skipList[packetKey, *RcvSegment] // Store out-of-order segments
nextOffset uint64 // Next expected offset
capacity int // Max buffer size
size int // Current size
mu *sync.Mutex
acks []Ack
segments *skipList[packetKey, *RcvSegment] // Store out-of-order segments
nextOffset uint64 // Next expected offset
capacity int // Max buffer size
size int // Current size
mu *sync.Mutex
acks []Ack
dataAvailable chan struct{} // Signal that data is available
}

func NewReceiveBuffer(capacity int) *ReceiveBuffer {
return &ReceiveBuffer{
segments: newSortedHashMap[packetKey, *RcvSegment](func(a, b packetKey) bool { return a.less(b) }),
capacity: capacity,
mu: &sync.Mutex{},
segments: newSortedHashMap[packetKey, *RcvSegment](func(a, b packetKey) bool { return a.less(b) }),
capacity: capacity,
mu: &sync.Mutex{},
dataAvailable: make(chan struct{}, 1),
}
}

Expand Down Expand Up @@ -60,31 +63,65 @@ func (rb *ReceiveBuffer) Insert(segment *RcvSegment) RcvInsertStatus {

rb.size += dataLen

// Signal that data is available (non-blocking send)
select {
case rb.dataAvailable <- struct{}{}:
default: // Non-blocking to prevent deadlocks if someone is already waiting
}

return RcvInsertOk
}

func (rb *ReceiveBuffer) RemoveOldestInOrder() *RcvSegment {
func (rb *ReceiveBuffer) RemoveOldestInOrder(ctx context.Context) (*RcvSegment, error) {
rb.mu.Lock()
defer rb.mu.Unlock()

// Get the Oldest segment, check if we have data in order
oldest := rb.segments.Min()
if oldest == nil || oldest.value.offset > rb.nextOffset {
return nil
for {
// Check if there is any data at all
oldest := rb.segments.Min()
if oldest == nil {
// No segments available, so wait
rb.mu.Unlock()
select {
case <-rb.dataAvailable: // Wait for new segment signal
rb.mu.Lock()
continue // Recheck segments size
case <-ctx.Done():
rb.mu.Lock()
return nil, ctx.Err() // Context cancelled
}
}

if oldest.value.offset == rb.nextOffset {
rb.segments.Remove(oldest.key)
rb.size -= int(oldest.key.length())

segment := oldest.value
if segment.offset < rb.nextOffset {
diff := rb.nextOffset - segment.offset
segment.data = segment.data[diff:]
segment.offset = rb.nextOffset
}

rb.nextOffset = segment.offset + uint64(len(segment.data))
return segment, nil
} else if oldest.value.offset > rb.nextOffset {
// Out of order; wait until segment offset available, signal that
rb.mu.Unlock()
select {
case <-rb.dataAvailable:
rb.mu.Lock() //get new data signal, re-lock to ensure no one modifies
continue // Recheck segments size after getting the data
case <-ctx.Done():
rb.mu.Lock()
return nil, ctx.Err()
}
} else {
rb.segments.Remove(oldest.key)
rb.size -= int(oldest.key.length())
// Dupe data, loop to get more data if exist
}
}

rb.segments.Remove(oldest.key)
rb.size -= int(oldest.key.length())

segment := oldest.value
if segment.offset < rb.nextOffset {
diff := rb.nextOffset - segment.offset
segment.data = segment.data[diff:]
segment.offset = rb.nextOffset
}

rb.nextOffset = segment.offset + uint64(len(segment.data))
return segment
}

func (rb *ReceiveBuffer) Size() int {
Expand Down
Loading

0 comments on commit 4ed1bca

Please sign in to comment.