Skip to content

Commit

Permalink
Merge branch 'main' into balance-metrics-by-resources
Browse files Browse the repository at this point in the history
  • Loading branch information
SHaaD94 authored Mar 25, 2024
2 parents 9b4bea7 + 008f032 commit ca9f993
Show file tree
Hide file tree
Showing 18 changed files with 425 additions and 30 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add_clickhouse_replication.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: clickhouseexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Allow configuring `ON CLUSTER` and `ENGINE` when creating database and tables"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24649]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: "Increases table creation flexibility with the ability to add replication for fault tolerance"

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
19 changes: 18 additions & 1 deletion exporter/clickhouseexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<!-- end autogenerated section -->


This exporter supports sending OpenTelemetry data to [ClickHouse](https://clickhouse.com/).
This exporter supports sending OpenTelemetry data to [ClickHouse](https://clickhouse.com/).
> ClickHouse is an open-source, high performance columnar OLAP database management system for real-time analytics using
> SQL.
> Throughput can be measured in rows per second or megabytes per second.
Expand Down Expand Up @@ -290,6 +290,19 @@ ClickHouse tables:
- `traces_table_name` (default = otel_traces): The table name for traces.
- `metrics_table_name` (default = otel_metrics): The table name for metrics.

Cluster definition:

- `cluster_name` (default = ): Optional. If present, will include `ON CLUSTER cluster_name` when creating tables.

Table engine:

- `table_engine`
- `name` (default = MergeTree)
- `params` (default = )

Modifies `ENGINE` definition when table is created. If not set then `ENGINE` defaults to `MergeTree()`.
Can be combined with `cluster_name` to enable [replication for fault tolerance](https://clickhouse.com/docs/en/architecture/replication).

Processing:

- `timeout` (default = 5s): The timeout for every attempt to send data to the backend.
Expand Down Expand Up @@ -335,6 +348,10 @@ exporters:
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
# cluster_name: my_cluster
# table_engine:
# name: ReplicatedMergeTree
# params:
service:
pipelines:
logs:
Expand Down
36 changes: 34 additions & 2 deletions exporter/clickhouseexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Config struct {
ConnectionParams map[string]string `mapstructure:"connection_params"`
// LogsTableName is the table name for logs. default is `otel_logs`.
LogsTableName string `mapstructure:"logs_table_name"`
// TracesTableName is the table name for logs. default is `otel_traces`.
// TracesTableName is the table name for traces. default is `otel_traces`.
TracesTableName string `mapstructure:"traces_table_name"`
// MetricsTableName is the table name for metrics. default is `otel_metrics`.
MetricsTableName string `mapstructure:"metrics_table_name"`
Expand All @@ -43,17 +43,28 @@ type Config struct {
TTLDays uint `mapstructure:"ttl_days"`
// TTL is The data time-to-live example 30m, 48h. 0 means no ttl.
TTL time.Duration `mapstructure:"ttl"`
// TableEngine is the table engine to use. default is `MergeTree()`.
TableEngine TableEngine `mapstructure:"table_engine"`
// ClusterName if set will append `ON CLUSTER` with the provided name when creating tables.
ClusterName string `mapstructure:"cluster_name"`
}

// TableEngine defines the ENGINE string value when creating the table.
type TableEngine struct {
Name string `mapstructure:"name"`
Params string `mapstructure:"params"`
}

const defaultDatabase = "default"
const defaultTableEngineName = "MergeTree"

var (
errConfigNoEndpoint = errors.New("endpoint must be specified")
errConfigInvalidEndpoint = errors.New("endpoint must be url format")
errConfigTTL = errors.New("both 'ttl_days' and 'ttl' can not be provided. 'ttl_days' is deprecated, use 'ttl' instead")
)

// Validate the clickhouse server configuration.
// Validate the ClickHouse server configuration.
func (cfg *Config) Validate() (err error) {
if cfg.Endpoint == "" {
err = errors.Join(err, errConfigNoEndpoint)
Expand Down Expand Up @@ -134,5 +145,26 @@ func (cfg *Config) buildDB(database string) (*sql.DB, error) {
}

return conn, nil
}

// TableEngineString generates the ENGINE string.
func (cfg *Config) TableEngineString() string {
engine := cfg.TableEngine.Name
params := cfg.TableEngine.Params

if cfg.TableEngine.Name == "" {
engine = defaultTableEngineName
params = ""
}

return fmt.Sprintf("%s(%s)", engine, params)
}

// ClusterString generates the ON CLUSTER string. Returns empty string if not set.
func (cfg *Config) ClusterString() string {
if cfg.ClusterName == "" {
return ""
}

return fmt.Sprintf("ON CLUSTER %s", cfg.ClusterName)
}
76 changes: 76 additions & 0 deletions exporter/clickhouseexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package clickhouseexporter

import (
"fmt"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -273,3 +274,78 @@ func TestConfig_buildDSN(t *testing.T) {
})
}
}

func TestTableEngineConfigParsing(t *testing.T) {
t.Parallel()
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
expected string
}{
{
id: component.NewIDWithName(metadata.Type, "table-engine-empty"),
expected: "MergeTree()",
},
{
id: component.NewIDWithName(metadata.Type, "table-engine-name-only"),
expected: "ReplicatedReplacingMergeTree()",
},
{
id: component.NewIDWithName(metadata.Type, "table-engine-full"),
expected: "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/table_name', '{replica}', ver)",
},
{
id: component.NewIDWithName(metadata.Type, "table-engine-params-only"),
expected: "MergeTree()",
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg.(*Config).TableEngineString())
})
}
}

