From 78d67b515577395e005d37ec30cdfb9aab46b213 Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Fri, 16 Dec 2022 14:53:59 +0300 Subject: [PATCH] feature: support graceful shutdown If connected to Tarantool 2.10 or newer and WatchersFeature is required, after this patch connection supports server graceful shutdown [1]. In this case, server will wait until all client requests will be finished and client disconnects before going down (server also may go down by timeout). Client reconnect will happen if connection options enable reconnect. 1. https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ Closes #214 --- CHANGELOG.md | 1 + connection.go | 120 +++++++++++++++++-- shutdown_test.go | 300 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 412 insertions(+), 9 deletions(-) create mode 100644 shutdown_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7930b34d1..8cc87979d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Error type support in MessagePack (#209) - Event subscription support (#119) - Session settings support (#215) +- Support graceful shutdown (#214) ### Changed diff --git a/connection.go b/connection.go index b657dc2ae..781e99502 100644 --- a/connection.go +++ b/connection.go @@ -33,6 +33,13 @@ const ( connTransportSsl = "ssl" ) +const ( + shutdownNotInProgress = 0 + shutdownInProgress = 1 +) + +const shutdownEventKey = "box.shutdown" + type ConnEventKind int type ConnLogKind int @@ -134,6 +141,14 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac // always returns array of array (array of tuples for space related methods). // For Eval* and Call* Tarantool always returns array, but does not forces // array of arrays. +// +// If connected to Tarantool 2.10 or newer and WatchersFeature is required, +// connection supports server graceful shutdown. In this case, server will +// wait until all client requests will be finished and client disconnects +// before going down (server also may go down by timeout). Client reconnect will +// happen if connection options enable reconnect. +// +// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ type Connection struct { addr string c net.Conn @@ -162,6 +177,13 @@ type Connection struct { serverProtocolInfo ProtocolInfo // watchMap is a map of key -> chan watchState. watchMap sync.Map + + // shutdownInProgress defined whether shutdown now in progress. + shutdownInProgress uint32 + // shutdownWatcher is the "box.shutdown" event watcher. + shutdownWatcher Watcher + // shutdownWg is the wait group to finish all tasks on shutdown. + shutdownWg sync.WaitGroup } var _ = Connector(&Connection{}) // Check compatibility with connector interface. @@ -373,6 +395,7 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) { } } } + conn.shutdownInProgress = shutdownNotInProgress if conn.opts.RateLimit > 0 { conn.rlimit = make(chan struct{}, conn.opts.RateLimit) @@ -421,6 +444,16 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) { } } + // Subscribe shutdown event to process graceful shutdown. + if conn.isWatchersRequired() { + watcher, werr := conn.NewWatcher(shutdownEventKey, shutdownEventCallback) + if werr != nil { + conn.closeConnection(werr, true) + return nil, werr + } + conn.shutdownWatcher = watcher + } + return conn, err } @@ -1086,6 +1119,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future { if fut.ready == nil { return fut } + if req.Ctx() != nil { select { case <-req.Ctx().Done(): @@ -1094,13 +1128,31 @@ func (conn *Connection) send(req Request, streamId uint64) *Future { default: } } + + if atomic.LoadUint32(&(conn.shutdownInProgress)) == shutdownInProgress { + conn.cancelFuture(fut, fmt.Errorf("server shutdown in progress")) + return fut + } + conn.putFuture(fut, req, streamId) + if req.Ctx() != nil { go conn.contextWatchdog(fut, req.Ctx()) } + + if conn.shutdownWatcher != nil { + go conn.gracefulWait(fut) + } + return fut } +func (conn *Connection) gracefulWait(fut *Future) { + conn.shutdownWg.Add(1) + <-fut.done + conn.shutdownWg.Done() +} + func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) { shardn := fut.requestId & (conn.opts.Concurrency - 1) shard := &conn.shard[shardn] @@ -1458,6 +1510,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error return st, nil } +func (conn *Connection) isWatchersRequired() bool { + for _, feature := range conn.opts.RequiredProtocolInfo.Features { + if feature == WatchersFeature { + return true + } + } + return false +} + // NewWatcher creates a new Watcher object for the connection. // // You need to require WatchersFeature to use watchers, see examples for the @@ -1496,15 +1557,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, // asynchronous. We do not expect any response from a Tarantool instance // That's why we can't just check the Tarantool response for an unsupported // request error. - watchersRequired := false - for _, feature := range conn.opts.RequiredProtocolInfo.Features { - if feature == WatchersFeature { - watchersRequired = true - break - } - } - - if !watchersRequired { + if !conn.isWatchersRequired() { err := fmt.Errorf("the feature %s must be required by connection "+ "options to create a watcher", WatchersFeature) return nil, err @@ -1666,3 +1719,52 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo { func (conn *Connection) ClientProtocolInfo() ProtocolInfo { return clientProtocolInfo.Clone() } + +func shutdownEventCallback(event WatchEvent) { + // Receives "true" on server shutdown. + // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ + // step 2. + val, ok := event.Value.(bool) + if ok && val { + // defer cause otherwise we'll block ourselves on ack wait. + defer event.Conn.processShutdown() + } +} + +func (conn *Connection) processShutdown() { + // Forbid starting new tasks. + atomic.StoreUint32(&(conn.shutdownInProgress), shutdownInProgress) + + // After finish, allow starting new tasks, they will fail with + // "not connected" instead. + defer atomic.StoreUint32(&(conn.shutdownInProgress), shutdownNotInProgress) + + // Wait for tasks to finish. + conn.shutdownWg.Wait() + + // Do not unregister task explicitly since connection teardown + // has the same effect. + + if !conn.ClosedNow() { + // Forbid other connection interactions. + conn.mutex.Lock() + + err := ClientError{ + ErrConnectionClosed, + "connection closed after server shutdown", + } + + // Close the connection since server waits until all subscribed + // connections are terminated. + // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ + // step 3. + conn.closeConnection(err, false) + + // Allow other connection interactions. + // Unlock not with defer since reconnect may block the mutex. + conn.mutex.Unlock() + + // Start to reconnect based on common rules, same as in net.box. + defer conn.reconnect(err, conn.c) + } +} diff --git a/shutdown_test.go b/shutdown_test.go new file mode 100644 index 000000000..c03cf02f9 --- /dev/null +++ b/shutdown_test.go @@ -0,0 +1,300 @@ +//go:build linux || (darwin && !cgo) +// +build linux darwin,!cgo + +// Use OS build flags since signals are system-dependent. + +package tarantool_test + +import ( + "fmt" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/require" + . "github.com/tarantool/go-tarantool" + "github.com/tarantool/go-tarantool/test_helpers" +) + +var shtdnServer = "127.0.0.1:3014" +var shtdnClntOpts = Opts{ + User: opts.User, + Pass: opts.Pass, + Timeout: 20 * time.Second, + Reconnect: 200 * time.Millisecond, + MaxReconnects: 10, + RequiredProtocolInfo: ProtocolInfo{Features: []ProtocolFeature{WatchersFeature}}, +} +var shtdnSrvOpts = test_helpers.StartOpts{ + InitScript: "config.lua", + Listen: shtdnServer, + User: shtdnClntOpts.User, + Pass: shtdnClntOpts.Pass, + WaitStart: 100 * time.Millisecond, + ConnectRetry: 3, + RetryTimeout: 500 * time.Millisecond, +} + +var evalMsg = "got enough sleep" +var evalBody = ` + local fiber = require('fiber') + local time, msg = ... + fiber.sleep(time) + return msg +` + +func testGracefulShutdown(t *testing.T, conn *Connection, inst *test_helpers.TarantoolInstance) { + var resp *Response + var err error + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 1 // in seconds + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Check that we can't send new requests after shutdown starts. + // Retry helps to wait a bit until server starts to shutdown + // and send us the shutdown event. + shutdownWaitRetries := 5 + shutdownWaitTimeout := 100 * time.Millisecond + + err = test_helpers.Retry(func(interface{}) error { + _, err = conn.Do(NewPingRequest()).Get() + if err == nil { + return fmt.Errorf("expected error for requests sent on shutdown") + } + + if err.Error() != "server shutdown in progress" { + return err + } + + return nil + }, nil, shutdownWaitRetries, shutdownWaitTimeout) + require.Nil(t, err) + + // Check that requests started before the shutdown finish successfully. + resp, err = fut.Get() + require.Nil(t, err) + require.NotNil(t, resp) + require.Equal(t, resp.Data, []interface{}{evalMsg}) + + // Wait until server go down. + // Server will go down only when it process all requests from our connection + // (or on timeout). + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Mark that process is already down for test helpers. + inst.Cmd.Process = nil + + // Check that it wasn't a timeout. + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + + // Connection is unavailable when server is down. + require.Equal(t, false, conn.ConnectedNow()) +} + +func TestGracefulShutdown(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + testGracefulShutdown(t, conn, &inst) +} + +func TestGracefulShutdownWithReconnect(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + testGracefulShutdown(t, conn, &inst) + + err = test_helpers.RestartTarantool(&inst) + require.Nilf(t, err, "Failed to restart tarantool") + + connected := test_helpers.WaitUntilReconnected(conn, shtdnClntOpts.MaxReconnects, shtdnClntOpts.Reconnect) + require.Truef(t, connected, "Reconnect success") + + testGracefulShutdown(t, conn, &inst) +} + +func TestNoGracefulShutdown(t *testing.T) { + // No watchers = no graceful shutdown. + noShtdnClntOpts := shtdnClntOpts.Clone() + noShtdnClntOpts.RequiredProtocolInfo = ProtocolInfo{} + + var inst test_helpers.TarantoolInstance + var conn *Connection + var isLess210 bool + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, noShtdnClntOpts) + defer conn.Close() + + evalSleep := 10 // in seconds + serverShutdownTimeout := 60 // in seconds + require.Less(t, evalSleep, serverShutdownTimeout) + + // Set a big timeout so it would be easy to find out if server went down on timeout. + // There is no graceful shutdown for pre-2.10 instances so it is not possible + // for them to wait for it. + isLess210, err = test_helpers.IsTarantoolVersionLess(2, 10, 0) + require.Nil(t, err) + if !isLess210 { + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + } + + // Send request with sleep. + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Check that request was interrupted. + _, err = fut.Get() + require.NotNilf(t, err, "sleep request error") + + // Wait until server go down. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Mark that process is already down for test helpers. + inst.Cmd.Process = nil + + // Check that server finished without waiting for eval to finish. + require.Lessf(t, + shutdownTime, + time.Duration(evalSleep/2)*time.Second, + "server went down without any additional waiting") + + // Check that it wasn't a timeout. + if !isLess210 { + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + } +} + +func TestGracefulShutdownRespectsClose(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 10 // in seconds + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Close the connection. + conn.Close() + + // Connection is closed. + require.Equal(t, true, conn.ClosedNow()) + + // Check that request was interrupted. + _, err = fut.Get() + require.NotNilf(t, err, "sleep request error") + + // Wait until server go down. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Mark that process is already down for test helpers. + inst.Cmd.Process = nil + + // Check that server finished without waiting for eval to finish. + require.Lessf(t, + shutdownTime, + time.Duration(evalSleep/2)*time.Second, + "server went down without any additional waiting") + + // Check that it wasn't a timeout. + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + + // Connection is still closed. + require.Equal(t, true, conn.ClosedNow()) +}