This repository has been archived by the owner on Nov 18, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcomms.go
151 lines (132 loc) · 4.1 KB
/
comms.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// base from github.com/go-mangos/mangos/examples/bus/bus.go
package main
import (
"encoding/csv"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"time"
docopt "github.com/docopt/docopt-go"
"github.com/go-mangos/mangos"
"github.com/go-mangos/mangos/protocol/rep"
"github.com/go-mangos/mangos/protocol/req"
"github.com/go-mangos/mangos/transport/ipc"
"github.com/go-mangos/mangos/transport/tcp"
)
type node struct {
port string
host string
}
func die(format string, v ...interface{}) {
fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
os.Exit(1)
}
func core(backups []node, lPort string) {
var sock mangos.Socket // the socket
var err error // catch errors
var recv []byte // the received bytes
cmds := make(map[string]*exec.Cmd) // the ssh commands that were started
msg := []byte("hello from core") // the test message to send from the core node to the backups
prefixURL := "tcp://localhost:" // the URL prefix
bPortBase, err := strconv.Atoi(lPort) // first backup port
if err != nil {
die("strconv.Atoi: %s", err.Error())
}
if sock, err = req.NewSocket(); err != nil {
die("req.NewSocket: %s", err.Error())
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
for index, element := range backups {
currBPort := strconv.Itoa(bPortBase + index) // current backup port
cmds[currBPort] = openSSH(element.port, element.host, currBPort, lPort) // open the SSH tunnel
}
for {
for currBPort := range cmds {
if err = sock.Dial(prefixURL + currBPort); err != nil {
die("socket.Dial: %s", err.Error())
}
time.Sleep(3 * time.Second) // wait for the dial to complete
for { // for now, just do it indefinitely
// eventually, if the wrong reply is received (or no reply) move onto next backup
fmt.Printf("SENDING REQUEST %s\n", string(msg))
if err = sock.Send(msg); err != nil { // send message to the backup
die("sock.Send: %s", err.Error())
}
if recv, err = sock.Recv(); err != nil {
die("sock.Recv: %s", err.Error())
}
fmt.Printf("RECEIVED REPLY %s\n", string(recv))
time.Sleep(3 * time.Second)
}
}
}
}
func backup(lPort string) {
var sock mangos.Socket // the socket
var err error // catch errors
var recv []byte
msg := []byte("hello from backup") // the test message to send from the backup to the core
lURL := "tcp://localhost:" + lPort // the local URL to talk on
if sock, err = rep.NewSocket(); err != nil {
die("rep.NewSocket: %s", err)
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Listen(lURL); err != nil {
die("sock.Listen: %s", err.Error())
}
for {
recv, err = sock.Recv()
if err != nil {
die("sock.Recv: %s", err.Error())
}
fmt.Printf("RECEIVED REQUEST %s\n", string(recv))
fmt.Printf("SENDING REPLY %s\n", string(msg))
err = sock.Send(msg)
if err != nil {
die("can't send reply: %s", err.Error())
}
}
}
func openSSH(rPort string, rHost string, bPort string, lPort string) *exec.Cmd {
cmd := exec.Command("ssh", "-N", "-L", bPort+":localhost:"+lPort, "-i ~/.ssh/id_rsa", "-p"+rPort, rHost) // port forward without opening an SSH session
err := cmd.Start()
if err != nil {
die("cmd.Start: %s", err.Error())
}
return cmd
}
func main() {
usage := `Ark backup node communication.
Usage:
comms [<config.csv>]`
arguments, _ := docopt.ParseDoc(usage)
lPort := "5124" // local port
if arguments["<config.csv>"] == nil {
backup(lPort) // backup node
}
file, err := os.Open(arguments["<config.csv>"].(string)) // the config file for the core node
if err != nil {
die("os.Open: %s", err.Error())
}
defer file.Close()
var backups []node
reader := csv.NewReader(file)
for {
tokens, err := reader.Read() // read the line and break it into tokens using the comma delim
if err == io.EOF {
break
} else if err != nil {
die("reader.Read: %s", err.Error())
}
if len(tokens) != 2 {
die("reader.Read: result not of length 2")
}
n := node{port: tokens[0], host: tokens[1]}
backups = append(backups, n)
}
core(backups, lPort) // core node
}