diff --git a/.chloggen/cloudfoundryreceiver_add_logs_support.yaml b/.chloggen/cloudfoundryreceiver_add_logs_support.yaml new file mode 100644 index 000000000000..736ec244e465 --- /dev/null +++ b/.chloggen/cloudfoundryreceiver_add_logs_support.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cloudfoundryreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support to receive CloudFoundry Logs + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32671] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/cloudfoundryreceiver/README.md b/receiver/cloudfoundryreceiver/README.md index 2d2336d782b6..de29f57c024c 100644 --- a/receiver/cloudfoundryreceiver/README.md +++ b/receiver/cloudfoundryreceiver/README.md @@ -3,12 +3,14 @@ | Status | | | ------------- |-----------| -| Stability | [beta]: metrics | +| Stability | [development]: logs | +| | [beta]: metrics | | Distributions | [contrib] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fcloudfoundry%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fcloudfoundry) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fcloudfoundry%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fcloudfoundry) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@crobert-1](https://www.github.com/crobert-1) \| Seeking more code owners! | | Emeritus | [@agoallikmaa](https://www.github.com/agoallikmaa), [@pellared](https://www.github.com/pellared) | +[development]: https://github.com/open-telemetry/opentelemetry-collector#development [beta]: https://github.com/open-telemetry/opentelemetry-collector#beta [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib @@ -42,7 +44,7 @@ The receiver takes the following configuration options: | --- | --- | --- | | `rlp_gateway.endpoint` | required | URL of the RLP gateway, typically `https://log-stream.` | | `rlp_gateway.tls.insecure_skip_verify` | `false` | whether to skip TLS verify for the RLP gateway endpoint | -| `rlp_gateway.shard_id` | `opentelemetry` | metrics are load balanced among receivers that use the same shard ID, therefore this must only be set if there are multiple receivers which must both receive all the metrics instead of them being balanced between them | +| `rlp_gateway.shard_id` | `opentelemetry` | metrics or logs are load balanced among receivers that use the same shard ID, therefore this must only be set if there are multiple receivers which must both receive all the metrics instead of them being balanced between them. This string will be a prefix used to build a different ShardID for each envelope type; for logs the final ShardID will have the `_logs` suffix, for metrics will be `_metrics` | | `uaa.endpoint` | required | URL of the UAA provider, typically `https://uaa.` | | `uaa.tls.insecure_skip_verify` | `false` | whether to skip TLS verify for the UAA endpoint | | `uaa.username` | required | name of the UAA user (required grant types/authorities described above) | @@ -73,18 +75,11 @@ receivers: The full list of settings exposed for this receiver are documented [here](./config.go) with detailed sample configurations [here](./testdata/config.yaml). -## Metrics - -Reported metrics are grouped under an instrumentation library named `otelcol/cloudfoundry`. Metric names are as -specified by [Cloud Foundry metrics documentation](https://docs.cloudfoundry.org/running/all_metrics.html), but the -origin name is prepended to the metric name with `.` separator. All metrics either of type `Gauge` or `Sum`. - -### Attributes +## Telemetry common Attributes -All the metrics have the following attributes: +The receiver maps the envelope attribute tags to the following OpenTelemetry attributes: * `origin` - origin name as documented by Cloud Foundry -* `source` - for applications, the GUID of the application, otherwise equal to `origin` For Cloud Foundry/Tanzu Application Service deployed in BOSH, the following attributes are also present, using their canonical BOSH meanings: @@ -94,21 +89,47 @@ canonical BOSH meanings: * `ip` - BOSH instance IP * `job` - BOSH job name -For metrics originating with `rep` origin name (specific to applications), the following metrics are present: +On TAS/PCF versions 2.8.0+ and cf-deployment versions v11.1.0+, the following additional attributes are present for application metrics: `app_id`, `app_name`, `space_id`, `space_name`, `organization_id`, `organization_name` which provide the GUID and name of application, space and organization respectively. -* `instance_id` - numerical index of the application instance. However, also present for `bbs` origin, where it matches - the value of `index` -* `process_id` - process ID (GUID). For a process of type "web" which is the main process of an application, this is - equal to `source_id` and `app_id` +This might not be a comprehensive list of attributes, as the receiver passes on whatever attributes the gateway +provides, which may include some that are specific to TAS and possibly new ones in future Cloud Foundry versions as +well. + +## Metrics + +Reported metrics are grouped under an instrumentation library named `otelcol/cloudfoundry`. Metric names are as +specified by [Cloud Foundry metrics documentation](https://docs.cloudfoundry.org/running/all_metrics.html), but the +origin name is prepended to the metric name with `.` separator. All metrics either of type `Gauge` or `Sum`. + +### Attributes + +The receiver maps the envelope attribute to the following OpenTelemetry attributes: + +* `source_id` - for applications, the GUID of the application, otherwise equal to `origin` + +For metrics originating with `rep` origin name (specific to applications), the following attributes are present: + +* `instance_id` - numerical index of the application instance. However, also present for `bbs` origin, where it matches the value of `index` +* `process_id` - process ID (GUID). For a process of type "web" which is the main process of an application, this is equal to `source_id` and `app_id` * `process_instance_id` - unique ID of a process instance, should be treated as an opaque string * `process_type` - process type. Each application has exactly one process of type `web`, but many have any number of other processes -On TAS/PCF versions 2.8.0+ and cf-deployment versions v11.1.0+, the following additional attributes are present for -application metrics: `app_id`, `app_name`, `space_id`, `space_name`, `organization_id`, `organization_name` which -provide the GUID and name of application, space and organization respectively. -This might not be a comprehensive list of attributes, as the receiver passes on whatever attributes the gateway -provides, which may include some that are specific to TAS and possibly new ones in future Cloud Foundry versions as -well. +## Logs +The receiver maps loggregator envelopes of these types to the following OpenTelemetry log severity text and severity number: +* type `OUT` becomes `info` and severity number `9` +* type `ERR` becomes `error` and severity number `17` +* If any other log types are received, they're discarded and result in an error log message in the collector. + +### Attributes + +The receiver maps the envelope attribute tags to the following OpenTelemetry attributes: + +* `source_id` - for applications, the GUID of the application, otherwise the GUID of the log generator +* `source_type` - The source of the log, any subset of `{API|APP|CELL|HEALTH|LGR|RTR|SSH|STG}`, for `APP` type extra labels are separated by a dash, example: `APP/PROC/WEB` +* `instance_id` - numerical index of the origin. If origin is `rep` (`source_type` is `APP`) this is the application index. However, for other cases this is the instance index. +* `process_id` - process ID (GUID) +* `process_instance_id` - unique ID of a process instance, should be treated as an opaque string +* `process_type` - process type. Each application has exactly one process of type `web`, but many have any number of other processes \ No newline at end of file diff --git a/receiver/cloudfoundryreceiver/config.go b/receiver/cloudfoundryreceiver/config.go index f871288aac18..ed38825c2ad5 100644 --- a/receiver/cloudfoundryreceiver/config.go +++ b/receiver/cloudfoundryreceiver/config.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "net/url" + "strings" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" @@ -47,6 +48,10 @@ func (c *Config) Validate() error { return err } + if strings.TrimSpace(c.RLPGateway.ShardID) == "" { + return errors.New("shardID cannot be empty") + } + err = validateURLOption("uaa.endpoint", c.UAA.Endpoint) if err != nil { return err diff --git a/receiver/cloudfoundryreceiver/config_test.go b/receiver/cloudfoundryreceiver/config_test.go index 54b3741e6311..326446267303 100644 --- a/receiver/cloudfoundryreceiver/config_test.go +++ b/receiver/cloudfoundryreceiver/config_test.go @@ -63,6 +63,31 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "invalid"), errorMessage: "failed to parse rlp_gateway.endpoint as url: parse \"https://[invalid\": missing ']' in host", }, + { + id: component.NewIDWithName(metadata.Type, "shardidnotdefined"), + expected: &Config{ + RLPGateway: RLPGatewayConfig{ + ClientConfig: confighttp.ClientConfig{ + Endpoint: "https://log-stream.sys.example.internal", + TLSSetting: configtls.ClientConfig{ + InsecureSkipVerify: true, + }, + Timeout: time.Second * 20, + }, + ShardID: "opentelemetry", + }, + UAA: UAAConfig{ + LimitedClientConfig: LimitedClientConfig{ + Endpoint: "https://uaa.sys.example.internal", + TLSSetting: LimitedTLSClientSetting{ + InsecureSkipVerify: true, + }, + }, + Username: "admin", + Password: "test", + }, + }, + }, } for _, tt := range tests { t.Run(tt.id.String(), func(t *testing.T) { @@ -96,6 +121,10 @@ func TestInvalidConfigValidation(t *testing.T) { configuration.UAA.Password = "" require.Error(t, configuration.Validate()) + configuration = loadSuccessfulConfig(t) + configuration.RLPGateway.ShardID = "" + require.Error(t, configuration.Validate()) + configuration = loadSuccessfulConfig(t) configuration.UAA.Endpoint = "https://[invalid" require.Error(t, configuration.Validate()) diff --git a/receiver/cloudfoundryreceiver/converter.go b/receiver/cloudfoundryreceiver/converter.go index 53917c0ffec4..b03cac337df9 100644 --- a/receiver/cloudfoundryreceiver/converter.go +++ b/receiver/cloudfoundryreceiver/converter.go @@ -4,10 +4,12 @@ package cloudfoundryreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/cloudfoundryreceiver" import ( + "fmt" "time" "code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -19,7 +21,6 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme namePrefix := envelope.Tags["origin"] + "." switch message := envelope.Message.(type) { - case *loggregator_v2.Envelope_Log: case *loggregator_v2.Envelope_Counter: metric := metricSlice.AppendEmpty() metric.SetName(namePrefix + message.Counter.GetName()) @@ -41,15 +42,34 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme } } +func convertEnvelopeToLogs(envelope *loggregator_v2.Envelope, logSlice plog.LogRecordSlice, startTime time.Time) error { + log := logSlice.AppendEmpty() + log.SetTimestamp(pcommon.Timestamp(envelope.GetTimestamp())) + log.SetObservedTimestamp(pcommon.NewTimestampFromTime(startTime)) + logLine := string(envelope.GetLog().GetPayload()) + log.Body().SetStr(logLine) + //exhaustive:enforce + switch envelope.GetLog().GetType() { + case loggregator_v2.Log_OUT: + log.SetSeverityText(plog.SeverityNumberInfo.String()) + log.SetSeverityNumber(plog.SeverityNumberInfo) + case loggregator_v2.Log_ERR: + log.SetSeverityText(plog.SeverityNumberError.String()) + log.SetSeverityNumber(plog.SeverityNumberError) + default: + return fmt.Errorf("unsupported envelope log type: %s", envelope.GetLog().GetType()) + } + copyEnvelopeAttributes(log.Attributes(), envelope) + return nil +} + func copyEnvelopeAttributes(attributes pcommon.Map, envelope *loggregator_v2.Envelope) { for key, value := range envelope.Tags { attributes.PutStr(attributeNamePrefix+key, value) } - if envelope.SourceId != "" { attributes.PutStr(attributeNamePrefix+"source_id", envelope.SourceId) } - if envelope.InstanceId != "" { attributes.PutStr(attributeNamePrefix+"instance_id", envelope.InstanceId) } diff --git a/receiver/cloudfoundryreceiver/converter_test.go b/receiver/cloudfoundryreceiver/converter_test.go index 6cc3d51c88f1..53b3452b3d8b 100644 --- a/receiver/cloudfoundryreceiver/converter_test.go +++ b/receiver/cloudfoundryreceiver/converter_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -53,14 +54,14 @@ func TestConvertCountEnvelope(t *testing.T) { assert.Equal(t, pcommon.NewTimestampFromTime(before), dataPoint.StartTimestamp()) assert.Equal(t, 10.0, dataPoint.DoubleValue()) - assertAttributes(t, dataPoint.Attributes(), map[string]string{ + assertAttributes(t, map[string]string{ "org.cloudfoundry.source_id": "uaa", "org.cloudfoundry.origin": "gorouter", "org.cloudfoundry.deployment": "cf", "org.cloudfoundry.job": "router", "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", "org.cloudfoundry.ip": "10.244.0.34", - }) + }, dataPoint.Attributes()) } func TestConvertGaugeEnvelope(t *testing.T) { @@ -129,7 +130,7 @@ func TestConvertGaugeEnvelope(t *testing.T) { assert.Equal(t, pcommon.NewTimestampFromTime(now), dataPoint.Timestamp()) assert.Equal(t, pcommon.NewTimestampFromTime(before), dataPoint.StartTimestamp()) assert.Equal(t, 17046641.0, dataPoint.DoubleValue()) - assertAttributes(t, dataPoint.Attributes(), expectedAttributes) + assertAttributes(t, expectedAttributes, dataPoint.Attributes()) metric = metricSlice.At(1 - memoryMetricPosition) assert.Equal(t, "rep.disk", metric.Name()) @@ -139,10 +140,96 @@ func TestConvertGaugeEnvelope(t *testing.T) { assert.Equal(t, pcommon.NewTimestampFromTime(now), dataPoint.Timestamp()) assert.Equal(t, pcommon.NewTimestampFromTime(before), dataPoint.StartTimestamp()) assert.Equal(t, 10231808.0, dataPoint.DoubleValue()) - assertAttributes(t, dataPoint.Attributes(), expectedAttributes) + assertAttributes(t, expectedAttributes, dataPoint.Attributes()) } -func assertAttributes(t *testing.T, attributes pcommon.Map, expected map[string]string) { +func TestConvertLogsEnvelope(t *testing.T) { + now := time.Now() + before := time.Now().Add(-time.Second) + t.Parallel() + tests := []struct { + id string + envelope loggregator_v2.Envelope + expected map[string]any + }{ + { + id: "normal-without-sourcetype-tag", + envelope: loggregator_v2.Envelope{ + Timestamp: before.UnixNano(), + SourceId: "744e75bb-69d1-4cf4-b037-76875368097b", + Tags: map[string]string{}, + Message: &loggregator_v2.Envelope_Log{ + Log: &loggregator_v2.Log{ + Payload: []byte(`test-app. Says Hello. on index: 0`), + Type: loggregator_v2.Log_OUT, + }, + }, + }, + expected: map[string]any{ + "Timestamp": before, + "Attributes": map[string]string{ + "org.cloudfoundry.source_id": "744e75bb-69d1-4cf4-b037-76875368097b", + }, + "Body": `test-app. Says Hello. on index: 0`, + "SeverityNumber": plog.SeverityNumberInfo, + "SeverityText": plog.SeverityNumberInfo.String(), + }, + }, + { + id: "json-log-with-sourcetype-error", + envelope: loggregator_v2.Envelope{ + Timestamp: before.UnixNano(), + SourceId: "df75aec8-b937-4dc8-9b4d-c336e36e3895", + Tags: map[string]string{ + "source_type": "APP/PROC/WEB", + "origin": "rep", + "deployment": "cf", + "job": "diego-cell", + "index": "bc276108-8282-48a5-bae7-c009c4392246", + "ip": "10.80.0.2", + }, + Message: &loggregator_v2.Envelope_Log{ + Log: &loggregator_v2.Log{ + Payload: []byte(`{"timestamp":"2024-05-29T16:16:28.063062903Z","level":"info","source":"guardian","message":"guardian.api.garden-server.get-properties.got-properties","data":{"handle":"e885e8be-c6a7-43b1-5066-a821","session":"2.1.209666"}}`), + Type: loggregator_v2.Log_ERR, + }, + }, + }, + expected: map[string]any{ + "Timestamp": before, + "Attributes": map[string]string{ + "org.cloudfoundry.source_id": "df75aec8-b937-4dc8-9b4d-c336e36e3895", + "org.cloudfoundry.source_type": "APP/PROC/WEB", + "org.cloudfoundry.origin": "rep", + "org.cloudfoundry.deployment": "cf", + "org.cloudfoundry.job": "diego-cell", + "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", + "org.cloudfoundry.ip": "10.80.0.2", + }, + "Body": `{"timestamp":"2024-05-29T16:16:28.063062903Z","level":"info","source":"guardian","message":"guardian.api.garden-server.get-properties.got-properties","data":{"handle":"e885e8be-c6a7-43b1-5066-a821","session":"2.1.209666"}}`, + "SeverityNumber": plog.SeverityNumberError, + "SeverityText": plog.SeverityNumberError.String(), + }, + }, + } + for i := range tests { + tt := tests[i] + t.Run(tt.id, func(t *testing.T) { + logSlice := plog.NewLogRecordSlice() + e := convertEnvelopeToLogs(&tt.envelope, logSlice, now) + require.Equal(t, nil, e) + require.Equal(t, 1, logSlice.Len()) + log := logSlice.At(0) + assert.Equal(t, tt.expected["Body"], log.Body().AsString()) + assert.Equal(t, tt.expected["SeverityText"], log.SeverityText()) + assert.Equal(t, pcommon.NewTimestampFromTime(tt.expected["Timestamp"].(time.Time)), log.Timestamp()) + assert.Equal(t, pcommon.NewTimestampFromTime(now), log.ObservedTimestamp()) + assertAttributes(t, tt.expected["Attributes"].(map[string]string), log.Attributes()) + }) + } +} + +func assertAttributes(t *testing.T, expected map[string]string, attributes pcommon.Map) { assert.Equal(t, len(expected), attributes.Len()) for key, expectedValue := range expected { diff --git a/receiver/cloudfoundryreceiver/doc.go b/receiver/cloudfoundryreceiver/doc.go index e4869b736cf1..6c9f6fc62a78 100644 --- a/receiver/cloudfoundryreceiver/doc.go +++ b/receiver/cloudfoundryreceiver/doc.go @@ -4,7 +4,7 @@ //go:generate mdatagen metadata.yaml // Package cloudfoundryreceiver implements a receiver that can be used by the -// Opentelemetry collector to receive Cloud Foundry metrics via its Reverse +// OpenTelemetry collector to receive Cloud Foundry metrics and logs via its Reverse // Log Proxy (RLP) Gateway component. The protocol is handled by the // go-loggregator library, which uses HTTP to connect to the gateway and receive // JSON-protobuf encoded v2 Envelope messages as documented by loggregator-api. diff --git a/receiver/cloudfoundryreceiver/factory.go b/receiver/cloudfoundryreceiver/factory.go index 9be0e5f6d8b6..5af72b622c5b 100644 --- a/receiver/cloudfoundryreceiver/factory.go +++ b/receiver/cloudfoundryreceiver/factory.go @@ -28,7 +28,8 @@ func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, createDefaultConfig, - receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability)) + receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability), + receiver.WithLogs(createLogsReceiver, metadata.LogsStability)) } func createDefaultConfig() component.Config { @@ -61,5 +62,15 @@ func createMetricsReceiver( nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { c := cfg.(*Config) - return newCloudFoundryReceiver(params, *c, nextConsumer) + return newCloudFoundryMetricsReceiver(params, *c, nextConsumer) +} + +func createLogsReceiver( + _ context.Context, + params receiver.Settings, + cfg component.Config, + nextConsumer consumer.Logs, +) (receiver.Logs, error) { + c := cfg.(*Config) + return newCloudFoundryLogsReceiver(params, *c, nextConsumer) } diff --git a/receiver/cloudfoundryreceiver/factory_test.go b/receiver/cloudfoundryreceiver/factory_test.go index ea6c6b918398..2d14d99c7b63 100644 --- a/receiver/cloudfoundryreceiver/factory_test.go +++ b/receiver/cloudfoundryreceiver/factory_test.go @@ -21,12 +21,22 @@ func TestCreateDefaultConfig(t *testing.T) { assert.NoError(t, componenttest.CheckConfigStruct(cfg)) } -func TestCreateReceiver(t *testing.T) { +func TestCreateMetricsReceiver(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() params := receivertest.NewNopSettings() tReceiver, err := factory.CreateMetricsReceiver(context.Background(), params, cfg, consumertest.NewNop()) assert.NoError(t, err) - assert.NotNil(t, tReceiver, "receiver creation failed") + assert.NotNil(t, tReceiver, "metrics receiver creation failed") +} + +func TestCreateLogsReceiver(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + params := receivertest.NewNopSettings() + tReceiver, err := factory.CreateLogsReceiver(context.Background(), params, cfg, consumertest.NewNop()) + assert.NoError(t, err) + assert.NotNil(t, tReceiver, "logs receiver creation failed") } diff --git a/receiver/cloudfoundryreceiver/generated_component_test.go b/receiver/cloudfoundryreceiver/generated_component_test.go index daa9624f7379..3c909184dc93 100644 --- a/receiver/cloudfoundryreceiver/generated_component_test.go +++ b/receiver/cloudfoundryreceiver/generated_component_test.go @@ -31,6 +31,13 @@ func TestComponentLifecycle(t *testing.T) { createFn func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) }{ + { + name: "logs", + createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateLogsReceiver(ctx, set, cfg, consumertest.NewNop()) + }, + }, + { name: "metrics", createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { diff --git a/receiver/cloudfoundryreceiver/internal/metadata/generated_status.go b/receiver/cloudfoundryreceiver/internal/metadata/generated_status.go index a4f057498366..5a2901c8158b 100644 --- a/receiver/cloudfoundryreceiver/internal/metadata/generated_status.go +++ b/receiver/cloudfoundryreceiver/internal/metadata/generated_status.go @@ -11,5 +11,6 @@ var ( ) const ( + LogsStability = component.StabilityLevelDevelopment MetricsStability = component.StabilityLevelBeta ) diff --git a/receiver/cloudfoundryreceiver/metadata.yaml b/receiver/cloudfoundryreceiver/metadata.yaml index 9406c6140ae3..bb69b2d0e930 100644 --- a/receiver/cloudfoundryreceiver/metadata.yaml +++ b/receiver/cloudfoundryreceiver/metadata.yaml @@ -5,6 +5,7 @@ status: class: receiver stability: beta: [metrics] + development: [logs] distributions: [contrib] codeowners: active: [crobert-1] diff --git a/receiver/cloudfoundryreceiver/receiver.go b/receiver/cloudfoundryreceiver/receiver.go index 33be6ad8a312..0ef8e1fe85a9 100644 --- a/receiver/cloudfoundryreceiver/receiver.go +++ b/receiver/cloudfoundryreceiver/receiver.go @@ -13,6 +13,7 @@ import ( "code.cloudfoundry.org/go-loggregator" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" @@ -25,23 +26,25 @@ const ( ) var _ receiver.Metrics = (*cloudFoundryReceiver)(nil) +var _ receiver.Logs = (*cloudFoundryReceiver)(nil) -// newCloudFoundryReceiver implements the receiver.Metrics for Cloud Foundry protocol. +// newCloudFoundryReceiver implements the receiver.Metrics and receiver.Logs for the Cloud Foundry protocol. type cloudFoundryReceiver struct { settings component.TelemetrySettings cancel context.CancelFunc config Config - nextConsumer consumer.Metrics + nextMetrics consumer.Metrics + nextLogs consumer.Logs obsrecv *receiverhelper.ObsReport goroutines sync.WaitGroup receiverStartTime time.Time } -// newCloudFoundryReceiver creates the Cloud Foundry receiver with the given parameters. -func newCloudFoundryReceiver( +// newCloudFoundryMetricsReceiver creates the Cloud Foundry receiver with the given parameters. +func newCloudFoundryMetricsReceiver( settings receiver.Settings, config Config, - nextConsumer consumer.Metrics) (receiver.Metrics, error) { + nextConsumer consumer.Metrics) (*cloudFoundryReceiver, error) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: settings.ID, @@ -51,22 +54,50 @@ func newCloudFoundryReceiver( if err != nil { return nil, err } + result := &cloudFoundryReceiver{ + settings: settings.TelemetrySettings, + config: config, + nextMetrics: nextConsumer, + obsrecv: obsrecv, + receiverStartTime: time.Now(), + } + return result, nil +} - return &cloudFoundryReceiver{ +// newCloudFoundryLogsReceiver creates the Cloud Foundry logs receiver with the given parameters. +func newCloudFoundryLogsReceiver( + settings receiver.Settings, + config Config, + nextConsumer consumer.Logs) (*cloudFoundryReceiver, error) { + + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: settings.ID, + Transport: transport, + ReceiverCreateSettings: settings, + }) + if err != nil { + return nil, err + } + result := &cloudFoundryReceiver{ settings: settings.TelemetrySettings, config: config, - nextConsumer: nextConsumer, + nextLogs: nextConsumer, obsrecv: obsrecv, receiverStartTime: time.Now(), - }, nil + } + return result, nil } func (cfr *cloudFoundryReceiver) Start(ctx context.Context, host component.Host) error { - tokenProvider, tokenErr := newUAATokenProvider(cfr.settings.Logger, cfr.config.UAA.LimitedClientConfig, cfr.config.UAA.Username, string(cfr.config.UAA.Password)) + tokenProvider, tokenErr := newUAATokenProvider( + cfr.settings.Logger, + cfr.config.UAA.LimitedClientConfig, + cfr.config.UAA.Username, + string(cfr.config.UAA.Password), + ) if tokenErr != nil { - return fmt.Errorf("create cloud foundry UAA token provider: %w", tokenErr) + return fmt.Errorf("cloudfoundry receiver failed to create UAA token provider: %w", tokenErr) } - streamFactory, streamErr := newEnvelopeStreamFactory( ctx, cfr.settings, @@ -75,34 +106,32 @@ func (cfr *cloudFoundryReceiver) Start(ctx context.Context, host component.Host) host, ) if streamErr != nil { - return fmt.Errorf("creating cloud foundry RLP envelope stream factory: %w", streamErr) + return fmt.Errorf("cloudfoundry receiver failed to create RLP envelope stream factory: %w", streamErr) } innerCtx, cancel := context.WithCancel(ctx) cfr.cancel = cancel - cfr.goroutines.Add(1) go func() { defer cfr.goroutines.Done() - cfr.settings.Logger.Debug("cloud foundry receiver starting") - + cfr.settings.Logger.Debug("cloudfoundry receiver starting") _, tokenErr = tokenProvider.ProvideToken() if tokenErr != nil { - cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("cloud foundry receiver failed to fetch initial token from UAA: %w", tokenErr))) + cfr.settings.ReportStatus( + component.NewFatalErrorEvent( + fmt.Errorf("cloudfoundry receiver failed to fetch initial token from UAA: %w", tokenErr), + ), + ) return } - - envelopeStream, err := streamFactory.CreateStream(innerCtx, cfr.config.RLPGateway.ShardID) - if err != nil { - cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("creating RLP gateway envelope stream: %w", err))) - return + if cfr.nextLogs != nil { + cfr.streamLogs(innerCtx, streamFactory.CreateLogsStream(innerCtx, cfr.config.RLPGateway.ShardID)) + } else if cfr.nextMetrics != nil { + cfr.streamMetrics(innerCtx, streamFactory.CreateMetricsStream(innerCtx, cfr.config.RLPGateway.ShardID)) } - - cfr.streamMetrics(innerCtx, envelopeStream) - cfr.settings.Logger.Debug("cloudfoundry metrics streamer stopped") + cfr.settings.Logger.Debug("cloudfoundry receiver stopped") }() - return nil } @@ -125,15 +154,16 @@ func (cfr *cloudFoundryReceiver) streamMetrics( if envelopes == nil { // If context has not been cancelled, then nil means the shutdown was due to an error within stream if ctx.Err() == nil { - cfr.settings.ReportStatus(component.NewFatalErrorEvent(errors.New("RLP gateway streamer shut down due to an error"))) + cfr.settings.ReportStatus( + component.NewFatalErrorEvent( + errors.New("RLP gateway metrics streamer shut down due to an error"), + ), + ) } - break } - metrics := pmetric.NewMetrics() libraryMetrics := createLibraryMetricsSlice(metrics) - for _, envelope := range envelopes { if envelope != nil { // There is no concept of startTime in CF loggregator, and we do not know the uptime of the component @@ -141,21 +171,56 @@ func (cfr *cloudFoundryReceiver) streamMetrics( convertEnvelopeToMetrics(envelope, libraryMetrics, cfr.receiverStartTime) } } - if libraryMetrics.Len() > 0 { obsCtx := cfr.obsrecv.StartMetricsOp(ctx) - err := cfr.nextConsumer.ConsumeMetrics(ctx, metrics) + err := cfr.nextMetrics.ConsumeMetrics(ctx, metrics) cfr.obsrecv.EndMetricsOp(obsCtx, dataFormat, metrics.DataPointCount(), err) } } } +func (cfr *cloudFoundryReceiver) streamLogs( + ctx context.Context, + stream loggregator.EnvelopeStream) { + + for { + envelopes := stream() + if envelopes == nil { + if ctx.Err() == nil { + cfr.settings.ReportStatus( + component.NewFatalErrorEvent( + errors.New("RLP gateway log streamer shut down due to an error"), + ), + ) + } + break + } + logs := plog.NewLogs() + libraryLogs := createLibraryLogsSlice(logs) + observedTime := time.Now() + for _, envelope := range envelopes { + if envelope != nil { + _ = convertEnvelopeToLogs(envelope, libraryLogs, observedTime) + } + } + if libraryLogs.Len() > 0 { + obsCtx := cfr.obsrecv.StartLogsOp(ctx) + err := cfr.nextLogs.ConsumeLogs(ctx, logs) + cfr.obsrecv.EndLogsOp(obsCtx, dataFormat, logs.LogRecordCount(), err) + } + } +} + func createLibraryMetricsSlice(metrics pmetric.Metrics) pmetric.MetricSlice { - resourceMetrics := metrics.ResourceMetrics() - resourceMetric := resourceMetrics.AppendEmpty() - resourceMetric.Resource().Attributes() - libraryMetricsSlice := resourceMetric.ScopeMetrics() - libraryMetrics := libraryMetricsSlice.AppendEmpty() + resourceMetric := metrics.ResourceMetrics().AppendEmpty() + libraryMetrics := resourceMetric.ScopeMetrics().AppendEmpty() libraryMetrics.Scope().SetName(instrumentationLibName) return libraryMetrics.Metrics() } + +func createLibraryLogsSlice(logs plog.Logs) plog.LogRecordSlice { + resourceLog := logs.ResourceLogs().AppendEmpty() + libraryLogs := resourceLog.ScopeLogs().AppendEmpty() + libraryLogs.Scope().SetName(instrumentationLibName) + return libraryLogs.LogRecords() +} diff --git a/receiver/cloudfoundryreceiver/receiver_test.go b/receiver/cloudfoundryreceiver/receiver_test.go index 81fa7b09e55f..eec1da92e50c 100644 --- a/receiver/cloudfoundryreceiver/receiver_test.go +++ b/receiver/cloudfoundryreceiver/receiver_test.go @@ -13,13 +13,38 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" ) -// Test to make sure a new receiver can be created properly, started and shutdown with the default config -func TestDefaultValidReceiver(t *testing.T) { +// Test to make sure a new metrics receiver can be created properly, started and shutdown with the default config +func TestDefaultValidMetricsReceiver(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) params := receivertest.NewNopSettings() - receiver, err := newCloudFoundryReceiver( + receiver, err := newCloudFoundryMetricsReceiver( + params, + *cfg, + consumertest.NewNop(), + ) + + require.NoError(t, err) + require.NotNil(t, receiver, "receiver creation failed") + + // Test start + ctx := context.Background() + err = receiver.Start(ctx, componenttest.NewNopHost()) + require.NoError(t, err) + + // Test shutdown + err = receiver.Shutdown(ctx) + require.NoError(t, err) +} + +// Test to make sure a new logs receiver can be created properly, started and shutdown with the default config +func TestDefaultValidLogsReceiver(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + params := receivertest.NewNopSettings() + + receiver, err := newCloudFoundryLogsReceiver( params, *cfg, consumertest.NewNop(), diff --git a/receiver/cloudfoundryreceiver/stream.go b/receiver/cloudfoundryreceiver/stream.go index fc86e0569459..027639a7c065 100644 --- a/receiver/cloudfoundryreceiver/stream.go +++ b/receiver/cloudfoundryreceiver/stream.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "net/http" - "strings" "code.cloudfoundry.org/go-loggregator" "code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2" @@ -45,31 +44,41 @@ func newEnvelopeStreamFactory( return &EnvelopeStreamFactory{gatewayClient}, nil } -func (rgc *EnvelopeStreamFactory) CreateStream( - ctx context.Context, - shardID string) (loggregator.EnvelopeStream, error) { - - if strings.TrimSpace(shardID) == "" { - return nil, errors.New("shardID cannot be empty") - } - - stream := rgc.rlpGatewayClient.Stream(ctx, &loggregator_v2.EgressBatchRequest{ - ShardId: shardID, - Selectors: []*loggregator_v2.Selector{ - { - Message: &loggregator_v2.Selector_Counter{ - Counter: &loggregator_v2.CounterSelector{}, - }, +func (rgc *EnvelopeStreamFactory) CreateMetricsStream(ctx context.Context, baseShardID string) loggregator.EnvelopeStream { + newShardID := baseShardID + "_metrics" + selectors := []*loggregator_v2.Selector{ + { + Message: &loggregator_v2.Selector_Counter{ + Counter: &loggregator_v2.CounterSelector{}, }, - { - Message: &loggregator_v2.Selector_Gauge{ - Gauge: &loggregator_v2.GaugeSelector{}, - }, + }, + { + Message: &loggregator_v2.Selector_Gauge{ + Gauge: &loggregator_v2.GaugeSelector{}, }, }, + } + stream := rgc.rlpGatewayClient.Stream(ctx, &loggregator_v2.EgressBatchRequest{ + ShardId: newShardID, + Selectors: selectors, }) + return stream +} - return stream, nil +func (rgc *EnvelopeStreamFactory) CreateLogsStream(ctx context.Context, baseShardID string) loggregator.EnvelopeStream { + newShardID := baseShardID + "_logs" + selectors := []*loggregator_v2.Selector{ + { + Message: &loggregator_v2.Selector_Log{ + Log: &loggregator_v2.LogSelector{}, + }, + }, + } + stream := rgc.rlpGatewayClient.Stream(ctx, &loggregator_v2.EgressBatchRequest{ + ShardId: newShardID, + Selectors: selectors, + }) + return stream } type authorizationProvider struct { diff --git a/receiver/cloudfoundryreceiver/stream_test.go b/receiver/cloudfoundryreceiver/stream_test.go index 95dac0f6c445..2d2b6c11b3d9 100644 --- a/receiver/cloudfoundryreceiver/stream_test.go +++ b/receiver/cloudfoundryreceiver/stream_test.go @@ -13,7 +13,7 @@ import ( ) // Ensure stream create works as expected -func TestValidStream(t *testing.T) { +func TestValidMetricsStream(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) @@ -38,19 +38,15 @@ func TestValidStream(t *testing.T) { innerCtx, cancel := context.WithCancel(context.Background()) - envelopeStream, createErr := streamFactory.CreateStream( - innerCtx, - cfg.RLPGateway.ShardID) + envelopeStream := streamFactory.CreateMetricsStream(innerCtx, cfg.RLPGateway.ShardID) - require.NoError(t, createErr) require.NotNil(t, envelopeStream) cancel() } -// Ensure stream create fails when it should -func TestInvalidStream(t *testing.T) { - +// Ensure stream create works as expected +func TestValidLogsStream(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) @@ -63,7 +59,6 @@ func TestInvalidStream(t *testing.T) { require.NoError(t, err) require.NotNil(t, uaa) - // Stream create should fail if given empty shard ID streamFactory, streamErr := newEnvelopeStreamFactory( context.Background(), componenttest.NewNopTelemetrySettings(), @@ -76,13 +71,9 @@ func TestInvalidStream(t *testing.T) { innerCtx, cancel := context.WithCancel(context.Background()) - invalidShardID := "" - envelopeStream, createErr := streamFactory.CreateStream( - innerCtx, - invalidShardID) + envelopeStream := streamFactory.CreateLogsStream(innerCtx, cfg.RLPGateway.ShardID) - require.EqualError(t, createErr, "shardID cannot be empty") - require.Nil(t, envelopeStream) + require.NotNil(t, envelopeStream) cancel() } diff --git a/receiver/cloudfoundryreceiver/testdata/config.yaml b/receiver/cloudfoundryreceiver/testdata/config.yaml index 1bcbcbe4a8be..7b3a7cff8560 100644 --- a/receiver/cloudfoundryreceiver/testdata/config.yaml +++ b/receiver/cloudfoundryreceiver/testdata/config.yaml @@ -31,3 +31,16 @@ cloudfoundry/invalid: password: "test" tls: insecure_skip_verify: true + +cloudfoundry/shardidnotdefined: + rlp_gateway: + endpoint: "https://log-stream.sys.example.internal" + timeout: "20s" + tls: + insecure_skip_verify: true + uaa: + endpoint: "https://uaa.sys.example.internal" + username: "admin" + password: "test" + tls: + insecure_skip_verify: true