func TestClusterString(t *testing.T) {
t.Parallel()

tests := []struct {
input string
expected string
}{
{
input: "",
expected: "",
},
{
input: "cluster_a_b",
expected: "ON CLUSTER cluster_a_b",
},
{
input: "cluster a b",
expected: "ON CLUSTER cluster a b",
},
}

for i, tt := range tests {
t.Run(fmt.Sprintf("ClusterString case %d", i), func(t *testing.T) {
cfg := createDefaultConfig()
cfg.(*Config).Endpoint = defaultEndpoint
cfg.(*Config).ClusterName = tt.input

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg.(*Config).ClusterString())
})
}
}
8 changes: 4 additions & 4 deletions exporter/clickhouseexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func attributesToMap(attributes pcommon.Map) map[string]string {
const (
// language=ClickHouse SQL
createLogsTableSQL = `
CREATE TABLE IF NOT EXISTS %s (
CREATE TABLE IF NOT EXISTS %s %s (
Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
TraceId String CODEC(ZSTD(1)),
SpanId String CODEC(ZSTD(1)),
Expand All @@ -152,7 +152,7 @@ CREATE TABLE IF NOT EXISTS %s (
INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1
) ENGINE MergeTree()
) ENGINE = %s
%s
PARTITION BY toDate(Timestamp)
ORDER BY (ServiceName, SeverityText, toUnixTimestamp(Timestamp), TraceId)
Expand Down Expand Up @@ -218,7 +218,7 @@ func createDatabase(ctx context.Context, cfg *Config) error {
defer func() {
_ = db.Close()
}()
query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database)
query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s %s", cfg.Database, cfg.ClusterString())
_, err = db.ExecContext(ctx, query)
if err != nil {
return fmt.Errorf("create database:%w", err)
Expand All @@ -235,7 +235,7 @@ func createLogsTable(ctx context.Context, cfg *Config, db *sql.DB) error {

func renderCreateLogsTableSQL(cfg *Config) string {
ttlExpr := generateTTLExpr(cfg.TTLDays, cfg.TTL, "Timestamp")
return fmt.Sprintf(createLogsTableSQL, cfg.LogsTableName, ttlExpr)
return fmt.Sprintf(createLogsTableSQL, cfg.LogsTableName, cfg.ClusterString(), cfg.TableEngineString(), ttlExpr)
}

func renderInsertLogsSQL(cfg *Config) string {
Expand Down
15 changes: 14 additions & 1 deletion exporter/clickhouseexporter/exporter_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,20 @@ func TestExporter_pushLogsData(t *testing.T) {
})
}

// nolint:unparam // not need to check this func
func TestLogsClusterConfig(t *testing.T) {
testClusterConfig(t, func(t *testing.T, dsn string, clusterTest clusterTestConfig, fns ...func(*Config)) {
exporter := newTestLogsExporter(t, dsn, fns...)
clusterTest.verifyConfig(t, exporter.cfg)
})
}

func TestLogsTableEngineConfig(t *testing.T) {
testTableEngineConfig(t, func(t *testing.T, dsn string, engineTest tableEngineTestConfig, fns ...func(*Config)) {
exporter := newTestLogsExporter(t, dsn, fns...)
engineTest.verifyConfig(t, exporter.cfg.TableEngine)
})
}

func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsExporter {
exporter, err := newLogsExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn))
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion exporter/clickhouseexporter/exporter_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (e *metricsExporter) start(ctx context.Context, _ component.Host) error {
internal.SetLogger(e.logger)

ttlExpr := generateTTLExpr(e.cfg.TTLDays, e.cfg.TTL, "TimeUnix")
return internal.NewMetricsTable(ctx, e.cfg.MetricsTableName, ttlExpr, e.client)
return internal.NewMetricsTable(ctx, e.cfg.MetricsTableName, e.cfg.ClusterString(), e.cfg.TableEngineString(), ttlExpr, e.client)
}

// shutdown will shut down the exporter.
Expand Down
14 changes: 14 additions & 0 deletions exporter/clickhouseexporter/exporter_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ import (
"go.uber.org/zap/zaptest"
)

func TestMetricsClusterConfig(t *testing.T) {
testClusterConfig(t, func(t *testing.T, dsn string, clusterTest clusterTestConfig, fns ...func(*Config)) {
exporter := newTestMetricsExporter(t, dsn, fns...)
clusterTest.verifyConfig(t, exporter.cfg)
})
}

func TestMetricsTableEngineConfig(t *testing.T) {
testTableEngineConfig(t, func(t *testing.T, dsn string, engineTest tableEngineTestConfig, fns ...func(*Config)) {
exporter := newTestMetricsExporter(t, dsn, fns...)
engineTest.verifyConfig(t, exporter.cfg.TableEngine)
})
}

func TestExporter_pushMetricsData(t *testing.T) {
t.Parallel()
t.Run("push success", func(t *testing.T) {
Expand Down
Loading

0 comments on commit ca9f993

Please sign in to comment.