Skip to content

Commit

Permalink
Merge pull request #15 from StackVista/STAC-20729-added-support-for-r…
Browse files Browse the repository at this point in the history
…eplicated-tables

STAC-20729 added support for replicated tables
  • Loading branch information
LukaszMarchewka authored Oct 8, 2024
2 parents 572199a + 96a35aa commit a57125d
Show file tree
Hide file tree
Showing 24 changed files with 723 additions and 146 deletions.
17 changes: 17 additions & 0 deletions exporter/clickhousestsexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,19 @@ ClickHouse tables:
- `metrics_table_name` (default = otel_metrics): The table name for metrics.
- `create_traces_table` (default = true): Create the traces table on startup

Cluster definition:

- `cluster_name` (default = ): Optional. If present, will include `ON CLUSTER cluster_name` when creating tables.

Table engine:

- `table_engine`
- `name` (default = MergeTree)
- `params` (default = )

Modifies `ENGINE` definition when table is created. If not set then `ENGINE` defaults to `MergeTree()`.
Can be combined with `cluster_name` to enable [replication for fault tolerance](https://clickhouse.com/docs/en/architecture/replication).

Processing:

- `timeout` (default = 5s): The timeout for every attempt to send data to the backend.
Expand Down Expand Up @@ -337,6 +350,10 @@ exporters:
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
# cluster_name: my_cluster
# table_engine:
# name: ReplicatedMergeTree
# params:
service:
pipelines:
logs:
Expand Down
54 changes: 51 additions & 3 deletions exporter/clickhousestsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ type Config struct {
Endpoint string `mapstructure:"endpoint"`
// Username is the authentication username.
Username string `mapstructure:"username"`
// Username is the authentication password.
// Password is the authentication password.
Password configopaque.String `mapstructure:"password"`
// Database is the database name to export.
Database string `mapstructure:"database"`
// ConnectionParams is the extra connection parameters with map format. for example compression/dial_timeout
ConnectionParams map[string]string `mapstructure:"connection_params"`
// LogsTableName is the table name for logs. default is `otel_logs`.
LogsTableName string `mapstructure:"logs_table_name"`
// TracesTableName is the table name for logs. default is `otel_traces`.
// TracesTableName is the table name for traces. default is `otel_traces`.
TracesTableName string `mapstructure:"traces_table_name"`
// MetricsTableName is the table name for metrics. default is `otel_metrics`.
MetricsTableName string `mapstructure:"metrics_table_name"`
Expand All @@ -45,21 +45,35 @@ type Config struct {
TTLDays uint `mapstructure:"ttl_days"`
// TTL is The data time-to-live example 30m, 48h. 0 means no ttl.
TTL time.Duration `mapstructure:"ttl"`
// TableEngine is the table engine to use. default is `MergeTree()`.
TableEngine TableEngine `mapstructure:"table_engine"`
// DeduplicatingTableEngine is the table engine to use that it removes duplicates entries with the same sorting key . default is `ReplacingMergeTree()`.
DeduplicatingTableEngine TableEngine `mapstructure:"deduplicating_table_engine"`
// ClusterName if set will append `ON CLUSTER` with the provided name when creating tables.
ClusterName string `mapstructure:"cluster_name"`
// Create the traces table on startup
CreateTracesTable bool `mapstructure:"create_traces_table"`
// Create the resources table on startup
CreateResourcesTable bool `mapstructure:"create_resources_table"`
}

// TableEngine defines the ENGINE string value when creating the table.
type TableEngine struct {
Name string `mapstructure:"name"`
Params string `mapstructure:"params"`
}

const defaultDatabase = "default"
const defaultTableEngineName = "MergeTree"
const defaultDeduplicatingTableEngineName = "ReplacingMergeTree"

var (
errConfigNoEndpoint = errors.New("endpoint must be specified")
errConfigInvalidEndpoint = errors.New("endpoint must be url format")
errConfigTTL = errors.New("both 'ttl_days' and 'ttl' can not be provided. 'ttl_days' is deprecated, use 'ttl' instead")
)

// Validate the clickhouse server configuration.
// Validate the ClickHouse server configuration.
func (cfg *Config) Validate() (err error) {
if cfg.Endpoint == "" {
err = errors.Join(err, errConfigNoEndpoint)
Expand Down Expand Up @@ -140,5 +154,39 @@ func (cfg *Config) buildDB(database string) (*sql.DB, error) {
}

return conn, nil
}

// TableEngineString generates the ENGINE string.
func (cfg *Config) TableEngineString() string {
engine := cfg.TableEngine.Name
params := cfg.TableEngine.Params

if cfg.TableEngine.Name == "" {
engine = defaultTableEngineName
params = ""
}

return fmt.Sprintf("%s(%s)", engine, params)
}

// DeduplicatingTableEngineString generates the ENGINE string that it removes duplicates entries with the same sorting key
func (cfg *Config) DeduplicatingTableEngineString() string {
engine := cfg.DeduplicatingTableEngine.Name
params := cfg.DeduplicatingTableEngine.Params

if cfg.DeduplicatingTableEngine.Name == "" {
engine = defaultDeduplicatingTableEngineName
params = ""
}

return fmt.Sprintf("%s(%s)", engine, params)
}

// ClusterString generates the ON CLUSTER string. Returns empty string if not set.
func (cfg *Config) ClusterString() string {
if cfg.ClusterName == "" {
return ""
}

return fmt.Sprintf("ON CLUSTER %s", cfg.ClusterName)
}
78 changes: 78 additions & 0 deletions exporter/clickhousestsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package clickhousestsexporter

import (
"fmt"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -73,6 +74,8 @@ func TestLoadConfig(t *testing.T) {
QueueSize: 100,
StorageID: &storageID,
},
CreateResourcesTable: true,
CreateTracesTable: true,
},
},
}
Expand Down Expand Up @@ -274,3 +277,78 @@ func TestConfig_buildDSN(t *testing.T) {
})
}
}

