Skip to content

Commit

Permalink
Add websocket support to API v2
Browse files Browse the repository at this point in the history
New emails are broadcast through both the API v1 event stream and API v2
websocket.
  • Loading branch information
GREsau committed Apr 22, 2016
1 parent 0fbf593 commit e7f979c
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 10 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ deps:
go get github.com/ian-kent/goose
go get github.com/ian-kent/linkio
go get github.com/jteeuwen/go-bindata/...
go get github.com/gorilla/websocket
go get labix.org/v2/mgo

test-deps:
Expand Down
23 changes: 23 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package api

import (
gohttp "net/http"

"github.com/gorilla/pat"
"github.com/mailhog/MailHog-Server/config"
)

func CreateAPI(conf *config.Config, r gohttp.Handler) {
apiv1 := createAPIv1(conf, r.(*pat.Router))
apiv2 := createAPIv2(conf, r.(*pat.Router))

go func() {
for {
select {
case msg := <-conf.MessageChan:
apiv1.messageChan <- msg
apiv2.messageChan <- msg
}
}
}()
}
11 changes: 7 additions & 4 deletions api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/gorilla/pat"
"github.com/ian-kent/go-log/log"
"github.com/mailhog/MailHog-Server/config"
"github.com/mailhog/data"
"github.com/mailhog/storage"

