From cd918fbfd8faee186aaee2332878bb9797acfc7b Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Fri, 16 Dec 2022 14:53:59 +0300 Subject: [PATCH] feature: support graceful shutdown If connected to Tarantool 2.10 or newer and WatchersFeature is required, after this patch connection supports server graceful shutdown [1]. In this case, server will wait until all client requests will be finished and client disconnects before going down (server also may go down by timeout). Client reconnect will happen if connection options enable reconnect. 1. https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ Closes #214 --- CHANGELOG.md | 1 + connection.go | 152 +++++++++++++++-- errors.go | 1 + shutdown_test.go | 426 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 566 insertions(+), 14 deletions(-) create mode 100644 shutdown_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7930b34d1..8cc87979d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Error type support in MessagePack (#209) - Event subscription support (#119) - Session settings support (#215) +- Support graceful shutdown (#214) ### Changed diff --git a/connection.go b/connection.go index b657dc2ae..9197cf56d 100644 --- a/connection.go +++ b/connection.go @@ -25,7 +25,8 @@ const ignoreStreamId = 0 const ( connDisconnected = 0 connConnected = 1 - connClosed = 2 + connShutdown = 2 + connClosed = 3 ) const ( @@ -33,6 +34,8 @@ const ( connTransportSsl = "ssl" ) +const shutdownEventKey = "box.shutdown" + type ConnEventKind int type ConnLogKind int @@ -45,6 +48,8 @@ const ( ReconnectFailed // Either reconnect attempts exhausted, or explicit Close is called. Closed + // Shutdown signals that shutdown callback is processing. + Shutdown // LogReconnectFailed is logged when reconnect attempt failed. LogReconnectFailed ConnLogKind = iota + 1 @@ -134,10 +139,19 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac // always returns array of array (array of tuples for space related methods). // For Eval* and Call* Tarantool always returns array, but does not forces // array of arrays. +// +// If connected to Tarantool 2.10 or newer and WatchersFeature is required, +// connection supports server graceful shutdown. In this case, server will +// wait until all client requests will be finished and client disconnects +// before going down (server also may go down by timeout). Client reconnect will +// happen if connection options enable reconnect. +// +// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ type Connection struct { addr string c net.Conn mutex sync.Mutex + cond *sync.Cond // Schema contains schema loaded on connection. Schema *Schema // requestId contains the last request ID for requests with nil context. @@ -162,6 +176,11 @@ type Connection struct { serverProtocolInfo ProtocolInfo // watchMap is a map of key -> chan watchState. watchMap sync.Map + + // shutdownWatcher is the "box.shutdown" event watcher. + shutdownWatcher Watcher + // requestCnt is a counter of active requests. + requestCnt uint32 } var _ = Connector(&Connection{}) // Check compatibility with connector interface. @@ -385,6 +404,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) { conn.opts.Logger = defaultLogger{} } + conn.cond = sync.NewCond(&conn.mutex) + if err = conn.createConnection(false); err != nil { ter, ok := err.(Error) if conn.opts.Reconnect <= 0 { @@ -421,6 +442,16 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) { } } + // Subscribe shutdown event to process graceful shutdown. + if conn.isWatchersRequired() { + watcher, werr := conn.NewWatcher(shutdownEventKey, shutdownEventCallback) + if werr != nil { + conn.closeConnection(werr, true) + return nil, werr + } + conn.shutdownWatcher = watcher + } + return conn, err } @@ -589,6 +620,7 @@ func (conn *Connection) dial() (err error) { conn.lockShards() conn.c = connection atomic.StoreUint32(&conn.state, connConnected) + conn.cond.Broadcast() conn.unlockShards() go conn.writer(w, connection) go conn.reader(r, connection) @@ -762,10 +794,19 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error) if conn.state != connClosed { close(conn.control) atomic.StoreUint32(&conn.state, connClosed) + conn.cond.Broadcast() + // Free the resources. + if conn.shutdownWatcher != nil { + conn.mutex.Unlock() + conn.shutdownWatcher.Unregister() + conn.shutdownWatcher = nil + conn.mutex.Lock() + } conn.notify(Closed) } } else { atomic.StoreUint32(&conn.state, connDisconnected) + conn.cond.Broadcast() conn.notify(Disconnected) } if conn.c != nil { @@ -784,9 +825,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error) return } -func (conn *Connection) reconnect(neterr error, c net.Conn) { - conn.mutex.Lock() - defer conn.mutex.Unlock() +func (conn *Connection) reconnectImpl(neterr error, c net.Conn) { if conn.opts.Reconnect > 0 { if c == conn.c { conn.closeConnection(neterr, false) @@ -799,6 +838,13 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) { } } +func (conn *Connection) reconnect(neterr error, c net.Conn) { + conn.mutex.Lock() + defer conn.mutex.Unlock() + conn.reconnectImpl(neterr, c) + conn.cond.Broadcast() +} + func (conn *Connection) lockShards() { for i := range conn.shard { conn.shard[i].rmut.Lock() @@ -1026,6 +1072,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) { fut.done = nil shard.rmut.Unlock() return + case connShutdown: + fut.err = ClientError{ + ErrConnectionShutdown, + "server shutdown in progress", + } + fut.ready = nil + fut.done = nil + shard.rmut.Unlock() + return } pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1) if ctx != nil { @@ -1086,6 +1141,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future { if fut.ready == nil { return fut } + if req.Ctx() != nil { select { case <-req.Ctx().Done(): @@ -1094,10 +1150,17 @@ func (conn *Connection) send(req Request, streamId uint64) *Future { default: } } + + if conn.shutdownWatcher != nil { + atomic.AddUint32(&(conn.requestCnt), uint32(1)) + } + conn.putFuture(fut, req, streamId) + if req.Ctx() != nil { go conn.contextWatchdog(fut, req.Ctx()) } + return fut } @@ -1164,6 +1227,15 @@ func (conn *Connection) markDone(fut *Future) { if conn.rlimit != nil { <-conn.rlimit } + + if conn.shutdownWatcher != nil { + // This is a real advice from Go documentation + // about how to decrement atomic uint32. + // https://pkg.go.dev/sync/atomic#AddUint32 + if atomic.AddUint32(&(conn.requestCnt), ^uint32(0)) == 0 { + conn.cond.Broadcast() + } + } } func (conn *Connection) peekFuture(reqid uint32) (fut *Future) { @@ -1458,6 +1530,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error return st, nil } +func (conn *Connection) isWatchersRequired() bool { + for _, feature := range conn.opts.RequiredProtocolInfo.Features { + if feature == WatchersFeature { + return true + } + } + return false +} + // NewWatcher creates a new Watcher object for the connection. // // You need to require WatchersFeature to use watchers, see examples for the @@ -1496,15 +1577,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, // asynchronous. We do not expect any response from a Tarantool instance // That's why we can't just check the Tarantool response for an unsupported // request error. - watchersRequired := false - for _, feature := range conn.opts.RequiredProtocolInfo.Features { - if feature == WatchersFeature { - watchersRequired = true - break - } - } - - if !watchersRequired { + if !conn.isWatchersRequired() { err := fmt.Errorf("the feature %s must be required by connection "+ "options to create a watcher", WatchersFeature) return nil, err @@ -1563,7 +1636,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, if state.cnt == 0 { // The last one sends IPROTO_UNWATCH. - conn.Do(newUnwatchRequest(key)).Get() + if !conn.ClosedNow() { + // conn.ClosedNow() check is a workaround for calling + // Unregister from connectionClose(). + conn.Do(newUnwatchRequest(key)).Get() + } conn.watchMap.Delete(key) close(state.unready) } @@ -1666,3 +1743,50 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo { func (conn *Connection) ClientProtocolInfo() ProtocolInfo { return clientProtocolInfo.Clone() } + +func shutdownEventCallback(event WatchEvent) { + // Receives "true" on server shutdown. + // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ + // step 2. + val, ok := event.Value.(bool) + if ok && val { + go event.Conn.processShutdown() + } +} + +func (conn *Connection) processShutdown() { + // Forbid state changes. + conn.mutex.Lock() + defer conn.mutex.Unlock() + + atomic.StoreUint32(&(conn.state), connShutdown) + conn.notify(Shutdown) + + c := conn.c + for (atomic.LoadUint32(&(conn.state)) == connShutdown) && + (atomic.LoadUint32(&(conn.requestCnt)) != 0) && + (c == conn.c) { + // Use cond var on conn.mutex since request execution may + // call reconnect(). It is ok if state changes as part of + // reconnect since Tarantool server won't allow to reconnect + // in the middle of shutting down. + conn.cond.Wait() + } + // Do not unregister task explicitly here since connection teardown + // has the same effect. To clean up connection resources, + // unregister on full close. + + if (atomic.LoadUint32(&(conn.state)) == connShutdown) && + (c == conn.c) { + // Start to reconnect based on common rules, same as in net.box. + // Reconnect also closes the connection: server waits until all + // subscribed connections are terminated. + // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ + // step 3. + conn.reconnectImpl( + ClientError{ + ErrConnectionClosed, + "connection closed after server shutdown", + }, conn.c) + } +} diff --git a/errors.go b/errors.go index 02e4635bb..906184c24 100644 --- a/errors.go +++ b/errors.go @@ -55,6 +55,7 @@ const ( ErrProtocolError = 0x4000 + iota ErrTimeouted = 0x4000 + iota ErrRateLimited = 0x4000 + iota + ErrConnectionShutdown = 0x4000 + iota ) // Tarantool server error codes. diff --git a/shutdown_test.go b/shutdown_test.go new file mode 100644 index 000000000..26f3f9b9b --- /dev/null +++ b/shutdown_test.go @@ -0,0 +1,426 @@ +//go:build linux || (darwin && !cgo) +// +build linux darwin,!cgo + +// Use OS build flags since signals are system-dependent. + +package tarantool_test + +import ( + "fmt" + "sync" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/require" + . "github.com/tarantool/go-tarantool" + "github.com/tarantool/go-tarantool/test_helpers" +) + +var shtdnServer = "127.0.0.1:3014" +var shtdnClntOpts = Opts{ + User: opts.User, + Pass: opts.Pass, + Timeout: 20 * time.Second, + Reconnect: 200 * time.Millisecond, + MaxReconnects: 10, + RequiredProtocolInfo: ProtocolInfo{Features: []ProtocolFeature{WatchersFeature}}, +} +var shtdnSrvOpts = test_helpers.StartOpts{ + InitScript: "config.lua", + Listen: shtdnServer, + User: shtdnClntOpts.User, + Pass: shtdnClntOpts.Pass, + WaitStart: 100 * time.Millisecond, + ConnectRetry: 3, + RetryTimeout: 500 * time.Millisecond, +} + +var evalMsg = "got enough sleep" +var evalBody = ` + local fiber = require('fiber') + local time, msg = ... + fiber.sleep(time) + return msg +` + +func testGracefulShutdown(t *testing.T, conn *Connection, inst *test_helpers.TarantoolInstance) { + var resp *Response + var err error + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 1 // in seconds + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Check that we can't send new requests after shutdown starts. + // Retry helps to wait a bit until server starts to shutdown + // and send us the shutdown event. + shutdownWaitRetries := 5 + shutdownWaitTimeout := 100 * time.Millisecond + + err = test_helpers.Retry(func(interface{}) error { + _, err = conn.Do(NewPingRequest()).Get() + if err == nil { + return fmt.Errorf("expected error for requests sent on shutdown") + } + + if err.Error() != "server shutdown in progress (0x4005)" { + return err + } + + return nil + }, nil, shutdownWaitRetries, shutdownWaitTimeout) + require.Nil(t, err) + + // Check that requests started before the shutdown finish successfully. + resp, err = fut.Get() + require.Nil(t, err) + require.NotNil(t, resp) + require.Equal(t, resp.Data, []interface{}{evalMsg}) + + // Wait until server go down. + // Server will go down only when it process all requests from our connection + // (or on timeout). + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that it wasn't a timeout. + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + + // Connection is unavailable when server is down. + require.Equal(t, false, conn.ConnectedNow()) +} + +func TestGracefulShutdown(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + testGracefulShutdown(t, conn, &inst) +} + +func TestGracefulShutdownWithReconnect(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + testGracefulShutdown(t, conn, &inst) + + err = test_helpers.RestartTarantool(&inst) + require.Nilf(t, err, "Failed to restart tarantool") + + connected := test_helpers.WaitUntilReconnected(conn, shtdnClntOpts.MaxReconnects, shtdnClntOpts.Reconnect) + require.Truef(t, connected, "Reconnect success") + + testGracefulShutdown(t, conn, &inst) +} + +func TestNoGracefulShutdown(t *testing.T) { + // No watchers = no graceful shutdown. + noShtdnClntOpts := shtdnClntOpts.Clone() + noShtdnClntOpts.RequiredProtocolInfo = ProtocolInfo{} + + var inst test_helpers.TarantoolInstance + var conn *Connection + var isLess210 bool + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, noShtdnClntOpts) + defer conn.Close() + + evalSleep := 10 // in seconds + serverShutdownTimeout := 60 // in seconds + require.Less(t, evalSleep, serverShutdownTimeout) + + // Set a big timeout so it would be easy to find out if server went down on timeout. + // There is no graceful shutdown for pre-2.10 instances so it is not possible + // for them to wait for it. + isLess210, err = test_helpers.IsTarantoolVersionLess(2, 10, 0) + require.Nil(t, err) + if !isLess210 { + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + } + + // Send request with sleep. + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Check that request was interrupted. + _, err = fut.Get() + require.NotNilf(t, err, "sleep request error") + + // Wait until server go down. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that server finished without waiting for eval to finish. + require.Lessf(t, + shutdownTime, + time.Duration(evalSleep/2)*time.Second, + "server went down without any additional waiting") + + // Check that it wasn't a timeout. + if !isLess210 { + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + } +} + +func TestGracefulShutdownRespectsClose(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 10 // in seconds + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Close the connection. + conn.Close() + + // Connection is closed. + require.Equal(t, true, conn.ClosedNow()) + + // Check that request was interrupted. + _, err = fut.Get() + require.NotNilf(t, err, "sleep request error") + + // Wait until server go down. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that server finished without waiting for eval to finish. + require.Lessf(t, + shutdownTime, + time.Duration(evalSleep/2)*time.Second, + "server went down without any additional waiting") + + // Check that it wasn't a timeout. + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + + // Connection is still closed. + require.Equal(t, true, conn.ClosedNow()) +} + +func TestGracefulShutdownNotRacesWithRequestReconnect(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Set a small timeout so server will shutdown before requesst finishes. + serverShutdownTimeout := 1 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 5 // in seconds + require.Lessf(t, + serverShutdownTimeout, + evalSleep, + "test request will be failed by timeout") + require.Lessf(t, + time.Duration(serverShutdownTimeout)*time.Second, + shtdnClntOpts.Timeout, + "test request will be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + evalStart := time.Now() + fut := conn.Do(req) + + // SIGTERM the server. + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Wait until server go down. + // Server is expected to go down on timeout. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that request failed by server disconnect, not a client timeout. + _, err = fut.Get() + require.NotNil(t, err) + require.NotContains(t, err.Error(), "client timeout for request") + + evalFinish := time.Now() + evalTime := evalFinish.Sub(evalStart) + + // Check that it wasn't a client timeout. + require.Lessf(t, + evalTime, + shtdnClntOpts.Timeout, + "server went down not by timeout") +} + +func TestGracefulShutdownConcurrent(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + const testConcurrency = 50 + + var caseWg, srvToStop, srvStop sync.WaitGroup + caseWg.Add(testConcurrency) + srvToStop.Add(testConcurrency) + srvStop.Add(1) + + // Create many connections. + var ret error + for i := 0; i < testConcurrency; i++ { + go func(i int) { + defer caseWg.Done() + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + evalSleep := 1 // in seconds + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + fut := conn.Do(req) + + // Wait till all connections had started sleeping. + srvToStop.Done() + srvStop.Wait() + + _, gerr := fut.Get() + if gerr != nil { + ret = gerr + } + }(i) + } + + var sret error + go func(inst *test_helpers.TarantoolInstance) { + srvToStop.Wait() + cerr := inst.Cmd.Process.Signal(syscall.SIGTERM) + if cerr != nil { + sret = cerr + } + srvStop.Done() + }(&inst) + + srvStop.Wait() + require.Nil(t, sret, "No errors on server SIGTERM") + + caseWg.Wait() + require.Nil(t, ret, "No errors on concurrent wait") + + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil +}