Skip to content

Commit

Permalink
feature: support graceful shutdown
Browse files Browse the repository at this point in the history
If connected to Tarantool 2.10 or newer and WatchersFeature is required,
after this patch connection supports server graceful shutdown [1]. In
this case, server will wait until all client requests will be finished
and client disconnects before going down (server also may go down by
timeout). Client reconnect will happen if connection options enable
reconnect.

1. https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/

Closes #214
  • Loading branch information
DifferentialOrange committed Dec 16, 2022
1 parent 609b5b1 commit 78d67b5
Show file tree
Hide file tree
Showing 3 changed files with 412 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Error type support in MessagePack (#209)
- Event subscription support (#119)
- Session settings support (#215)
- Support graceful shutdown (#214)

### Changed

Expand Down
120 changes: 111 additions & 9 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ const (
connTransportSsl = "ssl"
)

const (
shutdownNotInProgress = 0
shutdownInProgress = 1
)

const shutdownEventKey = "box.shutdown"

type ConnEventKind int
type ConnLogKind int

Expand Down Expand Up @@ -134,6 +141,14 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
// always returns array of array (array of tuples for space related methods).
// For Eval* and Call* Tarantool always returns array, but does not forces
// array of arrays.
//
// If connected to Tarantool 2.10 or newer and WatchersFeature is required,
// connection supports server graceful shutdown. In this case, server will
// wait until all client requests will be finished and client disconnects
// before going down (server also may go down by timeout). Client reconnect will
// happen if connection options enable reconnect.
//
// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
type Connection struct {
addr string
c net.Conn
Expand Down Expand Up @@ -162,6 +177,13 @@ type Connection struct {
serverProtocolInfo ProtocolInfo
// watchMap is a map of key -> chan watchState.
watchMap sync.Map

// shutdownInProgress defined whether shutdown now in progress.
shutdownInProgress uint32
// shutdownWatcher is the "box.shutdown" event watcher.
shutdownWatcher Watcher
// shutdownWg is the wait group to finish all tasks on shutdown.
shutdownWg sync.WaitGroup
}

var _ = Connector(&Connection{}) // Check compatibility with connector interface.
Expand Down Expand Up @@ -373,6 +395,7 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
}
}
}
conn.shutdownInProgress = shutdownNotInProgress

if conn.opts.RateLimit > 0 {
conn.rlimit = make(chan struct{}, conn.opts.RateLimit)
Expand Down Expand Up @@ -421,6 +444,16 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
}
}

// Subscribe shutdown event to process graceful shutdown.
if conn.isWatchersRequired() {
watcher, werr := conn.NewWatcher(shutdownEventKey, shutdownEventCallback)
if werr != nil {
conn.closeConnection(werr, true)
return nil, werr
}
conn.shutdownWatcher = watcher
}

return conn, err
}

Expand Down Expand Up @@ -1086,6 +1119,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
if fut.ready == nil {
return fut
}

if req.Ctx() != nil {
select {
case <-req.Ctx().Done():
Expand All @@ -1094,13 +1128,31 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
default:
}
}

if atomic.LoadUint32(&(conn.shutdownInProgress)) == shutdownInProgress {
conn.cancelFuture(fut, fmt.Errorf("server shutdown in progress"))
return fut
}

conn.putFuture(fut, req, streamId)

if req.Ctx() != nil {
go conn.contextWatchdog(fut, req.Ctx())
}

if conn.shutdownWatcher != nil {
go conn.gracefulWait(fut)
}

return fut
}

func (conn *Connection) gracefulWait(fut *Future) {
conn.shutdownWg.Add(1)
<-fut.done
conn.shutdownWg.Done()
}

func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
shardn := fut.requestId & (conn.opts.Concurrency - 1)
shard := &conn.shard[shardn]
Expand Down Expand Up @@ -1458,6 +1510,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error
return st, nil
}

func (conn *Connection) isWatchersRequired() bool {
for _, feature := range conn.opts.RequiredProtocolInfo.Features {
if feature == WatchersFeature {
return true
}
}
return false
}

// NewWatcher creates a new Watcher object for the connection.
//
// You need to require WatchersFeature to use watchers, see examples for the
Expand Down Expand Up @@ -1496,15 +1557,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
// asynchronous. We do not expect any response from a Tarantool instance
// That's why we can't just check the Tarantool response for an unsupported
// request error.
watchersRequired := false
for _, feature := range conn.opts.RequiredProtocolInfo.Features {
if feature == WatchersFeature {
watchersRequired = true
break
}
}

if !watchersRequired {
if !conn.isWatchersRequired() {
err := fmt.Errorf("the feature %s must be required by connection "+
"options to create a watcher", WatchersFeature)
return nil, err
Expand Down Expand Up @@ -1666,3 +1719,52 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
return clientProtocolInfo.Clone()
}

func shutdownEventCallback(event WatchEvent) {
// Receives "true" on server shutdown.
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
// step 2.
val, ok := event.Value.(bool)
if ok && val {
// defer cause otherwise we'll block ourselves on ack wait.
defer event.Conn.processShutdown()
}
}

func (conn *Connection) processShutdown() {
// Forbid starting new tasks.
atomic.StoreUint32(&(conn.shutdownInProgress), shutdownInProgress)

// After finish, allow starting new tasks, they will fail with
// "not connected" instead.
defer atomic.StoreUint32(&(conn.shutdownInProgress), shutdownNotInProgress)

// Wait for tasks to finish.
conn.shutdownWg.Wait()

// Do not unregister task explicitly since connection teardown
// has the same effect.

if !conn.ClosedNow() {
// Forbid other connection interactions.
conn.mutex.Lock()

err := ClientError{
ErrConnectionClosed,
"connection closed after server shutdown",
}

// Close the connection since server waits until all subscribed
// connections are terminated.
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
// step 3.
conn.closeConnection(err, false)

// Allow other connection interactions.
// Unlock not with defer since reconnect may block the mutex.
conn.mutex.Unlock()

// Start to reconnect based on common rules, same as in net.box.
defer conn.reconnect(err, conn.c)
}
}
Loading

0 comments on commit 78d67b5

Please sign in to comment.