-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathmassh.go
213 lines (179 loc) · 6.01 KB
/
massh.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
// Package massh provides tools for running distributed shell commands via SSH.
//
// A Massh config should be configured with the minimum of a Job, Hosts, and an SSHConfig, followed by a call to
// either Run() or Stream(), depending on your requirements.
package massh
import (
"fmt"
"github.com/mitchellh/go-homedir"
"golang.org/x/crypto/ssh"
"io/ioutil"
"path/filepath"
"strings"
)
// Config is a collection of parameters for running distributed SSH commands. A new config should always be generated
// using NewConfig.
type Config struct {
Hosts map[string]struct{}
SSHConfig *ssh.ClientConfig
// Jobs to execute, config will error if both are set
Job *Job
JobStack *[]Job
// Number of concurrent workers
WorkerPool int
BastionHost string
// BastionHost's SSH config. If nil, Bastion will use SSHConfig instead.
BastionHostSSHConfig *ssh.ClientConfig
// Stream-only
SlowTimeout int // Timeout for declaring that a host is slow.
CancelSlowHosts bool // Not implemented. Automatically cancel hosts that are flagged as slow.
Stop chan struct{}
}
// NewConfig initialises a new Config.
func NewConfig() *Config {
return &Config{
Hosts: map[string]struct{}{},
SSHConfig: &ssh.ClientConfig{},
BastionHostSSHConfig: &ssh.ClientConfig{},
Stop: make(chan struct{}, 1),
}
}
// Run returns a slice of results once every host has completed it's work, successful or otherwise.
//
// This is a rudimentary function designed for small, simple jobs that require low complexity. As such, it's
// execution is not affected by SlowTimeout or CancelSlowHosts. The Results returned using this method
// always have an IsSlow value of false.
func (c *Config) Run() ([]Result, error) {
if err := checkJobs(c); err != nil {
return nil, err
}
return run(c), nil
}
/*
Stream populates rs as commands are initiated, and writes each host's output concurrently. It allows
real-time output processing and premature cancellation.
Stdout and Stderr can be read from StdOutStream and StdErrStream respectively.
Example for reading each result in the channel:
```
resultChan := make(chan *Result)
cfg.Stream(resultChan)
for {
result := <-resultChan
go func() {
// do something with the result
}()
}
```
More complete examples can be found in test files or in _examples.
*/
func (c *Config) Stream(rs chan *Result) error {
if err := checkJobs(c); err != nil {
return err
}
if rs == nil {
return fmt.Errorf("stream channel cannot be nil")
}
runStream(c, rs)
return nil
}
// SetPrivateKeyAuth takes the private key file provided, reads it, and adds the key signature to the config.
func (c *Config) SetPrivateKeyAuth(PrivateKeyFile string, PrivateKeyPassphrase string) error {
if strings.HasPrefix(PrivateKeyFile, "~/") {
home, _ := homedir.Dir()
PrivateKeyFile = filepath.Join(home, PrivateKeyFile[2:])
}
key, err := ioutil.ReadFile(PrivateKeyFile)
if err != nil {
return fmt.Errorf("unable to read private key file: %s", err)
}
// Create the Signer for this private key.
var signer ssh.Signer
if PrivateKeyPassphrase == "" {
var err error
signer, err = ssh.ParsePrivateKey(key)
if err != nil {
return fmt.Errorf("unable to parse private key: %s", err)
}
} else {
var err error
signer, err = ssh.ParsePrivateKeyWithPassphrase(key, []byte(PrivateKeyPassphrase))
if err != nil {
return fmt.Errorf("unable to parse private key with passphrase: %s", err)
}
}
c.SSHConfig.Auth = append(c.SSHConfig.Auth, ssh.PublicKeys(signer))
return nil
}
// SetPasswordAuth sets ssh password from provided byte slice (read from terminal)
func (c *Config) SetPasswordAuth(username string, password string) {
c.SSHConfig.User = username
c.SSHConfig.Auth = append(c.SSHConfig.Auth, ssh.Password(password))
}
// SetSlowTimeout sets the SlowTimeout value for config.
func (c *Config) SetSlowTimeout(timeout int) {
c.SlowTimeout = timeout
}
// SetHosts adds a slice of strings as hosts to config. It will filter out duplicate hosts.
func (c *Config) SetHosts(hosts []string) {
for i := range hosts {
c.Hosts[hosts[i]] = struct{}{}
}
}
// SetBastionHost sets the bastion host for config.
func (c *Config) SetBastionHost(host string) {
c.BastionHost = host
}
// SetBastionHostConfig sets the bastion hosts's SSH client config. If value is left nil, SSHConfig will be used instead.
func (c *Config) SetBastionHostConfig(s *ssh.ClientConfig) {
c.BastionHostSSHConfig = s
}
// SetSSHConfig set the SSHConfig for config.
func (c *Config) SetSSHConfig(s *ssh.ClientConfig) {
c.SSHConfig = s
}
// SetJob sets Job for config.
func (c *Config) SetJob(job *Job) {
c.Job = job
}
// SetWorkerPool specifies the number of concurrent workers for config. If numWorkers is less than 1, it will be
// set to 1 instead.
func (c *Config) SetWorkerPool(numWorkers int) {
if numWorkers < 1 {
c.WorkerPool = 1
return
}
c.WorkerPool = numWorkers
}
// SetSSHAuthSock uses SSH_AUTH_SOCK environment variable to populate auth method in the SSHConfig.
//
// This is useful when using keys, and `AgentForwarding` is enabled in the client's SSH config.
func (c *Config) SetSSHAuthSock() error {
// SSH_AUTH_SOCK contains the path of the unix socket that the agent uses for communication with other processes.
SSHAuthSock, err := sshAuthSock()
if err != nil {
return err
}
c.SSHConfig.Auth = append(c.SSHConfig.Auth, SSHAuthSock)
return nil
}
// SetSSHHostKeyCallback sets the HostKeyCallback for the Config's SSHConfig value.
//
// This value should not be set to ssh.InsecureIgnoreHostKey() in production!
func (c *Config) SetSSHHostKeyCallback(callback ssh.HostKeyCallback) {
c.SSHConfig.HostKeyCallback = callback
}
// StopAllSessions stops all active streaming jobs.
func (c *Config) StopAllSessions() {
c.Stop <- struct{}{}
}
// CheckSanity ensures config is valid.
func (c *Config) CheckSanity() error {
if err := checkConfigSanity(c); err != nil {
return err
}
return nil
}
// AutoCancelSlowHosts will cancel/terminate slow host sessions.
func (c *Config) AutoCancelSlowHosts() {
c.CancelSlowHosts = true
}