Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

telemetry client updated to stream structured events #5

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2603,9 +2603,12 @@ func TestAuthCapabilities(t *testing.T) {
}

func TestClient(t *testing.T) {
// sonic-host:device-test-event is a test event.
// Events client will drop it on floor.
events := [] sdc.Evt_rcvd {
{ "test0", 7, 777 },
{ "test1", 6, 677 },
{ "{\"sonic-host:device-test-event\"", 5, 577 },
{ "test2", 5, 577 },
{ "test3", 4, 477 },
}
Expand Down Expand Up @@ -2653,7 +2656,6 @@ func TestClient(t *testing.T) {

qstr := fmt.Sprintf("all[heartbeat=%d]", HEARTBEAT_SET)
q := createEventsQuery(t, qstr)
// q := createEventsQuery(t, "all")
q.Addrs = []string{"127.0.0.1:8081"}

tests := []struct {
Expand Down Expand Up @@ -2694,16 +2696,18 @@ func TestClient(t *testing.T) {
}()

// wait for half second for subscribeRequest to sync
// and to receive events via notification handler.
time.Sleep(time.Millisecond * 2000)

if len(events) != len(gotNoti) {
t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events))
// -1 to discount test event, which receiver would drop.
if (len(events) - 1) != len(gotNoti) {
t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events)-1)
}

if (heartbeat != HEARTBEAT_SET) {
t.Errorf("Heartbeat is not set %d != expected:%d", heartbeat, HEARTBEAT_SET)
}
fmt.Printf("DONE: events:%d gotNoti=%d\n", len(events), len(gotNoti))
fmt.Printf("DONE: Expect events:%d - 1 gotNoti=%d\n", len(events), len(gotNoti))
})
time.Sleep(time.Millisecond * 1000)

Expand Down
69 changes: 56 additions & 13 deletions sonic_data_client/events_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import "C"

import (
"strconv"
"encoding/json"
"fmt"
"reflect"
"strings"
"sync"
"time"
"unsafe"
Expand All @@ -31,7 +33,11 @@ const SUBSCRIBER_TIMEOUT = (2 * 1000) // 2 seconds
const EVENT_BUFFSZ = 4096

const LATENCY_LIST_SIZE = 10 // Size of list of latencies.
const PQ_MAX_SIZE = 10240 // Max cnt of pending events in PQ.
const PQ_DEF_SIZE = 10240 // Def size for pending events in PQ.
const PQ_MIN_SIZE = 1024 // Min size for pending events in PQ.
const PQ_MAX_SIZE = 102400 // Max size for pending events in PQ.

const HEARTBEAT_MAX = 600 // 10 mins

// STATS counters
const MISSED = "COUNTERS_EVENTS:missed_internal"
Expand All @@ -45,15 +51,19 @@ const STATS_FIELD_NAME = "value"

const EVENTD_PUBLISHER_SOURCE = "{\"sonic-events-eventd"

const TEST_EVENT = "{\"sonic-host:device-test-event"

// Path parameter
const PARAM_HEARTBEAT = "heartbeat"
const PARAM_QSIZE = "qsize"

type EventClient struct {

prefix *gnmipb.Path
path *gnmipb.Path

q *queue.PriorityQueue
pq_max int
channel chan struct{}

wg *sync.WaitGroup // wait for all sub go routines to finish
Expand Down Expand Up @@ -87,6 +97,9 @@ func C_init_subs() unsafe.Pointer {
func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path, logLevel int) (Client, error) {
var evtc EventClient
evtc.prefix = prefix
evtc.pq_max = PQ_DEF_SIZE
log.V(4).Infof("Events priority Q max set default = %v", evtc.pq_max)

for _, path := range paths {
// Only one path is expected. Take the last if many
evtc.path = path
Expand All @@ -97,10 +110,28 @@ func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path, logLevel int) (Cl
for k, v := range keys {
if (k == PARAM_HEARTBEAT) {
if val, err := strconv.Atoi(v); err == nil {
if (val > HEARTBEAT_MAX) {
log.V(4).Infof("heartbeat req %v > max %v; default to max", val, HEARTBEAT_MAX)
val = HEARTBEAT_MAX
}
log.V(7).Infof("evtc.heartbeat_interval is set to %d", val)
Set_heartbeat(val)
}
break
} else if (k == PARAM_QSIZE) {
if val, err := strconv.Atoi(v); err == nil {
qval := val
if (val < PQ_MIN_SIZE) {
val = PQ_MIN_SIZE
} else if val > PQ_MAX_SIZE) {
val = PQ_MAX_SIZE
}
if val != qval {
log.V(4).Infof("Events priority Q request %v updated to nearest limit %v",
qval, val)
}
evtc.pq_max = val
log.V(7).Infof("Events priority Q max set by qsize param = %v", evtc.pq_max)
}
}
}
}
Expand Down Expand Up @@ -279,18 +310,30 @@ func get_events(evtc *EventClient) {

if rc == 0 {
evtc.counters[MISSED] += (uint64)(evt.Missed_cnt)
qlen := evtc.q.Len()

if (qlen < PQ_MAX_SIZE) {
evtTv := &gnmipb.TypedValue {
Value: &gnmipb.TypedValue_StringVal {
StringVal: evt.Event_str,
}}
if err := send_event(evtc, evtTv, evt.Publish_epoch_ms); err != nil {
return

if !strings.HasPrefix(evt.Event_str, TEST_EVENT) {
qlen := evtc.q.Len()

if (qlen < evtc.pq_max) {
var fvp map[string]interface{}
json.Unmarshal([]byte(evt.Event_str), &fvp)

jv, err := json.Marshal(fvp)

if err == nil {
evtTv := &gnmipb.TypedValue {
Value: &gnmipb.TypedValue_JsonIetfVal {
JsonIetfVal: jv,
}}
if err := send_event(evtc, evtTv, evt.Publish_epoch_ms); err != nil {
return
}
} else {
log.V(1).Infof("Invalid event string: %v", evt.Event_str)
}
} else {
evtc.counters[DROPPED] += 1
}
} else {
evtc.counters[DROPPED] += 1
}
}
if evtc.stopped == 1 {
Expand Down