Skip to content

Commit

Permalink
feature: support graceful shutdown
Browse files Browse the repository at this point in the history
If connected to Tarantool 2.10 or newer, after this patch a connection
supports server graceful shutdown [1]. In this case, server will wait
until all client requests will be finished and client disconnects before
going down (server also may go down by timeout). Client reconnect will
happen if connection options enable reconnect. Beware that graceful
shutdown event initialization is asynchronous.

1. https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/

Closes #214
  • Loading branch information
DifferentialOrange committed Dec 25, 2022
1 parent 2faaa7d commit fe07889
Show file tree
Hide file tree
Showing 4 changed files with 690 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Error type support in MessagePack (#209)
- Event subscription support (#119)
- Session settings support (#215)
- Support graceful shutdown (#214)

### Changed

Expand Down
151 changes: 137 additions & 14 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ const ignoreStreamId = 0
const (
connDisconnected = 0
connConnected = 1
connClosed = 2
connShutdown = 2
connClosed = 3
)

const (
connTransportNone = ""
connTransportSsl = "ssl"
)

const shutdownEventKey = "box.shutdown"

type ConnEventKind int
type ConnLogKind int

Expand All @@ -45,6 +48,8 @@ const (
ReconnectFailed
// Either reconnect attempts exhausted, or explicit Close is called.
Closed
// Shutdown signals that shutdown callback is processing.
Shutdown

// LogReconnectFailed is logged when reconnect attempt failed.
LogReconnectFailed ConnLogKind = iota + 1
Expand Down Expand Up @@ -134,10 +139,19 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
// always returns array of array (array of tuples for space related methods).
// For Eval* and Call* Tarantool always returns array, but does not forces
// array of arrays.
//
// If connected to Tarantool 2.10 or newer, connection supports server graceful
// shutdown. In this case, server will wait until all client requests will be
// finished and client disconnects before going down (server also may go down
// by timeout). Client reconnect will happen if connection options enable
// reconnect. Beware that graceful shutdown event initialization is asynchronous.
//
// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
type Connection struct {
addr string
c net.Conn
mutex sync.Mutex
cond *sync.Cond
// Schema contains schema loaded on connection.
Schema *Schema
// requestId contains the last request ID for requests with nil context.
Expand All @@ -162,6 +176,11 @@ type Connection struct {
serverProtocolInfo ProtocolInfo
// watchMap is a map of key -> chan watchState.
watchMap sync.Map

// shutdownWatcher is the "box.shutdown" event watcher.
shutdownWatcher Watcher
// requestCnt is a counter of active requests.
requestCnt int64
}

var _ = Connector(&Connection{}) // Check compatibility with connector interface.
Expand Down Expand Up @@ -385,6 +404,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
conn.opts.Logger = defaultLogger{}
}

conn.cond = sync.NewCond(&conn.mutex)

if err = conn.createConnection(false); err != nil {
ter, ok := err.(Error)
if conn.opts.Reconnect <= 0 {
Expand Down Expand Up @@ -589,10 +610,20 @@ func (conn *Connection) dial() (err error) {
conn.lockShards()
conn.c = connection
atomic.StoreUint32(&conn.state, connConnected)
conn.cond.Broadcast()
conn.unlockShards()
go conn.writer(w, connection)
go conn.reader(r, connection)

// Subscribe shutdown event to process graceful shutdown.
if conn.shutdownWatcher == nil && conn.isFeatureInSlice(WatchersFeature, conn.serverProtocolInfo.Features) {
watcher, werr := conn.newWatcherImpl(shutdownEventKey, shutdownEventCallback)
if werr != nil {
return werr
}
conn.shutdownWatcher = watcher
}

return
}

Expand Down Expand Up @@ -762,10 +793,17 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
if conn.state != connClosed {
close(conn.control)
atomic.StoreUint32(&conn.state, connClosed)
conn.cond.Broadcast()
// Free the resources.
if conn.shutdownWatcher != nil {
go conn.shutdownWatcher.Unregister()
conn.shutdownWatcher = nil
}
conn.notify(Closed)
}
} else {
atomic.StoreUint32(&conn.state, connDisconnected)
conn.cond.Broadcast()
conn.notify(Disconnected)
}
if conn.c != nil {
Expand All @@ -784,9 +822,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
return
}

func (conn *Connection) reconnect(neterr error, c net.Conn) {
conn.mutex.Lock()
defer conn.mutex.Unlock()
func (conn *Connection) reconnectImpl(neterr error, c net.Conn) {
if conn.opts.Reconnect > 0 {
if c == conn.c {
conn.closeConnection(neterr, false)
Expand All @@ -799,6 +835,13 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) {
}
}

func (conn *Connection) reconnect(neterr error, c net.Conn) {
conn.mutex.Lock()
defer conn.mutex.Unlock()
conn.reconnectImpl(neterr, c)
conn.cond.Broadcast()
}

func (conn *Connection) lockShards() {
for i := range conn.shard {
conn.shard[i].rmut.Lock()
Expand Down Expand Up @@ -1026,6 +1069,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
fut.done = nil
shard.rmut.Unlock()
return
case connShutdown:
fut.err = ClientError{
ErrConnectionShutdown,
"server shutdown in progress",
}
fut.ready = nil
fut.done = nil
shard.rmut.Unlock()
return
}
pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1)
if ctx != nil {
Expand Down Expand Up @@ -1082,22 +1134,32 @@ func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
}

func (conn *Connection) send(req Request, streamId uint64) *Future {
atomic.AddInt64(&conn.requestCnt, int64(1))

fut := conn.newFuture(req.Ctx())
if fut.ready == nil {
atomic.AddInt64(&conn.requestCnt, int64(-1))
return fut
}

if req.Ctx() != nil {
select {
case <-req.Ctx().Done():
conn.cancelFuture(fut, fmt.Errorf("context is done"))
// future here does not belong to any shard yet,
// so cancelFuture don't call markDone.
atomic.AddInt64(&conn.requestCnt, int64(-1))
return fut
default:
}
}

conn.putFuture(fut, req, streamId)

if req.Ctx() != nil {
go conn.contextWatchdog(fut, req.Ctx())
}

return fut
}

Expand Down Expand Up @@ -1164,6 +1226,10 @@ func (conn *Connection) markDone(fut *Future) {
if conn.rlimit != nil {
<-conn.rlimit
}

if atomic.AddInt64(&conn.requestCnt, int64(-1)) == 0 {
conn.cond.Broadcast()
}
}

func (conn *Connection) peekFuture(reqid uint32) (fut *Future) {
Expand Down Expand Up @@ -1458,6 +1524,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error
return st, nil
}

func (conn *Connection) isFeatureInSlice(expected ProtocolFeature, actualSlice []ProtocolFeature) bool {
for _, actual := range actualSlice {
if expected == actual {
return true
}
}
return false
}

// NewWatcher creates a new Watcher object for the connection.
//
// You need to require WatchersFeature to use watchers, see examples for the
Expand Down Expand Up @@ -1496,20 +1571,16 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
// asynchronous. We do not expect any response from a Tarantool instance
// That's why we can't just check the Tarantool response for an unsupported
// request error.
watchersRequired := false
for _, feature := range conn.opts.RequiredProtocolInfo.Features {
if feature == WatchersFeature {
watchersRequired = true
break
}
}

if !watchersRequired {
if !conn.isFeatureInSlice(WatchersFeature, conn.opts.RequiredProtocolInfo.Features) {
err := fmt.Errorf("the feature %s must be required by connection "+
"options to create a watcher", WatchersFeature)
return nil, err
}

return conn.newWatcherImpl(key, callback)
}

func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watcher, error) {
st, err := subscribeWatchChannel(conn, key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1563,7 +1634,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,

if state.cnt == 0 {
// The last one sends IPROTO_UNWATCH.
conn.Do(newUnwatchRequest(key)).Get()
if !conn.ClosedNow() {
// conn.ClosedNow() check is a workaround for calling
// Unregister from connectionClose().
conn.Do(newUnwatchRequest(key)).Get()
}
conn.watchMap.Delete(key)
close(state.unready)
}
Expand Down Expand Up @@ -1666,3 +1741,51 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
return clientProtocolInfo.Clone()
}

func shutdownEventCallback(event WatchEvent) {
// Receives "true" on server shutdown.
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
// step 2.
val, ok := event.Value.(bool)
if ok && val {
go event.Conn.shutdown()
}
}

func (conn *Connection) shutdown() {
// Forbid state changes.
conn.mutex.Lock()
defer conn.mutex.Unlock()

if !atomic.CompareAndSwapUint32(&(conn.state), connConnected, connShutdown) {
return
}
conn.cond.Broadcast()
conn.notify(Shutdown)

c := conn.c
for {
if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) {
return
}
if atomic.LoadInt64(&conn.requestCnt) == 0 {
break
}
// Use cond var on conn.mutex since request execution may
// call reconnect(). It is ok if state changes as part of
// reconnect since Tarantool server won't allow to reconnect
// in the middle of shutting down.
conn.cond.Wait()
}

// Start to reconnect based on common rules, same as in net.box.
// Reconnect also closes the connection: server waits until all
// subscribed connections are terminated.
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
// step 3.
conn.reconnectImpl(
ClientError{
ErrConnectionClosed,
"connection closed after server shutdown",
}, conn.c)
}
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
ErrProtocolError = 0x4000 + iota
ErrTimeouted = 0x4000 + iota
ErrRateLimited = 0x4000 + iota
ErrConnectionShutdown = 0x4000 + iota
)

// Tarantool server error codes.
Expand Down
Loading

0 comments on commit fe07889

Please sign in to comment.