Skip to content

Commit

Permalink
Fixes notification limit.
Browse files Browse the repository at this point in the history
- Adds helpful logging
- Makes notifcation subscriber event.Event compliant

Closes #15
  • Loading branch information
wfernandes committed Jan 9, 2016
1 parent 877b335 commit d459010
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 12 deletions.
2 changes: 1 addition & 1 deletion notification_processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ func main() {
// Initialize twilio notification service
notifier := notifiers.NewTwilio(config.TwilioAccountSid, config.TwilioAuthToken, config.TwilioFromPhone, config.To)

service := notification.New(notifier, alertChan, config.NotificationIntervalMinutes*time.Minute)
service := notification.New(notifier, alertChan, time.Duration(config.NotificationIntervalMinutes)*time.Minute)
service.Start()
}
4 changes: 4 additions & 0 deletions notification_processor/notification/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (n *NotificationService) Start() {
var err error
logging.Log.Info("Notification service started...")
for event := range n.inputChan {
logging.Log.Debugf("Received event: %s", event.Name)
err = n.notify(event)
if err != nil {
logging.Log.Error("Error notifying", err)
Expand All @@ -49,11 +50,14 @@ func (n *NotificationService) notify(evnt *event.Event) error {
lastNotified, ok := n.sensorsNotified[evnt.Name]
if !ok {
n.sensorsNotified[evnt.Name] = time.Now()
logging.Log.Debugf("Notifying event for %s", evnt.Name)
return n.notifier.Notify(evnt.Data)
}
if lastNotified.Add(n.duration).After(time.Now()) {
return nil
} else {
n.sensorsNotified[evnt.Name] = time.Now()
logging.Log.Debugf("Notifying event for %s", evnt.Name)
return n.notifier.Notify(evnt.Data)
}

Expand Down
14 changes: 11 additions & 3 deletions notification_processor/subscribe/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"sync"

"github.com/wfernandes/iot/event"
"github.com/wfernandes/iot/logging"
)

Expand All @@ -13,7 +14,7 @@ const SENSORS_LIST_KEY = "/wff/v1/sp1/sensors/"
type Subscribe struct {
broker Broker
subscriptions Sensors
outputChan chan string
outputChan chan *event.Event
lock sync.Mutex
}

Expand All @@ -27,7 +28,7 @@ type Sensors struct {
Sensors []string `json:"sensors"`
}

func New(broker Broker, outputChan chan string) *Subscribe {
func New(broker Broker, outputChan chan *event.Event) *Subscribe {
return &Subscribe{
broker: broker,
outputChan: outputChan,
Expand Down Expand Up @@ -64,5 +65,12 @@ func (s *Subscribe) Stop() {
}

func (s *Subscribe) sensorHandler(dat []byte) {
s.outputChan <- string(dat)
var evnt *event.Event
err := json.Unmarshal(dat, &evnt)
if err != nil {
logging.Log.Error("Unable to unmarshall sensor data", err)
logging.Log.Debugf("Sensor Data: %s", string(dat))
return
}
s.outputChan <- evnt
}
23 changes: 17 additions & 6 deletions notification_processor/subscribe/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@ import (

"github.com/wfernandes/iot/notification_processor/subscribe"

"encoding/json"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/wfernandes/iot/event"
)

var _ = Describe("Subscriber", func() {

var (
outputChan chan string
outputChan chan *event.Event
broker *mockBroker
)
Context("without errors", func() {

BeforeEach(func() {
outputChan = make(chan string, 100)
outputChan = make(chan *event.Event, 100)
broker = newMockBroker()
})

Expand Down Expand Up @@ -53,15 +56,23 @@ var _ = Describe("Subscriber", func() {
getSensorList := <-broker.SubscribeInput.arg1
getSensorList(data)

evnt := &event.Event{
Name: "sensor1",
Data: "some touch data",
}
evntBytes, _ := json.Marshal(evnt)

Eventually(broker.SubscribeCalled).Should(Receive(BeTrue()))
touchSensorHandler := <-broker.SubscribeInput.arg1
touchSensorHandler([]byte("some touch data"))
Eventually(outputChan).Should(Receive(Equal("some touch data")))
touchSensorHandler(evntBytes)
Eventually(outputChan).Should(Receive(Equal(evnt)))

evnt.Data = "some sound data"
evntBytes, _ = json.Marshal(evnt)
Eventually(broker.SubscribeCalled).Should(Receive(BeTrue()))
soundSensorHandler := <-broker.SubscribeInput.arg1
soundSensorHandler([]byte("some sound data"))
Eventually(outputChan).Should(Receive(Equal("some sound data")))
soundSensorHandler(evntBytes)
Eventually(outputChan).Should(Receive(Equal(evnt)))

})
})
Expand Down
4 changes: 2 additions & 2 deletions sensor_processor/sensors/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ func (s *SensorService) NewSoundSensor(pin string) {

func (s *SensorService) publish(event *event.Event) {
if s.broker.IsConnected() {
eventData, _ := json.Marshal(event)
eventBytes, _ := json.Marshal(event)
logging.Log.Debugf("Publishing %s", SENSOR_KEY+event.Name)
s.broker.Publish(SENSOR_KEY+event.Name, eventData)
s.broker.Publish(SENSOR_KEY+event.Name, eventBytes)
}
}

Expand Down

0 comments on commit d459010

Please sign in to comment.