Skip to content

Commit

Permalink
use proper log entry batch
Browse files Browse the repository at this point in the history
  • Loading branch information
justinianvoss22 committed Oct 30, 2024
1 parent b5233a8 commit 5323ccd
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 32 deletions.
53 changes: 31 additions & 22 deletions exporter/chronicleexporter/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chronicleexporter

import (
Expand Down Expand Up @@ -78,10 +79,10 @@ func (m *protoMarshaler) MarshalRawLogs(ctx context.Context, ld plog.Logs) ([]*a
}
return m.constructPayloads(rawLogs, namespace, ingestionLabels), nil
}
func (m *protoMarshaler) extractRawLogs(ctx context.Context, ld plog.Logs) (map[string][]*api.LogEntry, map[string]string, map[string]map[string]string, error) {
func (m *protoMarshaler) extractRawLogs(ctx context.Context, ld plog.Logs) (map[string][]*api.LogEntry, map[string]string, map[string][]*api.Label, error) {
entries := make(map[string][]*api.LogEntry)
namespaceMap := make(map[string]string)
ingestionLabelsMap := make(map[string]map[string]string)
ingestionLabelsMap := make(map[string][]*api.Label)
for i := 0; i < ld.ResourceLogs().Len(); i++ {
resourceLog := ld.ResourceLogs().At(i)
for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
Expand Down Expand Up @@ -113,18 +114,18 @@ func (m *protoMarshaler) extractRawLogs(ctx context.Context, ld plog.Logs) (map[
}
if len(ingestionLabels) != 0 {
if ingestionLabelsMap[logType] == nil {
ingestionLabelsMap[logType] = make(map[string]string)
ingestionLabelsMap[logType] = make([]*api.Label, 0)
}
for k, v := range ingestionLabels {
ingestionLabelsMap[logType][k] = v
for _, v := range ingestionLabels {
ingestionLabelsMap[logType] = append(ingestionLabelsMap[logType], v)
}
}
}
}
}
return entries, namespaceMap, ingestionLabelsMap, nil
}
func (m *protoMarshaler) processLogRecord(ctx context.Context, logRecord plog.LogRecord, scope plog.ScopeLogs, resource plog.ResourceLogs) (string, string, string, map[string]string, error) {
func (m *protoMarshaler) processLogRecord(ctx context.Context, logRecord plog.LogRecord, scope plog.ScopeLogs, resource plog.ResourceLogs) (string, string, string, []*api.Label, error) {
rawLog, err := m.getRawLog(ctx, logRecord, scope, resource)
if err != nil {
return "", "", "", nil, err
Expand Down Expand Up @@ -209,24 +210,31 @@ func (m *protoMarshaler) getNamespace(ctx context.Context, logRecord plog.LogRec
}
return m.cfg.Namespace, nil
}
func (m *protoMarshaler) getIngestionLabels(logRecord plog.LogRecord) (map[string]string, error) {
func (m *protoMarshaler) getIngestionLabels(logRecord plog.LogRecord) ([]*api.Label, error) {
configLabels := make([]*api.Label, 0)
for key, value := range m.cfg.IngestionLabels {
configLabels = append(configLabels, &api.Label{
Key: key,
Value: value,
})
}
ingestionLabels, err := m.getRawNestedFields(chronicleIngestionLabelsPrefix, logRecord)
if err != nil {
return m.cfg.IngestionLabels, fmt.Errorf("get chronicle ingestion labels: %w", err)
return configLabels, fmt.Errorf("get chronicle ingestion labels: %w", err)
}
if len(ingestionLabels) != 0 {
return ingestionLabels, nil
}
if m.cfg.OverrideIngestionLabels {
ingestionLabels, err := m.getRawNestedFields(ingestionLabelsPrefix, logRecord)
if err != nil {
return m.cfg.IngestionLabels, fmt.Errorf("get chronicle ingestion labels: %w", err)
return configLabels, fmt.Errorf("get chronicle ingestion labels: %w", err)
}
if len(ingestionLabels) != 0 {
return ingestionLabels, nil
}
}
return m.cfg.IngestionLabels, nil
return configLabels, nil
}
func (m *protoMarshaler) getRawField(ctx context.Context, field string, logRecord plog.LogRecord, scope plog.ScopeLogs, resource plog.ResourceLogs) (string, error) {
lrExpr, err := expr.NewOTTLLogRecordExpression(field, m.teleSettings)
Expand Down Expand Up @@ -255,11 +263,14 @@ func (m *protoMarshaler) getRawField(ctx context.Context, field string, logRecor
}
}

func (m *protoMarshaler) getRawNestedFields(field string, logRecord plog.LogRecord) (map[string]string, error) {
nestedFields := make(map[string]string)
func (m *protoMarshaler) getRawNestedFields(field string, logRecord plog.LogRecord) ([]*api.Label, error) {
nestedFields := make([]*api.Label, 0)
logRecord.Attributes().Range(func(key string, value pcommon.Value) bool {
if strings.HasPrefix(key, field) {
nestedFields[key] = value.AsString()
nestedFields = append(nestedFields, &api.Label{
Key: key,
Value: value.AsString(),
})
}
return true
})
Expand All @@ -269,7 +280,7 @@ func (m *protoMarshaler) getRawNestedFields(field string, logRecord plog.LogReco
return nestedFields, nil
}

func (m *protoMarshaler) constructPayloads(rawLogs map[string][]*api.LogEntry, namespaceMap map[string]string, ingestionLabelsMap map[string]map[string]string) []*api.BatchCreateLogsRequest {
func (m *protoMarshaler) constructPayloads(rawLogs map[string][]*api.LogEntry, namespaceMap map[string]string, ingestionLabelsMap map[string][]*api.Label) []*api.BatchCreateLogsRequest {
payloads := make([]*api.BatchCreateLogsRequest, 0, len(rawLogs))
for logType, entries := range rawLogs {
if len(entries) > 0 {
Expand All @@ -280,16 +291,14 @@ func (m *protoMarshaler) constructPayloads(rawLogs map[string][]*api.LogEntry, n
ingestionLabels := ingestionLabelsMap[logType]
payloads = append(payloads, &api.BatchCreateLogsRequest{
Batch: &api.LogEntryBatch{
StartTime: timestamppb.New(m.startTime),
Entries: entries,
LogType: logType,
Namespace: namespace,
IngestionLabels: ingestionLabels,
StartTime: timestamppb.New(m.startTime),
Entries: entries,
LogType: logType,
Source: &api.EventSource{
CollectorId: m.collectorID,
CustomerId: m.customerID,
Labels: m.labels,
Namespace: m.cfg.Namespace,
Labels: ingestionLabels,
Namespace: namespace,
},
},
})
Expand Down
18 changes: 11 additions & 7 deletions exporter/chronicleexporter/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) {
require.Len(t, requests, 1)
batch := requests[0].Batch
require.Equal(t, "WINEVTLOG", batch.LogType)
require.Equal(t, "test", batch.Namespace)
require.Equal(t, "test", batch.Source.Namespace)
require.Len(t, batch.Entries, 1)

// Convert Data (byte slice) to string for comparison
Expand Down Expand Up @@ -202,9 +202,11 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) {
require.Len(t, requests, 1)
batch := requests[0].Batch
require.Equal(t, "WINEVTLOG", batch.LogType, "Expected log type to be overridden by attribute")
require.Equal(t, "test", batch.Namespace, "Expected namespace to be overridden by attribute")
require.Equal(t, "realvalue1", batch.IngestionLabels[`ingestion_label["realkey1"]`], "Expected ingestion label to be overridden by attribute")
require.Equal(t, "realvalue2", batch.IngestionLabels[`ingestion_label["realkey2"]`], "Expected ingestion label to be overridden by attribute")
require.Equal(t, "test", batch.Source.Namespace, "Expected namespace to be overridden by attribute")
require.Equal(t, `ingestion_label["realkey1"]`, batch.Source.Labels[0].Key, "Expected ingestion label to be overridden by attribute")
require.Equal(t, "realvalue1", batch.Source.Labels[0].Value, "Expected ingestion label to be overridden by attribute")
require.Equal(t, `ingestion_label["realkey2"]`, batch.Source.Labels[1].Key, "Expected ingestion label to be overridden by attribute")
require.Equal(t, "realvalue2", batch.Source.Labels[1].Value, "Expected ingestion label to be overridden by attribute")
},
},
{
Expand All @@ -227,9 +229,11 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) {
require.Len(t, requests, 1)
batch := requests[0].Batch
require.Equal(t, "ASOC_ALERT", batch.LogType, "Expected log type to be overridden by attribute")
require.Equal(t, "test", batch.Namespace, "Expected namespace to be overridden by attribute")
require.Equal(t, "realvalue1", batch.IngestionLabels[`chronicle_ingestion_label["realkey1"]`], "Expected ingestion label to be overridden by attribute")
require.Equal(t, "realvalue2", batch.IngestionLabels[`chronicle_ingestion_label["realkey2"]`], "Expected ingestion label to be overridden by attribute")
require.Equal(t, "test", batch.Source.Namespace, "Expected namespace to be overridden by attribute")
require.Equal(t, `chronicle_ingestion_label["realkey1"]`, batch.Source.Labels[0].Key, "Expected ingestion label to be overridden by attribute")
require.Equal(t, "realvalue1", batch.Source.Labels[0].Value, "Expected ingestion label to be overridden by attribute")
require.Equal(t, `chronicle_ingestion_label["realkey2"]`, batch.Source.Labels[1].Key, "Expected ingestion label to be overridden by attribute")
require.Equal(t, "realvalue2", batch.Source.Labels[1].Value, "Expected ingestion label to be overridden by attribute")
},
},
}
Expand Down
3 changes: 0 additions & 3 deletions exporter/chronicleexporter/protos/api/chronicle_log.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5323ccd

Please sign in to comment.