Skip to content

Commit

Permalink
fix: Resolve force closed connections on receiver not being cleaned u…
Browse files Browse the repository at this point in the history
…p fully, preventing shutdown
  • Loading branch information
driskell committed Feb 26, 2024
1 parent bdc463c commit b6689de
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
11 changes: 8 additions & 3 deletions lc-lib/receiver/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,19 @@ ReceiverLoop:
receiver := eventImpl.Context().Value(transports.ContextReceiver).(transports.Receiver)
receiver.ShutdownConnection(eventImpl.Context())
// Receive side is closed, and we're just sending, so it'll close automatically once flushed, so clear all scheduled timeouts
r.apiConnections.RemoveEntry(r.connectionStatus[connection].remote)
delete(r.connectionStatus, connection)
r.scheduler.Remove(connection)
} else {
// Add to the scheduler a nil progress to signal that when we finish ack everything - this connection can close
r.connectionStatus[connection].progress = append(r.connectionStatus[connection].progress, nil)
}
r.connectionLock.Unlock()
case *transports.DisconnectEvent:
// Connection disconnected
r.connectionLock.Lock()
connection := eventImpl.Context().Value(transports.ContextConnection)
r.apiConnections.RemoveEntry(r.connectionStatus[connection].remote)
delete(r.connectionStatus, connection)
r.connectionLock.Unlock()
case *transports.StatusEvent:
if eventImpl.StatusChange() == transports.Finished {
// Remove the receiver from our list and exit if all receivers are finished
Expand All @@ -230,7 +235,7 @@ ReceiverLoop:
if status.active {
delete(r.receiversByListen, status.listen)
}
// If shutting down, have all acknowledgemente been handled, and all receivers closed?
// If shutting down, have all acknowledgements been handled, and all receivers closed?
if shutdownChan == nil && len(r.receivers) == 0 && len(r.connectionStatus) == 0 {
break ReceiverLoop
}
Expand Down
35 changes: 34 additions & 1 deletion lc-lib/transports/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (e *StatusEvent) Err() error {
return e.err
}

// ConnectEvent marks the start of a new connection on a reciver
// ConnectEvent marks the start of a new connection on a receiver
type ConnectEvent struct {
context context.Context
remote string
Expand Down Expand Up @@ -145,6 +145,39 @@ func (e *ConnectEvent) Desc() string {
return e.desc
}

// DisconnectEvent marks the end of a connection on a receiver
type DisconnectEvent struct {
context context.Context
remote string
desc string
}

var _ Event = (*DisconnectEvent)(nil)

// NewDisconnectEvent generates a new DisconnectEvent for the given Endpoint
func NewDisconnectEvent(context context.Context, remote string, desc string) *DisconnectEvent {
return &DisconnectEvent{
context: context,
remote: remote,
desc: desc,
}
}

// Context returns the endpoint associated with this event
func (e *DisconnectEvent) Context() context.Context {
return e.context
}

// Remote returns the identity of the remote side
func (e *DisconnectEvent) Remote() string {
return e.remote
}

// Desc returns a description for the remote
func (e *DisconnectEvent) Desc() string {
return e.desc
}

// EndEvent marks the end of a stream of events from an endpoint
type EndEvent struct {
context context.Context
Expand Down
2 changes: 2 additions & 0 deletions lc-lib/transports/tcp/receivertcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ func (t *receiverTCP) connectionRoutine(socket net.Conn, conn *connection) {
log.Noticef("[R %s - %s] Client closed", socket.LocalAddr().String(), socket.RemoteAddr().String())
}

conn.sendEvent(transports.NewDisconnectEvent(conn.ctx, socket.RemoteAddr().String(), conn.socket.Desc()))

t.connMutex.Lock()
delete(t.connections, conn)
t.connMutex.Unlock()
Expand Down

0 comments on commit b6689de

Please sign in to comment.