From 3e895cf209a6e492663e182f6a3ba30b54856ac1 Mon Sep 17 00:00:00 2001 From: Gurjeet Singh Date: Mon, 7 Nov 2016 15:12:02 -0800 Subject: [PATCH 1/2] Do not exit on EOF Forward the EOF to GoTTY, and let the server-side decide if it wants to terminate the connection. The server closes the connection, and in response we terminate the readLoop which in turn signals writeLoop to terminate via the QuitChan. This allows for the user to pipe commands to gotty-client, and capture all the result sent by the server. For eg. when gotty is launched as `gotty -w bash`, the following command would now wait to capture all output from the server. for (( i = 0 ; i < 2; ++i )); do echo echo $i; echo sleep 2; done | ./gotty-client https://gotty.example.com Before this patch, gotty-client used to exit on encountering EOF from the left side of the pipe. --- gotty-client.go | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/gotty-client.go b/gotty-client.go index e8c904f8..f376a572 100644 --- a/gotty-client.go +++ b/gotty-client.go @@ -293,6 +293,12 @@ func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) { rdfs := &goselect.FDSet{} reader := io.Reader(os.Stdin) for { + select { + case <-c.QuitChan: + return + default: + } + rdfs.Zero() rdfs.Set(reader.(exposeFd).Fd()) err := goselect.Select(1, rdfs, nil, nil, 50*time.Millisecond) @@ -302,10 +308,30 @@ func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) { } if rdfs.IsSet(reader.(exposeFd).Fd()) { size, err := reader.Read(buff) - if size <= 0 || err != nil { - done <- true - return + + if err != nil { + if err == io.EOF { + // Send EOF to GoTTY + + // Send 'Input' marker, as defined in GoTTY::client_context.go, + // followed by EOT (a translation of Ctrl-D for terminals) + err = c.write(append([]byte("0"), byte(4))) + + if err != nil { + done <- true + return + } + continue + } else { + done <- true + return + } + } + + if size <= 0 { + continue } + data := buff[:size] err = c.write(append([]byte("0"), data...)) if err != nil { @@ -313,12 +339,6 @@ func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) { return } } - select { - case <-c.QuitChan: - return - default: - break - } } } From e153cff9c8ce509bea662fe4985df0701ecfba53 Mon Sep 17 00:00:00 2001 From: Gurjeet Singh Date: Sat, 17 Dec 2016 11:53:17 -0800 Subject: [PATCH 2/2] Implement poison-pill method to exit infinite loops All cooperating goroutines regularly try to read from the shared "poison" channel. If the read succeeds, they exit by calling die(), assuming somebody else cracked open the poison pill. When any of these goroutines is done with its job, it signals other goroutines to exit by calling open_poison() on the shared channel. This approach takes advantage of the fact that reads from a closed channel always succeed. The driving goroutine (Client.Loop() in this case), is called from the "main" goroutine. And because when the "main" goroutine exits, the whole program exit (using os.Exit()) irrepective of liveness of other goroutines, we could not use the same "poison" channel to wait in the driving goroutine. Instead, we use sync.WaitGroup to wait for spawned goroutines, because we want the spawned goroutines to cleanup and exit cleanly. --- gotty-client.go | 119 +++++++++++++++++++++++++++++++----------------- 1 file changed, 78 insertions(+), 41 deletions(-) diff --git a/gotty-client.go b/gotty-client.go index f376a572..35e599d9 100644 --- a/gotty-client.go +++ b/gotty-client.go @@ -13,7 +13,6 @@ import ( "regexp" "strings" "sync" - "sync/atomic" "time" "github.com/Sirupsen/logrus" @@ -79,8 +78,7 @@ type Client struct { URL string WriteMutex *sync.Mutex Output io.Writer - QuitChan chan struct{} - QuitChanClosed int32 // atomic value + poison chan bool SkipTLSVerify bool UseProxyFromEnv bool Connected bool @@ -210,12 +208,11 @@ func (c *Client) Close() { c.Conn.Close() } -// ExitLoop will kill all goroutine +// ExitLoop will kill all goroutines launched by c.Loop() // ExitLoop() -> wait Loop() -> Close() func (c *Client) ExitLoop() { - if atomic.CompareAndSwapInt32(&c.QuitChanClosed, 0, 1) { - close(c.QuitChan) - } + fname := "ExitLoop" + openPoison(fname, c.poison) } // Loop will look indefinitely for new messages @@ -227,23 +224,21 @@ func (c *Client) Loop() error { } } - var wg sync.WaitGroup - done := make(chan bool) + wg := &sync.WaitGroup{} wg.Add(1) - go c.termsizeLoop(&wg) + go c.termsizeLoop(wg) + wg.Add(1) - go c.readLoop(done, &wg) + go c.readLoop(wg) + wg.Add(1) - go c.writeLoop(done, &wg) - select { - case <-done: - if atomic.CompareAndSwapInt32(&c.QuitChanClosed, 0, 1) { - close(c.QuitChan) - } - case <-c.QuitChan: - } + go c.writeLoop(wg) + + /* Wait for all of the above goroutines to finish */ wg.Wait() + + logrus.Debug("Client.Loop() exiting") return nil } @@ -255,8 +250,47 @@ type winsize struct { y uint16 } -func (c *Client) termsizeLoop(wg *sync.WaitGroup) { +type posionReason int + +const ( + committedSuicide = iota + killed +) + +func openPoison(fname string, poison chan bool) posionReason { + logrus.Debug(fname + " suicide") + + /* + * The close() may raise panic if multiple goroutines commit suicide at the + * same time. Prevent that panic from bubbling up. + */ + defer func() { + if r := recover(); r != nil { + logrus.Debug("Prevented panic() of simultaneous suicides", r) + } + }() + + /* Signal others to die */ + close(poison) + return committedSuicide +} + +func die(fname string, poison chan bool) posionReason { + logrus.Debug(fname + " died") + + wasOpen := <-poison + if wasOpen { + logrus.Error("ERROR: The channel was open when it wasn't suppoed to be") + } + + return killed +} + +func (c *Client) termsizeLoop(wg *sync.WaitGroup) posionReason { + defer wg.Done() + fname := "termsizeLoop" + ch := make(chan os.Signal, 1) notifySignalSIGWINCH(ch) defer resetSignalSIGWINCH() @@ -270,8 +304,9 @@ func (c *Client) termsizeLoop(wg *sync.WaitGroup) { } } select { - case <-c.QuitChan: - return + case <-c.poison: + /* Somebody poisoned the well; die */ + return die(fname, c.poison) case <-ch: } } @@ -281,8 +316,10 @@ type exposeFd interface { Fd() uintptr } -func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) { +func (c *Client) writeLoop(wg *sync.WaitGroup) posionReason { + defer wg.Done() + fname := "writeLoop" buff := make([]byte, 128) oldState, err := terminal.MakeRaw(0) @@ -294,8 +331,9 @@ func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) { reader := io.Reader(os.Stdin) for { select { - case <-c.QuitChan: - return + case <-c.poison: + /* Somebody poisoned the well; die */ + return die(fname, c.poison) default: } @@ -303,8 +341,7 @@ func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) { rdfs.Set(reader.(exposeFd).Fd()) err := goselect.Select(1, rdfs, nil, nil, 50*time.Millisecond) if err != nil { - done <- true - return + return openPoison(fname, c.poison) } if rdfs.IsSet(reader.(exposeFd).Fd()) { size, err := reader.Read(buff) @@ -318,13 +355,11 @@ func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) { err = c.write(append([]byte("0"), byte(4))) if err != nil { - done <- true - return + return openPoison(fname, c.poison) } continue } else { - done <- true - return + return openPoison(fname, c.poison) } } @@ -335,15 +370,16 @@ func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) { data := buff[:size] err = c.write(append([]byte("0"), data...)) if err != nil { - done <- true - return + return openPoison(fname, c.poison) } } } } -func (c *Client) readLoop(done chan bool, wg *sync.WaitGroup) { +func (c *Client) readLoop(wg *sync.WaitGroup) posionReason { + defer wg.Done() + fname := "readLoop" type MessageNonBlocking struct { Data []byte @@ -358,20 +394,21 @@ func (c *Client) readLoop(done chan bool, wg *sync.WaitGroup) { }() select { - case <-c.QuitChan: - return + case <-c.poison: + /* Somebody poisoned the well; die */ + return die(fname, c.poison) case msg := <-msgChan: if msg.Err != nil { - done <- true + if _, ok := msg.Err.(*websocket.CloseError); !ok { logrus.Warnf("c.Conn.ReadMessage: %v", msg.Err) } - return + return openPoison(fname, c.poison) } if len(msg.Data) == 0 { - done <- true + logrus.Warnf("An error has occured") - return + return openPoison(fname, c.poison) } switch msg.Data[0] { case '0': // data @@ -427,6 +464,6 @@ func NewClient(inputURL string) (*Client, error) { URL: url, WriteMutex: &sync.Mutex{}, Output: os.Stdout, - QuitChan: make(chan struct{}), + poison: make(chan bool), }, nil }