diff --git a/gotty-client.go b/gotty-client.go index e8c904f8..35e599d9 100644 --- a/gotty-client.go +++ b/gotty-client.go @@ -13,7 +13,6 @@ import ( "regexp" "strings" "sync" - "sync/atomic" "time" "github.com/Sirupsen/logrus" @@ -79,8 +78,7 @@ type Client struct { URL string WriteMutex *sync.Mutex Output io.Writer - QuitChan chan struct{} - QuitChanClosed int32 // atomic value + poison chan bool SkipTLSVerify bool UseProxyFromEnv bool Connected bool @@ -210,12 +208,11 @@ func (c *Client) Close() { c.Conn.Close() } -// ExitLoop will kill all goroutine +// ExitLoop will kill all goroutines launched by c.Loop() // ExitLoop() -> wait Loop() -> Close() func (c *Client) ExitLoop() { - if atomic.CompareAndSwapInt32(&c.QuitChanClosed, 0, 1) { - close(c.QuitChan) - } + fname := "ExitLoop" + openPoison(fname, c.poison) } // Loop will look indefinitely for new messages @@ -227,23 +224,21 @@ func (c *Client) Loop() error { } } - var wg sync.WaitGroup - done := make(chan bool) + wg := &sync.WaitGroup{} wg.Add(1) - go c.termsizeLoop(&wg) + go c.termsizeLoop(wg) + wg.Add(1) - go c.readLoop(done, &wg) + go c.readLoop(wg) + wg.Add(1) - go c.writeLoop(done, &wg) - select { - case <-done: - if atomic.CompareAndSwapInt32(&c.QuitChanClosed, 0, 1) { - close(c.QuitChan) - } - case <-c.QuitChan: - } + go c.writeLoop(wg) + + /* Wait for all of the above goroutines to finish */ wg.Wait() + + logrus.Debug("Client.Loop() exiting") return nil } @@ -255,8 +250,47 @@ type winsize struct { y uint16 } -func (c *Client) termsizeLoop(wg *sync.WaitGroup) { +type posionReason int + +const ( + committedSuicide = iota + killed +) + +func openPoison(fname string, poison chan bool) posionReason { + logrus.Debug(fname + " suicide") + + /* + * The close() may raise panic if multiple goroutines commit suicide at the + * same time. Prevent that panic from bubbling up. + */ + defer func() { + if r := recover(); r != nil { + logrus.Debug("Prevented panic() of simultaneous suicides", r) + } + }() + + /* Signal others to die */ + close(poison) + return committedSuicide +} + +func die(fname string, poison chan bool) posionReason { + logrus.Debug(fname + " died") + + wasOpen := <-poison + if wasOpen { + logrus.Error("ERROR: The channel was open when it wasn't suppoed to be") + } + + return killed +} + +func (c *Client) termsizeLoop(wg *sync.WaitGroup) posionReason { + defer wg.Done() + fname := "termsizeLoop" + ch := make(chan os.Signal, 1) notifySignalSIGWINCH(ch) defer resetSignalSIGWINCH() @@ -270,8 +304,9 @@ func (c *Client) termsizeLoop(wg *sync.WaitGroup) { } } select { - case <-c.QuitChan: - return + case <-c.poison: + /* Somebody poisoned the well; die */ + return die(fname, c.poison) case <-ch: } } @@ -281,8 +316,10 @@ type exposeFd interface { Fd() uintptr } -func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) { +func (c *Client) writeLoop(wg *sync.WaitGroup) posionReason { + defer wg.Done() + fname := "writeLoop" buff := make([]byte, 128) oldState, err := terminal.MakeRaw(0) @@ -293,37 +330,56 @@ func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) { rdfs := &goselect.FDSet{} reader := io.Reader(os.Stdin) for { + select { + case <-c.poison: + /* Somebody poisoned the well; die */ + return die(fname, c.poison) + default: + } + rdfs.Zero() rdfs.Set(reader.(exposeFd).Fd()) err := goselect.Select(1, rdfs, nil, nil, 50*time.Millisecond) if err != nil { - done <- true - return + return openPoison(fname, c.poison) } if rdfs.IsSet(reader.(exposeFd).Fd()) { size, err := reader.Read(buff) - if size <= 0 || err != nil { - done <- true - return + + if err != nil { + if err == io.EOF { + // Send EOF to GoTTY + + // Send 'Input' marker, as defined in GoTTY::client_context.go, + // followed by EOT (a translation of Ctrl-D for terminals) + err = c.write(append([]byte("0"), byte(4))) + + if err != nil { + return openPoison(fname, c.poison) + } + continue + } else { + return openPoison(fname, c.poison) + } } + + if size <= 0 { + continue + } + data := buff[:size] err = c.write(append([]byte("0"), data...)) if err != nil { - done <- true - return + return openPoison(fname, c.poison) } } - select { - case <-c.QuitChan: - return - default: - break - } } } -func (c *Client) readLoop(done chan bool, wg *sync.WaitGroup) { +func (c *Client) readLoop(wg *sync.WaitGroup) posionReason { + defer wg.Done() + fname := "readLoop" type MessageNonBlocking struct { Data []byte @@ -338,20 +394,21 @@ func (c *Client) readLoop(done chan bool, wg *sync.WaitGroup) { }() select { - case <-c.QuitChan: - return + case <-c.poison: + /* Somebody poisoned the well; die */ + return die(fname, c.poison) case msg := <-msgChan: if msg.Err != nil { - done <- true + if _, ok := msg.Err.(*websocket.CloseError); !ok { logrus.Warnf("c.Conn.ReadMessage: %v", msg.Err) } - return + return openPoison(fname, c.poison) } if len(msg.Data) == 0 { - done <- true + logrus.Warnf("An error has occured") - return + return openPoison(fname, c.poison) } switch msg.Data[0] { case '0': // data @@ -407,6 +464,6 @@ func NewClient(inputURL string) (*Client, error) { URL: url, WriteMutex: &sync.Mutex{}, Output: os.Stdout, - QuitChan: make(chan struct{}), + poison: make(chan bool), }, nil }