diff --git a/go.mod b/go.mod index 978848859..fe6f29126 100644 --- a/go.mod +++ b/go.mod @@ -371,7 +371,7 @@ require ( github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect github.com/observiq/bindplane-otel-collector/counter v1.68.0 // indirect github.com/observiq/bindplane-otel-collector/expr v1.68.0 // indirect - github.com/observiq/bindplane-otel-collector/internal/rehydration v1.62.0 // indirect + github.com/observiq/bindplane-otel-collector/internal/rehydration v1.68.0 // indirect github.com/okta/okta-sdk-golang/v2 v2.20.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/exporter/googlemanagedprometheusexporter v0.116.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension v0.116.0 // indirect diff --git a/receiver/splunksearchapireceiver/README.md b/receiver/splunksearchapireceiver/README.md index c746d2d2e..74264924b 100644 --- a/receiver/splunksearchapireceiver/README.md +++ b/receiver/splunksearchapireceiver/README.md @@ -21,8 +21,8 @@ Unlike other receivers, the SSAPI receiver is not built to collect live data. In | token_type | string | `(no default)` | Specifies the type of token used to authenticate to Splunk using `auth_token`. Accepted values are "Bearer" or "Splunk". | | job_poll_interval | duration | `5s` | The receiver uses an API call to determine if a search has completed. Specifies how long to wait between polling for search job completion. | | searches.query | string | `required (no default)` | The Splunk search to run to retrieve the desired events. Queries must start with `search` and should not contain additional commands, nor any time fields (e.g. `earliesttime`) | -| searches.earliest_time | string | `required (no default)` | The earliest timestamp to collect logs. Only logs that occurred at or after this timestamp will be collected. Must be in ISO 8601 or RFC3339 format. | -| searches.latest_time | string | `required (no default)` | The latest timestamp to collect logs. Only logs that occurred at or before this timestamp will be collected. Must be in ISO 8601 or RFC3339 format. | +| searches.earliest_time | string | `required (no default)` | The earliest timestamp to collect logs. Only logs that occurred at or after this timestamp will be collected. Must be in 'yyyy-MM-ddTHH:mm' format (UTC). | +| searches.latest_time | string | `required (no default)` | The latest timestamp to collect logs. Only logs that occurred at or before this timestamp will be collected. Must be in 'yyyy-MM-ddTHH:mm' format (UTC). | | searches.event_batch_size | int | `100` | The amount of events to query from Splunk for a single request. | | storage | component | `required (no default)` | The component ID of a storage extension which can be used when polling for `logs`. The storage extension prevents duplication of data after an exporter error by remembering which events were previously exported. | @@ -100,4 +100,4 @@ service: receivers: [splunksearchapi] exporters: [googlecloud] ``` -You are now ready to migrate events from Splunk to Google Cloud Logging. \ No newline at end of file +You are now ready to migrate events from Splunk to Google Cloud Logging. diff --git a/receiver/splunksearchapireceiver/config.go b/receiver/splunksearchapireceiver/config.go index da90653ef..9ef5ec586 100644 --- a/receiver/splunksearchapireceiver/config.go +++ b/receiver/splunksearchapireceiver/config.go @@ -20,6 +20,7 @@ import ( "strings" "time" + "github.com/observiq/bindplane-otel-collector/internal/rehydration" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" ) @@ -113,14 +114,14 @@ func (cfg *Config) Validate() error { } // parse time strings to time.Time - _, err := time.Parse(time.RFC3339, search.EarliestTime) + _, err := time.Parse(rehydration.TimeFormat, search.EarliestTime) if err != nil { - return errors.New("earliest_time failed to parse as RFC3339") + return errors.New("earliest_time failed to parse") } - _, err = time.Parse(time.RFC3339, search.LatestTime) + _, err = time.Parse(rehydration.TimeFormat, search.LatestTime) if err != nil { - return errors.New("latest_time failed to parse as RFC3339") + return errors.New("latest_time failed to parse") } } diff --git a/receiver/splunksearchapireceiver/config_test.go b/receiver/splunksearchapireceiver/config_test.go index e228b77bf..04ea2950d 100644 --- a/receiver/splunksearchapireceiver/config_test.go +++ b/receiver/splunksearchapireceiver/config_test.go @@ -42,8 +42,8 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, }, errExpected: true, @@ -57,8 +57,8 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, }, errExpected: true, @@ -72,8 +72,8 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, }, errExpected: true, @@ -87,8 +87,8 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, }, errExpected: true, @@ -103,8 +103,8 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, }, errExpected: true, @@ -121,8 +121,8 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, }, errExpected: true, @@ -136,8 +136,8 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, }, errExpected: true, @@ -160,8 +160,8 @@ func TestValidate(t *testing.T) { storage: "file_storage", searches: []Search{ { - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, }, errExpected: true, @@ -176,7 +176,7 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - LatestTime: "2024-10-30T14:00:00.000Z", + LatestTime: "2024-10-30T14:00", }, }, errExpected: true, @@ -192,11 +192,11 @@ func TestValidate(t *testing.T) { { Query: "search index=_internal", EarliestTime: "-1hr", - LatestTime: "2024-10-30T14:00:00.000Z", + LatestTime: "2024-10-30T14:00", }, }, errExpected: true, - errText: "earliest_time failed to parse as RFC3339", + errText: "earliest_time failed to parse", }, { desc: "Missing latest_time", @@ -207,7 +207,7 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - EarliestTime: "2024-10-30T04:00:00.000Z", + EarliestTime: "2024-10-30T04:00", }, }, errExpected: true, @@ -222,12 +222,12 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - EarliestTime: "2024-10-30T04:00:00.000Z", + EarliestTime: "2024-10-30T04:00", LatestTime: "-1hr", }, }, errExpected: true, - errText: "latest_time failed to parse as RFC3339", + errText: "latest_time failed to parse", }, { desc: "Invalid query chaining", @@ -238,8 +238,8 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal | stats count by sourcetype", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, }, errExpected: true, @@ -254,8 +254,8 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, }, errExpected: false, @@ -269,8 +269,8 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, }, errExpected: false, @@ -284,13 +284,13 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, { Query: "search index=_audit", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, }, errExpected: false, @@ -304,8 +304,8 @@ func TestValidate(t *testing.T) { searches: []Search{ { Query: "search index=_internal", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", Limit: 10, }, }, @@ -319,9 +319,9 @@ func TestValidate(t *testing.T) { storage: "file_storage", searches: []Search{ { - Query: "search index=_internal earliest=2024-10-30T04:00:00.000Z latest=2024-10-30T14:00:00.000Z", - EarliestTime: "2024-10-30T04:00:00.000Z", - LatestTime: "2024-10-30T14:00:00.000Z", + Query: "search index=_internal earliest=2024-10-30T04:00 latest=2024-10-30T14:00", + EarliestTime: "2024-10-30T04:00", + LatestTime: "2024-10-30T14:00", }, }, errExpected: true, diff --git a/receiver/splunksearchapireceiver/go.mod b/receiver/splunksearchapireceiver/go.mod index df6206841..aa9ecd9da 100644 --- a/receiver/splunksearchapireceiver/go.mod +++ b/receiver/splunksearchapireceiver/go.mod @@ -3,6 +3,7 @@ module github.com/observiq/bindplane-otel-collector/receiver/splunksearchapirece go 1.22.7 require ( + github.com/observiq/bindplane-otel-collector/internal/rehydration v1.68.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.116.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.116.0 @@ -87,3 +88,7 @@ require ( google.golang.org/grpc v1.68.1 // indirect google.golang.org/protobuf v1.35.2 // indirect ) + +replace github.com/observiq/bindplane-otel-collector/internal/rehydration => ../../internal/rehydration + +replace github.com/observiq/bindplane-otel-collector/internal/testutils => ../../internal/testutils diff --git a/receiver/splunksearchapireceiver/integration_test.go b/receiver/splunksearchapireceiver/integration_test.go index 624190e03..5170c7b06 100644 --- a/receiver/splunksearchapireceiver/integration_test.go +++ b/receiver/splunksearchapireceiver/integration_test.go @@ -39,8 +39,8 @@ func TestSplunkResultsPaginationFailure(t *testing.T) { cfg.Searches = []Search{ { Query: "search index=otel", - EarliestTime: "2024-11-14T00:00:00.000Z", - LatestTime: "2024-11-14T23:59:59.000Z", + EarliestTime: "2024-11-14T00:00", + LatestTime: "2024-11-14T23:59", EventBatchSize: 5, }, } @@ -104,8 +104,8 @@ func TestExporterFailure(t *testing.T) { cfg.Searches = []Search{ { Query: "search index=otel", - EarliestTime: "2024-11-14T00:00:00.000Z", - LatestTime: "2024-11-14T23:59:59.000Z", + EarliestTime: "2024-11-14T00:00", + LatestTime: "2024-11-14T23:59", EventBatchSize: 3, }, } diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index deb607ce2..bc7429a37 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -20,6 +20,8 @@ import ( "fmt" "time" + "github.com/observiq/bindplane-otel-collector/internal/rehydration" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -120,8 +122,16 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) { } // parse time strings to time.Time - earliestTime, _ := time.Parse(time.RFC3339, search.EarliestTime) - latestTime, _ := time.Parse(time.RFC3339, search.LatestTime) + earliestTime, err := time.Parse(rehydration.TimeFormat, search.EarliestTime) + if err != nil { + ssapir.logger.Error("error parsing earliest time", zap.Error(err)) + return + } + latestTime, err := time.Parse(rehydration.TimeFormat, search.LatestTime) + if err != nil { + ssapir.logger.Error("error parsing earliest time", zap.Error(err)) + return + } // create search in Splunk searchID, err := ssapir.createSplunkSearch(search) @@ -155,18 +165,18 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) { limitReached = true break } - // convert log timestamp to ISO 8601 (UTC() makes RFC 3339 into ISO 8601) + logTimestamp, err := time.Parse(time.RFC3339, splunkLog.Time) if err != nil { ssapir.logger.Error("error parsing log timestamp", zap.Error(err)) break } - if logTimestamp.UTC().Before(earliestTime.UTC()) { - ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime.UTC())) + if logTimestamp.UTC().Before(earliestTime) { + ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime)) break } - if logTimestamp.UTC().After(latestTime.UTC()) { - ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime.UTC())) + if logTimestamp.UTC().After(latestTime) { + ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime)) continue } log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()