From be7ff1b43e1246cebfb1786ed0aeaf1d995d15ea Mon Sep 17 00:00:00 2001 From: Caleb Hurshman <69878936+Caleb-Hurshman@users.noreply.github.com> Date: Wed, 18 Dec 2024 09:09:28 -0500 Subject: [PATCH] feat: SSAPI Receiver (BPS-289) (#1951) * chore: Update modules to v1.67.0 * fix: QRadar README typo (#2028) Fix README typo * fix: Shut down zombie goroutine in chronicleexporter (#2029) * Properly shut down chronicleexporter zombie goroutine * Fix lint * Fix the same problem for the GRPC workflow * ssapi mvp * lint * tls * WIP * ticker, other pr feedback * pagination functionality * break if results earlier than earliest_time * fix lint * check for earliest/latest in query * config unit tests * package comment * feat(chronicleexporter): Support dynamic namespace and ingestion labels (#1939) * add namespace and ingenstion logs initial commit * working except ingestion labels * ingestion labels from attributes * use proper log entry batch * namespace and ingestion logs no config overwrite * delete OverrideNamespace and OverrideIngestionLabeles * PR changes * fix unit tests * modify tests again * marshal changes * readme and namespace check * debug logs * rm unnecessary clauses * fix error wording * rm space * wip * client tests * checkpoint methods * WIP * functional checkpoint * debug logs, rm print * loadCheckpoint return error * splunk failure test * WIP * encode req body * stricter query validation * storage config test * lint, tidy * return error on export fail * tidy * receiver tests * receiver tests * lint * fix TestCheckpoint * rename structs * exporter fail test * fix search checkpointing * auth token * lint * fix struct name * rm prints, fix error messages * fix tests * default batch size * log end of export * readme * how-to * how-to example config * change how-to conf values * change test batch size * fix test case * fix client test * fix rebase errors * tidy * feat: Enforce request maximum size and number of logs (#2033) * feat: Enforce request maximum size and number of logs * Fix lint * Refactor to be more go-idiomatic * Update Chronicle exporter readme with new flags * fix: Delete empty values iterates through nested arrays (#2034) * delete empty values processor iterates through slices * log body implementation * pr review * initial feedback * chore: Minor cleanup of chronicle exporter names (#2046) * chore: Save component.TelemetrySettings on chronicle exporter (#2047) * chore: Minor cleanup of chronicle exporter names * chore: Chronicle exporter - save component.TelemetrySettings * safe shutdown() * chore: Localize chronicle exporter's metrics concerns (#2048) chore: Pull metrics-specific concerns into hostMetricsReporter * rm err checkk from time parsing * chore: Add debug logging (#2042) Add debug logging * chore: Add new tests for chronicle exporter with http and grpc servers (#2049) * ctx check, doc notes * chore: Rename to `bindplane-otel-collector` (#2043) * rename to bindplane-otel-collector * fix website links * update report card link * fix: Shut down zombie goroutine in chronicleexporter (#2029) * Properly shut down chronicleexporter zombie goroutine * Fix lint * Fix the same problem for the GRPC workflow * chore: Save component.TelemetrySettings on chronicle exporter (#2047) * chore: Minor cleanup of chronicle exporter names * chore: Chronicle exporter - save component.TelemetrySettings * chore: Localize chronicle exporter's metrics concerns (#2048) chore: Pull metrics-specific concerns into hostMetricsReporter * chore: Add new tests for chronicle exporter with http and grpc servers (#2049) * fix: Rebase cleanup (#2063) rebase cleanup * chore: separate http and grpc exporters (#2050) * fix: Shut down zombie goroutine in chronicleexporter (#2029) * Properly shut down chronicleexporter zombie goroutine * Fix lint * Fix the same problem for the GRPC workflow * ssapi mvp * initial feedback * chore: Save component.TelemetrySettings on chronicle exporter (#2047) * chore: Minor cleanup of chronicle exporter names * chore: Chronicle exporter - save component.TelemetrySettings * chore: Localize chronicle exporter's metrics concerns (#2048) chore: Pull metrics-specific concerns into hostMetricsReporter * chore: Add new tests for chronicle exporter with http and grpc servers (#2049) * chore: Save component.TelemetrySettings on chronicle exporter (#2047) * chore: Minor cleanup of chronicle exporter names * chore: Chronicle exporter - save component.TelemetrySettings * chore: Localize chronicle exporter's metrics concerns (#2048) chore: Pull metrics-specific concerns into hostMetricsReporter * chore: Add new tests for chronicle exporter with http and grpc servers (#2049) * fix: Shut down zombie goroutine in chronicleexporter (#2029) * Properly shut down chronicleexporter zombie goroutine * Fix lint * Fix the same problem for the GRPC workflow * fix rebase stuff --------- Co-authored-by: Dakota Paasman <122491662+dpaasman00@users.noreply.github.com> Co-authored-by: Sam Hazlehurst Co-authored-by: Ian Adams Co-authored-by: Justin Voss <90650155+justinianvoss22@users.noreply.github.com> Co-authored-by: Daniel Jaglowski --- factories/receivers.go | 2 + go.mod | 3 + receiver/splunksearchapireceiver/README.md | 103 +++++ receiver/splunksearchapireceiver/client.go | 175 +++++++++ .../splunksearchapireceiver/client_test.go | 176 +++++++++ receiver/splunksearchapireceiver/config.go | 128 +++++++ .../splunksearchapireceiver/config_test.go | 351 ++++++++++++++++++ receiver/splunksearchapireceiver/factory.go | 58 +++ .../splunksearchapireceiver/factory_test.go | 42 +++ receiver/splunksearchapireceiver/go.mod | 89 +++++ receiver/splunksearchapireceiver/go.sum | 213 +++++++++++ .../integration_test.go | 220 +++++++++++ .../splunksearchapireceiver/metadata.yaml | 7 + receiver/splunksearchapireceiver/model.go | 69 ++++ receiver/splunksearchapireceiver/receiver.go | 314 ++++++++++++++++ .../splunksearchapireceiver/receiver_test.go | 225 +++++++++++ .../logs/testPollJobStatus/input-done.xml | 212 +++++++++++ .../logs/testPollJobStatus/input-queued.xml | 212 +++++++++++ 18 files changed, 2599 insertions(+) create mode 100644 receiver/splunksearchapireceiver/README.md create mode 100644 receiver/splunksearchapireceiver/client.go create mode 100644 receiver/splunksearchapireceiver/client_test.go create mode 100644 receiver/splunksearchapireceiver/config.go create mode 100644 receiver/splunksearchapireceiver/config_test.go create mode 100644 receiver/splunksearchapireceiver/factory.go create mode 100644 receiver/splunksearchapireceiver/factory_test.go create mode 100644 receiver/splunksearchapireceiver/go.mod create mode 100644 receiver/splunksearchapireceiver/go.sum create mode 100644 receiver/splunksearchapireceiver/integration_test.go create mode 100644 receiver/splunksearchapireceiver/metadata.yaml create mode 100644 receiver/splunksearchapireceiver/model.go create mode 100644 receiver/splunksearchapireceiver/receiver.go create mode 100644 receiver/splunksearchapireceiver/receiver_test.go create mode 100644 receiver/splunksearchapireceiver/testdata/logs/testPollJobStatus/input-done.xml create mode 100644 receiver/splunksearchapireceiver/testdata/logs/testPollJobStatus/input-queued.xml diff --git a/factories/receivers.go b/factories/receivers.go index eba6f6d19..472237e3f 100644 --- a/factories/receivers.go +++ b/factories/receivers.go @@ -23,6 +23,7 @@ import ( "github.com/observiq/bindplane-otel-collector/receiver/pluginreceiver" "github.com/observiq/bindplane-otel-collector/receiver/routereceiver" "github.com/observiq/bindplane-otel-collector/receiver/sapnetweaverreceiver" + "github.com/observiq/bindplane-otel-collector/receiver/splunksearchapireceiver" "github.com/observiq/bindplane-otel-collector/receiver/telemetrygeneratorreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/activedirectorydsreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/aerospikereceiver" @@ -157,6 +158,7 @@ var defaultReceivers = []receiver.Factory{ sapnetweaverreceiver.NewFactory(), simpleprometheusreceiver.NewFactory(), snmpreceiver.NewFactory(), + splunksearchapireceiver.NewFactory(), splunkhecreceiver.NewFactory(), sqlqueryreceiver.NewFactory(), sqlserverreceiver.NewFactory(), diff --git a/go.mod b/go.mod index 8cf753284..5e700cd9d 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/observiq/bindplane-otel-collector/receiver/pluginreceiver v1.67.0 github.com/observiq/bindplane-otel-collector/receiver/routereceiver v1.67.0 github.com/observiq/bindplane-otel-collector/receiver/sapnetweaverreceiver v1.67.0 + github.com/observiq/bindplane-otel-collector/receiver/splunksearchapireceiver v1.67.0 github.com/observiq/bindplane-otel-collector/receiver/telemetrygeneratorreceiver v1.67.0 github.com/oklog/ulid/v2 v2.1.0 github.com/open-telemetry/opamp-go v0.17.0 @@ -875,6 +876,8 @@ replace github.com/observiq/bindplane-otel-collector/internal/report => ./intern replace github.com/observiq/bindplane-otel-collector/internal/measurements => ./internal/measurements +replace github.com/observiq/bindplane-otel-collector/receiver/splunksearchapireceiver => ./receiver/splunksearchapireceiver + // Does not build with windows and only used in configschema executable // Relevant issue https://github.com/mattn/go-ieproxy/issues/45 replace github.com/mattn/go-ieproxy => github.com/mattn/go-ieproxy v0.0.1 diff --git a/receiver/splunksearchapireceiver/README.md b/receiver/splunksearchapireceiver/README.md new file mode 100644 index 000000000..c746d2d2e --- /dev/null +++ b/receiver/splunksearchapireceiver/README.md @@ -0,0 +1,103 @@ +# Splunk Search API Receiver +This receiver collects Splunk events using the [Splunk Search API](https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch). + +## Supported Pipelines +- Logs + +## Prerequisites +- Splunk admin credentials +- Configured storage extension + +## Use Case +Unlike other receivers, the SSAPI receiver is not built to collect live data. Instead, it collects a finite set of historical data and transfers it to a destination, preserving the timestamp from the source. For this reason, the SSAPI recevier only needs to be left running until all Splunk events have been migrated, which is denoted by the log message: "all search results exported". Until this log message or some other error is printed, avoid cancelling the collector for any reason, as it will unnecessarily interfere with the receiver's ability to protect against writing duplicate events. + +## Configuration +| Field | Type | Default | Description | +|---------------------|----------|-------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| endpoint | string | `required` `(no default)` | The endpoint of the splunk instance to collect from. | +| splunk_username | string | `(no default)` | Specifies the username used to authenticate to Splunk using basic auth. | +| splunk_password | string | `(no default)` | Specifies the password used to authenticate to Splunk using basic auth. | +| auth_token | string | `(no default)` | Specifies the token used to authenticate to Splunk using token auth. Mutually exclusive with basic auth using `splunk_username` and `splunk_password`. | +| token_type | string | `(no default)` | Specifies the type of token used to authenticate to Splunk using `auth_token`. Accepted values are "Bearer" or "Splunk". | +| job_poll_interval | duration | `5s` | The receiver uses an API call to determine if a search has completed. Specifies how long to wait between polling for search job completion. | +| searches.query | string | `required (no default)` | The Splunk search to run to retrieve the desired events. Queries must start with `search` and should not contain additional commands, nor any time fields (e.g. `earliesttime`) | +| searches.earliest_time | string | `required (no default)` | The earliest timestamp to collect logs. Only logs that occurred at or after this timestamp will be collected. Must be in ISO 8601 or RFC3339 format. | +| searches.latest_time | string | `required (no default)` | The latest timestamp to collect logs. Only logs that occurred at or before this timestamp will be collected. Must be in ISO 8601 or RFC3339 format. | +| searches.event_batch_size | int | `100` | The amount of events to query from Splunk for a single request. | +| storage | component | `required (no default)` | The component ID of a storage extension which can be used when polling for `logs`. The storage extension prevents duplication of data after an exporter error by remembering which events were previously exported. | + +### Example Configuration +```yaml +receivers: + splunksearchapi: + endpoint: "https://splunk-c4-0.example.localnet:8089" + tls: + insecure_skip_verify: true + splunk_username: "user" + splunk_password: "pass" + job_poll_interval: 5s + searches: + - query: 'search index=my_index' + earliest_time: "2024-11-01T01:00:00.000-05:00" + latest_time: "2024-11-30T23:59:59.999-05:00" + event_batch_size: 500 + storage: file_storage + +extensions: + file_storage: + directory: "./local/storage" +``` + +## How To + +### Migrate historical events to Google Cloud Logging +1. Identify the Splunk index to migrate events from. Create a Splunk search to capture the events from that index. This will be the `searches.query` you pass to the receiver. + - Example: `search index=my_index` + - Note: queries must begin with the explicit `search` command, and must not include additional commands, nor any time fields (e.g. `earliesttime`) +2. Determine the timeframe you want to migrate events from, and set the `searches.earliest_time` and `searches.latest_time` config fields accordingly. + - To migrate events from December 2024, EST (UTC-5): + - `earliest_time: "2024-12-01T00:00:00.000-05:00"` + - `latest_time: "2024-12-31T23:59:59.999-05:00"` + - Note: By default, GCL will not accept logs with a timestamp older than 30 days. Contact Google to modify this rule. +3. Repeat steps 1 & 2 for each index you wish to collect from +3. Configure a storage extension to store checkpointing data for the receiver. +4. Configure the rest of the receiver fields according to your Splunk environment. +5. Add a `googlecloud` exporter to your config. Configure the exporter to send to a GCP project where your service account has Logging Admin role. To check the permissions of service accounts in your project, go to the [IAM page](https://console.cloud.google.com/iam-admin/iam). +6. Disable the `sending_queue` field on the GCP exporter. The sending queue introduces an asynchronous step to the pipeline, which will jeopardize the receiver's ability to checkpoint correctly and recover from errors. For this same reason, avoid using any asynchronous processors (e.g., batch processor). + +After following these steps, your configuration should look something like this: +```yaml +receivers: + splunksearchapi: + endpoint: "https://splunk-c4-0.example.localnet:8089" + tls: + insecure_skip_verify: true + splunk_username: "user" + splunk_password: "pass" + job_poll_interval: 5s + searches: + - query: 'search index=my_index' + earliest_time: "2024-12-01T00:00:00.000-05:00" + latest_time: "2024-12-31T23:59:59.999-05:00" + event_batch_size: 500 + storage: file_storage +exporters: + googlecloud: + project: "my-gcp-project" + log: + default_log_name: "splunk-events" + sending_queue: + enabled: false + +extensions: + file_storage: + directory: "./local/storage" + +service: + extensions: [file_storage] + pipelines: + logs: + receivers: [splunksearchapi] + exporters: [googlecloud] +``` +You are now ready to migrate events from Splunk to Google Cloud Logging. \ No newline at end of file diff --git a/receiver/splunksearchapireceiver/client.go b/receiver/splunksearchapireceiver/client.go new file mode 100644 index 000000000..3560d1b63 --- /dev/null +++ b/receiver/splunksearchapireceiver/client.go @@ -0,0 +1,175 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package splunksearchapireceiver contains the Splunk Search API receiver. +package splunksearchapireceiver + +import ( + "bytes" + "context" + "encoding/json" + "encoding/xml" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" +) + +type splunkSearchAPIClient interface { + CreateSearchJob(search string) (CreateJobResponse, error) + GetJobStatus(searchID string) (SearchJobStatusResponse, error) + GetSearchResults(searchID string, offset int, batchSize int) (SearchResults, error) +} + +type defaultSplunkSearchAPIClient struct { + client *http.Client + endpoint string + logger *zap.Logger + username string + password string + authToken string + tokenType string +} + +func newDefaultSplunkSearchAPIClient(ctx context.Context, settings component.TelemetrySettings, conf Config, host component.Host) (*defaultSplunkSearchAPIClient, error) { + client, err := conf.ClientConfig.ToClient(ctx, host, settings) + if err != nil { + return nil, err + } + + return &defaultSplunkSearchAPIClient{ + client: client, + endpoint: conf.Endpoint, + logger: settings.Logger, + username: conf.Username, + password: conf.Password, + authToken: conf.AuthToken, + tokenType: conf.TokenType, + }, nil +} + +func (c *defaultSplunkSearchAPIClient) CreateSearchJob(search string) (CreateJobResponse, error) { + endpoint := fmt.Sprintf("%s/services/search/jobs", c.endpoint) + + if !strings.Contains(search, strings.ToLower("starttime=")) || !strings.Contains(search, strings.ToLower("endtime=")) || !strings.Contains(search, strings.ToLower("timeformat=")) { + return CreateJobResponse{}, fmt.Errorf("search query must contain starttime, endtime, and timeformat") + } + + reqBody := fmt.Sprintf(`search=%s`, url.QueryEscape(search)) + resp, err := c.doSplunkRequest("POST", endpoint, bytes.NewBuffer([]byte(reqBody))) + if err != nil { + return CreateJobResponse{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusCreated { + return CreateJobResponse{}, fmt.Errorf("create search job: %d", resp.StatusCode) + } + + var jobResponse CreateJobResponse + body, err := io.ReadAll(resp.Body) + if err != nil { + return CreateJobResponse{}, fmt.Errorf("read search job create response: %w", err) + } + + err = xml.Unmarshal(body, &jobResponse) + if err != nil { + return CreateJobResponse{}, fmt.Errorf("unmarshal search job create response: %w", err) + } + return jobResponse, nil +} + +func (c *defaultSplunkSearchAPIClient) GetJobStatus(sid string) (SearchJobStatusResponse, error) { + endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s", c.endpoint, sid) + + resp, err := c.doSplunkRequest("GET", endpoint, nil) + if err != nil { + return SearchJobStatusResponse{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return SearchJobStatusResponse{}, fmt.Errorf("get search job status: %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return SearchJobStatusResponse{}, fmt.Errorf("read search job status response: %w", err) + } + var jobStatusResponse SearchJobStatusResponse + err = xml.Unmarshal(body, &jobStatusResponse) + if err != nil { + return SearchJobStatusResponse{}, fmt.Errorf("unmarshal search job status response: %w", err) + } + + return jobStatusResponse, nil +} + +func (c *defaultSplunkSearchAPIClient) GetSearchResults(sid string, offset int, batchSize int) (SearchResults, error) { + endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json&offset=%d&count=%d", c.endpoint, sid, offset, batchSize) + resp, err := c.doSplunkRequest("GET", endpoint, nil) + if err != nil { + return SearchResults{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return SearchResults{}, fmt.Errorf("get search job results: %d", resp.StatusCode) + } + + var searchResults SearchResults + body, err := io.ReadAll(resp.Body) + if err != nil { + return SearchResults{}, fmt.Errorf("read search job results response: %w", err) + } + err = json.Unmarshal(body, &searchResults) + if err != nil { + return SearchResults{}, fmt.Errorf("unmarshal search job results response: %w", err) + } + + return searchResults, nil +} + +func (c *defaultSplunkSearchAPIClient) doSplunkRequest(method, endpoint string, body io.Reader) (*http.Response, error) { + req, err := http.NewRequest(method, endpoint, body) + if err != nil { + return nil, fmt.Errorf("new http request: %w", err) + } + err = c.setSplunkRequestAuth(req) + if err != nil { + return nil, fmt.Errorf("set splunk request auth: %w", err) + } + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("client do request: %w", err) + } + return resp, nil +} + +func (c *defaultSplunkSearchAPIClient) setSplunkRequestAuth(req *http.Request) error { + if c.authToken != "" { + if strings.EqualFold(c.tokenType, TokenTypeBearer) { + req.Header.Set("Authorization", "Bearer "+string(c.authToken)) + } else if strings.EqualFold(c.tokenType, TokenTypeSplunk) { + req.Header.Set("Authorization", "Splunk "+string(c.authToken)) + } + } else { + req.SetBasicAuth(c.username, c.password) + } + return nil +} diff --git a/receiver/splunksearchapireceiver/client_test.go b/receiver/splunksearchapireceiver/client_test.go new file mode 100644 index 000000000..c7266f951 --- /dev/null +++ b/receiver/splunksearchapireceiver/client_test.go @@ -0,0 +1,176 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunksearchapireceiver + +import ( + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCreateSearchJob(t *testing.T) { + server := newMockServer() + testClient := defaultSplunkSearchAPIClient{ + client: server.Client(), + endpoint: server.URL, + } + + resp, err := testClient.CreateSearchJob("index=otel starttime=\"\" endtime=\"\" timeformat=\"\"") + require.NoError(t, err) + require.Equal(t, "123456", resp.SID) + + // returns an error if the search doesn't have times + resp, err = testClient.CreateSearchJob("index=otel") + require.EqualError(t, err, "search query must contain starttime, endtime, and timeformat") + require.Empty(t, resp) + + // returns an error if the response status isn't 201 + resp, err = testClient.CreateSearchJob("index=fail_to_create_job starttime=\"\" endtime=\"\" timeformat=\"\"") + require.ErrorContains(t, err, "create search job") + require.Empty(t, resp) + + // returns an error if the response body can't be unmarshalled + resp, err = testClient.CreateSearchJob("index=fail_to_unmarshal starttime=\"\" endtime=\"\" timeformat=\"\"") + require.ErrorContains(t, err, "unmarshal search job create response") + require.Empty(t, resp) + +} + +func TestGetJobStatus(t *testing.T) { + server := newMockServer() + testClient := defaultSplunkSearchAPIClient{ + client: server.Client(), + endpoint: server.URL, + } + + resp, err := testClient.GetJobStatus("123456") + require.NoError(t, err) + require.Equal(t, "DONE", resp.Content.Dict.Keys[0].Value) + require.Equal(t, "text/xml", resp.Content.Type) + + // returns an error if the response status isn't 200 + resp, err = testClient.GetJobStatus("654321") + require.ErrorContains(t, err, "get search job status") + require.Empty(t, resp) + + // returns an error if the response body can't be unmarshalled + resp, err = testClient.GetJobStatus("098765") + require.ErrorContains(t, err, "unmarshal search job status response") + require.Empty(t, resp) +} + +func TestGetSearchResults(t *testing.T) { + server := newMockServer() + testClient := defaultSplunkSearchAPIClient{ + client: server.Client(), + endpoint: server.URL, + } + + resp, err := testClient.GetSearchResults("123456", 0, 5) + require.NoError(t, err) + require.Equal(t, 3, len(resp.Results)) + require.Equal(t, "Hello, world!", resp.Results[0].Raw) + + // returns an error if the response status isn't 200 + resp, err = testClient.GetSearchResults("654321", 0, 5) + require.ErrorContains(t, err, "get search job results") + require.Empty(t, resp) + + // returns an error if the response body can't be unmarshalled + resp, err = testClient.GetSearchResults("098765", 0, 5) + require.ErrorContains(t, err, "unmarshal search job results response") + require.Empty(t, resp) +} + +func TestSetSplunkRequestAuth(t *testing.T) { + client := defaultSplunkSearchAPIClient{ + username: "user", + password: "password", + } + req := httptest.NewRequest("GET", "http://localhost:8089", nil) + client.setSplunkRequestAuth(req) + require.Equal(t, req.Header.Get("Authorization"), "Basic dXNlcjpwYXNzd29yZA==") + + client = defaultSplunkSearchAPIClient{ + authToken: "token", + tokenType: TokenTypeBearer, + } + client.setSplunkRequestAuth(req) + require.Equal(t, req.Header.Get("Authorization"), "Bearer token") + + client = defaultSplunkSearchAPIClient{ + authToken: "token", + tokenType: TokenTypeSplunk, + } + client.setSplunkRequestAuth(req) + require.Equal(t, req.Header.Get("Authorization"), "Splunk token") +} + +// mock Splunk servers +func newMockServer() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + switch req.URL.String() { + case "/services/search/jobs": + body, _ := io.ReadAll(req.Body) + if strings.Contains(string(body), "search=index%3Dotel") { + rw.Header().Set("Content-Type", "application/xml") + rw.WriteHeader(http.StatusCreated) + rw.Write([]byte(` + + 123456 + + `)) + } + if strings.Contains(string(body), "index%3Dfail_to_create_job") { + rw.WriteHeader(http.StatusNotFound) + } + if strings.Contains(string(body), "index%3Dfail_to_unmarshal") { + rw.WriteHeader(http.StatusCreated) + rw.Write([]byte(`invalid xml`)) + } + case "/services/search/v2/jobs/123456": + rw.Header().Set("Content-Type", "application/xml") + rw.WriteHeader(http.StatusOK) + rw.Write([]byte(` + + + + DONE + + + `)) + case "/services/search/v2/jobs/654321": + rw.WriteHeader(http.StatusNotFound) + case "/services/search/v2/jobs/098765": + rw.WriteHeader(http.StatusOK) + rw.Write([]byte(`invalid xml`)) + case "/services/search/v2/jobs/123456/results?output_mode=json&offset=0&count=5": + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + rw.Write(splunkEventsResultsP1) + case "/services/search/v2/jobs/654321/results?output_mode=json&offset=0&count=5": + rw.WriteHeader(http.StatusNotFound) + case "/services/search/v2/jobs/098765/results?output_mode=json&offset=0&count=5": + rw.WriteHeader(http.StatusOK) + rw.Write([]byte(`invalid json`)) + default: + rw.WriteHeader(http.StatusNotFound) + } + })) +} diff --git a/receiver/splunksearchapireceiver/config.go b/receiver/splunksearchapireceiver/config.go new file mode 100644 index 000000000..da90653ef --- /dev/null +++ b/receiver/splunksearchapireceiver/config.go @@ -0,0 +1,128 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunksearchapireceiver + +import ( + "errors" + "fmt" + "strings" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" +) + +var ( + errNonStandaloneSearchQuery = errors.New("only standalone search commands can be used for scraping data") + // TokenTypeBearer is the token type for Bearer tokens + TokenTypeBearer = "Bearer" + // TokenTypeSplunk is the token type for Splunk tokens + TokenTypeSplunk = "Splunk" +) + +// Config struct to represent the configuration for the Splunk Search API receiver +type Config struct { + confighttp.ClientConfig `mapstructure:",squash"` + Endpoint string `mapstructure:"endpoint"` + Username string `mapstructure:"splunk_username,omitempty"` + Password string `mapstructure:"splunk_password,omitempty"` + AuthToken string `mapstructure:"auth_token,omitempty"` + TokenType string `mapstructure:"token_type,omitempty"` + Searches []Search `mapstructure:"searches"` + JobPollInterval time.Duration `mapstructure:"job_poll_interval"` + StorageID *component.ID `mapstructure:"storage"` +} + +// Search struct to represent a Splunk search +type Search struct { + Query string `mapstructure:"query"` + EarliestTime string `mapstructure:"earliest_time"` + LatestTime string `mapstructure:"latest_time"` + Limit int `mapstructure:"limit"` + EventBatchSize int `mapstructure:"event_batch_size"` +} + +// Validate validates the Splunk Search API receiver configuration +func (cfg *Config) Validate() error { + if cfg.Endpoint == "" { + return errors.New("missing Splunk server endpoint") + } + + if cfg.AuthToken != "" { + if cfg.TokenType == "" { + return errors.New("auth_token provided without a token type") + } + if !strings.EqualFold(cfg.TokenType, TokenTypeBearer) && !strings.EqualFold(cfg.TokenType, TokenTypeSplunk) { + return fmt.Errorf("auth_token provided without a correct token type, valid token types are %v", []string{TokenTypeBearer, TokenTypeSplunk}) + } + if cfg.Username != "" || cfg.Password != "" { + return errors.New("auth_token and username/password were both provided, only one can be provided to authenticate with Splunk") + } + } else if cfg.Username == "" || cfg.Password == "" { + return errors.New("missing Splunk basic auth credentials, need username and password") + } + + if len(cfg.Searches) == 0 { + return errors.New("at least one search must be provided") + } + + if cfg.StorageID == nil { + return errors.New("storage configuration is required for this receiver") + } + + for _, search := range cfg.Searches { + if search.Query == "" { + return errors.New("missing query in search") + } + + // query must start with "search" command + if !strings.HasPrefix(search.Query, "search ") { + return errNonStandaloneSearchQuery + } + + if strings.Contains(search.Query, "|") { + return errNonStandaloneSearchQuery + } + + // ensure user query does not include time parameters + if strings.Contains(search.Query, "earliest=") || + strings.Contains(search.Query, "latest=") || + strings.Contains(search.Query, "starttime=") || + strings.Contains(search.Query, "endtime=") || + strings.Contains(search.Query, "timeformat=") { + return errors.New("time query parameters must be configured using only the 'earliest_time' and 'latest_time' configuration parameters") + } + + if search.EarliestTime == "" { + return errors.New("missing earliest_time in search") + } + if search.LatestTime == "" { + return errors.New("missing latest_time in search") + } + + // parse time strings to time.Time + _, err := time.Parse(time.RFC3339, search.EarliestTime) + if err != nil { + return errors.New("earliest_time failed to parse as RFC3339") + } + + _, err = time.Parse(time.RFC3339, search.LatestTime) + if err != nil { + return errors.New("latest_time failed to parse as RFC3339") + } + + } + return nil +} diff --git a/receiver/splunksearchapireceiver/config_test.go b/receiver/splunksearchapireceiver/config_test.go new file mode 100644 index 000000000..e228b77bf --- /dev/null +++ b/receiver/splunksearchapireceiver/config_test.go @@ -0,0 +1,351 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunksearchapireceiver + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" +) + +func TestValidate(t *testing.T) { + testCases := []struct { + desc string + endpoint string + username string + password string + authToken string + tokenType string + storage string + searches []Search + errExpected bool + errText string + }{ + { + desc: "Missing endpoint", + username: "user", + password: "password", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "missing Splunk server endpoint", + }, + { + desc: "Missing username, no auth token", + endpoint: "http://localhost:8089", + password: "password", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "missing Splunk basic auth credentials, need username and password", + }, + { + desc: "Missing password, no auth token", + endpoint: "http://localhost:8089", + username: "user", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "missing Splunk basic auth credentials, need username and password", + }, + { + desc: "Auth token without token type", + endpoint: "http://localhost:8089", + authToken: "token", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "auth_token provided without a token type", + }, + { + desc: "Auth token with invalid token type", + endpoint: "http://localhost:8089", + authToken: "token", + tokenType: "invalid", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "auth_token provided without a correct token type, valid token types are [Bearer Splunk]", + }, + { + desc: "Auth token and username/password provided", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + authToken: "token", + tokenType: "Bearer", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "auth_token and username/password were both provided, only one can be provided to authenticate with Splunk", + }, + { + desc: "Missing storage", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "storage configuration is required for this receiver", + }, + { + desc: "Missing searches", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + storage: "file_storage", + errExpected: true, + errText: "at least one search must be provided", + }, + { + desc: "Missing query", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + storage: "file_storage", + searches: []Search{ + { + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "missing query in search", + }, + { + desc: "Missing earliest_time", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "missing earliest_time in search", + }, + { + desc: "Unparsable earliest_time", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "-1hr", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "earliest_time failed to parse as RFC3339", + }, + { + desc: "Missing latest_time", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + }, + }, + errExpected: true, + errText: "missing latest_time in search", + }, + { + desc: "Unparsable latest_time", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "-1hr", + }, + }, + errExpected: true, + errText: "latest_time failed to parse as RFC3339", + }, + { + desc: "Invalid query chaining", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal | stats count by sourcetype", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "only standalone search commands can be used for scraping data", + }, + { + desc: "Valid config", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: false, + }, + { + desc: "Valid config with auth token", + endpoint: "http://localhost:8089", + authToken: "token", + tokenType: "Bearer", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: false, + }, + { + desc: "Valid config with multiple searches", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + { + Query: "search index=_audit", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: false, + }, + { + desc: "Valid config with limit", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + Limit: 10, + }, + }, + errExpected: false, + }, + { + desc: "Query with earliest and latest time", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal earliest=2024-10-30T04:00:00.000Z latest=2024-10-30T14:00:00.000Z", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "time query parameters must be configured using only the 'earliest_time' and 'latest_time' configuration parameters", + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.Endpoint = tc.endpoint + cfg.Username = tc.username + cfg.Password = tc.password + cfg.AuthToken = tc.authToken + cfg.TokenType = tc.tokenType + cfg.Searches = tc.searches + if tc.storage != "" { + cfg.StorageID = &component.ID{} + } + err := cfg.Validate() + if tc.errExpected { + require.EqualError(t, err, tc.errText) + return + } + require.NoError(t, err) + }) + } +} diff --git a/receiver/splunksearchapireceiver/factory.go b/receiver/splunksearchapireceiver/factory.go new file mode 100644 index 000000000..9fa2edfb0 --- /dev/null +++ b/receiver/splunksearchapireceiver/factory.go @@ -0,0 +1,58 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunksearchapireceiver + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" +) + +var ( + typeStr = component.MustNewType("splunksearchapi") +) + +func createDefaultConfig() component.Config { + return &Config{ + ClientConfig: confighttp.NewDefaultClientConfig(), + JobPollInterval: 5 * time.Second, + } +} + +func createLogsReceiver(_ context.Context, + params receiver.Settings, + cfg component.Config, + consumer consumer.Logs, +) (receiver.Logs, error) { + ssapirConfig := cfg.(*Config) + ssapir := &splunksearchapireceiver{ + logger: params.Logger, + logsConsumer: consumer, + config: ssapirConfig, + id: params.ID, + settings: params.TelemetrySettings, + checkpointRecord: &EventRecord{}, + } + return ssapir, nil +} + +// NewFactory creates a factory for Splunk Search API receiver +func NewFactory() receiver.Factory { + return receiver.NewFactory(typeStr, createDefaultConfig, receiver.WithLogs(createLogsReceiver, component.StabilityLevelAlpha)) +} diff --git a/receiver/splunksearchapireceiver/factory_test.go b/receiver/splunksearchapireceiver/factory_test.go new file mode 100644 index 000000000..e3582a19a --- /dev/null +++ b/receiver/splunksearchapireceiver/factory_test.go @@ -0,0 +1,42 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunksearchapireceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +func TestType(t *testing.T) { + factory := NewFactory() + ft := factory.Type() + require.EqualValues(t, "splunksearchapi", ft.String()) +} + +func TestCreateLogsReceiver(t *testing.T) { + factory := NewFactory() + test, err := factory.CreateLogs( + context.Background(), + receivertest.NewNopSettings(), + createDefaultConfig(), + consumertest.NewNop(), + ) + require.NoError(t, err) + require.NotNil(t, test) +} diff --git a/receiver/splunksearchapireceiver/go.mod b/receiver/splunksearchapireceiver/go.mod new file mode 100644 index 000000000..b03ace39d --- /dev/null +++ b/receiver/splunksearchapireceiver/go.mod @@ -0,0 +1,89 @@ +module github.com/open-telemetry/opentelemtry-collector-contrib/receiver/splunksearchapireceiver + +go 1.22.7 + +require ( + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.114.0 + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/collector/component v0.114.0 + go.opentelemetry.io/collector/component/componenttest v0.114.0 + go.opentelemetry.io/collector/consumer v0.114.0 + go.opentelemetry.io/collector/consumer/consumertest v0.114.0 + go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 + go.opentelemetry.io/collector/pdata v1.20.0 + go.opentelemetry.io/collector/receiver v0.114.0 + go.opentelemetry.io/collector/receiver/receivertest v0.114.0 + go.uber.org/zap v1.27.0 +) + +require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/elastic/lunes v0.1.0 // indirect + github.com/expr-lang/expr v1.16.9 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/fsnotify/fsnotify v1.8.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/goccy/go-json v0.10.3 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect + github.com/klauspost/compress v1.17.11 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/knadh/koanf/providers/confmap v0.1.0 // indirect + github.com/knadh/koanf/v2 v2.1.2 // indirect + github.com/leodido/go-syslog/v4 v4.2.0 // indirect + github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b // indirect + github.com/magefile/mage v1.15.0 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.114.0 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rs/cors v1.11.1 // indirect + github.com/stretchr/objx v0.5.2 // indirect + github.com/valyala/fastjson v1.6.4 // indirect + go.opentelemetry.io/collector/client v1.20.0 // indirect + go.opentelemetry.io/collector/config/configauth v0.113.0 // indirect + go.opentelemetry.io/collector/config/configcompression v1.19.0 // indirect + go.opentelemetry.io/collector/config/configopaque v1.20.0 // indirect + go.opentelemetry.io/collector/config/configtls v1.20.0 // indirect + go.opentelemetry.io/collector/config/internal v0.113.0 // indirect + go.opentelemetry.io/collector/confmap v1.20.0 // indirect + go.opentelemetry.io/collector/consumer/consumererror v0.114.0 // indirect + go.opentelemetry.io/collector/consumer/consumerprofiles v0.114.0 // indirect + go.opentelemetry.io/collector/extension v0.114.0 // indirect + go.opentelemetry.io/collector/extension/auth v0.113.0 // indirect + go.opentelemetry.io/collector/featuregate v1.20.0 // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.114.0 // indirect + go.opentelemetry.io/collector/receiver/receiverprofiles v0.114.0 // indirect + go.opentelemetry.io/collector/semconv v0.114.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect + go.opentelemetry.io/otel/sdk v1.32.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect + gonum.org/v1/gonum v0.15.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +require ( + github.com/gogo/protobuf v1.3.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + go.opentelemetry.io/collector/config/confighttp v0.113.0 + go.opentelemetry.io/collector/config/configtelemetry v0.114.0 // indirect + go.opentelemetry.io/collector/pipeline v0.114.0 // indirect + go.opentelemetry.io/otel v1.32.0 // indirect + go.opentelemetry.io/otel/metric v1.32.0 // indirect + go.opentelemetry.io/otel/trace v1.32.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/net v0.30.0 // indirect + golang.org/x/sys v0.27.0 // indirect + golang.org/x/text v0.19.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect + google.golang.org/grpc v1.67.1 // indirect + google.golang.org/protobuf v1.35.1 // indirect +) diff --git a/receiver/splunksearchapireceiver/go.sum b/receiver/splunksearchapireceiver/go.sum new file mode 100644 index 000000000..3446cd783 --- /dev/null +++ b/receiver/splunksearchapireceiver/go.sum @@ -0,0 +1,213 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/lunes v0.1.0 h1:amRtLPjwkWtzDF/RKzcEPMvSsSseLDLW+bnhfNSLRe4= +github.com/elastic/lunes v0.1.0/go.mod h1:xGphYIt3XdZRtyWosHQTErsQTd4OP1p9wsbVoHelrd4= +github.com/expr-lang/expr v1.16.9 h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI= +github.com/expr-lang/expr v1.16.9/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= +github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= +github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= +github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU= +github.com/knadh/koanf/v2 v2.1.2 h1:I2rtLRqXRy1p01m/utEtpZSSA6dcJbgGVuE27kW2PzQ= +github.com/knadh/koanf/v2 v2.1.2/go.mod h1:Gphfaen0q1Fc1HTgJgSTC4oRX9R2R5ErYMZJy8fLJBo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-syslog/v4 v4.2.0 h1:A7vpbYxsO4e2E8udaurkLlxP5LDpDbmPMsGnuhb7jVk= +github.com/leodido/go-syslog/v4 v4.2.0/go.mod h1:eJ8rUfDN5OS6dOkCOBYlg2a+hbAg6pJa99QXXgMrd98= +github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b h1:11UHH39z1RhZ5dc4y4r/4koJo6IYFgTRMe/LlwRTEw0= +github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b/go.mod h1:WZxr2/6a/Ar9bMDc2rN/LJrE/hF6bXE4LPyDSIxwAfg= +github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= +github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.114.0 h1:mchuc816TxLpmsGvFbtGA3KBVx91vAXi7vJnlvsQdiU= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.114.0/go.mod h1:vgCMUWPVrfjNux9P9G053fRqGFF6BS3xtxNFZZdFTCM= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.114.0 h1:0LbaoE7Aof8J4CVQ5kYv1QbuL3usTxLRSMFisDNBX9U= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.114.0/go.mod h1:ByoGXMLeHE/k5ELO3EITitVmvq3bh4Z/GVwWZZxrQ5s= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.114.0 h1:d2wCLlENxH4I2axQWaogivx/5ZIjDYgn9MIf6sFxlJ4= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.114.0/go.mod h1:Psyligv8GKL9WI3TraW3BLwkOX4TRxaaa1BBQQyICzA= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.114.0 h1:Xr3Hvm9cxOSQX94tLX1yX63uvuvtglJICrOz9YcxiuI= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.114.0/go.mod h1:cgIgmEg66RhVtAv4JkIhHdy70kn2EtVhrH8CtyvhfuI= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= +github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ= +github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector/client v1.20.0 h1:o60wPcj5nLtaRenF+1E5p4QXFS3TDL6vHlw+GOon3rg= +go.opentelemetry.io/collector/client v1.20.0/go.mod h1:6aqkszco9FaLWCxyJEVam6PP7cUa8mPRIXeS5eZGj0U= +go.opentelemetry.io/collector/component v0.114.0 h1:SVGbm5LvHGSTEDv7p92oPuBgK5tuiWR82I9+LL4TtBE= +go.opentelemetry.io/collector/component v0.114.0/go.mod h1:MLxtjZ6UVHjDxSdhGLuJfHBHvfl1iT/Y7IaQPD24Eww= +go.opentelemetry.io/collector/component/componenttest v0.114.0 h1:GM4FTTlfeXoVm6sZYBHImwlRN8ayh2oAfUhvaFj7Zo8= +go.opentelemetry.io/collector/component/componenttest v0.114.0/go.mod h1:ZZEJMtbJtoVC/3/9R1HzERq+cYQRxuMFQrPCpfZ4Xos= +go.opentelemetry.io/collector/config/configauth v0.113.0 h1:CBz43fGpN41MwLdwe3mw/XVSIDvGRMT8aaaPuqKukTU= +go.opentelemetry.io/collector/config/configauth v0.113.0/go.mod h1:Q8SlxrIvL3FJO51hXa4n9ARvox04lK8mmpjf4b3UNAU= +go.opentelemetry.io/collector/config/configcompression v1.19.0 h1:bTSjTLhnPXX1NSFM6GzguEM/NBe8QUPsXHc9kMOAJzE= +go.opentelemetry.io/collector/config/configcompression v1.19.0/go.mod h1:pnxkFCLUZLKWzYJvfSwZnPrnm0twX14CYj2ADth5xiU= +go.opentelemetry.io/collector/config/confighttp v0.113.0 h1:a6iO0y1ZM5CPDvwbryzU+GpqAtAQ3eSfNseoAUogw7c= +go.opentelemetry.io/collector/config/confighttp v0.113.0/go.mod h1:JZ9EwoiWMIrXt5v+d/q54TeUhPdAoLDimSEqTtddW6E= +go.opentelemetry.io/collector/config/configopaque v1.20.0 h1:2I48zKiyyyYqjm7y0B9OLp24ku2ZSX3nCHG0r5FdWOQ= +go.opentelemetry.io/collector/config/configopaque v1.20.0/go.mod h1:6zlLIyOoRpJJ+0bEKrlZOZon3rOp5Jrz9fMdR4twOS4= +go.opentelemetry.io/collector/config/configtelemetry v0.114.0 h1:kjLeyrumge6wsX6ZIkicdNOlBXaEyW2PI2ZdVXz/rzY= +go.opentelemetry.io/collector/config/configtelemetry v0.114.0/go.mod h1:R0MBUxjSMVMIhljuDHWIygzzJWQyZHXXWIgQNxcFwhc= +go.opentelemetry.io/collector/config/configtls v1.20.0 h1:hNlJdwfyY5Qe54RLJ41lfLqKTn9ypkR7sk7JNCcSe2U= +go.opentelemetry.io/collector/config/configtls v1.20.0/go.mod h1:sav/txSHguadTYlSSK+BJO2ljJeYEtRoBahgzWAguYg= +go.opentelemetry.io/collector/config/internal v0.113.0 h1:9RAzH8v7ItFT1npHpvP0SvUzBHcZDliCGRo9Spp6v7c= +go.opentelemetry.io/collector/config/internal v0.113.0/go.mod h1:yC7E4h1Uj0SubxcFImh6OvBHFTjMh99+A5PuyIgDWqc= +go.opentelemetry.io/collector/confmap v1.20.0 h1:ARfOwmkKxFOud1njl03yAHQ30+uenlzqCO6LBYamDTE= +go.opentelemetry.io/collector/confmap v1.20.0/go.mod h1:DMpd9Ay/ffls3JoQBQ73vWeRsz1rNuLbwjo6WtjSQus= +go.opentelemetry.io/collector/consumer v0.114.0 h1:1zVaHvfIZowGwZRitRBRo3i+RP2StlU+GClYiofSw0Q= +go.opentelemetry.io/collector/consumer v0.114.0/go.mod h1:d+Mrzt9hsH1ub3zmwSlnQVPLeTYir4Mgo7CrWfnncN4= +go.opentelemetry.io/collector/consumer/consumererror v0.114.0 h1:r2YiELfWerb40FHD23V04gNjIkLUcjEKGxI4Vtm2iO4= +go.opentelemetry.io/collector/consumer/consumererror v0.114.0/go.mod h1:MzIrLQ5jptO2egypolhlAbZsWZr29WC4FhSxQjnxcvg= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.114.0 h1:5pXYy3E6UK5Huu3aQbsYL8B6E6MyWx4fvXXDn+oXZaA= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.114.0/go.mod h1:PMq3f54KcJQO4v1tue0QxQScu7REFVADlXxXSAYMiN0= +go.opentelemetry.io/collector/consumer/consumertest v0.114.0 h1:isaTwJK5DOy8Bs7GuLq23ejfgj8gLIo5dOUvkRnLF4g= +go.opentelemetry.io/collector/consumer/consumertest v0.114.0/go.mod h1:GNeLPkfRPdh06n/Rv1UKa/cAtCKjN0a7ADyHjIj4HFE= +go.opentelemetry.io/collector/extension v0.114.0 h1:9Qb92y8hD2WDC5aMDoj4JNQN+/5BQYJWPUPzLXX+iGw= +go.opentelemetry.io/collector/extension v0.114.0/go.mod h1:Yk2/1ptVgfTr12t+22v93nYJpioP14pURv2YercSzU0= +go.opentelemetry.io/collector/extension/auth v0.113.0 h1:4ggRy1vepOabUiCWfU+6M9P/ftXojMUNAvBpeLihYj8= +go.opentelemetry.io/collector/extension/auth v0.113.0/go.mod h1:VbvAm2YZAqePkWgwn0m0vBaq3aC49CxPVwHmrJ24aeQ= +go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 h1:hLyX9UvmY0t6iBnk3CqvyNck2U0QjPACekj7pDRx2hA= +go.opentelemetry.io/collector/extension/experimental/storage v0.114.0/go.mod h1:WqYRQVJjJLE1rm+y/ks1wPdPRGWePEvE1VO07xm2J2k= +go.opentelemetry.io/collector/featuregate v1.20.0 h1:Mi7nMy/q52eruI+6jWnMKUOeM55XvwoPnGcdB1++O8c= +go.opentelemetry.io/collector/featuregate v1.20.0/go.mod h1:47xrISO71vJ83LSMm8+yIDsUbKktUp48Ovt7RR6VbRs= +go.opentelemetry.io/collector/pdata v1.20.0 h1:ePcwt4bdtISP0loHaE+C9xYoU2ZkIvWv89Fob16o9SM= +go.opentelemetry.io/collector/pdata v1.20.0/go.mod h1:Ox1YVLe87cZDB/TL30i4SUz1cA5s6AM6SpFMfY61ICs= +go.opentelemetry.io/collector/pdata/pprofile v0.114.0 h1:pUNfTzsI/JUTiE+DScDM4lsrPoxnVNLI2fbTxR/oapo= +go.opentelemetry.io/collector/pdata/pprofile v0.114.0/go.mod h1:4aNcj6WM1n1uXyFSXlhVs4ibrERgNYsTbzcYI2zGhxA= +go.opentelemetry.io/collector/pdata/testdata v0.114.0 h1:+AzszWSL1i4K6meQ8rU0JDDW55SYCXa6FVqfDixhhTo= +go.opentelemetry.io/collector/pdata/testdata v0.114.0/go.mod h1:bv8XFdCTZxG2MQB5l9dKxSxf5zBrcodwO6JOy1+AxXM= +go.opentelemetry.io/collector/pipeline v0.114.0 h1:v3YOhc5z0tD6QbO5n/pnftpIeroihM2ks9Z2yKPCcwY= +go.opentelemetry.io/collector/pipeline v0.114.0/go.mod h1:4vOvjVsoYTHVGTbfFwqfnQOSV2K3RKUHofh3jNRc2Mg= +go.opentelemetry.io/collector/receiver v0.114.0 h1:90SAnXAjNq7/k52/pFmmb06Cf1YauoPYtbio4aOXafY= +go.opentelemetry.io/collector/receiver v0.114.0/go.mod h1:KUGT0/D953LXbGH/D3lLPU8yrU3HfWnUqpt4W4hSOnE= +go.opentelemetry.io/collector/receiver/receiverprofiles v0.114.0 h1:ibhEfGpvNB3yrtpl2jYFabrunMk1hurxvMYpM0b1Ck4= +go.opentelemetry.io/collector/receiver/receiverprofiles v0.114.0/go.mod h1:UZyRfaasw+NLvN10AN8IQnmj5tQ3BOUH1uP2ctpO9f0= +go.opentelemetry.io/collector/receiver/receivertest v0.114.0 h1:D+Kh9t2n4asTnM+TiSxbrKlUemLZandWntj17BJWWb0= +go.opentelemetry.io/collector/receiver/receivertest v0.114.0/go.mod h1:mNSHQ13vFmqD+VAcRzLjStFBejbcWUn2Mp0pAd7Op+U= +go.opentelemetry.io/collector/semconv v0.114.0 h1:/eKcCJwZepQUtEuFuxa0thx2XIOvhFpaf214ZG1a11k= +go.opentelemetry.io/collector/semconv v0.114.0/go.mod h1:zCJ5njhWpejR+A40kiEoeFm1xq1uzyZwMnRNX6/D82A= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= +go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0= +gonum.org/v1/gonum v0.15.1/go.mod h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd h1:6TEm2ZxXoQmFWFlt1vNxvVOa1Q0dXFQD1m/rYjXmS0E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/receiver/splunksearchapireceiver/integration_test.go b/receiver/splunksearchapireceiver/integration_test.go new file mode 100644 index 000000000..624190e03 --- /dev/null +++ b/receiver/splunksearchapireceiver/integration_test.go @@ -0,0 +1,220 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunksearchapireceiver + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/extension/experimental/storage" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" +) + +// Test the case where some data is exported, but a subsequent call for paginated data fails +func TestSplunkResultsPaginationFailure(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.Searches = []Search{ + { + Query: "search index=otel", + EarliestTime: "2024-11-14T00:00:00.000Z", + LatestTime: "2024-11-14T23:59:59.000Z", + EventBatchSize: 5, + }, + } + var callCount int + server := newMockSplunkServerPagination(&callCount) + defer server.Close() + settings := componenttest.NewNopTelemetrySettings() + ssapir := newSSAPIReceiver(zap.NewNop(), cfg, settings, component.NewID(typeStr)) + ssapir.client, _ = newDefaultSplunkSearchAPIClient(context.Background(), settings, *cfg, componenttest.NewNopHost()) + ssapir.client.(*defaultSplunkSearchAPIClient).client = server.Client() + ssapir.client.(*defaultSplunkSearchAPIClient).endpoint = server.URL + ssapir.logsConsumer = &consumertest.LogsSink{} + + ssapir.storageClient = storage.NewNopClient() + + ssapir.initCheckpoint(context.Background()) + ssapir.runQueries(context.Background()) + require.Equal(t, 3, ssapir.checkpointRecord.Offset) + require.Equal(t, 1, callCount) +} + +func newMockSplunkServerPagination(callCount *int) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.URL.String() == "/services/search/jobs" { + rw.Header().Set("Content-Type", "application/xml") + rw.WriteHeader(201) + rw.Write([]byte(` + + 123456 + + `)) + } else if req.URL.String() == "/services/search/v2/jobs/123456" { + rw.Header().Set("Content-Type", "application/xml") + rw.WriteHeader(200) + rw.Write([]byte(` + + + DISPATCH + + DONE + + + `)) + } else if req.URL.String() == "/services/search/v2/jobs/123456/results?output_mode=json&offset=0&count=5" && req.URL.Query().Get("offset") == "0" { + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(200) + rw.Write(splunkEventsResultsP1) + *callCount++ + } else if req.URL.String() == "/services/search/v2/jobs/123456/results?output_mode=json&offset=5&count=5" && req.URL.Query().Get("offset") == "5" { + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(400) + rw.Write([]byte("error, bad request")) + } + })) +} + +// Test the case where the GCP exporter returns an error +func TestExporterFailure(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.Searches = []Search{ + { + Query: "search index=otel", + EarliestTime: "2024-11-14T00:00:00.000Z", + LatestTime: "2024-11-14T23:59:59.000Z", + EventBatchSize: 3, + }, + } + server := newMockSplunkServer() + defer server.Close() + settings := componenttest.NewNopTelemetrySettings() + ssapir := newSSAPIReceiver(zap.NewNop(), cfg, settings, component.NewID(typeStr)) + logsConsumer := &mockLogsConsumerExporterErr{} + logsConsumer.On("ConsumeLogs", mock.Anything, mock.Anything).Return(nil) + + ssapir.logsConsumer = logsConsumer + ssapir.client, _ = newDefaultSplunkSearchAPIClient(context.Background(), settings, *cfg, componenttest.NewNopHost()) + ssapir.client.(*defaultSplunkSearchAPIClient).client = server.Client() + ssapir.client.(*defaultSplunkSearchAPIClient).endpoint = server.URL + + ssapir.initCheckpoint(context.Background()) + ssapir.runQueries(context.Background()) + require.Equal(t, 5, ssapir.checkpointRecord.Offset) + require.Equal(t, "search index=otel", ssapir.checkpointRecord.Search) + + // simulate data failing + // the checkpoint should not be updated, and an error should be returned + ssapir.checkpointRecord.Offset = 0 + offset = 0 + logsConsumerErr := &mockLogsConsumerExporterErr{} + logsConsumerErr.On("ConsumeLogs", mock.Anything, mock.Anything).Return(errors.New("error exporting logs")) + + ssapir.logsConsumer = logsConsumerErr + ssapir.initCheckpoint(context.Background()) + ssapir.runQueries(context.Background()) + require.Equal(t, 0, ssapir.checkpointRecord.Offset) + require.Equal(t, "search index=otel", ssapir.checkpointRecord.Search) +} + +func newMockSplunkServer() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.URL.String() == "/services/search/jobs" { + rw.Header().Set("Content-Type", "application/xml") + rw.WriteHeader(201) + rw.Write([]byte(` + + 123456 + + `)) + } else if req.URL.String() == "/services/search/v2/jobs/123456" { + rw.Header().Set("Content-Type", "application/xml") + rw.WriteHeader(200) + rw.Write([]byte(` + + + DISPATCH + + DONE + + + `)) + } else if req.URL.String() == "/services/search/v2/jobs/123456/results?output_mode=json&offset=0&count=3" { + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(200) + rw.Write(splunkEventsResultsP1) + } else if req.URL.String() == "/services/search/v2/jobs/123456/results?output_mode=json&offset=3&count=3" { + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(200) + rw.Write(splunkEventsResultsP2) + } + })) +} + +var splunkEventsResultsP1 = []byte(`{ + "init_offset": 0, + "results": [ + { + "_raw": "Hello, world!", + "_time": "2024-11-14T13:02:31.000-05:00" + }, + { + "_raw": "Goodbye, world!", + "_time": "2024-11-14T13:02:30.000-05:00" + }, + { + "_raw": "lorem ipsum", + "_time": "2024-11-14T13:02:29.000-05:00" + } + ] +}`) + +var splunkEventsResultsP2 = []byte(`{ + "init_offset": 3, + "results": [ + { + "_raw": "dolor sit amet", + "_time": "2024-11-14T13:02:28.000-05:00" + }, + { + "_raw": "consectetur adipiscing elit", + "_time": "2024-11-14T13:02:27.000-05:00" + } + ] +}`) + +type mockLogsConsumerExporterErr struct { + mock.Mock +} + +func (m *mockLogsConsumerExporterErr) ConsumeLogs(ctx context.Context, logs plog.Logs) error { + args := m.Called(ctx, logs) + return args.Error(0) +} + +func (m *mockLogsConsumerExporterErr) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} diff --git a/receiver/splunksearchapireceiver/metadata.yaml b/receiver/splunksearchapireceiver/metadata.yaml new file mode 100644 index 000000000..3ca815db9 --- /dev/null +++ b/receiver/splunksearchapireceiver/metadata.yaml @@ -0,0 +1,7 @@ +type: splunksearchapi + +status: + class: receiver + stability: + alpha: [logs] + distributions: [observiq] diff --git a/receiver/splunksearchapireceiver/model.go b/receiver/splunksearchapireceiver/model.go new file mode 100644 index 000000000..0eeed5a3f --- /dev/null +++ b/receiver/splunksearchapireceiver/model.go @@ -0,0 +1,69 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunksearchapireceiver + +// CreateJobResponse struct to represent the XML response from Splunk create job endpoint +// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fjobs +type CreateJobResponse struct { + SID string `xml:"sid"` +} + +// SearchJobStatusResponse struct to represent the XML response from Splunk job status endpoint +// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fjobs.2F.7Bsearch_id.7D +type SearchJobStatusResponse struct { + Content SearchJobContent `xml:"content"` +} + +// SearchJobContent struct to represent elements +type SearchJobContent struct { + Type string `xml:"type,attr"` + Dict Dict `xml:"dict"` +} + +// Dict struct to represent elements +type Dict struct { + Keys []Key `xml:"key"` +} + +// Key struct to represent elements +type Key struct { + Name string `xml:"name,attr"` + Value string `xml:",chardata"` + Dict *Dict `xml:"dict,omitempty"` + List *List `xml:"list,omitempty"` +} + +// List struct to represent elements +type List struct { + Items []struct { + Value string `xml:",chardata"` + } `xml:"item"` +} + +// SearchResults struct to represent the JSON response from Splunk search results endpoint +// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fv2.2Fjobs.2F.7Bsearch_id.7D.2Fresults +type SearchResults struct { + InitOffset int `json:"init_offset"` + Results []struct { + Raw string `json:"_raw"` + Time string `json:"_time"` + } `json:"results"` +} + +// EventRecord struct stores the offset of the last event exported successfully +type EventRecord struct { + Offset int `json:"offset"` + Search string `json:"search"` +} diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go new file mode 100644 index 000000000..deb607ce2 --- /dev/null +++ b/receiver/splunksearchapireceiver/receiver.go @@ -0,0 +1,314 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunksearchapireceiver + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/extension/experimental/storage" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" +) + +const ( + eventStorageKey = "last_event_offset" + splunkDefaultEventBatchSize = 100 +) + +var ( + offset = 0 // offset for pagination and checkpointing + exportedEvents = 0 // track the number of events returned by the results endpoint that are exported + limitReached = false // flag to stop processing search results when limit is reached +) + +type splunksearchapireceiver struct { + logger *zap.Logger + logsConsumer consumer.Logs + config *Config + settings component.TelemetrySettings + id component.ID + cancel context.CancelFunc + client splunkSearchAPIClient + storageClient storage.Client + checkpointRecord *EventRecord +} + +func newSSAPIReceiver( + logger *zap.Logger, + config *Config, + settings component.TelemetrySettings, + id component.ID, +) *splunksearchapireceiver { + return &splunksearchapireceiver{ + logger: logger, + config: config, + settings: settings, + storageClient: storage.NewNopClient(), + id: id, + checkpointRecord: &EventRecord{}, + } +} + +func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component.Host) error { + var err error + ssapir.client, err = newDefaultSplunkSearchAPIClient(ctx, ssapir.settings, *ssapir.config, host) + if err != nil { + return err + } + + // set cancel function + cancelCtx, cancel := context.WithCancel(ctx) + ssapir.cancel = cancel + + // create storage client + storageClient, err := adapter.GetStorageClient(ctx, host, ssapir.config.StorageID, ssapir.id) + if err != nil { + return fmt.Errorf("failed to get storage client: %w", err) + } + ssapir.storageClient = storageClient + + err = ssapir.initCheckpoint(cancelCtx) + if err != nil { + return fmt.Errorf("failed to initialize checkpoint: %w", err) + } + go ssapir.runQueries(cancelCtx) + return nil +} + +func (ssapir *splunksearchapireceiver) Shutdown(ctx context.Context) error { + ssapir.logger.Debug("shutting down logs receiver") + if ssapir.cancel != nil { + ssapir.cancel() + } + + if ssapir.storageClient != nil { + if err := ssapir.checkpoint(ctx); err != nil { + ssapir.logger.Error("failed checkpoint", zap.Error(err)) + } + return ssapir.storageClient.Close(ctx) + } + return nil +} + +func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) { + for _, search := range ssapir.config.Searches { + // set current search query + ssapir.checkpointRecord.Search = search.Query + + // set default event batch size (matches Splunk API default) + if search.EventBatchSize == 0 { + search.EventBatchSize = splunkDefaultEventBatchSize + } + + // parse time strings to time.Time + earliestTime, _ := time.Parse(time.RFC3339, search.EarliestTime) + latestTime, _ := time.Parse(time.RFC3339, search.LatestTime) + + // create search in Splunk + searchID, err := ssapir.createSplunkSearch(search) + if err != nil { + ssapir.logger.Error("error creating search", zap.Error(err)) + return + } + + // wait for search to complete + if err = ssapir.pollSearchCompletion(ctx, searchID); err != nil { + ssapir.logger.Error("error polling for search completion", zap.Error(err)) + return + } + + for { + if ctx.Err() != nil { + ssapir.logger.Error("context cancelled, stopping search result export", zap.Error(ctx.Err())) + return + } + + ssapir.logger.Info("fetching search results") + results, err := ssapir.getSplunkSearchResults(searchID, offset, search.EventBatchSize) + if err != nil { + ssapir.logger.Error("error fetching search results", zap.Error(err)) + } + ssapir.logger.Info("search results fetched", zap.Int("num_results", len(results.Results))) + + logs := plog.NewLogs() + for idx, splunkLog := range results.Results { + if (idx+exportedEvents) >= search.Limit && search.Limit != 0 { + limitReached = true + break + } + // convert log timestamp to ISO 8601 (UTC() makes RFC 3339 into ISO 8601) + logTimestamp, err := time.Parse(time.RFC3339, splunkLog.Time) + if err != nil { + ssapir.logger.Error("error parsing log timestamp", zap.Error(err)) + break + } + if logTimestamp.UTC().Before(earliestTime.UTC()) { + ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime.UTC())) + break + } + if logTimestamp.UTC().After(latestTime.UTC()) { + ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime.UTC())) + continue + } + log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + // convert time to timestamp + timestamp := pcommon.NewTimestampFromTime(logTimestamp.UTC()) + log.SetTimestamp(timestamp) + log.Body().SetStr(splunkLog.Raw) + } + + if logs.ResourceLogs().Len() == 0 { + ssapir.logger.Info("search returned no logs within the given time range") + break + } + + // pass logs, wait for exporter to confirm successful export to GCP + err = ssapir.logsConsumer.ConsumeLogs(ctx, logs) + if err != nil { + // error from down the pipeline, freak out + ssapir.logger.Error("error exporting logs", zap.Error(err)) + return + } + // last batch of logs has been successfully exported + exportedEvents += logs.ResourceLogs().Len() + offset += len(results.Results) + + // update checkpoint + ssapir.checkpointRecord.Offset = offset + err = ssapir.checkpoint(ctx) + if err != nil { + ssapir.logger.Error("error writing checkpoint", zap.Error(err)) + } + if limitReached { + ssapir.logger.Info("limit reached, stopping search result export") + break + } + // if the number of results is less than the results per request, we have queried all pages for the search + if len(results.Results) < search.EventBatchSize { + ssapir.logger.Debug("results less than batch size, stopping search result export") + break + } + } + ssapir.logger.Info("search results exported", zap.String("query", search.Query), zap.Int("total results", exportedEvents)) + } + ssapir.logger.Info("all search results exported") +} + +func (ssapir *splunksearchapireceiver) pollSearchCompletion(ctx context.Context, searchID string) error { + t := time.NewTicker(ssapir.config.JobPollInterval) + defer t.Stop() + for { + select { + case <-t.C: + ssapir.logger.Debug("polling for search completion") + resp, err := ssapir.client.GetJobStatus(searchID) + if err != nil { + return fmt.Errorf("error getting search job status: %v", err) + } + done := ssapir.isSearchCompleted(resp) + if done { + ssapir.logger.Info("search completed") + return nil + } + ssapir.logger.Debug("search not completed yet") + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (ssapir *splunksearchapireceiver) createSplunkSearch(search Search) (string, error) { + timeFormat := "%Y-%m-%dT%H:%M:%S" + searchQuery := fmt.Sprintf("%s starttime=\"%s\" endtime=\"%s\" timeformat=\"%s\"", search.Query, search.EarliestTime, search.LatestTime, timeFormat) + ssapir.logger.Info("creating search", zap.String("query", searchQuery)) + resp, err := ssapir.client.CreateSearchJob(searchQuery) + if err != nil { + return "", err + } + return resp.SID, nil +} + +func (ssapir *splunksearchapireceiver) isSearchCompleted(resp SearchJobStatusResponse) bool { + for _, key := range resp.Content.Dict.Keys { + if key.Name == "dispatchState" { + if key.Value == "DONE" { + return true + } + break + } + } + return false +} + +func (ssapir *splunksearchapireceiver) getSplunkSearchResults(sid string, offset int, batchSize int) (SearchResults, error) { + resp, err := ssapir.client.GetSearchResults(sid, offset, batchSize) + if err != nil { + return SearchResults{}, err + } + return resp, nil +} + +func (ssapir *splunksearchapireceiver) initCheckpoint(ctx context.Context) error { + ssapir.logger.Debug("initializing checkpoint") + // if a checkpoint already exists, use the offset from the checkpoint + if err := ssapir.loadCheckpoint(ctx); err != nil { + return fmt.Errorf("failed to load checkpoint: %w", err) + } + if ssapir.checkpointRecord.Offset != 0 { + // check if the search query in the checkpoint record matches any of the search queries in the config + for idx, search := range ssapir.config.Searches { + if search.Query == ssapir.checkpointRecord.Search { + ssapir.logger.Info("found offset checkpoint in storage extension", zap.Int("offset", ssapir.checkpointRecord.Offset), zap.String("search", ssapir.checkpointRecord.Search)) + // skip searches that have already been processed, use the offset from the checkpoint + ssapir.config.Searches = ssapir.config.Searches[idx:] + offset = ssapir.checkpointRecord.Offset + return nil + } + } + ssapir.logger.Info("while initializing checkpoint, no matching search query found, starting from the beginning") + } + return nil +} + +func (ssapir *splunksearchapireceiver) checkpoint(ctx context.Context) error { + if ssapir.checkpointRecord == nil { + return nil + } + + marshalBytes, err := json.Marshal(ssapir.checkpointRecord) + if err != nil { + return fmt.Errorf("failed to write checkpoint: %w", err) + } + return ssapir.storageClient.Set(ctx, eventStorageKey, marshalBytes) +} + +func (ssapir *splunksearchapireceiver) loadCheckpoint(ctx context.Context) error { + marshalBytes, err := ssapir.storageClient.Get(ctx, eventStorageKey) + if err != nil { + return err + } + if marshalBytes == nil { + ssapir.logger.Info("no checkpoint found") + return nil + } + return json.Unmarshal(marshalBytes, ssapir.checkpointRecord) +} diff --git a/receiver/splunksearchapireceiver/receiver_test.go b/receiver/splunksearchapireceiver/receiver_test.go new file mode 100644 index 000000000..5e4d52066 --- /dev/null +++ b/receiver/splunksearchapireceiver/receiver_test.go @@ -0,0 +1,225 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunksearchapireceiver + +import ( + "context" + "encoding/xml" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension/experimental/storage" + "go.uber.org/zap" +) + +var ( + logger = zap.NewNop() + config = &Config{} + settings = component.TelemetrySettings{} + id = component.ID{} + ssapireceiver = newSSAPIReceiver(logger, config, settings, id) +) + +func TestPolling(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.JobPollInterval = 1 * time.Second + ssapireceiver.config = cfg + + client := &mockLogsClient{} + ssapireceiver.client = client + + file := filepath.Join("testdata", "logs", "testPollJobStatus", "input-done.xml") + client.On("GetJobStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(client.loadTestStatusResponse(t, file), nil) + cancelCtx, cancel := context.WithCancel(context.Background()) + ssapireceiver.cancel = cancel + ssapireceiver.checkpointRecord = &EventRecord{} + + err := ssapireceiver.pollSearchCompletion(cancelCtx, "123456") + require.NoError(t, err) + client.AssertNumberOfCalls(t, "GetJobStatus", 1) + + // Test polling for a job that is still running + file = filepath.Join("testdata", "logs", "testPollJobStatus", "input-queued.xml") + client.On("GetJobStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(client.loadTestStatusResponse(t, file), nil) + err = ssapireceiver.pollSearchCompletion(cancelCtx, "123456") + require.NoError(t, err) + client.AssertNumberOfCalls(t, "GetJobStatus", 2) + + err = ssapireceiver.Shutdown(context.Background()) + require.NoError(t, err) +} + +func TestIsSearchCompleted(t *testing.T) { + jobResponse := SearchJobStatusResponse{ + Content: SearchJobContent{ + Dict: Dict{ + Keys: []Key{ + { + Name: "dispatchState", + Value: "DONE", + }, + }, + }, + }, + } + + emptyResponse := SearchJobStatusResponse{} + + done := ssapireceiver.isSearchCompleted(jobResponse) + require.True(t, done) + + jobResponse.Content.Dict.Keys[0].Value = "RUNNING" + done = ssapireceiver.isSearchCompleted(jobResponse) + require.False(t, done) + + done = ssapireceiver.isSearchCompleted(emptyResponse) + require.False(t, done) +} + +func TestInitCheckpoint(t *testing.T) { + mockStorage := &mockStorage{} + searches := []Search{ + { + Query: "index=otel", + }, + { + Query: "index=otel2", + }, + { + Query: "index=otel3", + }, + { + Query: "index=otel4", + }, + { + Query: "index=otel5", + }, + } + ssapireceiver.config.Searches = searches + ssapireceiver.storageClient = mockStorage + err := ssapireceiver.initCheckpoint(context.Background()) + require.NoError(t, err) + require.Equal(t, 0, ssapireceiver.checkpointRecord.Offset) + + mockStorage.Value = []byte(`{"offset":5,"search":"index=otel3"}`) + err = ssapireceiver.initCheckpoint(context.Background()) + require.NoError(t, err) + require.Equal(t, 5, ssapireceiver.checkpointRecord.Offset) + require.Equal(t, "index=otel3", ssapireceiver.checkpointRecord.Search) +} + +func TestCheckpoint(t *testing.T) { + mockStorage := &mockStorage{} + ssapireceiver.storageClient = mockStorage + mockStorage.On("Set", mock.Anything, eventStorageKey, mock.Anything).Return(nil) + ssapireceiver.checkpointRecord = &EventRecord{ + Offset: 0, + Search: "", + } + err := ssapireceiver.checkpoint(context.Background()) + require.NoError(t, err) + mockStorage.AssertCalled(t, "Set", mock.Anything, eventStorageKey, []byte(`{"offset":0,"search":""}`)) + + ssapireceiver.checkpointRecord = &EventRecord{ + Offset: 5, + Search: "index=otel3", + } + + err = ssapireceiver.checkpoint(context.Background()) + require.NoError(t, err) + mockStorage.AssertCalled(t, "Set", mock.Anything, eventStorageKey, []byte(`{"offset":5,"search":"index=otel3"}`)) +} + +func TestLoadCheckpoint(t *testing.T) { + mockStorage := &mockStorage{} + ssapireceiver.storageClient = mockStorage + mockStorage.Value = []byte(`{"offset":5,"search":"index=otel3"}`) + err := ssapireceiver.loadCheckpoint(context.Background()) + require.NoError(t, err) + require.Equal(t, 5, ssapireceiver.checkpointRecord.Offset) + require.Equal(t, "index=otel3", ssapireceiver.checkpointRecord.Search) + + mockStorage.Value = []byte(`{"offset":10,"search":"index=otel4"}`) + err = ssapireceiver.loadCheckpoint(context.Background()) + require.NoError(t, err) + require.Equal(t, 10, ssapireceiver.checkpointRecord.Offset) + require.Equal(t, "index=otel4", ssapireceiver.checkpointRecord.Search) + + mockStorage.Value = []byte(`{}`) + err = ssapireceiver.loadCheckpoint(context.Background()) + require.NoError(t, err) +} + +type mockLogsClient struct { + mock.Mock +} + +func (m *mockLogsClient) loadTestStatusResponse(t *testing.T, file string) SearchJobStatusResponse { + logBytes, err := os.ReadFile(file) + require.NoError(t, err) + var resp SearchJobStatusResponse + err = xml.Unmarshal(logBytes, &resp) + require.NoError(t, err) + return resp +} + +func (m *mockLogsClient) GetJobStatus(searchID string) (SearchJobStatusResponse, error) { + args := m.Called(searchID) + return args.Get(0).(SearchJobStatusResponse), args.Error(1) +} + +func (m *mockLogsClient) CreateSearchJob(searchQuery string) (CreateJobResponse, error) { + args := m.Called(searchQuery) + return args.Get(0).(CreateJobResponse), args.Error(1) +} + +func (m *mockLogsClient) GetSearchResults(searchID string, offset int, batchSize int) (SearchResults, error) { + args := m.Called(searchID, offset, batchSize) + return args.Get(0).(SearchResults), args.Error(1) +} + +type mockStorage struct { + mock.Mock + Key string + Value []byte +} + +func (m *mockStorage) Get(_ context.Context, _ string) ([]byte, error) { + return []byte(m.Value), nil +} + +func (m *mockStorage) Set(ctx context.Context, key string, value []byte) error { + args := m.Called(ctx, key, value) + m.Key = key + m.Value = value + return args.Error(0) +} + +func (m *mockStorage) Batch(_ context.Context, _ ...storage.Operation) error { + return nil +} + +func (m *mockStorage) Close(_ context.Context) error { + return nil +} + +func (m *mockStorage) Delete(_ context.Context, _ string) error { + return nil +} diff --git a/receiver/splunksearchapireceiver/testdata/logs/testPollJobStatus/input-done.xml b/receiver/splunksearchapireceiver/testdata/logs/testPollJobStatus/input-done.xml new file mode 100644 index 000000000..049f0d90a --- /dev/null +++ b/receiver/splunksearchapireceiver/testdata/logs/testPollJobStatus/input-done.xml @@ -0,0 +1,212 @@ + + search index + https://localhost:8089/services/search/jobs/mysearch_02151949 + 2011-07-07T20:49:58.000-07:00 + + 2011-07-07T20:49:57.000-07:00 + + + + + + + + + admin + + + + 1969-12-31T16:00:00.000-08:00 + + 2174976 + DONE + 1.00000 + 0 + 2011-07-07T11:18:08.000-07:00 + 287 + 287 + 6 + 1 + 0 + search index + desc + 1 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + index + + 1969-12-31T16:00:00.000-08:00 + 0 + 5 + litsearch index | fields keepcolorder=t "host" "index" "linecount" "source" "sourcetype" "splunk_server" + + 287 + 1 + 287 + 1.004000 + 287 + mysearch_02151949 + 0 + 516 + + + + + 0.004 + 4 + 287 + 287 + + + + + 0.089 + 4 + 0 + 287 + + + + + 0.002 + 2 + 287 + 287 + + + + + 0.005 + 4 + + + + + 0.002 + 2 + + + + + 0.002 + 2 + 287 + 287 + + + + + 0.083 + 2 + + + + + 0.004 + 4 + 287 + 287 + + + + + 0.004 + 4 + 287 + 287 + + + + + 0.059 + 1 + + + + + 0.037 + 1 + + + + + 0.036 + 1 + + + + + 0.092 + 5 + + + + + 0.110 + 1 + + + + + 0.089 + 4 + + + + + 0.359 + 5 + + + + + + + + + + mysearch_02151949 + search index + + + + + + + + + admin + + + + + admin + + + + + admin + true + global + search + true + + + + + mbp15.splunk.com + + + + + diff --git a/receiver/splunksearchapireceiver/testdata/logs/testPollJobStatus/input-queued.xml b/receiver/splunksearchapireceiver/testdata/logs/testPollJobStatus/input-queued.xml new file mode 100644 index 000000000..17b01d572 --- /dev/null +++ b/receiver/splunksearchapireceiver/testdata/logs/testPollJobStatus/input-queued.xml @@ -0,0 +1,212 @@ + + search index + https://localhost:8089/services/search/jobs/mysearch_02151949 + 2011-07-07T20:49:58.000-07:00 + + 2011-07-07T20:49:57.000-07:00 + + + + + + + + + admin + + + + 1969-12-31T16:00:00.000-08:00 + + 2174976 + QUEUED + 1.00000 + 0 + 2011-07-07T11:18:08.000-07:00 + 287 + 287 + 6 + 1 + 0 + search index + desc + 1 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + 0 + index + + 1969-12-31T16:00:00.000-08:00 + 0 + 5 + litsearch index | fields keepcolorder=t "host" "index" "linecount" "source" "sourcetype" "splunk_server" + + 287 + 1 + 287 + 1.004000 + 287 + mysearch_02151949 + 0 + 516 + + + + + 0.004 + 4 + 287 + 287 + + + + + 0.089 + 4 + 0 + 287 + + + + + 0.002 + 2 + 287 + 287 + + + + + 0.005 + 4 + + + + + 0.002 + 2 + + + + + 0.002 + 2 + 287 + 287 + + + + + 0.083 + 2 + + + + + 0.004 + 4 + 287 + 287 + + + + + 0.004 + 4 + 287 + 287 + + + + + 0.059 + 1 + + + + + 0.037 + 1 + + + + + 0.036 + 1 + + + + + 0.092 + 5 + + + + + 0.110 + 1 + + + + + 0.089 + 4 + + + + + 0.359 + 5 + + + + + + + + + + mysearch_02151949 + search index + + + + + + + + + admin + + + + + admin + + + + + admin + true + global + search + true + + + + + mbp15.splunk.com + + + + +