Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not exit on EOF #45

Merged
merged 2 commits into from
Jan 5, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 101 additions & 44 deletions gotty-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -79,8 +78,7 @@ type Client struct {
URL string
WriteMutex *sync.Mutex
Output io.Writer
QuitChan chan struct{}
QuitChanClosed int32 // atomic value
poison chan bool
SkipTLSVerify bool
UseProxyFromEnv bool
Connected bool
Expand Down Expand Up @@ -210,12 +208,11 @@ func (c *Client) Close() {
c.Conn.Close()
}

// ExitLoop will kill all goroutine
// ExitLoop will kill all goroutines launched by c.Loop()
// ExitLoop() -> wait Loop() -> Close()
func (c *Client) ExitLoop() {
if atomic.CompareAndSwapInt32(&c.QuitChanClosed, 0, 1) {
close(c.QuitChan)
}
fname := "ExitLoop"
openPoison(fname, c.poison)
}

// Loop will look indefinitely for new messages
Expand All @@ -227,23 +224,21 @@ func (c *Client) Loop() error {
}
}

var wg sync.WaitGroup
done := make(chan bool)
wg := &sync.WaitGroup{}

wg.Add(1)
go c.termsizeLoop(&wg)
go c.termsizeLoop(wg)

wg.Add(1)
go c.readLoop(done, &wg)
go c.readLoop(wg)

wg.Add(1)
go c.writeLoop(done, &wg)
select {
case <-done:
if atomic.CompareAndSwapInt32(&c.QuitChanClosed, 0, 1) {
close(c.QuitChan)
}
case <-c.QuitChan:
}
go c.writeLoop(wg)

/* Wait for all of the above goroutines to finish */
wg.Wait()

logrus.Debug("Client.Loop() exiting")
return nil
}

Expand All @@ -255,8 +250,47 @@ type winsize struct {
y uint16
}

func (c *Client) termsizeLoop(wg *sync.WaitGroup) {
type posionReason int

const (
committedSuicide = iota
killed
)

func openPoison(fname string, poison chan bool) posionReason {
logrus.Debug(fname + " suicide")

/*
* The close() may raise panic if multiple goroutines commit suicide at the
* same time. Prevent that panic from bubbling up.
*/
defer func() {
if r := recover(); r != nil {
logrus.Debug("Prevented panic() of simultaneous suicides", r)
}
}()

/* Signal others to die */
close(poison)
return committedSuicide
}

func die(fname string, poison chan bool) posionReason {
logrus.Debug(fname + " died")

wasOpen := <-poison
if wasOpen {
logrus.Error("ERROR: The channel was open when it wasn't suppoed to be")
}

return killed
}

func (c *Client) termsizeLoop(wg *sync.WaitGroup) posionReason {

defer wg.Done()
fname := "termsizeLoop"

ch := make(chan os.Signal, 1)
notifySignalSIGWINCH(ch)
defer resetSignalSIGWINCH()
Expand All @@ -270,8 +304,9 @@ func (c *Client) termsizeLoop(wg *sync.WaitGroup) {
}
}
select {
case <-c.QuitChan:
return
case <-c.poison:
/* Somebody poisoned the well; die */
return die(fname, c.poison)
case <-ch:
}
}
Expand All @@ -281,8 +316,10 @@ type exposeFd interface {
Fd() uintptr
}

func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) {
func (c *Client) writeLoop(wg *sync.WaitGroup) posionReason {

defer wg.Done()
fname := "writeLoop"

buff := make([]byte, 128)
oldState, err := terminal.MakeRaw(0)
Expand All @@ -293,37 +330,56 @@ func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) {
rdfs := &goselect.FDSet{}
reader := io.Reader(os.Stdin)
for {
select {
case <-c.poison:
/* Somebody poisoned the well; die */
return die(fname, c.poison)
default:
}

rdfs.Zero()
rdfs.Set(reader.(exposeFd).Fd())
err := goselect.Select(1, rdfs, nil, nil, 50*time.Millisecond)
if err != nil {
done <- true
return
return openPoison(fname, c.poison)
}
if rdfs.IsSet(reader.(exposeFd).Fd()) {
size, err := reader.Read(buff)
if size <= 0 || err != nil {
done <- true
return

if err != nil {
if err == io.EOF {
// Send EOF to GoTTY

// Send 'Input' marker, as defined in GoTTY::client_context.go,
// followed by EOT (a translation of Ctrl-D for terminals)
err = c.write(append([]byte("0"), byte(4)))

if err != nil {
return openPoison(fname, c.poison)
}
continue
} else {
return openPoison(fname, c.poison)
}
}

if size <= 0 {
continue
}

data := buff[:size]
err = c.write(append([]byte("0"), data...))
if err != nil {
done <- true
return
return openPoison(fname, c.poison)
}
}
select {
case <-c.QuitChan:
return
default:
break
}
}
}

func (c *Client) readLoop(done chan bool, wg *sync.WaitGroup) {
func (c *Client) readLoop(wg *sync.WaitGroup) posionReason {

defer wg.Done()
fname := "readLoop"

type MessageNonBlocking struct {
Data []byte
Expand All @@ -338,20 +394,21 @@ func (c *Client) readLoop(done chan bool, wg *sync.WaitGroup) {
}()

select {
case <-c.QuitChan:
return
case <-c.poison:
/* Somebody poisoned the well; die */
return die(fname, c.poison)
case msg := <-msgChan:
if msg.Err != nil {
done <- true

if _, ok := msg.Err.(*websocket.CloseError); !ok {
logrus.Warnf("c.Conn.ReadMessage: %v", msg.Err)
}
return
return openPoison(fname, c.poison)
}
if len(msg.Data) == 0 {
done <- true

logrus.Warnf("An error has occured")
return
return openPoison(fname, c.poison)
}
switch msg.Data[0] {
case '0': // data
Expand Down Expand Up @@ -407,6 +464,6 @@ func NewClient(inputURL string) (*Client, error) {
URL: url,
WriteMutex: &sync.Mutex{},
Output: os.Stdout,
QuitChan: make(chan struct{}),
poison: make(chan bool),
}, nil
}