From 15ac57b58cf948c2ed3f5f91e078e03d3fbdb904 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Tue, 15 Jun 2021 13:11:32 +0200 Subject: [PATCH 1/3] implement UserIdleTimeout Implement a new method SetUserIdleTimeout() that allows to set a timeout for idle sessions, but contrarty to the SetIdleTimeout(), this doesn't take into account SPDY ping frames. This allows consumers to use SPDY ping frames to keep the TCP and SPDY connections alive, but also to detect and close the connection if it is not being used, i.e. no data is sent by the applications through the connection. Signed-off-by: Antonio Ojea --- connection.go | 42 +++++++++---- spdy_test.go | 162 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 194 insertions(+), 10 deletions(-) diff --git a/connection.go b/connection.go index d906bb0..3606d3b 100644 --- a/connection.go +++ b/connection.go @@ -45,13 +45,14 @@ type StreamHandler func(stream *Stream) type AuthHandler func(header http.Header, slot uint8, parent uint32) bool type idleAwareFramer struct { - f *spdy.Framer - conn *Connection - writeLock sync.Mutex - resetChan chan struct{} - setTimeoutLock sync.Mutex - setTimeoutChan chan time.Duration - timeout time.Duration + f *spdy.Framer + conn *Connection + writeLock sync.Mutex + resetChan chan struct{} + setTimeoutLock sync.Mutex + setTimeoutChan chan time.Duration + timeout time.Duration + ignorePingFrames bool } func newIdleAwareFramer(framer *spdy.Framer) *idleAwareFramer { @@ -158,6 +159,13 @@ func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error { return err } + if i.ignorePingFrames { + _, ok := frame.(*spdy.PingFrame) + if ok { + return nil + } + } + i.resetChan <- struct{}{} return nil @@ -169,19 +177,27 @@ func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) { return nil, err } + if i.ignorePingFrames { + _, ok := frame.(*spdy.PingFrame) + if ok { + return frame, nil + } + } + // resetChan should never be closed since it is only closed // when the connection has closed its closeChan. This closure // only occurs after all Reads have finished // TODO (dmcgowan): refactor relationship into connection i.resetChan <- struct{}{} - return frame, nil } -func (i *idleAwareFramer) setIdleTimeout(timeout time.Duration) { +func (i *idleAwareFramer) setIdleTimeout(timeout time.Duration, ignorePingFrames bool) { i.setTimeoutLock.Lock() defer i.setTimeoutLock.Unlock() + i.ignorePingFrames = ignorePingFrames + if i.setTimeoutChan == nil { return } @@ -834,7 +850,13 @@ func (s *Connection) SetCloseTimeout(timeout time.Duration) { // SetIdleTimeout sets the amount of time the connection may sit idle before // it is forcefully terminated. func (s *Connection) SetIdleTimeout(timeout time.Duration) { - s.framer.setIdleTimeout(timeout) + s.framer.setIdleTimeout(timeout, false) +} + +// SetUserIdleTimeout sets the amount of time the connection may sit idle, +// not taking into account SPDY Ping frames, before it is forcefully terminated +func (s *Connection) SetUserIdleTimeout(timeout time.Duration) { + s.framer.setIdleTimeout(timeout, true) } func (s *Connection) sendHeaders(headers http.Header, stream *Stream, fin bool) error { diff --git a/spdy_test.go b/spdy_test.go index 312a950..9a4ea58 100644 --- a/spdy_test.go +++ b/spdy_test.go @@ -544,6 +544,34 @@ func TestIdleNoData(t *testing.T) { wg.Wait() } +func TestUserIdleNoData(t *testing.T) { + var wg sync.WaitGroup + server, listen, serverErr := runServer(&wg) + if serverErr != nil { + t.Fatalf("Error initializing server: %s", serverErr) + } + + conn, dialErr := net.Dial("tcp", listen) + if dialErr != nil { + t.Fatalf("Error dialing server: %s", dialErr) + } + + spdyConn, spdyErr := NewConnection(conn, false) + if spdyErr != nil { + t.Fatalf("Error creating spdy connection: %s", spdyErr) + } + go spdyConn.Serve(NoOpStreamHandler) + + spdyConn.SetUserIdleTimeout(10 * time.Millisecond) + <-spdyConn.CloseChan() + + closeErr := server.Close() + if closeErr != nil { + t.Fatalf("Error shutting down server: %s", closeErr) + } + wg.Wait() +} + func TestIdleWithData(t *testing.T) { var wg sync.WaitGroup server, listen, serverErr := runServer(&wg) @@ -606,6 +634,140 @@ Loop: wg.Wait() } +func TestIdleWithPing(t *testing.T) { + var wg sync.WaitGroup + server, listen, serverErr := runServer(&wg) + if serverErr != nil { + t.Fatalf("Error initializing server: %s", serverErr) + } + + conn, dialErr := net.Dial("tcp", listen) + if dialErr != nil { + t.Fatalf("Error dialing server: %s", dialErr) + } + + spdyConn, spdyErr := NewConnection(conn, false) + if spdyErr != nil { + t.Fatalf("Error creating spdy connection: %s", spdyErr) + } + go spdyConn.Serve(NoOpStreamHandler) + + spdyConn.SetIdleTimeout(25 * time.Millisecond) + + authenticated = true + _, err := spdyConn.CreateStream(http.Header{}, nil, false) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + + writeCh := make(chan struct{}) + + go func() { + for i := 0; i < 10; i++ { + pingTime, pingErr := spdyConn.Ping() + if pingErr != nil { + t.Errorf("Error pinging server: %s", pingErr) + } + + if pingTime == time.Duration(0) { + t.Errorf("Expecting non-zero ping time") + } + time.Sleep(10 * time.Millisecond) + } + close(writeCh) + }() + + writesFinished := false + +Loop: + for { + select { + case <-writeCh: + writesFinished = true + case <-spdyConn.CloseChan(): + if !writesFinished { + t.Fatal("Connection closed before all writes finished") + } + break Loop + } + } + + closeErr := server.Close() + if closeErr != nil { + t.Fatalf("Error shutting down server: %s", closeErr) + } + wg.Wait() +} + +func TestUserIdleWithPing(t *testing.T) { + var wg sync.WaitGroup + server, listen, serverErr := runServer(&wg) + if serverErr != nil { + t.Fatalf("Error initializing server: %s", serverErr) + } + + conn, dialErr := net.Dial("tcp", listen) + if dialErr != nil { + t.Fatalf("Error dialing server: %s", dialErr) + } + + spdyConn, spdyErr := NewConnection(conn, false) + if spdyErr != nil { + t.Fatalf("Error creating spdy connection: %s", spdyErr) + } + go spdyConn.Serve(NoOpStreamHandler) + + spdyConn.SetUserIdleTimeout(25 * time.Millisecond) + + authenticated = true + _, err := spdyConn.CreateStream(http.Header{}, nil, false) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + + writeCh := make(chan struct{}) + + go func() { + for i := 0; i < 10; i++ { + select { + case <-spdyConn.CloseChan(): + default: + pingTime, pingErr := spdyConn.Ping() + if pingErr != nil { + t.Errorf("Error pinging server: %s", pingErr) + } + + if pingTime == time.Duration(0) { + t.Errorf("Expecting non-zero ping time") + } + time.Sleep(10 * time.Millisecond) + } + } + close(writeCh) + }() + + writesFinished := false + +Loop: + for { + select { + case <-writeCh: + writesFinished = true + case <-spdyConn.CloseChan(): + if writesFinished { + t.Fatal("Connection closed after all writes finished") + } + break Loop + } + } + + closeErr := server.Close() + if closeErr != nil { + t.Fatalf("Error shutting down server: %s", closeErr) + } + wg.Wait() +} + func TestIdleRace(t *testing.T) { var wg sync.WaitGroup server, listen, serverErr := runServer(&wg) From 96c6c1ce4a40077ae3882ea6e609217a9ee69492 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Wed, 16 Jun 2021 10:45:43 +0200 Subject: [PATCH 2/3] fix data race on test running the tests with -race flag shows that there is a race trying to access the global variable "authenticated" during the tests. Using a lock to access the variable fixes it. Signed-off-by: Antonio Ojea --- spdy_test.go | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/spdy_test.go b/spdy_test.go index 9a4ea58..582fb5a 100644 --- a/spdy_test.go +++ b/spdy_test.go @@ -50,7 +50,9 @@ func TestSpdyStreams(t *testing.T) { } go spdyConn.Serve(NoOpStreamHandler) + authMu.Lock() authenticated = true + authMu.Unlock() stream, streamErr := spdyConn.CreateStream(http.Header{}, nil, false) if streamErr != nil { t.Fatalf("Error creating stream: %s", streamErr) @@ -144,7 +146,9 @@ func TestSpdyStreams(t *testing.T) { t.Fatalf("Error reseting stream: %s", streamResetErr) } + authMu.Lock() authenticated = false + authMu.Unlock() badStream, badStreamErr := spdyConn.CreateStream(http.Header{}, nil, false) if badStreamErr != nil { t.Fatalf("Error creating stream: %s", badStreamErr) @@ -225,7 +229,9 @@ func TestHalfClose(t *testing.T) { } go spdyConn.Serve(NoOpStreamHandler) + authMu.Lock() authenticated = true + authMu.Unlock() stream, streamErr := spdyConn.CreateStream(http.Header{}, nil, false) if streamErr != nil { t.Fatalf("Error creating stream: %s", streamErr) @@ -311,7 +317,9 @@ func TestUnexpectedRemoteConnectionClosed(t *testing.T) { } go spdyConn.Serve(NoOpStreamHandler) + authMu.Lock() authenticated = true + authMu.Unlock() stream, streamErr := spdyConn.CreateStream(http.Header{}, nil, false) if streamErr != nil { t.Fatalf("Error creating stream: %s", streamErr) @@ -427,7 +435,9 @@ func TestIdleShutdownRace(t *testing.T) { } go spdyConn.Serve(NoOpStreamHandler) + authMu.Lock() authenticated = true + authMu.Unlock() stream, err := spdyConn.CreateStream(http.Header{}, nil, false) if err != nil { t.Fatalf("Error creating stream: %v", err) @@ -592,7 +602,9 @@ func TestIdleWithData(t *testing.T) { spdyConn.SetIdleTimeout(25 * time.Millisecond) + authMu.Lock() authenticated = true + authMu.Unlock() stream, err := spdyConn.CreateStream(http.Header{}, nil, false) if err != nil { t.Fatalf("Error creating stream: %v", err) @@ -654,7 +666,9 @@ func TestIdleWithPing(t *testing.T) { spdyConn.SetIdleTimeout(25 * time.Millisecond) + authMu.Lock() authenticated = true + authMu.Unlock() _, err := spdyConn.CreateStream(http.Header{}, nil, false) if err != nil { t.Fatalf("Error creating stream: %v", err) @@ -719,7 +733,9 @@ func TestUserIdleWithPing(t *testing.T) { spdyConn.SetUserIdleTimeout(25 * time.Millisecond) + authMu.Lock() authenticated = true + authMu.Unlock() _, err := spdyConn.CreateStream(http.Header{}, nil, false) if err != nil { t.Fatalf("Error creating stream: %v", err) @@ -788,8 +804,9 @@ func TestIdleRace(t *testing.T) { spdyConn.SetIdleTimeout(10 * time.Millisecond) + authMu.Lock() authenticated = true - + authMu.Unlock() for i := 0; i < 10; i++ { _, err := spdyConn.CreateStream(http.Header{}, nil, false) if err != nil { @@ -873,7 +890,9 @@ func TestStreamReset(t *testing.T) { } go spdyConn.Serve(NoOpStreamHandler) + authMu.Lock() authenticated = true + authMu.Unlock() stream, streamErr := spdyConn.CreateStream(http.Header{}, nil, false) if streamErr != nil { t.Fatalf("Error creating stream: %s", streamErr) @@ -921,7 +940,9 @@ func TestStreamResetWithDataRemaining(t *testing.T) { } go spdyConn.Serve(NoOpStreamHandler) + authMu.Lock() authenticated = true + authMu.Unlock() stream, streamErr := spdyConn.CreateStream(http.Header{}, nil, false) if streamErr != nil { t.Fatalf("Error creating stream: %s", streamErr) @@ -1316,12 +1337,16 @@ func TestStreamReadUnblocksAfterCloseThenReset(t *testing.T) { } var authenticated bool +var authMu sync.Mutex func authStreamHandler(stream *Stream) { + authMu.Lock() if !authenticated { + authMu.Unlock() stream.Refuse() return } + authMu.Unlock() MirrorStreamHandler(stream) } From 83610fa6fc69ad17491d9e488ed3c415d0debebb Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Fri, 18 Jun 2021 12:25:44 +0200 Subject: [PATCH 3/3] don't race on ignore ping frames Instead of using locking, that will cause read and write frames to wait for each others and introduce a risk of deadlocking, use atomic operations to check the new option to ignore ping SPDY frames for idling. Signed-off-by: Antonio Ojea --- connection.go | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/connection.go b/connection.go index 3606d3b..eaeaa2d 100644 --- a/connection.go +++ b/connection.go @@ -23,6 +23,7 @@ import ( "net" "net/http" "sync" + "sync/atomic" "time" "github.com/moby/spdystream/spdy" @@ -40,6 +41,25 @@ const ( QUEUE_SIZE = 50 ) +// atomicBool uses load/store operations on an int32 to simulate an atomic boolean. +type atomicBool struct { + v int32 +} + +// set sets the int32 to the given boolean. +func (a *atomicBool) set(value bool) { + if value { + atomic.StoreInt32(&a.v, 1) + return + } + atomic.StoreInt32(&a.v, 0) +} + +// get returns true if the int32 == 1 +func (a *atomicBool) get() bool { + return atomic.LoadInt32(&a.v) == 1 +} + type StreamHandler func(stream *Stream) type AuthHandler func(header http.Header, slot uint8, parent uint32) bool @@ -52,7 +72,7 @@ type idleAwareFramer struct { setTimeoutLock sync.Mutex setTimeoutChan chan time.Duration timeout time.Duration - ignorePingFrames bool + ignorePingFrames *atomicBool } func newIdleAwareFramer(framer *spdy.Framer) *idleAwareFramer { @@ -61,7 +81,8 @@ func newIdleAwareFramer(framer *spdy.Framer) *idleAwareFramer { resetChan: make(chan struct{}, 2), // setTimeoutChan needs to be buffered to avoid deadlocks when calling setIdleTimeout at about // the same time the connection is being closed - setTimeoutChan: make(chan time.Duration, 1), + setTimeoutChan: make(chan time.Duration, 1), + ignorePingFrames: &atomicBool{0}, } return iaf } @@ -159,7 +180,7 @@ func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error { return err } - if i.ignorePingFrames { + if i.ignorePingFrames.get() { _, ok := frame.(*spdy.PingFrame) if ok { return nil @@ -177,7 +198,7 @@ func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) { return nil, err } - if i.ignorePingFrames { + if i.ignorePingFrames.get() { _, ok := frame.(*spdy.PingFrame) if ok { return frame, nil @@ -193,11 +214,11 @@ func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) { } func (i *idleAwareFramer) setIdleTimeout(timeout time.Duration, ignorePingFrames bool) { + i.ignorePingFrames.set(ignorePingFrames) + i.setTimeoutLock.Lock() defer i.setTimeoutLock.Unlock() - i.ignorePingFrames = ignorePingFrames - if i.setTimeoutChan == nil { return }