forked from xbbdjj/grinnodes
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconnect.go
223 lines (196 loc) · 5.57 KB
/
connect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package p2p
import (
"encoding/binary"
"fmt"
"math/rand"
"net"
"sync"
"time"
"github.com/xbbdjj/grinnodes/log"
"github.com/xbbdjj/grinnodes/p2p/message"
"github.com/xbbdjj/grinnodes/storage"
"go.uber.org/zap"
)
type node struct {
addr net.TCPAddr
lastSeen int64
}
var peerAll sync.Map
var peerConnected sync.Map
//Start ... start peer connect
func Start() {
res, err := storage.P2PToConnect()
if err == nil {
for _, p := range res {
n := node{addr: p, lastSeen: time.Now().Unix()}
peerAll.Store(fmt.Sprintf("%s:%d", n.addr.IP, n.addr.Port), n)
}
}
for {
peerAll.Range(func(key, value interface{}) bool {
time.Sleep(time.Second)
n, ok := value.(node)
if ok {
if n.lastSeen < time.Now().Unix()-86400 {
peerAll.Delete(fmt.Sprintf("%s:%d", n.addr.IP, n.addr.Port))
} else {
go work(n.addr)
}
}
return true
})
}
}
//one goroutine to handle one p2p conenct
func work(addr net.TCPAddr) {
_, ok := peerConnected.LoadOrStore(addr.IP.String(), time.Now().Unix)
if ok {
return
}
defer func() {
peerConnected.Delete(addr.IP.String())
}()
conn, err := net.DialTCP("tcp", nil, &addr)
if err != nil {
//msg := err.Error()
//"operation timed out"
//"connection refused"
//"no route to host"
//"network is unreachable"
//"can't assign requested address"
return
}
defer conn.Close()
storage.P2PConnected(addr)
log.Logger.Info("p2p connected", zap.String("ip", addr.IP.String()))
msg := message.NewMainnetMessage(message.MsgTypeHand)
hand := message.Hand{
Version: 3,
Capabilities: 15,
Nonce: uint64(time.Now().Unix()),
TotalDifficulty: 0,
UserAgent: "",
Genesis: "40adad0aec27797b48840aa9e00472015c21baea118ce7a2ff1a82c0f8f5bf82",
}
msg.Payload = hand.Payload()
_, err = conn.Write(msg.Bytes())
if err != nil {
log.Logger.Error("p2p write hand message", zap.String("ip", addr.IP.String()), zap.Error(err))
return
}
tPing := time.NewTicker(time.Second * 30)
defer tPing.Stop()
tPeer := time.NewTicker(time.Minute * 10)
defer tPeer.Stop()
for {
time.Sleep(time.Second * 10)
select {
case _ = <-tPing.C:
ping := message.Ping{
TotalDifficulty: 0,
Height: 0,
}
m := message.NewMainnetMessage(message.MsgTypePing)
m.Payload = ping.Playload()
_, err = conn.Write(m.Bytes())
if err != nil {
log.Logger.Error("p2p write ping message", zap.String("ip", addr.IP.String()), zap.Error(err))
return
}
case _ = <-tPeer.C:
mask := [3]uint32{0x01, 0x02, 0x04}
r := rand.New(rand.NewSource(time.Now().Unix()))
get := message.GetPeerAddrs{
Capabilities: mask[r.Intn(len(mask))],
}
msg := message.NewMainnetMessage(message.MsgTypeGetPeerAddrs)
msg.Payload = get.Playload()
_, err := conn.Write(msg.Bytes())
if err != nil {
log.Logger.Error("p2p write getPeerAddrs message", zap.String("ip", addr.IP.String()), zap.Error(err))
return
}
default:
var magic1 uint8
var magic2 uint8
err := binary.Read(conn, binary.BigEndian, &magic1)
if err != nil {
log.Logger.Error("p2p read magic1", zap.String("ip", addr.IP.String()), zap.Error(err))
return
}
if magic1 != 97 {
continue
}
err = binary.Read(conn, binary.BigEndian, &magic2)
if err != nil {
log.Logger.Error("p2p read magic2", zap.String("ip", addr.IP.String()), zap.Error(err))
return
}
if magic2 != 61 {
continue
}
var msg uint8
err = binary.Read(conn, binary.BigEndian, &msg)
if err != nil {
log.Logger.Error("p2p read msgType", zap.String("ip", addr.IP.String()), zap.Error(err))
return
}
var length uint64
err = binary.Read(conn, binary.BigEndian, &length)
if err != nil {
log.Logger.Error("p2p read length", zap.String("ip", addr.IP.String()), zap.Error(err))
return
}
b := make([]byte, length, length)
err = binary.Read(conn, binary.BigEndian, &b)
if err != nil {
log.Logger.Error("p2p read palyload", zap.String("ip", addr.IP.String()), zap.Error(err))
return
}
storage.P2PConnecting(addr)
if msg == 2 {
shake, err := message.NewShake(b)
// fmt.Printf("%#v\n", shake)
if err != nil {
log.Logger.Error("p2p decode shake", zap.String("ip", addr.IP.String()), zap.Error(err))
} else {
log.Logger.Debug("p2p receive shake", zap.String("ip", addr.IP.String()))
storage.ReceiveShake(addr, shake)
}
} else if msg == 4 {
pong, err := message.NewPong(b)
// fmt.Printf("%#v\n", pong)
if err != nil {
log.Logger.Error("p2p decode pong", zap.String("ip", addr.IP.String()), zap.Error(err))
} else {
log.Logger.Debug("p2p receive pong", zap.String("ip", addr.IP.String()))
storage.ReceivePong(addr, pong)
}
} else if msg == 6 {
peers, err := message.NewPeers(b)
// fmt.Printf("%#v\n", p)
if err != nil {
log.Logger.Error("p2p decode peers", zap.String("ip", addr.IP.String()), zap.Error(err))
} else {
log.Logger.Debug("p2p receive peers", zap.String("ip", addr.IP.String()))
for _, p := range peers {
if storage.IsPublicIP(p.IP) && p.Port > 0 {
n := node{addr: p, lastSeen: time.Now().Unix()}
peerAll.Store(fmt.Sprintf("%s:%d", p.IP, p.Port), n)
storage.AddPeer(p)
}
}
}
} else if msg == 8 {
header, err := message.NewHeader(b)
// fmt.Printf("%#v\n", header)
if err != nil {
log.Logger.Error("p2p decode header", zap.String("ip", addr.IP.String()), zap.Error(err))
} else {
log.Logger.Debug("p2p receive header", zap.String("ip", addr.IP.String()))
storage.ReceiveHeader(addr, header)
}
}
}
}
}