Skip to content

Commit

Permalink
Use log_type in upload batch endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
shazlehu committed Sep 4, 2024
1 parent 36026dd commit d075a42
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 29 deletions.
13 changes: 6 additions & 7 deletions exporter/chronicleexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Log
return fmt.Errorf("marshal logs: %w", err)
}

for _, payload := range payloads {
if err := ce.uploadToChronicleHTTP(ctx, payload); err != nil {
for logType, payload := range payloads {
if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil {
return fmt.Errorf("upload to chronicle: %w", err)
}
}
Expand All @@ -268,14 +268,13 @@ func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Log

// This uses the DataPlane URL for the request
// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}/logTypes/{logtype}/logs:import
func buildEndpoint(cfg *Config) string {
// TODO handle override of LogType
func buildEndpoint(cfg *Config, logType string) string {
// Location Endpoint Version Project Location Instance LogType
formatString := "https://%s-%s/%s/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import"
return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, "v1alpha", cfg.Project, cfg.Location, cfg.CustomerID, cfg.LogType)
return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, "v1alpha", cfg.Project, cfg.Location, cfg.CustomerID, logType)
}

func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest) error {
func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest, logType string) error {

data, err := protojson.Marshal(logs)
if err != nil {
Expand All @@ -298,7 +297,7 @@ func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *ap
body = bytes.NewBuffer(data)
}

request, err := http.NewRequestWithContext(ctx, "POST", buildEndpoint(ce.cfg), body)
request, err := http.NewRequestWithContext(ctx, "POST", buildEndpoint(ce.cfg, logType), body)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
Expand Down
17 changes: 8 additions & 9 deletions exporter/chronicleexporter/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var supportedLogTypes = map[string]string{
//go:generate mockery --name logMarshaler --filename mock_log_marshaler.go --structname MockMarshaler --inpackage
type logMarshaler interface {
MarshalRawLogs(ctx context.Context, ld plog.Logs) ([]*api.BatchCreateLogsRequest, error)
MarshalRawLogsForHTTP(ctx context.Context, ld plog.Logs) ([]*api.ImportLogsRequest, error)
MarshalRawLogsForHTTP(ctx context.Context, ld plog.Logs) (map[string]*api.ImportLogsRequest, error)
}

type protoMarshaler struct {
Expand Down Expand Up @@ -226,7 +226,7 @@ func (m *protoMarshaler) constructPayloads(rawLogs map[string][]*api.LogEntry) [
return payloads
}

func (m *protoMarshaler) MarshalRawLogsForHTTP(ctx context.Context, ld plog.Logs) ([]*api.ImportLogsRequest, error) {
func (m *protoMarshaler) MarshalRawLogsForHTTP(ctx context.Context, ld plog.Logs) (map[string]*api.ImportLogsRequest, error) {
rawLogs, err := m.extractRawHTTPLogs(ctx, ld)
if err != nil {
return nil, fmt.Errorf("extract raw logs: %w", err)
Expand Down Expand Up @@ -279,15 +279,15 @@ func buildForwarderString(cfg Config) string {
return fmt.Sprintf(format, cfg.Project, cfg.Location, cfg.CustomerID, cfg.Forwarder)
}

func (m *protoMarshaler) constructHTTPPayloads(rawLogs map[string][]*api.Log) []*api.ImportLogsRequest {
payloads := make([]*api.ImportLogsRequest, 0, len(rawLogs))
func (m *protoMarshaler) constructHTTPPayloads(rawLogs map[string][]*api.Log) map[string]*api.ImportLogsRequest {
payloads := make(map[string]*api.ImportLogsRequest, len(rawLogs))

// TODO logType
for _, entries := range rawLogs {
for logType, entries := range rawLogs {
if len(entries) > 0 {
payloads = append(payloads,
payloads[logType] =
&api.ImportLogsRequest{
// TODO: Add parent and hint
// We don't yet have solid guidance on what these should be
Parent: "",
Hint: "",

Expand All @@ -297,8 +297,7 @@ func (m *protoMarshaler) constructHTTPPayloads(rawLogs map[string][]*api.Log) []
Logs: entries,
},
},
},
)
}
}
}
return payloads
Expand Down
16 changes: 8 additions & 8 deletions exporter/chronicleexporter/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) {
cfg Config
labels []*api.Label
logRecords func() plog.Logs
expectations func(t *testing.T, requests []*api.ImportLogsRequest)
expectations func(t *testing.T, requests map[string]*api.ImportLogsRequest)
}{
{
name: "Single log record with expected data",
Expand All @@ -221,9 +221,9 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) {
logRecords: func() plog.Logs {
return mockLogs(mockLogRecord("Test log message", map[string]any{"log_type": "WINEVTLOG"}))
},
expectations: func(t *testing.T, requests []*api.ImportLogsRequest) {
expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) {
require.Len(t, requests, 1)
logs := requests[0].GetInlineSource().Logs
logs := requests["WINEVTLOG"].GetInlineSource().Logs
require.Len(t, logs, 1)

// Convert Data (byte slice) to string for comparison
Expand Down Expand Up @@ -251,9 +251,9 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) {
record2.Body().SetStr("Second log message")
return logs
},
expectations: func(t *testing.T, requests []*api.ImportLogsRequest) {
expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) {
require.Len(t, requests, 1, "Expected a single batch request")
logs := requests[0].GetInlineSource().Logs
logs := requests["WINEVTLOG"].GetInlineSource().Logs
require.Len(t, logs, 2, "Expected two log entries in the batch")
// Verifying the first log entry data
require.Equal(t, "First log message", string(logs[0].Data))
Expand All @@ -273,9 +273,9 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) {
logRecords: func() plog.Logs {
return mockLogs(mockLogRecord("", map[string]any{"key1": "value1", "log_type": "WINEVTLOG"}))
},
expectations: func(t *testing.T, requests []*api.ImportLogsRequest) {
expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) {
require.Len(t, requests, 1)
logs := requests[0].GetInlineSource().Logs
logs := requests["WINEVTLOG"].GetInlineSource().Logs
require.Len(t, logs, 1)

// Assuming the attributes are marshaled into the Data field as a JSON string
Expand All @@ -296,7 +296,7 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) {
logRecords: func() plog.Logs {
return plog.NewLogs() // No log records added
},
expectations: func(t *testing.T, requests []*api.ImportLogsRequest) {
expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) {
require.Len(t, requests, 0, "Expected no requests due to no log records")
},
},
Expand Down
10 changes: 5 additions & 5 deletions exporter/chronicleexporter/mock_log_marshaler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d075a42

Please sign in to comment.