Skip to content

Commit

Permalink
feature: support graceful shutdown
Browse files Browse the repository at this point in the history
If connected to Tarantool 2.10 or newer and WatchersFeature is required,
after this patch connection supports server graceful shutdown [1]. In
this case, server will wait until all client requests will be finished
and client disconnects before going down (server also may go down by
timeout). Client reconnect will happen if connection options enable
reconnect.

1. https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/

Closes #214
  • Loading branch information
DifferentialOrange committed Dec 21, 2022
1 parent 03a1cc1 commit cd918fb
Show file tree
Hide file tree
Showing 4 changed files with 566 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Error type support in MessagePack (#209)
- Event subscription support (#119)
- Session settings support (#215)
- Support graceful shutdown (#214)

### Changed

Expand Down
152 changes: 138 additions & 14 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ const ignoreStreamId = 0
const (
connDisconnected = 0
connConnected = 1
connClosed = 2
connShutdown = 2
connClosed = 3
)

const (
connTransportNone = ""
connTransportSsl = "ssl"
)

const shutdownEventKey = "box.shutdown"

type ConnEventKind int
type ConnLogKind int

Expand All @@ -45,6 +48,8 @@ const (
ReconnectFailed
// Either reconnect attempts exhausted, or explicit Close is called.
Closed
// Shutdown signals that shutdown callback is processing.
Shutdown

// LogReconnectFailed is logged when reconnect attempt failed.
LogReconnectFailed ConnLogKind = iota + 1
Expand Down Expand Up @@ -134,10 +139,19 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
// always returns array of array (array of tuples for space related methods).
// For Eval* and Call* Tarantool always returns array, but does not forces
// array of arrays.
//
// If connected to Tarantool 2.10 or newer and WatchersFeature is required,
// connection supports server graceful shutdown. In this case, server will
// wait until all client requests will be finished and client disconnects
// before going down (server also may go down by timeout). Client reconnect will
// happen if connection options enable reconnect.
//
// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
type Connection struct {
addr string
c net.Conn
mutex sync.Mutex
cond *sync.Cond
// Schema contains schema loaded on connection.
Schema *Schema
// requestId contains the last request ID for requests with nil context.
Expand All @@ -162,6 +176,11 @@ type Connection struct {
serverProtocolInfo ProtocolInfo
// watchMap is a map of key -> chan watchState.
watchMap sync.Map

// shutdownWatcher is the "box.shutdown" event watcher.
shutdownWatcher Watcher
// requestCnt is a counter of active requests.
requestCnt uint32
}

var _ = Connector(&Connection{}) // Check compatibility with connector interface.
Expand Down Expand Up @@ -385,6 +404,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
conn.opts.Logger = defaultLogger{}
}

conn.cond = sync.NewCond(&conn.mutex)

if err = conn.createConnection(false); err != nil {
ter, ok := err.(Error)
if conn.opts.Reconnect <= 0 {
Expand Down Expand Up @@ -421,6 +442,16 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
}
}

// Subscribe shutdown event to process graceful shutdown.
if conn.isWatchersRequired() {
watcher, werr := conn.NewWatcher(shutdownEventKey, shutdownEventCallback)
if werr != nil {
conn.closeConnection(werr, true)
return nil, werr
}
conn.shutdownWatcher = watcher
}

return conn, err
}

Expand Down Expand Up @@ -589,6 +620,7 @@ func (conn *Connection) dial() (err error) {
conn.lockShards()
conn.c = connection
atomic.StoreUint32(&conn.state, connConnected)
conn.cond.Broadcast()
conn.unlockShards()
go conn.writer(w, connection)
go conn.reader(r, connection)
Expand Down Expand Up @@ -762,10 +794,19 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
if conn.state != connClosed {
close(conn.control)
atomic.StoreUint32(&conn.state, connClosed)
conn.cond.Broadcast()
// Free the resources.
if conn.shutdownWatcher != nil {
conn.mutex.Unlock()
conn.shutdownWatcher.Unregister()
conn.shutdownWatcher = nil
conn.mutex.Lock()
}
conn.notify(Closed)
}
} else {
atomic.StoreUint32(&conn.state, connDisconnected)
conn.cond.Broadcast()
conn.notify(Disconnected)
}
if conn.c != nil {
Expand All @@ -784,9 +825,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
return
}

func (conn *Connection) reconnect(neterr error, c net.Conn) {
conn.mutex.Lock()
defer conn.mutex.Unlock()
func (conn *Connection) reconnectImpl(neterr error, c net.Conn) {
if conn.opts.Reconnect > 0 {
if c == conn.c {
conn.closeConnection(neterr, false)
Expand All @@ -799,6 +838,13 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) {
}
}

func (conn *Connection) reconnect(neterr error, c net.Conn) {
conn.mutex.Lock()
defer conn.mutex.Unlock()
conn.reconnectImpl(neterr, c)
conn.cond.Broadcast()
}

func (conn *Connection) lockShards() {
for i := range conn.shard {
conn.shard[i].rmut.Lock()
Expand Down Expand Up @@ -1026,6 +1072,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
fut.done = nil
shard.rmut.Unlock()
return
case connShutdown:
fut.err = ClientError{
ErrConnectionShutdown,
"server shutdown in progress",
}
fut.ready = nil
fut.done = nil
shard.rmut.Unlock()
return
}
pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1)
if ctx != nil {
Expand Down Expand Up @@ -1086,6 +1141,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
if fut.ready == nil {
return fut
}

if req.Ctx() != nil {
select {
case <-req.Ctx().Done():
Expand All @@ -1094,10 +1150,17 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
default:
}
}

if conn.shutdownWatcher != nil {
atomic.AddUint32(&(conn.requestCnt), uint32(1))
}

conn.putFuture(fut, req, streamId)

if req.Ctx() != nil {
go conn.contextWatchdog(fut, req.Ctx())
}

return fut
}

Expand Down Expand Up @@ -1164,6 +1227,15 @@ func (conn *Connection) markDone(fut *Future) {
if conn.rlimit != nil {
<-conn.rlimit
}

if conn.shutdownWatcher != nil {
// This is a real advice from Go documentation
// about how to decrement atomic uint32.
// https://pkg.go.dev/sync/atomic#AddUint32
if atomic.AddUint32(&(conn.requestCnt), ^uint32(0)) == 0 {
conn.cond.Broadcast()
}
}
}

func (conn *Connection) peekFuture(reqid uint32) (fut *Future) {
Expand Down Expand Up @@ -1458,6 +1530,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error
return st, nil
}

func (conn *Connection) isWatchersRequired() bool {
for _, feature := range conn.opts.RequiredProtocolInfo.Features {
if feature == WatchersFeature {
return true
}
}
return false
}

// NewWatcher creates a new Watcher object for the connection.
//
// You need to require WatchersFeature to use watchers, see examples for the
Expand Down Expand Up @@ -1496,15 +1577,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
// asynchronous. We do not expect any response from a Tarantool instance
// That's why we can't just check the Tarantool response for an unsupported
// request error.
watchersRequired := false
for _, feature := range conn.opts.RequiredProtocolInfo.Features {
if feature == WatchersFeature {
watchersRequired = true
break
}
}

if !watchersRequired {
if !conn.isWatchersRequired() {
err := fmt.Errorf("the feature %s must be required by connection "+
"options to create a watcher", WatchersFeature)
return nil, err
Expand Down Expand Up @@ -1563,7 +1636,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,

if state.cnt == 0 {
// The last one sends IPROTO_UNWATCH.
conn.Do(newUnwatchRequest(key)).Get()
if !conn.ClosedNow() {
// conn.ClosedNow() check is a workaround for calling
// Unregister from connectionClose().
conn.Do(newUnwatchRequest(key)).Get()
}
conn.watchMap.Delete(key)
close(state.unready)
}
Expand Down Expand Up @@ -1666,3 +1743,50 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
return clientProtocolInfo.Clone()
}

func shutdownEventCallback(event WatchEvent) {
// Receives "true" on server shutdown.
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
// step 2.
val, ok := event.Value.(bool)
if ok && val {
go event.Conn.processShutdown()
}
}

func (conn *Connection) processShutdown() {
// Forbid state changes.
conn.mutex.Lock()
defer conn.mutex.Unlock()

atomic.StoreUint32(&(conn.state), connShutdown)
conn.notify(Shutdown)

c := conn.c
for (atomic.LoadUint32(&(conn.state)) == connShutdown) &&
(atomic.LoadUint32(&(conn.requestCnt)) != 0) &&
(c == conn.c) {
// Use cond var on conn.mutex since request execution may
// call reconnect(). It is ok if state changes as part of
// reconnect since Tarantool server won't allow to reconnect
// in the middle of shutting down.
conn.cond.Wait()
}
// Do not unregister task explicitly here since connection teardown
// has the same effect. To clean up connection resources,
// unregister on full close.

if (atomic.LoadUint32(&(conn.state)) == connShutdown) &&
(c == conn.c) {
// Start to reconnect based on common rules, same as in net.box.
// Reconnect also closes the connection: server waits until all
// subscribed connections are terminated.
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
// step 3.
conn.reconnectImpl(
ClientError{
ErrConnectionClosed,
"connection closed after server shutdown",
}, conn.c)
}
}
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
ErrProtocolError = 0x4000 + iota
ErrTimeouted = 0x4000 + iota
ErrRateLimited = 0x4000 + iota
ErrConnectionShutdown = 0x4000 + iota
)

// Tarantool server error codes.
Expand Down
Loading

0 comments on commit cd918fb

Please sign in to comment.