From c8192e1f34a0007ee34997c3c76d74cc80e02686 Mon Sep 17 00:00:00 2001 From: Russell Jones Date: Mon, 19 Feb 2018 17:59:08 -0800 Subject: [PATCH] Migrate to structured logging in session handling code. --- constants.go | 3 + lib/srv/forward/sshserver.go | 5 +- lib/srv/regular/sshserver.go | 5 +- lib/srv/sess.go | 453 +++++++++++++++++++---------------- 4 files changed, 252 insertions(+), 214 deletions(-) diff --git a/constants.go b/constants.go index e52d8889818de..fa07701f0f127 100644 --- a/constants.go +++ b/constants.go @@ -118,6 +118,9 @@ const ( // ComponentKeyGen is the public/private keypair generator. ComponentKeyGen = "keygen" + // ComponentSession is an active session. + ComponentSession = "session" + // DebugEnvVar tells tests to use verbose debug output DebugEnvVar = "DEBUG" diff --git a/lib/srv/forward/sshserver.go b/lib/srv/forward/sshserver.go index 7ad54b8232494..7f269583adb6c 100644 --- a/lib/srv/forward/sshserver.go +++ b/lib/srv/forward/sshserver.go @@ -186,7 +186,10 @@ func New(c ServerConfig) (*Server, error) { sessionServer: c.AuthClient, } - s.sessionRegistry = srv.NewSessionRegistry(s) + s.sessionRegistry, err = srv.NewSessionRegistry(s) + if err != nil { + return nil, trace.Wrap(err) + } // common auth handlers s.authHandlers = &srv.AuthHandlers{ diff --git a/lib/srv/regular/sshserver.go b/lib/srv/regular/sshserver.go index ae1f3c6f0a027..cc40eb37337e8 100644 --- a/lib/srv/regular/sshserver.go +++ b/lib/srv/regular/sshserver.go @@ -353,7 +353,10 @@ func New(addr utils.NetAddr, component = teleport.ComponentNode } - s.reg = srv.NewSessionRegistry(s) + s.reg, err = srv.NewSessionRegistry(s) + if err != nil { + return nil, trace.Wrap(err) + } // add in common auth handlers s.authHandlers = &srv.AuthHandlers{ diff --git a/lib/srv/sess.go b/lib/srv/sess.go index 1e962dce681a6..f046e9b855e97 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "golang.org/x/crypto/ssh" + "github.com/gravitational/teleport" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/events" @@ -32,9 +34,9 @@ import ( "github.com/gravitational/teleport/lib/state" "github.com/gravitational/trace" + "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" - "golang.org/x/crypto/ssh" + "github.com/sirupsen/logrus" ) const ( @@ -67,8 +69,28 @@ func init() { // SSH server type SessionRegistry struct { sync.Mutex + + // log holds the structured logger + log *logrus.Entry + + // sessions holds a map between session ID and the session object. sessions map[rsession.ID]*session - srv Server + + // srv refers to the upon which this session registry is created. + srv Server +} + +func NewSessionRegistry(srv Server) (*SessionRegistry, error) { + if srv.GetSessionServer() == nil { + return nil, trace.BadParameter("session server is required") + } + return &SessionRegistry{ + log: logrus.WithFields(logrus.Fields{ + trace.Component: teleport.Component(teleport.ComponentSession, srv.Component()), + }), + srv: srv, + sessions: make(map[rsession.ID]*session), + }, nil } func (s *SessionRegistry) addSession(sess *session) { @@ -77,13 +99,15 @@ func (s *SessionRegistry) addSession(sess *session) { s.sessions[sess.id] = sess } -func (r *SessionRegistry) Close() { - r.Lock() - defer r.Unlock() - for _, s := range r.sessions { - s.Close() +func (s *SessionRegistry) Close() { + s.Lock() + defer s.Unlock() + + for _, se := range s.sessions { + se.Close() } - log.Debugf("SessionRegistry.Close()") + + s.log.Debugf("Closing Session Registry.") } // joinShell either joins an existing session or starts a new shell @@ -99,7 +123,7 @@ func (s *SessionRegistry) OpenSession(ch ssh.Channel, req *ssh.Request, ctx *Ser events.RemoteAddr: ctx.Conn.RemoteAddr().String(), events.SessionServerID: ctx.srv.ID(), }) - ctx.Infof("[SESSION] joining session: %v", ctx.session.id) + ctx.Infof("Joining existing session %v.", ctx.session.id) _, err := ctx.session.join(ch, req, ctx) return trace.Wrap(err) } @@ -117,7 +141,7 @@ func (s *SessionRegistry) OpenSession(ch ssh.Channel, req *ssh.Request, ctx *Ser } ctx.session = sess s.addSession(sess) - ctx.Infof("[SESSION] new session %v", sid) + ctx.Infof("Creating session %v.", sid) if err := sess.start(ch, ctx); err != nil { sess.Close() @@ -156,10 +180,10 @@ func (s *SessionRegistry) leaveSession(party *party) error { // not lingering anymore? someone reconnected? cool then... no need // to die... if !sess.isLingering() { - log.Infof("[session.registry] session %v becomes active again", sess.id) + s.log.Infof("Session %v has become active again.", sess.id) return } - log.Infof("[session.registry] session %v to be garbage collected", sess.id) + s.log.Infof("Session %v will be garbage collected.", sess.id) // no more people left? Need to end the session! s.Lock() @@ -178,7 +202,7 @@ func (s *SessionRegistry) leaveSession(party *party) error { sess.recorder.Close() if err := sess.Close(); err != nil { - log.Error(err) + s.log.Errorf("Unable to close session %v: %v", sess.id, err) } // mark it as inactive in the DB @@ -215,7 +239,7 @@ func (s *SessionRegistry) getParties(ctx *ServerContext) (parties []*party) { // us that the terminal size has changed func (s *SessionRegistry) NotifyWinChange(params rsession.TerminalParams, ctx *ServerContext) error { if ctx.session == nil { - log.Debugf("notifyWinChange(): no session found!") + s.log.Debugf("Unable to update window size, no session found in context.") return nil } sid := ctx.session.id @@ -242,7 +266,7 @@ func (s *SessionRegistry) NotifyWinChange(params rsession.TerminalParams, ctx *S err := s.srv.GetSessionServer().UpdateSession( rsession.UpdateRequest{ID: sid, TerminalParams: ¶ms, Namespace: s.srv.GetNamespace()}) if err != nil { - log.Error(err) + s.log.Errorf("Unable to update session %v: %v", sid, err) } }() return nil @@ -265,14 +289,129 @@ func (s *SessionRegistry) findSession(id rsession.ID) (*session, bool) { return sess, found } -func NewSessionRegistry(srv Server) *SessionRegistry { - if srv.GetSessionServer() == nil { - panic("need a session server") +func (r *SessionRegistry) PushTermSizeToParty(sconn *ssh.ServerConn, ch ssh.Channel) error { + // the party may not be immediately available for this connection, + // keep asking for a full second: + for i := 0; i < 10; i++ { + party := r.partyForConnection(sconn) + if party == nil { + time.Sleep(time.Millisecond * 100) + continue + } + + // this starts a loop which will keep updating the terminal + // size for every SSH write back to this connection + party.termSizePusher(ch) + return nil } - return &SessionRegistry{ - srv: srv, - sessions: make(map[rsession.ID]*session), + + return trace.Errorf("unable to push term size to party") +} + +// partyForConnection finds an existing party which owns the given connection +func (r *SessionRegistry) partyForConnection(sconn *ssh.ServerConn) *party { + r.Lock() + defer r.Unlock() + + for _, session := range r.sessions { + session.Lock() + defer session.Unlock() + parties := session.parties + for _, party := range parties { + if party.sconn == sconn { + return party + } + } + } + return nil +} + +// sessionRecorder implements io.Writer to be plugged into the multi-writer +// associated with every session. It forwards session stream to the audit log +type sessionRecorder struct { + // log holds the structured logger + log *logrus.Entry + + // alog is the audit log to store session chunks + alog events.IAuditLog + + // sid defines the session to record + sid rsession.ID + + // namespace is session namespace + namespace string +} + +func newSessionRecorder(alog events.IAuditLog, ctx *ServerContext, sid rsession.ID) (*sessionRecorder, error) { + var auditLog events.IAuditLog + var err error + if alog == nil { + auditLog = &events.DiscardAuditLog{} + } else { + auditLog, err = state.NewCachingAuditLog(state.CachingAuditLogConfig{ + Namespace: ctx.srv.GetNamespace(), + SessionID: string(sid), + Server: alog, + }) + if err != nil { + return nil, trace.Wrap(err) + } + } + sr := &sessionRecorder{ + log: logrus.WithFields(logrus.Fields{ + trace.Component: teleport.Component(teleport.ComponentSession, ctx.srv.Component()), + }), + alog: auditLog, + sid: sid, + namespace: ctx.srv.GetNamespace(), + } + return sr, nil +} + +// Write takes a chunk and writes it into the audit log +func (r *sessionRecorder) Write(data []byte) (int, error) { + // we are copying buffer to prevent data corruption: + // io.Copy allocates single buffer and calls multiple writes in a loop + // our PostSessionChunk is async and sends reader wrapping buffer + // to the channel. This can lead to cases when the buffer is re-used + // and data is corrupted unless we copy the data buffer in the first place + dataCopy := make([]byte, len(data)) + copy(dataCopy, data) + // post the chunk of bytes to the audit log: + chunk := &events.SessionChunk{ + Data: dataCopy, + Time: time.Now().UTC().UnixNano(), + } + if err := r.alog.PostSessionSlice(events.SessionSlice{ + Namespace: r.namespace, + SessionID: string(r.sid), + Chunks: []*events.SessionChunk{chunk}, + }); err != nil { + r.log.Error(trace.DebugReport(err)) + } + return len(data), nil +} + +// Close closes audit log caching forwarder. +func (r *sessionRecorder) Close() error { + var errors []error + err := r.alog.Close() + errors = append(errors, err) + + // wait until all events from recorder get flushed, it is important + // to do so before we send SessionEndEvent to advise the audit log + // to release resources associated with this session. + // not doing so will not result in memory leak, but could result + // in missing playback events + context, cancel := context.WithTimeout(context.TODO(), defaults.ReadHeadersTimeout) + defer cancel() // releases resources if slowOperation completes before timeout elapses + err = r.alog.WaitForDelivery(context) + if err != nil { + errors = append(errors, err) + r.log.Warnf("Timeout waiting for session to flush events: %v", trace.DebugReport(err)) } + + return trace.NewAggregate(errors...) } // session struct describes an active (in progress) SSH session. These sessions @@ -280,6 +419,9 @@ func NewSessionRegistry(srv Server) *SessionRegistry { type session struct { sync.Mutex + // log holds the structured logger + log *logrus.Entry + // session ID. unique GUID, this is what people use to "join" sessions id rsession.ID @@ -364,10 +506,13 @@ func newSession(id rsession.ID, r *SessionRegistry, ctx *ServerContext) (*sessio // return nil, trace.Wrap(err) // No need to abort. Perhaps the auth server is down? // Log the error and continue: - log.Errorf("failed logging new session: %v", err) + r.log.Errorf("Failed to create new session: %v.", err) } sess := &session{ + log: logrus.WithFields(logrus.Fields{ + trace.Component: teleport.Component(teleport.ComponentSession, r.srv.Component()), + }), id: id, registry: r, parties: make(map[rsession.ID]*party), @@ -379,71 +524,6 @@ func newSession(id rsession.ID, r *SessionRegistry, ctx *ServerContext) (*sessio return sess, nil } -func (r *SessionRegistry) PushTermSizeToParty(sconn *ssh.ServerConn, ch ssh.Channel) error { - // the party may not be immediately available for this connection, - // keep asking for a full second: - for i := 0; i < 10; i++ { - party := r.partyForConnection(sconn) - if party == nil { - time.Sleep(time.Millisecond * 100) - continue - } - - // this starts a loop which will keep updating the terminal - // size for every SSH write back to this connection - party.termSizePusher(ch) - return nil - } - - return trace.Errorf("unable to push term size to party") -} - -// PartyForConnection finds an existing party which owns the given connection -func (r *SessionRegistry) partyForConnection(sconn *ssh.ServerConn) *party { - r.Lock() - defer r.Unlock() - - for _, session := range r.sessions { - session.Lock() - defer session.Unlock() - parties := session.parties - for _, party := range parties { - if party.sconn == sconn { - return party - } - } - } - return nil -} - -// This goroutine pushes terminal resize events directly into a connected web client -func (p *party) termSizePusher(ch ssh.Channel) { - var ( - err error - n int - ) - defer func() { - if err != nil { - log.Error(err) - } - }() - - for err == nil { - select { - case newSize := <-p.termSizeC: - n, err = ch.Write(newSize) - if err == io.EOF { - continue - } - if err != nil || n == 0 { - return - } - case <-p.closeC: - return - } - } -} - // isLingering returns 'true' if every party has left this session func (s *session) isLingering() bool { s.Lock() @@ -459,7 +539,7 @@ func (s *session) Close() error { // (session writer) will try to close this session, causing a deadlock // because of closeOnce go func() { - log.Infof("session.Close(%v)", s.id) + s.log.Infof("Closing session %v", s.id) if s.term != nil { s.term.Close() } @@ -469,7 +549,7 @@ func (s *session) Close() error { s.writer.Lock() defer s.writer.Unlock() for writerName, writer := range s.writer.writers { - log.Infof("session.close(writer=%v)", writerName) + s.log.Infof("Closing session writer: %v", writerName) closer, ok := io.Writer(writer).(io.WriteCloser) if ok { closer.Close() @@ -480,86 +560,6 @@ func (s *session) Close() error { return nil } -// sessionRecorder implements io.Writer to be plugged into the multi-writer -// associated with every session. It forwards session stream to the audit log -type sessionRecorder struct { - // alog is the audit log to store session chunks - alog events.IAuditLog - // sid defines the session to record - sid rsession.ID - // namespace is session namespace - namespace string -} - -func newSessionRecorder(alog events.IAuditLog, namespace string, sid rsession.ID) (*sessionRecorder, error) { - var auditLog events.IAuditLog - var err error - if alog == nil { - auditLog = &events.DiscardAuditLog{} - } else { - auditLog, err = state.NewCachingAuditLog(state.CachingAuditLogConfig{ - Namespace: namespace, - SessionID: string(sid), - Server: alog, - }) - if err != nil { - return nil, trace.Wrap(err) - } - } - sr := &sessionRecorder{ - alog: auditLog, - sid: sid, - namespace: namespace, - } - return sr, nil -} - -// Write takes a chunk and writes it into the audit log -func (r *sessionRecorder) Write(data []byte) (int, error) { - // we are copying buffer to prevent data corruption: - // io.Copy allocates single buffer and calls multiple writes in a loop - // our PostSessionChunk is async and sends reader wrapping buffer - // to the channel. This can lead to cases when the buffer is re-used - // and data is corrupted unless we copy the data buffer in the first place - dataCopy := make([]byte, len(data)) - copy(dataCopy, data) - // post the chunk of bytes to the audit log: - chunk := &events.SessionChunk{ - Data: dataCopy, - Time: time.Now().UTC().UnixNano(), - } - if err := r.alog.PostSessionSlice(events.SessionSlice{ - Namespace: r.namespace, - SessionID: string(r.sid), - Chunks: []*events.SessionChunk{chunk}, - }); err != nil { - log.Error(trace.DebugReport(err)) - } - return len(data), nil -} - -// Close() closes audit log caching forwarder -func (r *sessionRecorder) Close() error { - var errors []error - err := r.alog.Close() - errors = append(errors, err) - - // wait until all events from recorder get flushed, it is important - // to do so before we send SessionEndEvent to advise the audit log - // to release resources associated with this session. - // not doing so will not result in memory leak, but could result - // in missing playback events - context, cancel := context.WithTimeout(context.TODO(), defaults.ReadHeadersTimeout) - defer cancel() // releases resources if slowOperation completes before timeout elapses - err = r.alog.WaitForDelivery(context) - if err != nil { - errors = append(errors, err) - log.Warningf("timeout waiting for session to flush events: %v", trace.DebugReport(err)) - } - - return trace.NewAggregate(errors...) -} - // start starts a new interactive process (or a shell) in the current session func (s *session) start(ch ssh.Channel, ctx *ServerContext) error { var err error @@ -574,13 +574,13 @@ func (s *session) start(ch ssh.Channel, ctx *ServerContext) error { ctx.SetTerm(nil) } else { if s.term, err = NewTerminal(ctx); err != nil { - ctx.Infof("handleShell failed to create term: %v", err) + ctx.Infof("Unable to allocate new terminal: %v", err) return trace.Wrap(err) } } if err := s.term.Run(); err != nil { - ctx.Errorf("shell command (%v) failed: %v", ctx.ExecRequest.GetCommand(), err) + ctx.Errorf("Unable to run shell command (%v): %v", ctx.ExecRequest.GetCommand(), err) return trace.ConvertSystemError(err) } if err := s.addParty(p); err != nil { @@ -593,7 +593,7 @@ func (s *session) start(ch ssh.Channel, ctx *ServerContext) error { // be a discard audit log if the proxy is in recording mode and a teleport // node so we don't create double recordings. auditLog := s.registry.srv.GetAuditLog() - s.recorder, err = newSessionRecorder(auditLog, ctx.srv.GetNamespace(), s.id) + s.recorder, err = newSessionRecorder(auditLog, ctx, s.id) if err != nil { return trace.Wrap(err) } @@ -626,7 +626,7 @@ func (s *session) start(ch ssh.Channel, ctx *ServerContext) error { defer s.term.AddParty(-1) _, err := io.Copy(s.writer, s.term.PTY()) - log.Debugf("Copying from PTY to writer completed with error %v.", err) + s.log.Debugf("Copying from PTY to writer completed with error %v.", err) // once everything has been copied, notify the goroutine below. if this code // is running in a teleport node, when the exec.Cmd is done it will close @@ -647,7 +647,7 @@ func (s *session) start(ch ssh.Channel, ctx *ServerContext) error { // closed already. select { case <-time.After(defaults.WaitCopyTimeout): - log.Errorf("Timed out waiting for PTY copy to finish, session data for %v may be missing.", s.id) + s.log.Errorf("Timed out waiting for PTY copy to finish, session data for %v may be missing.", s.id) case <-doneCh: } @@ -655,7 +655,7 @@ func (s *session) start(ch ssh.Channel, ctx *ServerContext) error { s.registry.broadcastResult(s.id, *result) } if err != nil { - log.Errorf("shell exited with error: %v", err) + s.log.Errorf("Shell exited with error: %v", err) } else { // no error? this means the command exited cleanly: no need // for this session to "linger" after this. @@ -685,7 +685,7 @@ func (s *session) String() string { // 1. from in-memory dictionary inside of this session // 2. from sessin server's storage func (s *session) removeParty(p *party) error { - p.ctx.Infof("session.removeParty(%v)", p) + p.ctx.Infof("Removing party %v from session %v", p, s.id) ns := s.getNamespace() @@ -702,7 +702,7 @@ func (s *session) removeParty(p *party) error { storageRemove := func(db rsession.Service) { dbSession, err := db.GetSession(ns, s.id) if err != nil { - log.Error(err) + s.log.Error("Unable to get session %v: %v", s.id, err) return } if dbSession != nil && dbSession.RemoveParty(p.id) { @@ -741,22 +741,21 @@ func (s *session) getNamespace() string { // the proxy, then this function does nothing as it's counterpart in the proxy // will do this work. func (s *session) pollAndSync() { - log.Debugf("[session.registry] start pollAndSync()\b") - defer log.Debugf("[session.registry] end pollAndSync()\n") - // If sessions are being recorded at the proxy, an identical version of this // goroutine is running in the proxy, which means it does not need to run here. clusterConfig, err := s.registry.srv.GetAccessPoint().GetClusterConfig() if err != nil { - log.Errorf("Unable to sync terminal size: %v.", err) + s.log.Errorf("Unable to sync terminal size: %v.", err) return } - if clusterConfig.GetSessionRecording() == services.RecordAtProxy { - if s.registry.srv.Component() == teleport.ComponentNode { - return - } + if clusterConfig.GetSessionRecording() == services.RecordAtProxy && + s.registry.srv.Component() == teleport.ComponentNode { + return } + s.log.Debugf("Starting poll and sync of terminal size to all parties.") + defer s.log.Debugf("Stopping poll and sync of terminal size to all parties.") + ns := s.getNamespace() sessionServer := s.registry.srv.GetSessionServer() @@ -783,7 +782,7 @@ func (s *session) pollAndSync() { termSizeChanged := (int(winSize.Width) != sess.TerminalParams.W || int(winSize.Height) != sess.TerminalParams.H) if termSizeChanged { - log.Debugf("terminal has changed from: %v to %v", sess.TerminalParams, winSize) + s.log.Debugf("Terminal changed from: %v to %v", sess.TerminalParams, winSize) err = s.term.SetWinSize(sess.TerminalParams) } return err @@ -793,7 +792,7 @@ func (s *session) pollAndSync() { defer tick.Stop() for { if err := sync(); err != nil { - log.Infof("sync term error: %v", err) + s.log.Infof("Unable to sync terminal: %v", err) errCount++ // if the error count keeps going up, this means we're stuck in // a bad state: end this goroutine to avoid leaks @@ -805,7 +804,6 @@ func (s *session) pollAndSync() { } select { case <-s.closeC: - log.Infof("[SSH] terminal sync stopped") return case <-tick.C: } @@ -844,10 +842,9 @@ func (s *session) addParty(p *party) error { storageUpdate := func(db rsession.Service) { dbSession, err := db.GetSession(s.getNamespace(), s.id) if err != nil { - log.Error(err) + s.log.Errorf("Unable to get session %v: %v", s.id, err) return } - log.Infof("PARTY: %v %v", dbSession, err) dbSession.Parties = append(dbSession.Parties, rsession.Party{ ID: p.id, User: p.user, @@ -865,16 +862,16 @@ func (s *session) addParty(p *party) error { go storageUpdate(s.registry.srv.GetSessionServer()) } - p.ctx.Infof("[SESSION] new party joined: %v", p.String()) + s.log.Infof("New party %v joined session: %v", p.String(), s.id) // this goroutine keeps pumping party's input into the session go func() { defer s.term.AddParty(-1) _, err := io.Copy(s.term.PTY(), p) - p.ctx.Infof("party.io.copy(%v) closed", p.id) if err != nil { - log.Error(err) + s.log.Errorf("Party member %v left session %v due an error: %v", p.id, s.id, err) } + s.log.Infof("Party member %v left session %v.", p.id, s.id) }() return nil } @@ -960,25 +957,10 @@ func (m *multiWriter) Write(p []byte) (n int, err error) { return len(p), nil } -func newParty(s *session, ch ssh.Channel, ctx *ServerContext) *party { - return &party{ - user: ctx.Identity.TeleportUser, - login: ctx.Identity.Login, - serverID: s.registry.srv.ID(), - site: ctx.Conn.RemoteAddr().String(), - id: rsession.NewID(), - ch: ch, - ctx: ctx, - s: s, - sconn: ctx.Conn, - termSizeC: make(chan []byte, 5), - closeC: make(chan bool), - } -} - type party struct { sync.Mutex + log *logrus.Entry login string user string serverID string @@ -994,8 +976,27 @@ type party struct { closeOnce sync.Once } +func newParty(s *session, ch ssh.Channel, ctx *ServerContext) *party { + return &party{ + log: logrus.WithFields(logrus.Fields{ + trace.Component: teleport.Component(teleport.ComponentSession, ctx.srv.Component()), + }), + user: ctx.Identity.TeleportUser, + login: ctx.Identity.Login, + serverID: s.registry.srv.ID(), + site: ctx.Conn.RemoteAddr().String(), + id: rsession.NewID(), + ch: ch, + ctx: ctx, + s: s, + sconn: ctx.Conn, + termSizeC: make(chan []byte, 5), + closeC: make(chan bool), + } +} + func (p *party) onWindowChanged(params *rsession.TerminalParams) { - log.Debugf("party(%s).onWindowChanged(%v)", p.id, params.Serialize()) + p.log.Debugf("Window size changed to %v in party: %v", params.Serialize(), p.id) p.Lock() defer p.Unlock() @@ -1008,6 +1009,34 @@ func (p *party) onWindowChanged(params *rsession.TerminalParams) { } } +// This goroutine pushes terminal resize events directly into a connected web client +func (p *party) termSizePusher(ch ssh.Channel) { + var ( + err error + n int + ) + defer func() { + if err != nil { + p.log.Error(err) + } + }() + + for err == nil { + select { + case newSize := <-p.termSizeC: + n, err = ch.Write(newSize) + if err == io.EOF { + continue + } + if err != nil || n == 0 { + return + } + case <-p.closeC: + return + } + } +} + func (p *party) updateActivity() { p.Lock() defer p.Unlock() @@ -1035,7 +1064,7 @@ func (p *party) String() string { func (p *party) Close() (err error) { p.closeOnce.Do(func() { - p.ctx.Infof("party[%v].Close()", p.id) + p.log.Infof("Closing party %v", p.id) if err = p.s.registry.leaveSession(p); err != nil { p.ctx.Error(err) }