diff --git a/CHANGELOG.md b/CHANGELOG.md index 7930b34d1..8cc87979d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Error type support in MessagePack (#209) - Event subscription support (#119) - Session settings support (#215) +- Support graceful shutdown (#214) ### Changed diff --git a/connection.go b/connection.go index b657dc2ae..781e99502 100644 --- a/connection.go +++ b/connection.go @@ -33,6 +33,13 @@ const ( connTransportSsl = "ssl" ) +const ( + shutdownNotInProgress = 0 + shutdownInProgress = 1 +) + +const shutdownEventKey = "box.shutdown" + type ConnEventKind int type ConnLogKind int @@ -134,6 +141,14 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac // always returns array of array (array of tuples for space related methods). // For Eval* and Call* Tarantool always returns array, but does not forces // array of arrays. +// +// If connected to Tarantool 2.10 or newer and WatchersFeature is required, +// connection supports server graceful shutdown. In this case, server will +// wait until all client requests will be finished and client disconnects +// before going down (server also may go down by timeout). Client reconnect will +// happen if connection options enable reconnect. +// +// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ type Connection struct { addr string c net.Conn @@ -162,6 +177,13 @@ type Connection struct { serverProtocolInfo ProtocolInfo // watchMap is a map of key -> chan watchState. watchMap sync.Map + + // shutdownInProgress defined whether shutdown now in progress. + shutdownInProgress uint32 + // shutdownWatcher is the "box.shutdown" event watcher. + shutdownWatcher Watcher + // shutdownWg is the wait group to finish all tasks on shutdown. + shutdownWg sync.WaitGroup } var _ = Connector(&Connection{}) // Check compatibility with connector interface. @@ -373,6 +395,7 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) { } } } + conn.shutdownInProgress = shutdownNotInProgress if conn.opts.RateLimit > 0 { conn.rlimit = make(chan struct{}, conn.opts.RateLimit) @@ -421,6 +444,16 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) { } } + // Subscribe shutdown event to process graceful shutdown. + if conn.isWatchersRequired() { + watcher, werr := conn.NewWatcher(shutdownEventKey, shutdownEventCallback) + if werr != nil { + conn.closeConnection(werr, true) + return nil, werr + } + conn.shutdownWatcher = watcher + } + return conn, err } @@ -1086,6 +1119,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future { if fut.ready == nil { return fut } + if req.Ctx() != nil { select { case <-req.Ctx().Done(): @@ -1094,13 +1128,31 @@ func (conn *Connection) send(req Request, streamId uint64) *Future { default: } } + + if atomic.LoadUint32(&(conn.shutdownInProgress)) == shutdownInProgress { + conn.cancelFuture(fut, fmt.Errorf("server shutdown in progress")) + return fut + } + conn.putFuture(fut, req, streamId) + if req.Ctx() != nil { go conn.contextWatchdog(fut, req.Ctx()) } + + if conn.shutdownWatcher != nil { + go conn.gracefulWait(fut) + } + return fut } +func (conn *Connection) gracefulWait(fut *Future) { + conn.shutdownWg.Add(1) + <-fut.done + conn.shutdownWg.Done() +} + func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) { shardn := fut.requestId & (conn.opts.Concurrency - 1) shard := &conn.shard[shardn] @@ -1458,6 +1510,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error return st, nil } +func (conn *Connection) isWatchersRequired() bool { + for _, feature := range conn.opts.RequiredProtocolInfo.Features { + if feature == WatchersFeature { + return true + } + } + return false +} + // NewWatcher creates a new Watcher object for the connection. // // You need to require WatchersFeature to use watchers, see examples for the @@ -1496,15 +1557,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, // asynchronous. We do not expect any response from a Tarantool instance // That's why we can't just check the Tarantool response for an unsupported // request error. - watchersRequired := false - for _, feature := range conn.opts.RequiredProtocolInfo.Features { - if feature == WatchersFeature { - watchersRequired = true - break - } - } - - if !watchersRequired { + if !conn.isWatchersRequired() { err := fmt.Errorf("the feature %s must be required by connection "+ "options to create a watcher", WatchersFeature) return nil, err @@ -1666,3 +1719,52 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo { func (conn *Connection) ClientProtocolInfo() ProtocolInfo { return clientProtocolInfo.Clone() } + +func shutdownEventCallback(event WatchEvent) { + // Receives "true" on server shutdown. + // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ + // step 2. + val, ok := event.Value.(bool) + if ok && val { + // defer cause otherwise we'll block ourselves on ack wait. + defer event.Conn.processShutdown() + } +} + +func (conn *Connection) processShutdown() { + // Forbid starting new tasks. + atomic.StoreUint32(&(conn.shutdownInProgress), shutdownInProgress) + + // After finish, allow starting new tasks, they will fail with + // "not connected" instead. + defer atomic.StoreUint32(&(conn.shutdownInProgress), shutdownNotInProgress) + + // Wait for tasks to finish. + conn.shutdownWg.Wait() + + // Do not unregister task explicitly since connection teardown + // has the same effect. + + if !conn.ClosedNow() { + // Forbid other connection interactions. + conn.mutex.Lock() + + err := ClientError{ + ErrConnectionClosed, + "connection closed after server shutdown", + } + + // Close the connection since server waits until all subscribed + // connections are terminated. + // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ + // step 3. + conn.closeConnection(err, false) + + // Allow other connection interactions. + // Unlock not with defer since reconnect may block the mutex. + conn.mutex.Unlock() + + // Start to reconnect based on common rules, same as in net.box. + defer conn.reconnect(err, conn.c) + } +} diff --git a/shutdown_test.go b/shutdown_test.go new file mode 100644 index 000000000..c03cf02f9 --- /dev/null +++ b/shutdown_test.go @@ -0,0 +1,300 @@ +//go:build linux || (darwin && !cgo) +// +build linux darwin,!cgo + +// Use OS build flags since signals are system-dependent. + +package tarantool_test + +import ( + "fmt" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/require" + . "github.com/tarantool/go-tarantool" + "github.com/tarantool/go-tarantool/test_helpers" +) + +var shtdnServer = "127.0.0.1:3014" +var shtdnClntOpts = Opts{ + User: opts.User, + Pass: opts.Pass, + Timeout: 20 * time.Second, + Reconnect: 200 * time.Millisecond, + MaxReconnects: 10, + RequiredProtocolInfo: ProtocolInfo{Features: []ProtocolFeature{WatchersFeature}}, +} +var shtdnSrvOpts = test_helpers.StartOpts{ + InitScript: "config.lua", + Listen: shtdnServer, + User: shtdnClntOpts.User, + Pass: shtdnClntOpts.Pass, + WaitStart: 100 * time.Millisecond, + ConnectRetry: 3, + RetryTimeout: 500 * time.Millisecond, +} + +var evalMsg = "got enough sleep" +var evalBody = ` + local fiber = require('fiber') + local time, msg = ... + fiber.sleep(time) + return msg +` + +func testGracefulShutdown(t *testing.T, conn *Connection, inst *test_helpers.TarantoolInstance) { + var resp *Response + var err error + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 1 // in seconds + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Check that we can't send new requests after shutdown starts. + // Retry helps to wait a bit until server starts to shutdown + // and send us the shutdown event. + shutdownWaitRetries := 5 + shutdownWaitTimeout := 100 * time.Millisecond + + err = test_helpers.Retry(func(interface{}) error { + _, err = conn.Do(NewPingRequest()).Get() + if err == nil { + return fmt.Errorf("expected error for requests sent on shutdown") + } + + if err.Error() != "server shutdown in progress" { + return err + } + + return nil + }, nil, shutdownWaitRetries, shutdownWaitTimeout) + require.Nil(t, err) + + // Check that requests started before the shutdown finish successfully. + resp, err = fut.Get() + require.Nil(t, err) + require.NotNil(t, resp) + require.Equal(t, resp.Data, []interface{}{evalMsg}) + + // Wait until server go down. + // Server will go down only when it process all requests from our connection + // (or on timeout). + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Mark that process is already down for test helpers. + inst.Cmd.Process = nil + + // Check that it wasn't a timeout. + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + + // Connection is unavailable when server is down. + require.Equal(t, false, conn.ConnectedNow()) +} + +func TestGracefulShutdown(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + testGracefulShutdown(t, conn, &inst) +} + +func TestGracefulShutdownWithReconnect(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + testGracefulShutdown(t, conn, &inst) + + err = test_helpers.RestartTarantool(&inst) + require.Nilf(t, err, "Failed to restart tarantool") + + connected := test_helpers.WaitUntilReconnected(conn, shtdnClntOpts.MaxReconnects, shtdnClntOpts.Reconnect) + require.Truef(t, connected, "Reconnect success") + + testGracefulShutdown(t, conn, &inst) +} + +func TestNoGracefulShutdown(t *testing.T) { + // No watchers = no graceful shutdown. + noShtdnClntOpts := shtdnClntOpts.Clone() + noShtdnClntOpts.RequiredProtocolInfo = ProtocolInfo{} + + var inst test_helpers.TarantoolInstance + var conn *Connection + var isLess210 bool + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, noShtdnClntOpts) + defer conn.Close() + + evalSleep := 10 // in seconds + serverShutdownTimeout := 60 // in seconds + require.Less(t, evalSleep, serverShutdownTimeout) + + // Set a big timeout so it would be easy to find out if server went down on timeout. + // There is no graceful shutdown for pre-2.10 instances so it is not possible + // for them to wait for it. + isLess210, err = test_helpers.IsTarantoolVersionLess(2, 10, 0) + require.Nil(t, err) + if !isLess210 { + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + } + + // Send request with sleep. + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Check that request was interrupted. + _, err = fut.Get() + require.NotNilf(t, err, "sleep request error") + + // Wait until server go down. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Mark that process is already down for test helpers. + inst.Cmd.Process = nil + + // Check that server finished without waiting for eval to finish. + require.Lessf(t, + shutdownTime, + time.Duration(evalSleep/2)*time.Second, + "server went down without any additional waiting") + + // Check that it wasn't a timeout. + if !isLess210 { + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + } +} + +func TestGracefulShutdownRespectsClose(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 10 // in seconds + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Close the connection. + conn.Close() + + // Connection is closed. + require.Equal(t, true, conn.ClosedNow()) + + // Check that request was interrupted. + _, err = fut.Get() + require.NotNilf(t, err, "sleep request error") + + // Wait until server go down. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Mark that process is already down for test helpers. + inst.Cmd.Process = nil + + // Check that server finished without waiting for eval to finish. + require.Lessf(t, + shutdownTime, + time.Duration(evalSleep/2)*time.Second, + "server went down without any additional waiting") + + // Check that it wasn't a timeout. + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + + // Connection is still closed. + require.Equal(t, true, conn.ClosedNow()) +}