From fe07889b3c0c9d2573ebeb91eeda94ecc7cc225d Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Fri, 16 Dec 2022 14:53:59 +0300 Subject: [PATCH] feature: support graceful shutdown If connected to Tarantool 2.10 or newer, after this patch a connection supports server graceful shutdown [1]. In this case, server will wait until all client requests will be finished and client disconnects before going down (server also may go down by timeout). Client reconnect will happen if connection options enable reconnect. Beware that graceful shutdown event initialization is asynchronous. 1. https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ Closes #214 --- CHANGELOG.md | 1 + connection.go | 151 +++++++++++-- errors.go | 1 + shutdown_test.go | 551 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 690 insertions(+), 14 deletions(-) create mode 100644 shutdown_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7930b34d1..8cc87979d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Error type support in MessagePack (#209) - Event subscription support (#119) - Session settings support (#215) +- Support graceful shutdown (#214) ### Changed diff --git a/connection.go b/connection.go index b657dc2ae..a43a62263 100644 --- a/connection.go +++ b/connection.go @@ -25,7 +25,8 @@ const ignoreStreamId = 0 const ( connDisconnected = 0 connConnected = 1 - connClosed = 2 + connShutdown = 2 + connClosed = 3 ) const ( @@ -33,6 +34,8 @@ const ( connTransportSsl = "ssl" ) +const shutdownEventKey = "box.shutdown" + type ConnEventKind int type ConnLogKind int @@ -45,6 +48,8 @@ const ( ReconnectFailed // Either reconnect attempts exhausted, or explicit Close is called. Closed + // Shutdown signals that shutdown callback is processing. + Shutdown // LogReconnectFailed is logged when reconnect attempt failed. LogReconnectFailed ConnLogKind = iota + 1 @@ -134,10 +139,19 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac // always returns array of array (array of tuples for space related methods). // For Eval* and Call* Tarantool always returns array, but does not forces // array of arrays. +// +// If connected to Tarantool 2.10 or newer, connection supports server graceful +// shutdown. In this case, server will wait until all client requests will be +// finished and client disconnects before going down (server also may go down +// by timeout). Client reconnect will happen if connection options enable +// reconnect. Beware that graceful shutdown event initialization is asynchronous. +// +// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ type Connection struct { addr string c net.Conn mutex sync.Mutex + cond *sync.Cond // Schema contains schema loaded on connection. Schema *Schema // requestId contains the last request ID for requests with nil context. @@ -162,6 +176,11 @@ type Connection struct { serverProtocolInfo ProtocolInfo // watchMap is a map of key -> chan watchState. watchMap sync.Map + + // shutdownWatcher is the "box.shutdown" event watcher. + shutdownWatcher Watcher + // requestCnt is a counter of active requests. + requestCnt int64 } var _ = Connector(&Connection{}) // Check compatibility with connector interface. @@ -385,6 +404,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) { conn.opts.Logger = defaultLogger{} } + conn.cond = sync.NewCond(&conn.mutex) + if err = conn.createConnection(false); err != nil { ter, ok := err.(Error) if conn.opts.Reconnect <= 0 { @@ -589,10 +610,20 @@ func (conn *Connection) dial() (err error) { conn.lockShards() conn.c = connection atomic.StoreUint32(&conn.state, connConnected) + conn.cond.Broadcast() conn.unlockShards() go conn.writer(w, connection) go conn.reader(r, connection) + // Subscribe shutdown event to process graceful shutdown. + if conn.shutdownWatcher == nil && conn.isFeatureInSlice(WatchersFeature, conn.serverProtocolInfo.Features) { + watcher, werr := conn.newWatcherImpl(shutdownEventKey, shutdownEventCallback) + if werr != nil { + return werr + } + conn.shutdownWatcher = watcher + } + return } @@ -762,10 +793,17 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error) if conn.state != connClosed { close(conn.control) atomic.StoreUint32(&conn.state, connClosed) + conn.cond.Broadcast() + // Free the resources. + if conn.shutdownWatcher != nil { + go conn.shutdownWatcher.Unregister() + conn.shutdownWatcher = nil + } conn.notify(Closed) } } else { atomic.StoreUint32(&conn.state, connDisconnected) + conn.cond.Broadcast() conn.notify(Disconnected) } if conn.c != nil { @@ -784,9 +822,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error) return } -func (conn *Connection) reconnect(neterr error, c net.Conn) { - conn.mutex.Lock() - defer conn.mutex.Unlock() +func (conn *Connection) reconnectImpl(neterr error, c net.Conn) { if conn.opts.Reconnect > 0 { if c == conn.c { conn.closeConnection(neterr, false) @@ -799,6 +835,13 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) { } } +func (conn *Connection) reconnect(neterr error, c net.Conn) { + conn.mutex.Lock() + defer conn.mutex.Unlock() + conn.reconnectImpl(neterr, c) + conn.cond.Broadcast() +} + func (conn *Connection) lockShards() { for i := range conn.shard { conn.shard[i].rmut.Lock() @@ -1026,6 +1069,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) { fut.done = nil shard.rmut.Unlock() return + case connShutdown: + fut.err = ClientError{ + ErrConnectionShutdown, + "server shutdown in progress", + } + fut.ready = nil + fut.done = nil + shard.rmut.Unlock() + return } pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1) if ctx != nil { @@ -1082,22 +1134,32 @@ func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) { } func (conn *Connection) send(req Request, streamId uint64) *Future { + atomic.AddInt64(&conn.requestCnt, int64(1)) + fut := conn.newFuture(req.Ctx()) if fut.ready == nil { + atomic.AddInt64(&conn.requestCnt, int64(-1)) return fut } + if req.Ctx() != nil { select { case <-req.Ctx().Done(): conn.cancelFuture(fut, fmt.Errorf("context is done")) + // future here does not belong to any shard yet, + // so cancelFuture don't call markDone. + atomic.AddInt64(&conn.requestCnt, int64(-1)) return fut default: } } + conn.putFuture(fut, req, streamId) + if req.Ctx() != nil { go conn.contextWatchdog(fut, req.Ctx()) } + return fut } @@ -1164,6 +1226,10 @@ func (conn *Connection) markDone(fut *Future) { if conn.rlimit != nil { <-conn.rlimit } + + if atomic.AddInt64(&conn.requestCnt, int64(-1)) == 0 { + conn.cond.Broadcast() + } } func (conn *Connection) peekFuture(reqid uint32) (fut *Future) { @@ -1458,6 +1524,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error return st, nil } +func (conn *Connection) isFeatureInSlice(expected ProtocolFeature, actualSlice []ProtocolFeature) bool { + for _, actual := range actualSlice { + if expected == actual { + return true + } + } + return false +} + // NewWatcher creates a new Watcher object for the connection. // // You need to require WatchersFeature to use watchers, see examples for the @@ -1496,20 +1571,16 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, // asynchronous. We do not expect any response from a Tarantool instance // That's why we can't just check the Tarantool response for an unsupported // request error. - watchersRequired := false - for _, feature := range conn.opts.RequiredProtocolInfo.Features { - if feature == WatchersFeature { - watchersRequired = true - break - } - } - - if !watchersRequired { + if !conn.isFeatureInSlice(WatchersFeature, conn.opts.RequiredProtocolInfo.Features) { err := fmt.Errorf("the feature %s must be required by connection "+ "options to create a watcher", WatchersFeature) return nil, err } + return conn.newWatcherImpl(key, callback) +} + +func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watcher, error) { st, err := subscribeWatchChannel(conn, key) if err != nil { return nil, err @@ -1563,7 +1634,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, if state.cnt == 0 { // The last one sends IPROTO_UNWATCH. - conn.Do(newUnwatchRequest(key)).Get() + if !conn.ClosedNow() { + // conn.ClosedNow() check is a workaround for calling + // Unregister from connectionClose(). + conn.Do(newUnwatchRequest(key)).Get() + } conn.watchMap.Delete(key) close(state.unready) } @@ -1666,3 +1741,51 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo { func (conn *Connection) ClientProtocolInfo() ProtocolInfo { return clientProtocolInfo.Clone() } + +func shutdownEventCallback(event WatchEvent) { + // Receives "true" on server shutdown. + // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ + // step 2. + val, ok := event.Value.(bool) + if ok && val { + go event.Conn.shutdown() + } +} + +func (conn *Connection) shutdown() { + // Forbid state changes. + conn.mutex.Lock() + defer conn.mutex.Unlock() + + if !atomic.CompareAndSwapUint32(&(conn.state), connConnected, connShutdown) { + return + } + conn.cond.Broadcast() + conn.notify(Shutdown) + + c := conn.c + for { + if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) { + return + } + if atomic.LoadInt64(&conn.requestCnt) == 0 { + break + } + // Use cond var on conn.mutex since request execution may + // call reconnect(). It is ok if state changes as part of + // reconnect since Tarantool server won't allow to reconnect + // in the middle of shutting down. + conn.cond.Wait() + } + + // Start to reconnect based on common rules, same as in net.box. + // Reconnect also closes the connection: server waits until all + // subscribed connections are terminated. + // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ + // step 3. + conn.reconnectImpl( + ClientError{ + ErrConnectionClosed, + "connection closed after server shutdown", + }, conn.c) +} diff --git a/errors.go b/errors.go index 02e4635bb..906184c24 100644 --- a/errors.go +++ b/errors.go @@ -55,6 +55,7 @@ const ( ErrProtocolError = 0x4000 + iota ErrTimeouted = 0x4000 + iota ErrRateLimited = 0x4000 + iota + ErrConnectionShutdown = 0x4000 + iota ) // Tarantool server error codes. diff --git a/shutdown_test.go b/shutdown_test.go new file mode 100644 index 000000000..d9b1db111 --- /dev/null +++ b/shutdown_test.go @@ -0,0 +1,551 @@ +//go:build linux || (darwin && !cgo) +// +build linux darwin,!cgo + +// Use OS build flags since signals are system-dependent. + +package tarantool_test + +import ( + "fmt" + "sync" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/require" + . "github.com/tarantool/go-tarantool" + "github.com/tarantool/go-tarantool/test_helpers" +) + +var shtdnServer = "127.0.0.1:3014" +var shtdnClntOpts = Opts{ + User: opts.User, + Pass: opts.Pass, + Timeout: 20 * time.Second, + Reconnect: 200 * time.Millisecond, + MaxReconnects: 10, + RequiredProtocolInfo: ProtocolInfo{Features: []ProtocolFeature{WatchersFeature}}, +} +var shtdnSrvOpts = test_helpers.StartOpts{ + InitScript: "config.lua", + Listen: shtdnServer, + User: shtdnClntOpts.User, + Pass: shtdnClntOpts.Pass, + WaitStart: 100 * time.Millisecond, + ConnectRetry: 3, + RetryTimeout: 500 * time.Millisecond, +} + +var evalMsg = "got enough sleep" +var evalBody = ` + local fiber = require('fiber') + local time, msg = ... + fiber.sleep(time) + return msg +` + +func testGracefulShutdown(t *testing.T, conn *Connection, inst *test_helpers.TarantoolInstance) { + var resp *Response + var err error + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 1 // in seconds + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + // Create a helper watcher to ensure that async + // shutdown is set up. + helperCh := make(chan WatchEvent, 10) + helperW, herr := conn.NewWatcher("box.shutdown", func(event WatchEvent) { + helperCh <- event + }) + require.Nil(t, herr) + defer helperW.Unregister() + <-helperCh + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Check that we can't send new requests after shutdown starts. + // Retry helps to wait a bit until server starts to shutdown + // and send us the shutdown event. + shutdownWaitRetries := 5 + shutdownWaitTimeout := 100 * time.Millisecond + + err = test_helpers.Retry(func(interface{}) error { + _, err = conn.Do(NewPingRequest()).Get() + if err == nil { + return fmt.Errorf("expected error for requests sent on shutdown") + } + + if err.Error() != "server shutdown in progress (0x4005)" { + return err + } + + return nil + }, nil, shutdownWaitRetries, shutdownWaitTimeout) + require.Nil(t, err) + + // Check that requests started before the shutdown finish successfully. + resp, err = fut.Get() + require.Nil(t, err) + require.NotNil(t, resp) + require.Equal(t, resp.Data, []interface{}{evalMsg}) + + // Wait until server go down. + // Server will go down only when it process all requests from our connection + // (or on timeout). + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that it wasn't a timeout. + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + + // Connection is unavailable when server is down. + require.Equal(t, false, conn.ConnectedNow()) +} + +func TestGracefulShutdown(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + testGracefulShutdown(t, conn, &inst) +} + +func TestGracefulShutdownWithReconnect(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + testGracefulShutdown(t, conn, &inst) + + err = test_helpers.RestartTarantool(&inst) + require.Nilf(t, err, "Failed to restart tarantool") + + connected := test_helpers.WaitUntilReconnected(conn, shtdnClntOpts.MaxReconnects, shtdnClntOpts.Reconnect) + require.Truef(t, connected, "Reconnect success") + + testGracefulShutdown(t, conn, &inst) +} + +func TestNoGracefulShutdown(t *testing.T) { + // No watchers = no graceful shutdown. + noShtdnClntOpts := shtdnClntOpts.Clone() + noShtdnClntOpts.RequiredProtocolInfo = ProtocolInfo{} + test_helpers.SkipIfWatchersSupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, noShtdnClntOpts) + defer conn.Close() + + evalSleep := 10 // in seconds + serverShutdownTimeout := 60 // in seconds + require.Less(t, evalSleep, serverShutdownTimeout) + + // Send request with sleep. + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Check that request was interrupted. + _, err = fut.Get() + require.NotNilf(t, err, "sleep request error") + + // Wait until server go down. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that server finished without waiting for eval to finish. + require.Lessf(t, + shutdownTime, + time.Duration(evalSleep/2)*time.Second, + "server went down without any additional waiting") +} + +func TestGracefulShutdownRespectsClose(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Create a helper watcher to ensure that async + // shutdown is set up. + helperCh := make(chan WatchEvent, 10) + helperW, herr := conn.NewWatcher("box.shutdown", func(event WatchEvent) { + helperCh <- event + }) + require.Nil(t, herr) + defer helperW.Unregister() + <-helperCh + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 10 // in seconds + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Close the connection. + conn.Close() + + // Connection is closed. + require.Equal(t, true, conn.ClosedNow()) + + // Check that request was interrupted. + _, err = fut.Get() + require.NotNilf(t, err, "sleep request error") + + // Wait until server go down. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that server finished without waiting for eval to finish. + require.Lessf(t, + shutdownTime, + time.Duration(evalSleep/2)*time.Second, + "server went down without any additional waiting") + + // Check that it wasn't a timeout. + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + + // Connection is still closed. + require.Equal(t, true, conn.ClosedNow()) +} + +func TestGracefulShutdownNotRacesWithRequestReconnect(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Create a helper watcher to ensure that async + // shutdown is set up. + helperCh := make(chan WatchEvent, 10) + helperW, herr := conn.NewWatcher("box.shutdown", func(event WatchEvent) { + helperCh <- event + }) + require.Nil(t, herr) + defer helperW.Unregister() + <-helperCh + + // Set a small timeout so server will shutdown before requesst finishes. + serverShutdownTimeout := 1 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 5 // in seconds + require.Lessf(t, + serverShutdownTimeout, + evalSleep, + "test request will be failed by timeout") + require.Lessf(t, + time.Duration(serverShutdownTimeout)*time.Second, + shtdnClntOpts.Timeout, + "test request will be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + evalStart := time.Now() + fut := conn.Do(req) + + // SIGTERM the server. + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Wait until server go down. + // Server is expected to go down on timeout. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that request failed by server disconnect, not a client timeout. + _, err = fut.Get() + require.NotNil(t, err) + require.NotContains(t, err.Error(), "client timeout for request") + + evalFinish := time.Now() + evalTime := evalFinish.Sub(evalStart) + + // Check that it wasn't a client timeout. + require.Lessf(t, + evalTime, + shtdnClntOpts.Timeout, + "server went down not by timeout") +} + +func TestGracefulShutdownCloseConcurrent(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var err error + var srvShtdnStart, srvShtdnFinish time.Time + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Create a helper watcher to ensure that async + // shutdown is set up. + helperCh := make(chan WatchEvent, 10) + helperW, herr := conn.NewWatcher("box.shutdown", func(event WatchEvent) { + helperCh <- event + }) + require.Nil(t, herr) + defer helperW.Unregister() + <-helperCh + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + conn.Close() + + const testConcurrency = 50 + + var caseWg, srvToStop, srvStop sync.WaitGroup + caseWg.Add(testConcurrency) + srvToStop.Add(testConcurrency) + srvStop.Add(1) + + // Create many connections. + for i := 0; i < testConcurrency; i++ { + go func(i int) { + defer caseWg.Done() + + // Do not wait till Tarantool register out watcher, + // test everything is ok even on async. + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Wait till all connections created. + srvToStop.Done() + srvStop.Wait() + }(i) + } + + var sret error + go func(inst *test_helpers.TarantoolInstance) { + srvToStop.Wait() + srvShtdnStart = time.Now() + cerr := inst.Cmd.Process.Signal(syscall.SIGTERM) + if cerr != nil { + sret = cerr + } + srvStop.Done() + }(&inst) + + srvStop.Wait() + require.Nil(t, sret, "No errors on server SIGTERM") + + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + srvShtdnFinish = time.Now() + srvShtdnTime := srvShtdnFinish.Sub(srvShtdnStart) + + require.Less(t, + srvShtdnTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") +} + +func TestGracefulShutdownConcurrent(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var err error + var srvShtdnStart, srvShtdnFinish time.Time + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + conn.Close() + + const testConcurrency = 50 + + var caseWg, srvToStop, srvStop sync.WaitGroup + caseWg.Add(testConcurrency) + srvToStop.Add(testConcurrency) + srvStop.Add(1) + + // Create many connections. + var ret error + for i := 0; i < testConcurrency; i++ { + go func(i int) { + defer caseWg.Done() + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Create a helper watcher to ensure that async + // shutdown is set up. + helperCh := make(chan WatchEvent, 10) + helperW, _ := conn.NewWatcher("box.shutdown", func(event WatchEvent) { + helperCh <- event + }) + defer helperW.Unregister() + <-helperCh + + evalSleep := 1 // in seconds + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + fut := conn.Do(req) + + // Wait till all connections had started sleeping. + srvToStop.Done() + srvStop.Wait() + + _, gerr := fut.Get() + if gerr != nil { + ret = gerr + } + }(i) + } + + var sret error + go func(inst *test_helpers.TarantoolInstance) { + srvToStop.Wait() + srvShtdnStart = time.Now() + cerr := inst.Cmd.Process.Signal(syscall.SIGTERM) + if cerr != nil { + sret = cerr + } + srvStop.Done() + }(&inst) + + srvStop.Wait() + require.Nil(t, sret, "No errors on server SIGTERM") + + caseWg.Wait() + require.Nil(t, ret, "No errors on concurrent wait") + + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + srvShtdnFinish = time.Now() + srvShtdnTime := srvShtdnFinish.Sub(srvShtdnStart) + + require.Less(t, + srvShtdnTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") +}