-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnats.go
132 lines (122 loc) · 3.12 KB
/
nats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package main
import (
"encoding/json"
"errors"
"time"
nats "github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
)
//TODO: This needs connection handling logic added. Currently it's pretty rudimentary on failures
//NatsConn struct to satisfy the interface
type NatsConn struct {
Conn *nats.Conn
JS nats.JetStreamContext
}
//Connect to the NATS message queue
func (natsConn *NatsConn) Connect(host, port string, errChan chan error) {
log.Info().Msgf("Connecting to NATS: %v:%v", host, port)
nh := "nats://" + host + ":" + port
conn, err := nats.Connect(nh,
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
errChan <- err
}),
nats.DisconnectHandler(func(_ *nats.Conn) {
errChan <- errors.New("unexpectedly disconnected from nats")
}),
)
if err != nil {
errChan <- err
return
}
log.Debug().Msg("setting up nats connection")
natsConn.Conn = conn
log.Debug().Msg("setting up jetstream")
natsConn.JS, err = conn.JetStream()
if err != nil {
log.Debug().Msg("error connecting to jetstream")
errChan <- err
return
}
log.Debug().Msg("creating streams")
err = natsConn.createStream()
if err != nil {
log.Debug().Msg("error creating stream")
errChan <- err
return
}
log.Info().Msg("successfully connected to nats")
}
//Publish push messages to NATS
func (natsConn *NatsConn) Publish(run *Run) error {
data, err := json.Marshal(run)
if err != nil {
return err
}
log.Debug().RawJSON("results-topic", data)
//log.Debug().Msgf("Publishing scan: %v to topic: %v", string(data), publish)
_, err = natsConn.JS.Publish(publish, data)
if err != nil {
return err
}
return nil
}
/*
* TODO: There's a bug here where a message needs to be acked back after a scan is finished
*/
//Subscribe subscribe to a topic in NATS
func (natsConn *NatsConn) Subscribe(workers int, errChan chan error) chan *Message {
log.Info().Msgf("Listening on topic: %v", subscription)
bch := make(chan *Message, 1)
sub, err := natsConn.JS.PullSubscribe(subscription, durableName, nats.PullMaxWaiting(128), nats.ManualAck())
if err != nil {
errChan <- err
return nil
}
go func() {
for {
msgs, err := sub.Fetch(workers, nats.MaxWait(10*time.Second))
for _, msg := range msgs {
log.Debug().Msgf("got message: %f", msg)
if err != nil {
errChan <- err
}
message := newMessage(msg.Data)
bch <- message
ack := message.Processed()
if !ack {
msg.Nak()
continue
}
msg.Ack()
}
}
}()
return bch
}
//Close the connection
func (natsConn *NatsConn) Close() {
natsConn.Conn.Drain()
}
//Setup the streams
func (natsConn *NatsConn) createStream() error {
log.Debug().Msg("starting create stream function")
stream, err := natsConn.JS.StreamInfo(streamName)
log.Debug().Msg("stream created")
if err != nil {
log.Error().Err(err)
}
log.Debug().Msg("setting up stream config")
natsConfig := &nats.StreamConfig{
Name: streamName,
Subjects: []string{subscription},
}
if stream == nil {
log.Info().Msgf("creating stream %s", subscription)
_, err := natsConn.JS.AddStream(natsConfig)
log.Debug().Msg("stream created")
if err != nil {
return err
}
}
return nil
}