diff --git a/CHANGELOG.md b/CHANGELOG.md index 7930b34d1..8cc87979d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Error type support in MessagePack (#209) - Event subscription support (#119) - Session settings support (#215) +- Support graceful shutdown (#214) ### Changed diff --git a/connection.go b/connection.go index b657dc2ae..d5979ea37 100644 --- a/connection.go +++ b/connection.go @@ -25,7 +25,8 @@ const ignoreStreamId = 0 const ( connDisconnected = 0 connConnected = 1 - connClosed = 2 + connShutdown = 2 + connClosed = 3 ) const ( @@ -33,6 +34,8 @@ const ( connTransportSsl = "ssl" ) +const shutdownEventKey = "box.shutdown" + type ConnEventKind int type ConnLogKind int @@ -45,6 +48,8 @@ const ( ReconnectFailed // Either reconnect attempts exhausted, or explicit Close is called. Closed + // Shutdown signals that shutdown callback is processing. + Shutdown // LogReconnectFailed is logged when reconnect attempt failed. LogReconnectFailed ConnLogKind = iota + 1 @@ -134,10 +139,19 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac // always returns array of array (array of tuples for space related methods). // For Eval* and Call* Tarantool always returns array, but does not forces // array of arrays. +// +// If connected to Tarantool 2.10 or newer and WatchersFeature is required, +// connection supports server graceful shutdown. In this case, server will +// wait until all client requests will be finished and client disconnects +// before going down (server also may go down by timeout). Client reconnect will +// happen if connection options enable reconnect. +// +// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ type Connection struct { addr string c net.Conn mutex sync.Mutex + cond *sync.Cond // Schema contains schema loaded on connection. Schema *Schema // requestId contains the last request ID for requests with nil context. @@ -162,6 +176,11 @@ type Connection struct { serverProtocolInfo ProtocolInfo // watchMap is a map of key -> chan watchState. watchMap sync.Map + + // shutdownWatcher is the "box.shutdown" event watcher. + shutdownWatcher Watcher + // requestCnt is a counter of active requests. + requestCnt int64 } var _ = Connector(&Connection{}) // Check compatibility with connector interface. @@ -385,6 +404,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) { conn.opts.Logger = defaultLogger{} } + conn.cond = sync.NewCond(&conn.mutex) + if err = conn.createConnection(false); err != nil { ter, ok := err.(Error) if conn.opts.Reconnect <= 0 { @@ -421,6 +442,16 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) { } } + // Subscribe shutdown event to process graceful shutdown. + if conn.isWatchersRequired() { + watcher, werr := conn.NewWatcher(shutdownEventKey, shutdownEventCallback) + if werr != nil { + conn.closeConnection(werr, true) + return nil, werr + } + conn.shutdownWatcher = watcher + } + return conn, err } @@ -589,6 +620,7 @@ func (conn *Connection) dial() (err error) { conn.lockShards() conn.c = connection atomic.StoreUint32(&conn.state, connConnected) + conn.cond.Broadcast() conn.unlockShards() go conn.writer(w, connection) go conn.reader(r, connection) @@ -762,10 +794,17 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error) if conn.state != connClosed { close(conn.control) atomic.StoreUint32(&conn.state, connClosed) + conn.cond.Broadcast() + // Free the resources. + if conn.shutdownWatcher != nil { + go conn.shutdownWatcher.Unregister() + conn.shutdownWatcher = nil + } conn.notify(Closed) } } else { atomic.StoreUint32(&conn.state, connDisconnected) + conn.cond.Broadcast() conn.notify(Disconnected) } if conn.c != nil { @@ -784,9 +823,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error) return } -func (conn *Connection) reconnect(neterr error, c net.Conn) { - conn.mutex.Lock() - defer conn.mutex.Unlock() +func (conn *Connection) reconnectImpl(neterr error, c net.Conn) { if conn.opts.Reconnect > 0 { if c == conn.c { conn.closeConnection(neterr, false) @@ -799,6 +836,13 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) { } } +func (conn *Connection) reconnect(neterr error, c net.Conn) { + conn.mutex.Lock() + defer conn.mutex.Unlock() + conn.reconnectImpl(neterr, c) + conn.cond.Broadcast() +} + func (conn *Connection) lockShards() { for i := range conn.shard { conn.shard[i].rmut.Lock() @@ -1026,6 +1070,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) { fut.done = nil shard.rmut.Unlock() return + case connShutdown: + fut.err = ClientError{ + ErrConnectionShutdown, + "server shutdown in progress", + } + fut.ready = nil + fut.done = nil + shard.rmut.Unlock() + return } pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1) if ctx != nil { @@ -1086,6 +1139,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future { if fut.ready == nil { return fut } + if req.Ctx() != nil { select { case <-req.Ctx().Done(): @@ -1094,10 +1148,17 @@ func (conn *Connection) send(req Request, streamId uint64) *Future { default: } } + + if conn.shutdownWatcher != nil { + atomic.AddInt64(&(conn.requestCnt), int64(1)) + } + conn.putFuture(fut, req, streamId) + if req.Ctx() != nil { go conn.contextWatchdog(fut, req.Ctx()) } + return fut } @@ -1164,6 +1225,12 @@ func (conn *Connection) markDone(fut *Future) { if conn.rlimit != nil { <-conn.rlimit } + + if conn.shutdownWatcher != nil { + if atomic.AddInt64(&(conn.requestCnt), int64(-1)) == 0 { + conn.cond.Broadcast() + } + } } func (conn *Connection) peekFuture(reqid uint32) (fut *Future) { @@ -1458,6 +1525,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error return st, nil } +func (conn *Connection) isWatchersRequired() bool { + for _, feature := range conn.opts.RequiredProtocolInfo.Features { + if feature == WatchersFeature { + return true + } + } + return false +} + // NewWatcher creates a new Watcher object for the connection. // // You need to require WatchersFeature to use watchers, see examples for the @@ -1496,15 +1572,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, // asynchronous. We do not expect any response from a Tarantool instance // That's why we can't just check the Tarantool response for an unsupported // request error. - watchersRequired := false - for _, feature := range conn.opts.RequiredProtocolInfo.Features { - if feature == WatchersFeature { - watchersRequired = true - break - } - } - - if !watchersRequired { + if !conn.isWatchersRequired() { err := fmt.Errorf("the feature %s must be required by connection "+ "options to create a watcher", WatchersFeature) return nil, err @@ -1563,7 +1631,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, if state.cnt == 0 { // The last one sends IPROTO_UNWATCH. - conn.Do(newUnwatchRequest(key)).Get() + if !conn.ClosedNow() { + // conn.ClosedNow() check is a workaround for calling + // Unregister from connectionClose(). + conn.Do(newUnwatchRequest(key)).Get() + } conn.watchMap.Delete(key) close(state.unready) } @@ -1666,3 +1738,52 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo { func (conn *Connection) ClientProtocolInfo() ProtocolInfo { return clientProtocolInfo.Clone() } + +func shutdownEventCallback(event WatchEvent) { + // Receives "true" on server shutdown. + // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ + // step 2. + val, ok := event.Value.(bool) + if ok && val { + go event.Conn.processShutdown() + } +} + +func (conn *Connection) processShutdown() { + // Forbid state changes. + conn.mutex.Lock() + defer conn.mutex.Unlock() + + if !atomic.CompareAndSwapUint32(&(conn.state), connConnected, connShutdown) { + return + } + conn.notify(Shutdown) + + c := conn.c + for (atomic.LoadUint32(&(conn.state)) == connShutdown) && + (atomic.LoadInt64(&(conn.requestCnt)) != 0) && + (c == conn.c) { + // Use cond var on conn.mutex since request execution may + // call reconnect(). It is ok if state changes as part of + // reconnect since Tarantool server won't allow to reconnect + // in the middle of shutting down. + conn.cond.Wait() + } + // Do not unregister task explicitly here since connection teardown + // has the same effect. To clean up connection resources, + // unregister on full close. + + if (atomic.LoadUint32(&(conn.state)) == connShutdown) && + (c == conn.c) { + // Start to reconnect based on common rules, same as in net.box. + // Reconnect also closes the connection: server waits until all + // subscribed connections are terminated. + // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ + // step 3. + conn.reconnectImpl( + ClientError{ + ErrConnectionClosed, + "connection closed after server shutdown", + }, conn.c) + } +} diff --git a/errors.go b/errors.go index 02e4635bb..906184c24 100644 --- a/errors.go +++ b/errors.go @@ -55,6 +55,7 @@ const ( ErrProtocolError = 0x4000 + iota ErrTimeouted = 0x4000 + iota ErrRateLimited = 0x4000 + iota + ErrConnectionShutdown = 0x4000 + iota ) // Tarantool server error codes. diff --git a/shutdown_test.go b/shutdown_test.go new file mode 100644 index 000000000..f47a6d1c1 --- /dev/null +++ b/shutdown_test.go @@ -0,0 +1,517 @@ +//go:build linux || (darwin && !cgo) +// +build linux darwin,!cgo + +// Use OS build flags since signals are system-dependent. + +package tarantool_test + +import ( + "fmt" + "sync" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/require" + . "github.com/tarantool/go-tarantool" + "github.com/tarantool/go-tarantool/test_helpers" +) + +var shtdnServer = "127.0.0.1:3014" +var shtdnClntOpts = Opts{ + User: opts.User, + Pass: opts.Pass, + Timeout: 20 * time.Second, + Reconnect: 200 * time.Millisecond, + MaxReconnects: 10, + RequiredProtocolInfo: ProtocolInfo{Features: []ProtocolFeature{WatchersFeature}}, +} +var shtdnSrvOpts = test_helpers.StartOpts{ + InitScript: "config.lua", + Listen: shtdnServer, + User: shtdnClntOpts.User, + Pass: shtdnClntOpts.Pass, + WaitStart: 100 * time.Millisecond, + ConnectRetry: 3, + RetryTimeout: 500 * time.Millisecond, +} + +var evalMsg = "got enough sleep" +var evalBody = ` + local fiber = require('fiber') + local time, msg = ... + fiber.sleep(time) + return msg +` + +func testGracefulShutdown(t *testing.T, conn *Connection, inst *test_helpers.TarantoolInstance) { + var resp *Response + var err error + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 1 // in seconds + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Check that we can't send new requests after shutdown starts. + // Retry helps to wait a bit until server starts to shutdown + // and send us the shutdown event. + shutdownWaitRetries := 5 + shutdownWaitTimeout := 100 * time.Millisecond + + err = test_helpers.Retry(func(interface{}) error { + _, err = conn.Do(NewPingRequest()).Get() + if err == nil { + return fmt.Errorf("expected error for requests sent on shutdown") + } + + if err.Error() != "server shutdown in progress (0x4005)" { + return err + } + + return nil + }, nil, shutdownWaitRetries, shutdownWaitTimeout) + require.Nil(t, err) + + // Check that requests started before the shutdown finish successfully. + resp, err = fut.Get() + require.Nil(t, err) + require.NotNil(t, resp) + require.Equal(t, resp.Data, []interface{}{evalMsg}) + + // Wait until server go down. + // Server will go down only when it process all requests from our connection + // (or on timeout). + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that it wasn't a timeout. + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + + // Connection is unavailable when server is down. + require.Equal(t, false, conn.ConnectedNow()) +} + +func TestGracefulShutdown(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + testGracefulShutdown(t, conn, &inst) +} + +func TestGracefulShutdownWithReconnect(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + testGracefulShutdown(t, conn, &inst) + + err = test_helpers.RestartTarantool(&inst) + require.Nilf(t, err, "Failed to restart tarantool") + + connected := test_helpers.WaitUntilReconnected(conn, shtdnClntOpts.MaxReconnects, shtdnClntOpts.Reconnect) + require.Truef(t, connected, "Reconnect success") + + testGracefulShutdown(t, conn, &inst) +} + +func TestNoGracefulShutdown(t *testing.T) { + // No watchers = no graceful shutdown. + noShtdnClntOpts := shtdnClntOpts.Clone() + noShtdnClntOpts.RequiredProtocolInfo = ProtocolInfo{} + + var inst test_helpers.TarantoolInstance + var conn *Connection + var isLess210 bool + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, noShtdnClntOpts) + defer conn.Close() + + evalSleep := 10 // in seconds + serverShutdownTimeout := 60 // in seconds + require.Less(t, evalSleep, serverShutdownTimeout) + + // Set a big timeout so it would be easy to find out if server went down on timeout. + // There is no graceful shutdown for pre-2.10 instances so it is not possible + // for them to wait for it. + isLess210, err = test_helpers.IsTarantoolVersionLess(2, 10, 0) + require.Nil(t, err) + if !isLess210 { + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + } + + // Send request with sleep. + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Check that request was interrupted. + _, err = fut.Get() + require.NotNilf(t, err, "sleep request error") + + // Wait until server go down. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that server finished without waiting for eval to finish. + require.Lessf(t, + shutdownTime, + time.Duration(evalSleep/2)*time.Second, + "server went down without any additional waiting") + + // Check that it wasn't a timeout. + if !isLess210 { + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + } +} + +func TestGracefulShutdownRespectsClose(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 10 // in seconds + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Close the connection. + conn.Close() + + // Connection is closed. + require.Equal(t, true, conn.ClosedNow()) + + // Check that request was interrupted. + _, err = fut.Get() + require.NotNilf(t, err, "sleep request error") + + // Wait until server go down. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that server finished without waiting for eval to finish. + require.Lessf(t, + shutdownTime, + time.Duration(evalSleep/2)*time.Second, + "server went down without any additional waiting") + + // Check that it wasn't a timeout. + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + + // Connection is still closed. + require.Equal(t, true, conn.ClosedNow()) +} + +func TestGracefulShutdownNotRacesWithRequestReconnect(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Set a small timeout so server will shutdown before requesst finishes. + serverShutdownTimeout := 1 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 5 // in seconds + require.Lessf(t, + serverShutdownTimeout, + evalSleep, + "test request will be failed by timeout") + require.Lessf(t, + time.Duration(serverShutdownTimeout)*time.Second, + shtdnClntOpts.Timeout, + "test request will be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + evalStart := time.Now() + fut := conn.Do(req) + + // SIGTERM the server. + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Wait until server go down. + // Server is expected to go down on timeout. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that request failed by server disconnect, not a client timeout. + _, err = fut.Get() + require.NotNil(t, err) + require.NotContains(t, err.Error(), "client timeout for request") + + evalFinish := time.Now() + evalTime := evalFinish.Sub(evalStart) + + // Check that it wasn't a client timeout. + require.Lessf(t, + evalTime, + shtdnClntOpts.Timeout, + "server went down not by timeout") +} + +func TestGracefulShutdownCloseConcurrent(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var err error + var srvShtdnStart, srvShtdnFinish time.Time + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + conn.Close() + + const testConcurrency = 50 + + var caseWg, srvToStop, srvStop sync.WaitGroup + caseWg.Add(testConcurrency) + srvToStop.Add(testConcurrency) + srvStop.Add(1) + + // Create many connections. + for i := 0; i < testConcurrency; i++ { + go func(i int) { + defer caseWg.Done() + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Wait till all connections created. + srvToStop.Done() + srvStop.Wait() + }(i) + } + + var sret error + go func(inst *test_helpers.TarantoolInstance) { + srvToStop.Wait() + srvShtdnStart = time.Now() + cerr := inst.Cmd.Process.Signal(syscall.SIGTERM) + if cerr != nil { + sret = cerr + } + srvStop.Done() + }(&inst) + + srvStop.Wait() + require.Nil(t, sret, "No errors on server SIGTERM") + + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + srvShtdnFinish = time.Now() + srvShtdnTime := srvShtdnFinish.Sub(srvShtdnStart) + + require.Less(t, + srvShtdnTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") +} + +func TestGracefulShutdownConcurrent(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var err error + var srvShtdnStart, srvShtdnFinish time.Time + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + conn.Close() + + const testConcurrency = 50 + + var caseWg, srvToStop, srvStop sync.WaitGroup + caseWg.Add(testConcurrency) + srvToStop.Add(testConcurrency) + srvStop.Add(1) + + // Create many connections. + var ret error + for i := 0; i < testConcurrency; i++ { + go func(i int) { + defer caseWg.Done() + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + evalSleep := 1 // in seconds + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + fut := conn.Do(req) + + // Wait till all connections had started sleeping. + srvToStop.Done() + srvStop.Wait() + + _, gerr := fut.Get() + if gerr != nil { + ret = gerr + } + }(i) + } + + var sret error + go func(inst *test_helpers.TarantoolInstance) { + srvToStop.Wait() + srvShtdnStart = time.Now() + cerr := inst.Cmd.Process.Signal(syscall.SIGTERM) + if cerr != nil { + sret = cerr + } + srvStop.Done() + }(&inst) + + srvStop.Wait() + require.Nil(t, sret, "No errors on server SIGTERM") + + caseWg.Wait() + require.Nil(t, ret, "No errors on concurrent wait") + + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + srvShtdnFinish = time.Now() + srvShtdnTime := srvShtdnFinish.Sub(srvShtdnStart) + + require.Less(t, + srvShtdnTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") +}