-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathofpmsgtunnel.go
188 lines (172 loc) · 4.4 KB
/
ofpmsgtunnel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package goof
import (
"bytes"
"encoding/binary"
"fmt"
"net"
"strings"
log "github.com/Sirupsen/logrus"
"github.com/kopwei/goof/protocols/ofp10"
"github.com/kopwei/goof/protocols/ofp11"
"github.com/kopwei/goof/protocols/ofp12"
"github.com/kopwei/goof/protocols/ofp13"
"github.com/kopwei/goof/protocols/ofp14"
"github.com/kopwei/goof/protocols/ofp15"
"github.com/kopwei/goof/protocols/ofpgeneral"
)
const (
defaultBufferSize = 50
)
// ofpBufferPool is the message buffer pool
type ofpBufferPool struct {
empty chan *bytes.Buffer
full chan *bytes.Buffer
}
// newBufferPool creates the message buffer pool
func newBufferPool(size int) *ofpBufferPool {
m := &ofpBufferPool{}
m.empty = make(chan *bytes.Buffer, size)
m.full = make(chan *bytes.Buffer, size)
for i := 0; i < size; i++ {
m.empty <- bytes.NewBuffer(make([]byte, 0, 2048))
}
return m
}
// MessageParser is the interface for message parser
type MessageParser interface {
ParseMsg(b []byte) (ofpgeneral.OfpMessage, error)
}
// OfpMessageTunnel is the tunnel of messages in one tcp connection
// between the openflow controller and datapath
type OfpMessageTunnel struct {
conn *net.TCPConn
pool *ofpBufferPool
// Openflow Version
Version uint8
Incomming chan ofpgeneral.OfpMessage
Outgoing chan ofpgeneral.OfpMessage
MsgParser MessageParser
// Channel on which to receive a shutdown command
Shutdown chan bool
// Channel on which to publish connection errors
Error chan error
}
// NewOfpMsgTunnel return the message stream
func NewOfpMsgTunnel(con *net.TCPConn) *OfpMessageTunnel {
msgTunnel := &OfpMessageTunnel{conn: con}
msgTunnel.Incomming = make(chan ofpgeneral.OfpMessage)
msgTunnel.Outgoing = make(chan ofpgeneral.OfpMessage)
msgTunnel.pool = newBufferPool(defaultBufferSize)
msgTunnel.MsgParser = nil
go msgTunnel.sendMessage()
go msgTunnel.receiveMessage()
for i := 0; i < defaultBufferSize/2; i++ {
go msgTunnel.parseWorker()
}
return msgTunnel
}
// SendFeatureRequest is used to send the feature request message to datapath
func (mt *OfpMessageTunnel) SendFeatureRequest() {
header := ofpgeneral.NewOfpHeader(mt.Version)
switch mt.Version {
case ofp10.Version:
header.Type = ofp10.OfpTypeFeaturesRequest
case ofp13.Version:
header.Type = ofp13.OfpTypeFeaturesRequest
}
mt.Outgoing <- header
}
func (mt *OfpMessageTunnel) sendMessage() {
for {
msg := <-mt.Outgoing
data, _ := msg.MarshalBinary()
if _, err := mt.conn.Write(data); err != nil {
log.Printf("Error in sending messages %s", err.Error())
}
}
}
func (mt *OfpMessageTunnel) receiveMessage() {
msg := 0
hdr := 0
hdrBuf := make([]byte, 4)
tmp := make([]byte, 2048)
buf := <-mt.pool.empty
for {
n, err := mt.conn.Read(tmp)
if err != nil {
// Handle explicitly disconnecting by closing connection
if strings.Contains(err.Error(), "use of closed network connection") {
return
}
log.Warnln("InboundError", err)
mt.Error <- err
mt.Shutdown <- true
return
}
for i := 0; i < n; i++ {
if hdr < 4 {
hdrBuf[hdr] = tmp[i]
buf.WriteByte(tmp[i])
hdr++
if hdr >= 4 {
msg = int(binary.BigEndian.Uint16(hdrBuf[2:])) - 4
}
continue
}
if msg > 0 {
buf.WriteByte(tmp[i])
msg = msg - 1
if msg == 0 {
hdr = 0
mt.pool.full <- buf
buf = <-mt.pool.empty
}
continue
}
}
}
}
func (mt *OfpMessageTunnel) parseWorker() {
var err error
for {
msgBufBytes := <-mt.pool.full
if mt.MsgParser == nil {
mt.MsgParser, err = genMsgParser(msgBufBytes.Bytes())
if err != nil {
log.Printf("Message parser generation error %s", err.Error())
}
}
msg, err := mt.MsgParser.ParseMsg(msgBufBytes.Bytes())
if err != nil {
log.Printf("Message parsing error %s", err.Error())
}
mt.Incomming <- msg
msgBufBytes.Reset()
mt.pool.empty <- msgBufBytes
}
}
func genMsgParser(msg []byte) (MessageParser, error) {
version, err := ofpgeneral.GetMessageVersion(msg)
if err != nil {
return nil, err
}
var parser MessageParser
switch version {
case ofp10.Version:
parser = &ofp10.OfpMessageParser{}
case ofp11.Version:
parser = &ofp11.OfpMessageParser{}
case ofp12.Version:
parser = &ofp12.OfpMessageParser{}
case ofp13.Version:
parser = &ofp13.OfpMessageParser{}
case ofp14.Version:
parser = &ofp14.OfpMessageParser{}
case ofp15.Version:
parser = &ofp15.OfpMessageParser{}
default:
parser = nil
err = fmt.Errorf("Unsupported version %d", version)
}
return parser, err
}