Skip to content

Commit

Permalink
[exporter/doris] Second PR (with metrics Implementation) of New compo…
Browse files Browse the repository at this point in the history
…nent: Doris Exporter (open-telemetry#35338)

**Description:** <Describe what has changed.>

Second PR of New component: Doris Exporter. Implementation of metrics.

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

**Link to tracking Issue:** open-telemetry#33479 

**Testing:** 

Tested via unit test.

**Documentation:** <Describe the documentation added.>

No additional documentation.
  • Loading branch information
joker-star-l authored and sbylica-splunk committed Dec 17, 2024
1 parent 63d3b2a commit d6c44e0
Show file tree
Hide file tree
Showing 19 changed files with 1,449 additions and 7 deletions.
27 changes: 27 additions & 0 deletions .chloggen/doris-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: dorisexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: metrics implementation

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33479]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
3 changes: 3 additions & 0 deletions exporter/dorisexporter/exporter_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func streamLoadRequest(ctx context.Context, cfg *Config, table string, data []by
req.Header.Set("format", "json")
req.Header.Set("Expect", "100-continue")
req.Header.Set("strip_outer_array", "true")
if cfg.ClientConfig.Timeout != 0 {
req.Header.Set("timeout", fmt.Sprintf("%d", cfg.ClientConfig.Timeout/time.Second))
}
req.SetBasicAuth(cfg.Username, string(cfg.Password))

return req, nil
Expand Down
3 changes: 2 additions & 1 deletion exporter/dorisexporter/exporter_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ func TestPushLogData(t *testing.T) {
}()

err0 := fmt.Errorf("Not Started")
for err0 != nil { // until server started
for i := 0; err0 != nil && i < 10; i++ { // until server started
err0 = exporter.pushLogData(ctx, simpleLogs(10))
time.Sleep(100 * time.Millisecond)
}
require.NoError(t, err0)

_ = server.Shutdown(ctx)
}
Expand Down
234 changes: 234 additions & 0 deletions exporter/dorisexporter/exporter_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter"

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pmetric"
semconv "go.opentelemetry.io/collector/semconv/v1.25.0"
"go.uber.org/zap"
)

var ddls = []string{
metricsGaugeDDL,
metricsSumDDL,
metricsHistogramDDL,
metricsExponentialHistogramDDL,
metricsSummaryDDL,
}

func initMetricMap(maxLen int) map[pmetric.MetricType]metricModel {
return map[pmetric.MetricType]metricModel{
pmetric.MetricTypeGauge: &metricModelGauge{
data: make([]*dMetricGauge, 0, maxLen),
},
pmetric.MetricTypeSum: &metricModelSum{
data: make([]*dMetricSum, 0, maxLen),
},
pmetric.MetricTypeHistogram: &metricModelHistogram{
data: make([]*dMetricHistogram, 0, maxLen),
},
pmetric.MetricTypeExponentialHistogram: &metricModelExponentialHistogram{
data: make([]*dMetricExponentialHistogram, 0, maxLen),
},
pmetric.MetricTypeSummary: &metricModelSummary{
data: make([]*dMetricSummary, 0, maxLen),
},
}
}

type metricsExporter struct {
*commonExporter
}

func newMetricsExporter(logger *zap.Logger, cfg *Config, set component.TelemetrySettings) *metricsExporter {
return &metricsExporter{
commonExporter: newExporter(logger, cfg, set),
}
}

func (e *metricsExporter) start(ctx context.Context, host component.Host) error {
client, err := createDorisHTTPClient(ctx, e.cfg, host, e.TelemetrySettings)
if err != nil {
return err
}
e.client = client

if !e.cfg.CreateSchema {
return nil
}

conn, err := createDorisMySQLClient(e.cfg)
if err != nil {
return err
}
defer conn.Close()

err = createAndUseDatabase(ctx, conn, e.cfg)
if err != nil {
return err
}

for _, ddlTemplate := range ddls {
ddl := fmt.Sprintf(ddlTemplate, e.cfg.Table.Metrics, e.cfg.propertiesStr())
_, err = conn.ExecContext(ctx, ddl)
if err != nil {
return err
}
}

return nil
}

func (e *metricsExporter) shutdown(_ context.Context) error {
if e.client != nil {
e.client.CloseIdleConnections()
}
return nil
}

func (e *metricsExporter) pushMetricData(ctx context.Context, md pmetric.Metrics) error {
metricMap := initMetricMap(md.DataPointCount())

for i := 0; i < md.ResourceMetrics().Len(); i++ {
resourceMetric := md.ResourceMetrics().At(i)
resource := resourceMetric.Resource()
resourceAttributes := resource.Attributes()
serviceName := ""
v, ok := resourceAttributes.Get(semconv.AttributeServiceName)
if ok {
serviceName = v.AsString()
}

for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
scopeMetric := resourceMetric.ScopeMetrics().At(j)

for k := 0; k < scopeMetric.Metrics().Len(); k++ {
metric := scopeMetric.Metrics().At(k)

dm := &dMetric{
ServiceName: serviceName,
MetricName: metric.Name(),
MetricDescription: metric.Description(),
MetricUnit: metric.Unit(),
ResourceAttributes: resourceAttributes.AsRaw(),
ScopeName: scopeMetric.Scope().Name(),
ScopeVersion: scopeMetric.Scope().Version(),
}

metricM, ok := metricMap[metric.Type()]
if !ok {
return fmt.Errorf("invalid metric type: %v", metric.Type().String())
}

err := metricM.add(metric, dm, e)
if err != nil {
return err
}
}
}

}

return e.pushMetricDataParallel(ctx, metricMap)
}

func (e *metricsExporter) pushMetricDataParallel(ctx context.Context, metricMap map[pmetric.MetricType]metricModel) error {
errChan := make(chan error, len(metricMap))
wg := &sync.WaitGroup{}
for _, m := range metricMap {
if m.size() <= 0 {
continue
}

wg.Add(1)
go func(m metricModel, wg *sync.WaitGroup) {
errChan <- e.pushMetricDataInternal(ctx, m)
wg.Done()
}(m, wg)
}
wg.Wait()
close(errChan)
var errs error
for err := range errChan {
errs = errors.Join(errs, err)
}
return errs
}

func (e *metricsExporter) pushMetricDataInternal(ctx context.Context, metrics metricModel) error {
if metrics.size() <= 0 {
return nil
}

marshal, err := metrics.bytes()
if err != nil {
return err
}

req, err := streamLoadRequest(ctx, e.cfg, e.cfg.Table.Metrics+metrics.tableSuffix(), marshal)
if err != nil {
return err
}

res, err := e.client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return err
}

response := streamLoadResponse{}
err = json.Unmarshal(body, &response)
if err != nil {
return err
}

if !response.success() {
return fmt.Errorf("failed to push metric data: %s", response.Message)
}

return nil
}

func (e *metricsExporter) getNumberDataPointValue(dp pmetric.NumberDataPoint) float64 {
switch dp.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
return float64(dp.IntValue())
case pmetric.NumberDataPointValueTypeDouble:
return dp.DoubleValue()
case pmetric.NumberDataPointValueTypeEmpty:
e.logger.Warn("data point value type is unset, use 0.0 as default")
return 0.0
default:
e.logger.Warn("data point value type is invalid, use 0.0 as default")
return 0.0
}
}

func (e *metricsExporter) getExemplarValue(ep pmetric.Exemplar) float64 {
switch ep.ValueType() {
case pmetric.ExemplarValueTypeInt:
return float64(ep.IntValue())
case pmetric.ExemplarValueTypeDouble:
return ep.DoubleValue()
case pmetric.ExemplarValueTypeEmpty:
e.logger.Warn("exemplar value type is unset, use 0.0 as default")
return 0.0
default:
e.logger.Warn("exemplar value type is invalid, use 0.0 as default")
return 0.0
}
}
Loading

0 comments on commit d6c44e0

Please sign in to comment.