Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Window Change #1928

Merged
merged 3 commits into from
May 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,9 @@ const (
// SharedDirMode is a mode for a directory shared with group
SharedDirMode = 0750
)

const (
// SessionEvent is sent by servers to clients when an audit event occurs on
// the session.
SessionEvent = "x-teleport-event"
)
114 changes: 114 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2525,6 +2525,120 @@ func runAndMatch(tc *client.TeleportClient, attempts int, command []string, patt
return err
}

// TestWindowChange checks if custom Teleport window change requests are sent
// when the server side PTY changes its size.
func (s *IntSuite) TestWindowChange(c *check.C) {
t := s.newTeleport(c, nil, true)
defer t.Stop(true)

site := t.GetSiteAPI(Site)
c.Assert(site, check.NotNil)

personA := NewTerminal(250)
personB := NewTerminal(250)

// openSession will open a new session on a server.
openSession := func() {
cl, err := t.NewClient(ClientConfig{
Login: s.me.Username,
Cluster: Site,
Host: Host,
Port: t.GetPortSSHInt(),
})
c.Assert(err, check.IsNil)

cl.Stdout = &personA
cl.Stdin = &personA

err = cl.SSH(context.TODO(), []string{}, false)
c.Assert(err, check.IsNil)
}

// joinSession will join the existing session on a server.
joinSession := func() {
// Find the existing session in the backend.
var sessionID string
for {
time.Sleep(time.Millisecond)
sessions, _ := site.GetSessions(defaults.Namespace)
if len(sessions) == 0 {
continue
}
sessionID = string(sessions[0].ID)
break
}

cl, err := t.NewClient(ClientConfig{
Login: s.me.Username,
Cluster: Site,
Host: Host,
Port: t.GetPortSSHInt(),
})
c.Assert(err, check.IsNil)

cl.Stdout = &personB
cl.Stdin = &personB

// Change the size of the window immediately after it is created.
cl.OnShellCreated = func(s *ssh.Session, c *ssh.Client, terminal io.ReadWriteCloser) (exit bool, err error) {
err = s.WindowChange(48, 160)
if err != nil {
return true, trace.Wrap(err)
}
return false, nil
}

for i := 0; i < 10; i++ {
err = cl.Join(context.TODO(), defaults.Namespace, session.ID(sessionID), &personB)
if err == nil {
break
}
}
c.Assert(err, check.IsNil)
}

// waitForOutput checks the output of the passed in terminal of a string until
// some timeout has occured.
waitForOutput := func(t Terminal, s string) error {
tickerCh := time.Tick(500 * time.Millisecond)
timeoutCh := time.After(30 * time.Second)
for {
select {
case <-tickerCh:
if strings.Contains(t.Output(500), s) {
return nil
}
case <-timeoutCh:
return trace.BadParameter("timed out waiting for output")
}
}

}

// Open session, the initial size will be 80x24.
go openSession()

// Use the "printf" command to print the terminal size on the screen and
// make sure it is 80x25.
personA.Type("\aprintf '%s %s\n' $(tput cols) $(tput lines)\n\r\a")
err := waitForOutput(personA, "80 25")
c.Assert(err, check.IsNil)

// As soon as person B joins the session, the terminal is resized to 160x48.
// Have another user join the session. As soon as the second shell is
// created, the window is resized to 160x48 (see joinSession implementation).
go joinSession()

// Use the "printf" command to print the window size again and make sure it's
// 160x48.
personA.Type("\aprintf '%s %s\n' $(tput cols) $(tput lines)\n\r\a")
err = waitForOutput(personA, "160 48")
c.Assert(err, check.IsNil)

// Close the session.
personA.Type("\aexit\r\n\a")
}

// runCommand is a shortcut for running SSH command, it creates a client
// connected to proxy of the passed in instance, runs the command, and returns
// the result. If multiple attempts are requested, a 250 millisecond delay is
Expand Down
30 changes: 29 additions & 1 deletion lib/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,12 @@ type TeleportClient struct {
localAgent *LocalKeyAgent

// OnShellCreated gets called when the shell is created. It's
// safe to keep it nil
// safe to keep it nil.
OnShellCreated ShellCreatedCallback

// eventsCh is a channel used to inform clients about events have that
// occured during the session.
eventsCh chan events.EventFields
}

// ShellCreatedCallback can be supplied for every teleport client. It will
Expand Down Expand Up @@ -568,6 +572,12 @@ func NewClient(c *Config) (tc *TeleportClient, err error) {
tc.Stdin = os.Stdin
}