func TestTableEngineConfigParsing(t *testing.T) {
t.Parallel()
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
expected string
}{
{
id: component.NewIDWithName(metadata.Type, "table-engine-empty"),
expected: "MergeTree()",
},
{
id: component.NewIDWithName(metadata.Type, "table-engine-name-only"),
expected: "ReplicatedReplacingMergeTree()",
},
{
id: component.NewIDWithName(metadata.Type, "table-engine-full"),
expected: "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/table_name', '{replica}', ver)",
},
{
id: component.NewIDWithName(metadata.Type, "table-engine-params-only"),
expected: "MergeTree()",
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg.(*Config).TableEngineString())
})
}
}

func TestClusterString(t *testing.T) {
t.Parallel()

tests := []struct {
input string
expected string
}{
{
input: "",
expected: "",
},
{
input: "cluster_a_b",
expected: "ON CLUSTER cluster_a_b",
},
{
input: "cluster a b",
expected: "ON CLUSTER cluster a b",
},
}

for i, tt := range tests {
t.Run(fmt.Sprintf("ClusterString case %d", i), func(t *testing.T) {
cfg := createDefaultConfig()
cfg.(*Config).Endpoint = defaultEndpoint
cfg.(*Config).ClusterName = tt.input

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg.(*Config).ClusterString())
})
}
}
10 changes: 5 additions & 5 deletions exporter/clickhousestsexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"context"
"database/sql"
"fmt"
"github.com/stackvista/sts-opentelemetry-collector/exporter/clickhousestsexporter/internal"
"time"

_ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
"github.com/stackvista/sts-opentelemetry-collector/exporter/clickhousestsexporter/internal"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -127,7 +127,7 @@ func attributesToMap(attributes pcommon.Map) map[string]string {
const (
// language=ClickHouse SQL
createLogsTableSQL = `
CREATE TABLE IF NOT EXISTS %s (
CREATE TABLE IF NOT EXISTS %s %s (
Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
TraceId String CODEC(ZSTD(1)),
SpanId String CODEC(ZSTD(1)),
Expand All @@ -151,7 +151,7 @@ CREATE TABLE IF NOT EXISTS %s (
INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1
) ENGINE MergeTree()
) ENGINE = %s
%s
PARTITION BY toDate(Timestamp)
ORDER BY (ServiceName, SeverityText, toUnixTimestamp(Timestamp), TraceId)
Expand Down Expand Up @@ -217,7 +217,7 @@ func createDatabase(ctx context.Context, cfg *Config) error {
defer func() {
_ = db.Close()
}()
query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database)
query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s %s", cfg.Database, cfg.ClusterString())
_, err = db.ExecContext(ctx, query)
if err != nil {
return fmt.Errorf("create database:%w", err)
Expand All @@ -234,7 +234,7 @@ func createLogsTable(ctx context.Context, cfg *Config, db *sql.DB) error {

func renderCreateLogsTableSQL(cfg *Config) string {
ttlExpr := internal.GenerateTTLExpr(cfg.TTLDays, cfg.TTL, "Timestamp")
return fmt.Sprintf(createLogsTableSQL, cfg.LogsTableName, ttlExpr)
return fmt.Sprintf(createLogsTableSQL, cfg.LogsTableName, cfg.ClusterString(), cfg.TableEngineString(), ttlExpr)
}

func renderInsertLogsSQL(cfg *Config) string {
Expand Down
28 changes: 25 additions & 3 deletions exporter/clickhousestsexporter/exporter_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestLogsExporter_New(t *testing.T) {
}

failWithMsg := func(msg string) validate {
return func(t *testing.T, exporter *logsExporter, err error) {
return func(t *testing.T, _ *logsExporter, err error) {
require.Error(t, err)
require.Contains(t, err.Error(), msg)
}
Expand Down Expand Up @@ -121,6 +121,20 @@ func TestExporter_pushLogsData(t *testing.T) {
})
}

func TestLogsClusterConfig(t *testing.T) {
testClusterConfig(t, func(t *testing.T, dsn string, clusterTest clusterTestConfig, fns ...func(*Config)) {
exporter := newTestLogsExporter(t, dsn, fns...)
clusterTest.verifyConfig(t, exporter.cfg)
})
}

func TestLogsTableEngineConfig(t *testing.T) {
testTableEngineConfig(t, func(t *testing.T, dsn string, engineTest tableEngineTestConfig, fns ...func(*Config)) {
exporter := newTestLogsExporter(t, dsn, fns...)
engineTest.verifyConfig(t, exporter.cfg.TableEngine)
})
}

func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsExporter {
exporter, err := newLogsExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn))
require.NoError(t, err)
Expand Down Expand Up @@ -151,10 +165,18 @@ func simpleLogs(count int) plog.Logs {
sl.Scope().SetName("io.opentelemetry.contrib.clickhouse")
sl.Scope().SetVersion("1.0.0")
sl.Scope().Attributes().PutStr("lib", "clickhouse")
timestamp := time.Unix(1703498029, 0)
for i := 0; i < count; i++ {
r := sl.LogRecords().AppendEmpty()
r.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
r.Attributes().PutStr(conventions.AttributeServiceName, "v")
r.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
r.SetObservedTimestamp(pcommon.NewTimestampFromTime(timestamp))
r.SetSeverityNumber(plog.SeverityNumberError2)
r.SetSeverityText("error")
r.Body().SetStr("error message")
r.Attributes().PutStr(conventions.AttributeServiceNamespace, "default")
r.SetFlags(plog.DefaultLogRecordFlags)
r.SetTraceID([16]byte{1, 2, 3, byte(i)})
r.SetSpanID([8]byte{1, 2, 3, byte(i)})
}
return logs
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/clickhousestsexporter/exporter_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (e *metricsExporter) start(ctx context.Context, _ component.Host) error {
internal.SetLogger(e.logger)

ttlExpr := internal.GenerateTTLExpr(e.cfg.TTLDays, e.cfg.TTL, "TimeUnix")
return internal.NewMetricsTable(ctx, e.cfg.MetricsTableName, ttlExpr, e.client)
return internal.NewMetricsTable(ctx, e.cfg.MetricsTableName, e.cfg.ClusterString(), e.cfg.TableEngineString(), ttlExpr, e.client)
}

// shutdown will shut down the exporter.
Expand Down
Loading

0 comments on commit a57125d

Please sign in to comment.