Skip to content

Commit

Permalink
Merge pull request #37 from opsramp/release/v0.112.x
Browse files Browse the repository at this point in the history
Release/v0.112.x
  • Loading branch information
opsrampdeveloper authored Nov 13, 2024
2 parents a11ffe0 + 6dd4a23 commit 940e683
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 25 deletions.
43 changes: 37 additions & 6 deletions exporter/opsrampdebugexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
)

var (
LogsOpsRampChannel = make(chan plog.Logs, 1000)
LogsOpsRampChannel = make(chan plog.Logs, 1000)
EventsOpsRampChannel = make(chan plog.Logs, 100)
)

type debugExporter struct {
Expand Down Expand Up @@ -92,11 +93,41 @@ func (s *debugExporter) pushLogs(_ context.Context, ld plog.Logs) error {
zap.Int("resource logs", ld.ResourceLogs().Len()),
zap.Int("log records", ld.LogRecordCount()))

select {
case LogsOpsRampChannel <- ld:
s.logger.Info("#######LogsExporter: Successfully sent to channel")
default:
s.logger.Info("#######LogsExporter: failed sent to channel")
eventsSlice := plog.NewResourceLogsSlice()
logsSlice := plog.NewResourceLogsSlice()

rlSlice := ld.ResourceLogs()
for i := 0; i < rlSlice.Len(); i++ {
rl := rlSlice.At(i)
resource := rl.Resource()

if val, found := resource.Attributes().Get("type"); found && val.Str() == "event" {
rl.CopyTo(eventsSlice.AppendEmpty())
} else {
rl.CopyTo(logsSlice.AppendEmpty())
}
}

if logsSlice.Len() != 0 {
logs := plog.NewLogs()
logsSlice.CopyTo(logs.ResourceLogs())
select {
case LogsOpsRampChannel <- logs:
s.logger.Info("#######LogsExporter: Successfully sent to logs channel")
default:
s.logger.Info("#######LogsExporter: failed sent to logs channel")
}
}

if eventsSlice.Len() != 0 {
eventLogs := plog.NewLogs()
eventsSlice.CopyTo(eventLogs.ResourceLogs())
select {
case EventsOpsRampChannel <- eventLogs:
s.logger.Info("#######LogsExporter: Successfully sent to events channel")
default:
s.logger.Info("#######LogsExporter: failed sent to eventschannel")
}
}

if s.verbosity == configtelemetry.LevelBasic {
Expand Down
15 changes: 14 additions & 1 deletion receiver/k8seventsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,24 @@ type Config struct {

type InvolvedObjectProperties struct {
// Include only the specified reasons. If its empty, list events of all reasons.
IncludeReasons []string `mapstructure:"include_reasons,omitempty"`
IncludeReasons []ReasonProperties `mapstructure:"include_reasons,omitempty"`

//Can be enhanced to take in object names with reg ex etc.
}

type ReasonProperties struct {
Name string `mapstructure:"name"`
Attributes []KeyValue `mapstructure:"attributes,omitempty"`
}

type KeyValue struct {
// This is a required field.
Key string `mapstructure:"key"`

// This is a required field.
Value any `mapstructure:"value"`
}

func (cfg *Config) Validate() error {

for _, eventType := range cfg.EventTypes {
Expand Down
13 changes: 12 additions & 1 deletion receiver/k8seventsreceiver/k8s_event_to_logdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var severityMap = map[string]plog.SeverityNumber{
}

// k8sEventToLogRecord converts Kubernetes event to plog.LogRecordSlice and adds the resource attributes.
func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event) plog.Logs {
func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event, attributes []KeyValue) plog.Logs {
ld := plog.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()
sl := rl.ScopeLogs().AppendEmpty()
Expand Down Expand Up @@ -74,6 +74,8 @@ func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event) plog.Logs {
attrs := lr.Attributes()
attrs.EnsureCapacity(totalLogAttributes)

attrs.PutStr("k8s.event.type", ev.Type)
attrs.PutStr("k8s.event.sourceComponent", ev.Source.Component)
attrs.PutStr("k8s.event.reason", ev.Reason)
attrs.PutStr("k8s.event.action", ev.Action)
attrs.PutStr("k8s.event.start_time", ev.ObjectMeta.CreationTimestamp.String())
Expand All @@ -87,5 +89,14 @@ func k8sEventToLogData(logger *zap.Logger, ev *corev1.Event) plog.Logs {
attrs.PutInt("k8s.event.count", int64(ev.Count))
}

for _, kv := range attributes {
val := pcommon.NewValueEmpty()
err := val.FromRaw(kv.Value)
if err != nil {
continue
}
val.CopyTo(attrs.PutEmpty(kv.Key))
}

return ld
}
36 changes: 19 additions & 17 deletions receiver/k8seventsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func (kr *k8seventsReceiver) startWatch(ns string, client k8s.Interface) {
}

func (kr *k8seventsReceiver) handleEvent(ev *corev1.Event) {
if kr.allowEvent(ev) {
ld := k8sEventToLogData(kr.settings.Logger, ev)
if attributes, allow := kr.allowEvent(ev); allow {
ld := k8sEventToLogData(kr.settings.Logger, ev, attributes)

ctx := kr.obsrecv.StartLogsOp(kr.ctx)
consumerErr := kr.logsConsumer.ConsumeLogs(ctx, ld)
Expand Down Expand Up @@ -138,10 +138,10 @@ func (kr *k8seventsReceiver) startWatchingNamespace(
// Allow events with eventTimestamp(EventTime/LastTimestamp/FirstTimestamp)
// not older than the receiver start time so that
// event flood can be avoided upon startup.
func (kr *k8seventsReceiver) allowEvent(ev *corev1.Event) bool {
func (kr *k8seventsReceiver) allowEvent(ev *corev1.Event) (attributes []KeyValue, allow bool) {
eventTimestamp := getEventTimestamp(ev)
if eventTimestamp.Before(kr.startTime) {
return false
return attributes, false
}

if len(kr.config.EventTypes) != 0 {
Expand All @@ -154,42 +154,44 @@ func (kr *k8seventsReceiver) allowEvent(ev *corev1.Event) bool {
}
}
if !found {
return false
return attributes, false
}
}

existsInSlice := func(key string, slice []string) bool {
found := false
existsInSlice := func(key string, slice []ReasonProperties) ([]KeyValue, bool) {
for _, k := range slice {
if key == k {
found = true
break
if key == k.Name {
return k.Attributes, true
}
}
return found
return []KeyValue{}, false
}

if len(kr.config.IncludeInvolvedObject) != 0 {
if prop, exists := kr.config.IncludeInvolvedObject[ev.InvolvedObject.Kind]; !exists {
if prop, exists := kr.config.IncludeInvolvedObject["Other"]; !exists {
return false
return attributes, false
} else {
if len(prop.IncludeReasons) != 0 {
if !existsInSlice(ev.Reason, prop.IncludeReasons) {
return false
if reasonAttributes, exists := existsInSlice(ev.Reason, prop.IncludeReasons); !exists {
return attributes, false
} else {
attributes = reasonAttributes
}
}
}
} else {
if len(prop.IncludeReasons) != 0 {
if !existsInSlice(ev.Reason, prop.IncludeReasons) {
return false
if reasonAttributes, exists := existsInSlice(ev.Reason, prop.IncludeReasons); !exists {
return attributes, false
} else {
attributes = reasonAttributes
}
}
}
}

return true
return attributes, true
}

// Return the EventTimestamp based on the populated k8s event timestamps.
Expand Down

0 comments on commit 940e683

Please sign in to comment.