// Create a buffered channel to hold events that occured during this session.
// This channel must be buffered because the SSH connection directly feeds
// into it. Delays in pulling messages off the global SSH request channel
// could lead to the connection hanging.
tc.eventsCh = make(chan events.EventFields, 1024)

// sometimes we need to use external auth without using local auth
// methods, e.g. in automation daemons
if c.SkipLocalAuth {
Expand Down Expand Up @@ -1500,6 +1510,24 @@ func (tc *TeleportClient) u2fLogin(pub []byte) (*auth.SSHLoginResponse, error) {
return response, trace.Wrap(err)
}

// SendEvent adds a events.EventFields to the channel.
func (tc *TeleportClient) SendEvent(ctx context.Context, e events.EventFields) error {
// Try and send the event to the eventsCh. If blocking, keep blocking until
// the passed in context in canceled.
select {
case tc.eventsCh <- e:
return nil
case <-ctx.Done():
return trace.Wrap(ctx.Err())
}
}

// EventsChannel returns a channel that can be used to listen for events that
// occur for this session.
func (tc *TeleportClient) EventsChannel() <-chan events.EventFields {
return tc.eventsCh
}

// loopbackPool reads trusted CAs if it finds it in a predefined location
// and will work only if target proxy address is loopback
func loopbackPool(proxyAddr string) *x509.CertPool {
Expand Down
69 changes: 62 additions & 7 deletions lib/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/sshutils"
"github.com/gravitational/teleport/lib/sshutils/scp"
Expand Down Expand Up @@ -63,6 +64,7 @@ type NodeClient struct {
Namespace string
Client *ssh.Client
Proxy *ProxyClient
TC *TeleportClient
}

// GetSites returns list of the "sites" (AKA teleport clusters) connected to the proxy
Expand Down Expand Up @@ -420,9 +422,62 @@ func (proxy *ProxyClient) ConnectToNode(ctx context.Context, nodeAddress string,
return nil, trace.Wrap(err)
}

client := ssh.NewClient(conn, chans, reqs)
// We pass an empty channel which we close right away to ssh.NewClient
// because the client need to handle requests itself.
emptyCh := make(chan *ssh.Request)
close(emptyCh)

return &NodeClient{Client: client, Proxy: proxy, Namespace: defaults.Namespace}, nil
client := ssh.NewClient(conn, chans, emptyCh)

nc := &NodeClient{
Client: client,
Proxy: proxy,
Namespace: defaults.Namespace,
TC: proxy.teleportClient,
}

// Start a goroutine that will run for the duration of the client to process
// global requests from the client. Teleport clients will use this to update
// terminal sizes when the remote PTY size has changed.
go nc.handleGlobalRequests(ctx, reqs)

return nc, nil
}

func (c *NodeClient) handleGlobalRequests(ctx context.Context, requestCh <-chan *ssh.Request) {
for {
select {
case r := <-requestCh:
// When the channel is closing, nil is returned.
if r == nil {
return
}

switch r.Type {
case teleport.SessionEvent:
// Parse event and create events.EventFields that can be consumed directly
// by caller.
var e events.EventFields
err := json.Unmarshal(r.Payload, &e)
if err != nil {
log.Warnf("Unable to parse event: %v: %v.", string(r.Payload), err)
continue
}

// Send event to event channel.
err = c.TC.SendEvent(ctx, e)
if err != nil {
log.Warnf("Unable to send event %v: %v.", string(r.Payload), err)
continue
}
default:
// This handles keepalive messages and matches the behaviour of OpenSSH.
r.Reply(false, nil)
}
case <-ctx.Done():
return
}
}
}

// newClientConn is a wrapper around ssh.NewClientConn
Expand Down Expand Up @@ -504,18 +559,18 @@ func (client *NodeClient) Download(remoteSourcePath, localDestinationPath string
// scp runs remote scp command(shellCmd) on the remote server and
// runs local scp handler using scpConf
func (client *NodeClient) scp(scpCommand scp.Command, shellCmd string, errWriter io.Writer) error {
session, err := client.Client.NewSession()
s, err := client.Client.NewSession()
if err != nil {
return trace.Wrap(err)
}
defer session.Close()
defer s.Close()

stdin, err := session.StdinPipe()
stdin, err := s.StdinPipe()
if err != nil {
return trace.Wrap(err)
}

stdout, err := session.StdoutPipe()
stdout, err := s.StdoutPipe()
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -537,7 +592,7 @@ func (client *NodeClient) scp(scpCommand scp.Command, shellCmd string, errWriter
close(closeC)
}()

runErr := session.Run(shellCmd)
runErr := s.Run(shellCmd)
if runErr != nil && err == nil {
err = runErr
}
Expand Down
Loading