"github.com/ian-kent/goose"
Expand All @@ -24,7 +25,8 @@ import (
//
// Any changes/additions should be added in APIv2.
type APIv1 struct {
config *config.Config
config *config.Config
messageChan chan *data.Message
}

// FIXME should probably move this into APIv1 struct
Expand All @@ -33,10 +35,11 @@ var stream *goose.EventStream
// ReleaseConfig is an alias to preserve go package API
type ReleaseConfig config.OutgoingSMTP

func CreateAPIv1(conf *config.Config, r *pat.Router) *APIv1 {
func createAPIv1(conf *config.Config, r *pat.Router) *APIv1 {
log.Println("Creating API v1 with WebPath: " + conf.WebPath)
apiv1 := &APIv1{
config: conf,
config: conf,
messageChan: make(chan *data.Message),
}

stream = goose.NewEventStream()
Expand Down Expand Up @@ -65,7 +68,7 @@ func CreateAPIv1(conf *config.Config, r *pat.Router) *APIv1 {
keepaliveTicker := time.Tick(time.Minute)
for {
select {
case msg := <-apiv1.config.MessageChan:
case msg := <-apiv1.messageChan:
log.Println("Got message in APIv1 event stream")
bytes, _ := json.MarshalIndent(msg, "", " ")
json := string(bytes)
Expand Down
35 changes: 32 additions & 3 deletions api/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ian-kent/go-log/log"
"github.com/mailhog/MailHog-Server/config"
"github.com/mailhog/MailHog-Server/monkey"
"github.com/mailhog/MailHog-Server/websockets"
"github.com/mailhog/data"
)

Expand All @@ -17,13 +18,17 @@ import (
// It is currently experimental and may change in future releases.
// Use APIv1 for guaranteed compatibility.
type APIv2 struct {
config *config.Config
config *config.Config
messageChan chan *data.Message
wsHub *websockets.Hub
}

func CreateAPIv2(conf *config.Config, r *pat.Router) *APIv2 {
func createAPIv2(conf *config.Config, r *pat.Router) *APIv2 {
log.Println("Creating API v2 with WebPath: " + conf.WebPath)
apiv2 := &APIv2{
config: conf,
config: conf,
messageChan: make(chan *data.Message),
wsHub: websockets.NewHub(),
}

r.Path(conf.WebPath + "/api/v2/messages").Methods("GET").HandlerFunc(apiv2.messages)
Expand All @@ -41,6 +46,18 @@ func CreateAPIv2(conf *config.Config, r *pat.Router) *APIv2 {
r.Path(conf.WebPath + "/api/v2/outgoing-smtp").Methods("GET").HandlerFunc(apiv2.listOutgoingSMTP)
r.Path(conf.WebPath + "/api/v2/outgoing-smtp").Methods("OPTIONS").HandlerFunc(apiv2.defaultOptions)

r.Path(conf.WebPath + "/api/v2/websocket").Methods("GET").HandlerFunc(apiv2.websocket)

go func() {
for {
select {
case msg := <-apiv2.messageChan:
log.Println("Got message in APIv2 websocket channel")
apiv2.broadcast(msg)
}
}
}()

return apiv2
}

Expand Down Expand Up @@ -227,3 +244,15 @@ func (apiv2 *APIv2) listOutgoingSMTP(w http.ResponseWriter, req *http.Request) {
w.Header().Add("Content-Type", "application/json")
w.Write(b)
}

func (apiv2 *APIv2) websocket(w http.ResponseWriter, req *http.Request) {
log.Println("[APIv2] GET /api/v2/websocket")

apiv2.wsHub.Serve(w, req)
}

func (apiv2 *APIv2) broadcast(msg *data.Message) {
log.Println("[APIv2] BROADCAST /api/v2/websocket")

apiv2.wsHub.Broadcast(msg)
}
4 changes: 1 addition & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

gohttp "net/http"

"github.com/gorilla/pat"
"github.com/ian-kent/go-log/log"
"github.com/mailhog/MailHog-Server/api"
"github.com/mailhog/MailHog-Server/config"
Expand Down Expand Up @@ -37,8 +36,7 @@ func main() {

exitCh = make(chan int)
cb := func(r gohttp.Handler) {
api.CreateAPIv1(conf, r.(*pat.Router))
api.CreateAPIv2(conf, r.(*pat.Router))
api.CreateAPI(conf, r)
}
go http.Listen(conf.APIBindAddr, assets.Asset, exitCh, cb)
go smtp.Listen(conf, exitCh)
Expand Down
73 changes: 73 additions & 0 deletions websockets/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package websockets

import (
"time"

"github.com/gorilla/websocket"
)

const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 256
)

type connection struct {
hub *Hub
ws *websocket.Conn
send chan interface{}
}

func (c *connection) readLoop() {
defer func() {
c.hub.unregisterChan <- c
c.ws.Close()
}()
c.ws.SetReadLimit(256)
c.ws.SetReadDeadline(time.Now().Add(pongWait))
c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
if _, _, err := c.ws.NextReader(); err != nil {
return
}
}
}

func (c *connection) writeLoop() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.ws.Close()
}()
for {
select {
case message, ok := <-c.send:
if !ok {
c.writeControl(websocket.CloseMessage)
return
}
if err := c.writeJSON(message); err != nil {
return
}
case <-ticker.C:
if err := c.writeControl(websocket.PingMessage); err != nil {
return
}
}
}
}

func (c *connection) writeJSON(message interface{}) error {
c.ws.SetWriteDeadline(time.Now().Add(writeWait))
return c.ws.WriteJSON(message)
}

func (c *connection) writeControl(messageType int) error {
c.ws.SetWriteDeadline(time.Now().Add(writeWait))
return c.ws.WriteMessage(messageType, []byte{})
}
73 changes: 73 additions & 0 deletions websockets/hub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package websockets

import (
"net/http"

"github.com/gorilla/websocket"
"github.com/ian-kent/go-log/log"
)

type Hub struct {
upgrader websocket.Upgrader
connections map[*connection]bool
messages chan interface{}
registerChan chan *connection
unregisterChan chan *connection
}

func NewHub() *Hub {
hub := &Hub{
upgrader: websocket.Upgrader{
ReadBufferSize: 256,
WriteBufferSize: 4096,
},
connections: make(map[*connection]bool),
messages: make(chan interface{}),
registerChan: make(chan *connection),
unregisterChan: make(chan *connection),
}
go hub.run()
return hub
}

func (h *Hub) run() {
for {
select {
case c := <-h.registerChan:
h.connections[c] = true
case c := <-h.unregisterChan:
h.unregister(c)
case m := <-h.messages:
for c := range h.connections {
select {
case c.send <- m:
default:
h.unregister(c)
}
}
}
}
}

func (h *Hub) unregister(c *connection) {
if _, ok := h.connections[c]; ok {
close(c.send)
delete(h.connections, c)
}
}

func (h *Hub) Serve(w http.ResponseWriter, r *http.Request) {
ws, err := h.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
c := &connection{hub: h, ws: ws, send: make(chan interface{}, 256)}
h.registerChan <- c
go c.writeLoop()
go c.readLoop()
}

func (h *Hub) Broadcast(data interface{}) {
h.messages <- data
}

0 comments on commit e7f979c

Please sign in to comment.