Skip to content

Commit

Permalink
Merge pull request #4 from renukamanavalan/statistics
Browse files Browse the repository at this point in the history
Send event as JsonIetfVal instead of StringVal
  • Loading branch information
renukamanavalan authored Oct 5, 2022
2 parents 38132b6 + 39d4be8 commit 9299ec7
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 17 deletions.
12 changes: 8 additions & 4 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2603,9 +2603,12 @@ func TestAuthCapabilities(t *testing.T) {
}

func TestClient(t *testing.T) {
// sonic-host:device-test-event is a test event.
// Events client will drop it on floor.
events := [] sdc.Evt_rcvd {
{ "test0", 7, 777 },
{ "test1", 6, 677 },
{ "{\"sonic-host:device-test-event\"", 5, 577 },
{ "test2", 5, 577 },
{ "test3", 4, 477 },
}
Expand Down Expand Up @@ -2653,7 +2656,6 @@ func TestClient(t *testing.T) {

qstr := fmt.Sprintf("all[heartbeat=%d]", HEARTBEAT_SET)
q := createEventsQuery(t, qstr)
// q := createEventsQuery(t, "all")
q.Addrs = []string{"127.0.0.1:8081"}

tests := []struct {
Expand Down Expand Up @@ -2694,16 +2696,18 @@ func TestClient(t *testing.T) {
}()

// wait for half second for subscribeRequest to sync
// and to receive events via notification handler.
time.Sleep(time.Millisecond * 2000)

if len(events) != len(gotNoti) {
t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events))
// -1 to discount test event, which receiver would drop.
if (len(events) - 1) != len(gotNoti) {
t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events)-1)
}

if (heartbeat != HEARTBEAT_SET) {
t.Errorf("Heartbeat is not set %d != expected:%d", heartbeat, HEARTBEAT_SET)
}
fmt.Printf("DONE: events:%d gotNoti=%d\n", len(events), len(gotNoti))
fmt.Printf("DONE: Expect events:%d - 1 gotNoti=%d\n", len(events), len(gotNoti))
})
time.Sleep(time.Millisecond * 1000)

Expand Down
69 changes: 56 additions & 13 deletions sonic_data_client/events_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import "C"

import (
"strconv"
"encoding/json"
"fmt"
"reflect"
"strings"
"sync"
"time"
"unsafe"
Expand All @@ -31,7 +33,11 @@ const SUBSCRIBER_TIMEOUT = (2 * 1000) // 2 seconds
const EVENT_BUFFSZ = 4096

const LATENCY_LIST_SIZE = 10 // Size of list of latencies.
const PQ_MAX_SIZE = 10240 // Max cnt of pending events in PQ.
const PQ_DEF_SIZE = 10240 // Def size for pending events in PQ.
const PQ_MIN_SIZE = 1024 // Min size for pending events in PQ.
const PQ_MAX_SIZE = 102400 // Max size for pending events in PQ.

const HEARTBEAT_MAX = 600 // 10 mins

// STATS counters
const MISSED = "COUNTERS_EVENTS:missed_internal"
Expand All @@ -45,15 +51,19 @@ const STATS_FIELD_NAME = "value"

const EVENTD_PUBLISHER_SOURCE = "{\"sonic-events-eventd"

const TEST_EVENT = "{\"sonic-host:device-test-event"

// Path parameter
const PARAM_HEARTBEAT = "heartbeat"
const PARAM_QSIZE = "qsize"

type EventClient struct {

prefix *gnmipb.Path
path *gnmipb.Path

q *queue.PriorityQueue
pq_max int
channel chan struct{}

wg *sync.WaitGroup // wait for all sub go routines to finish
Expand Down Expand Up @@ -87,6 +97,9 @@ func C_init_subs() unsafe.Pointer {
func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path, logLevel int) (Client, error) {
var evtc EventClient
evtc.prefix = prefix
evtc.pq_max = PQ_DEF_SIZE
log.V(4).Infof("Events priority Q max set default = %v", evtc.pq_max)

for _, path := range paths {
// Only one path is expected. Take the last if many
evtc.path = path
Expand All @@ -97,10 +110,28 @@ func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path, logLevel int) (Cl
for k, v := range keys {
if (k == PARAM_HEARTBEAT) {
if val, err := strconv.Atoi(v); err == nil {
if (val > HEARTBEAT_MAX) {
log.V(4).Infof("heartbeat req %v > max %v; default to max", val, HEARTBEAT_MAX)
val = HEARTBEAT_MAX
}
log.V(7).Infof("evtc.heartbeat_interval is set to %d", val)
Set_heartbeat(val)
}
break
} else if (k == PARAM_QSIZE) {
if val, err := strconv.Atoi(v); err == nil {
qval := val
if (val < PQ_MIN_SIZE) {
val = PQ_MIN_SIZE
} else if val > PQ_MAX_SIZE) {
val = PQ_MAX_SIZE
}
if val != qval {
log.V(4).Infof("Events priority Q request %v updated to nearest limit %v",
qval, val)
}
evtc.pq_max = val
log.V(7).Infof("Events priority Q max set by qsize param = %v", evtc.pq_max)
}
}
}
}
Expand Down Expand Up @@ -279,18 +310,30 @@ func get_events(evtc *EventClient) {

if rc == 0 {
evtc.counters[MISSED] += (uint64)(evt.Missed_cnt)
qlen := evtc.q.Len()

if (qlen < PQ_MAX_SIZE) {
evtTv := &gnmipb.TypedValue {
Value: &gnmipb.TypedValue_StringVal {
StringVal: evt.Event_str,
}}
if err := send_event(evtc, evtTv, evt.Publish_epoch_ms); err != nil {
return

if !strings.HasPrefix(evt.Event_str, TEST_EVENT) {
qlen := evtc.q.Len()

if (qlen < evtc.pq_max) {
var fvp map[string]interface{}
json.Unmarshal([]byte(evt.Event_str), &fvp)

jv, err := json.Marshal(fvp)

if err == nil {
evtTv := &gnmipb.TypedValue {
Value: &gnmipb.TypedValue_JsonIetfVal {
JsonIetfVal: jv,
}}
if err := send_event(evtc, evtTv, evt.Publish_epoch_ms); err != nil {
return
}
} else {
log.V(1).Infof("Invalid event string: %v", evt.Event_str)
}
} else {
evtc.counters[DROPPED] += 1
}
} else {
evtc.counters[DROPPED] += 1
}
}
if evtc.stopped == 1 {
Expand Down

0 comments on commit 9299ec7

Please sign in to comment.