From c3d68f6a8ff1affc07d590ebc4cac6ca95768543 Mon Sep 17 00:00:00 2001 From: Jimmie Han Date: Fri, 27 Jan 2023 11:33:12 +0800 Subject: [PATCH] exporter/clickhouse: use endpoint instead of raw dsn in configuration. (#18067) --- .chloggen/clickhouseexporter-tidy-config.yaml | 16 +++++ exporter/clickhouseexporter/README.md | 17 ++--- exporter/clickhouseexporter/config.go | 54 +++++++++----- exporter/clickhouseexporter/config_test.go | 71 ++++++++++++++++++- .../example/datasource.yaml | 3 +- .../example/otel-collector-config.yml | 3 +- exporter/clickhouseexporter/exporter_logs.go | 28 ++++---- .../clickhouseexporter/exporter_logs_test.go | 32 +-------- .../exporter_metrics_test.go | 14 ++-- .../clickhouseexporter/exporter_traces.go | 3 +- .../exporter_traces_test.go | 2 +- exporter/clickhouseexporter/factory.go | 1 + exporter/clickhouseexporter/factory_test.go | 4 +- .../clickhouseexporter/testdata/config.yaml | 12 +++- internal/components/exporters_test.go | 2 +- 15 files changed, 169 insertions(+), 93 deletions(-) create mode 100644 .chloggen/clickhouseexporter-tidy-config.yaml diff --git a/.chloggen/clickhouseexporter-tidy-config.yaml b/.chloggen/clickhouseexporter-tidy-config.yaml new file mode 100644 index 000000000000..57ef912bab87 --- /dev/null +++ b/.chloggen/clickhouseexporter-tidy-config.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: clickhouseexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: use endpoint instead of raw dsn in configuration. + +# One or more tracking issues related to the change +issues: [8028] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: \ No newline at end of file diff --git a/exporter/clickhouseexporter/README.md b/exporter/clickhouseexporter/README.md index 5bfe4c55fb07..e6113f7cf07d 100644 --- a/exporter/clickhouseexporter/README.md +++ b/exporter/clickhouseexporter/README.md @@ -261,17 +261,17 @@ around 40k/s logs entry per CPU cores, add more collector node can increase line The following settings are required: -- `dsn` (no default): The ClickHouse server DSN (Data Source Name), for - example `tcp://user:password@127.0.0.1:9000/default` - For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn). - For http protocol - reference: [ClickHouse/clickhouse-go#http-support-experimental](https://github.com/ClickHouse/clickhouse-go/tree/main#http-support-experimental) - . +- `endpoint` (no default): The ClickHouse server endpoint, support multi host, for example: + tcp protocol `tcp://ip1:port,ip2:port` + http protocol `http://ip:port,ip2:port` The following settings can be optionally configured: -- `ttl_days` (default= 0): The data time-to-live in days, 0 means no ttl. +- `username` (default = ): The authentication username. +- `password` (default = ): The authentication password. +- `ttl_days` (default = 0): The data time-to-live in days, 0 means no ttl. - `database` (default = otel): The database name. +- `connection_params` (default = {}). Params is the extra connection parameters with map format. for example compression/dial_timeout. - `logs_table_name` (default = otel_logs): The table name for logs. - `traces_table_name` (default = otel_traces): The table name for traces. - `metrics_table_name` (default = otel_metrics): The table name for metrics. @@ -297,7 +297,8 @@ processors: send_batch_size: 100000 exporters: clickhouse: - dsn: tcp://127.0.0.1:9000/otel + endpoint: tcp://127.0.0.1:9000 + database: otel ttl_days: 3 logs_table: otel_logs traces_table: otel_traces diff --git a/exporter/clickhouseexporter/config.go b/exporter/clickhouseexporter/config.go index 8e0a01de8af8..99cfbd98bfa7 100644 --- a/exporter/clickhouseexporter/config.go +++ b/exporter/clickhouseexporter/config.go @@ -16,9 +16,7 @@ package clickhouseexporter // import "github.com/open-telemetry/opentelemetry-co import ( "errors" - "fmt" "net/url" - "strings" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/multierr" @@ -32,10 +30,18 @@ type Config struct { // because only QueueSize is user-settable. QueueSettings QueueSettings `mapstructure:"sending_queue"` - // DSN is the ClickHouse server Data Source Name. - // For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn). - // For http protocol reference: [mailru/go-clickhouse/#dsn](https://github.com/mailru/go-clickhouse/#dsn). - DSN string `mapstructure:"dsn"` + // Endpoint is the clickhouse server endpoint. + // TCP endpoint: tcp://ip1:port,ip2:port + // HTTP endpoint: http://ip:port,ip2:port + Endpoint string `mapstructure:"endpoint"` + // Username is the authentication username. + Username string `mapstructure:"username"` + // Username is the authentication password. + Password string `mapstructure:"password"` + // Database is the database name to export. + Database string `mapstructure:"database"` + // ConnectionParams is the extra connection parameters with map format. for example compression/dial_timeout + ConnectionParams map[string]string `mapstructure:"connection_params"` // LogsTableName is the table name for logs. default is `otel_logs`. LogsTableName string `mapstructure:"logs_table_name"` // TracesTableName is the table name for logs. default is `otel_traces`. @@ -53,31 +59,24 @@ type QueueSettings struct { } var ( - errConfigNoDSN = errors.New("dsn must be specified") + errConfigNoEndpoint = errors.New("endpoint must be specified") + errConfigInvalidEndpoint = errors.New("endpoint must be url format") ) // Validate validates the clickhouse server configuration. func (cfg *Config) Validate() (err error) { - if cfg.DSN == "" { - err = multierr.Append(err, errConfigNoDSN) + if cfg.Endpoint == "" { + err = multierr.Append(err, errConfigNoEndpoint) } - _, e := parseDSNDatabase(cfg.DSN) + _, e := cfg.buildDSN(cfg.Database) if e != nil { - err = multierr.Append(err, fmt.Errorf("invalid dsn format:%w", err)) + err = multierr.Append(err, e) } return err } const defaultDatabase = "default" -func parseDSNDatabase(dsn string) (string, error) { - u, err := url.Parse(dsn) - if err != nil { - return defaultDatabase, err - } - return strings.TrimPrefix(u.Path, "/"), nil -} - func (cfg *Config) enforcedQueueSettings() exporterhelper.QueueSettings { return exporterhelper.QueueSettings{ Enabled: true, @@ -85,3 +84,20 @@ func (cfg *Config) enforcedQueueSettings() exporterhelper.QueueSettings { QueueSize: cfg.QueueSettings.QueueSize, } } + +func (cfg *Config) buildDSN(database string) (string, error) { + dsn, err := url.Parse(cfg.Endpoint) + if err != nil { + return "", errConfigInvalidEndpoint + } + if cfg.Username != "" { + dsn.User = url.UserPassword(cfg.Username, cfg.Password) + } + dsn.Path = "/" + database + params := url.Values{} + for k, v := range cfg.ConnectionParams { + params.Set(k, v) + } + dsn.RawQuery = params.Encode() + return dsn.String(), nil +} diff --git a/exporter/clickhouseexporter/config_test.go b/exporter/clickhouseexporter/config_test.go index 94e6bae80335..8942435bc90e 100644 --- a/exporter/clickhouseexporter/config_test.go +++ b/exporter/clickhouseexporter/config_test.go @@ -26,7 +26,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper" ) -const defaultDSN = "tcp://127.0.0.1:9000/otel" +const defaultEndpoint = "tcp://127.0.0.1:9000" func TestLoadConfig(t *testing.T) { t.Parallel() @@ -35,7 +35,7 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) defaultCfg := createDefaultConfig() - defaultCfg.(*Config).DSN = defaultDSN + defaultCfg.(*Config).Endpoint = defaultEndpoint tests := []struct { id component.ID @@ -49,7 +49,14 @@ func TestLoadConfig(t *testing.T) { { id: component.NewIDWithName(typeStr, "full"), expected: &Config{ - DSN: defaultDSN, + Endpoint: defaultEndpoint, + Database: "otel", + Username: "foo", + Password: "bar", + ConnectionParams: map[string]string{ + "compression": "zstd", + "dial_timeout": "5s", + }, TTLDays: 3, LogsTableName: "otel_logs", TracesTableName: "otel_traces", @@ -92,3 +99,61 @@ func withDefaultConfig(fns ...func(*Config)) *Config { } return cfg } + +func TestConfig_buildDSN(t *testing.T) { + type fields struct { + Endpoint string + Username string + Password string + Database string + Params map[string]string + } + type args struct { + database string + } + tests := []struct { + name string + fields fields + args args + want string + wantErr error + }{ + { + name: "valid config", + fields: fields{ + Endpoint: defaultEndpoint, + Username: "foo", + Password: "bar", + Database: "otel", + Params: map[string]string{ + "compression": "zstd", + }, + }, + args: args{ + database: defaultDatabase, + }, + want: "tcp://foo:bar@127.0.0.1:9000/default?compression=zstd", + }, + { + name: "invalid config", + fields: fields{ + Endpoint: "127.0.0.1:9000", + }, + wantErr: errConfigInvalidEndpoint, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &Config{ + Endpoint: tt.fields.Endpoint, + Username: tt.fields.Username, + Password: tt.fields.Password, + Database: tt.fields.Database, + ConnectionParams: tt.fields.Params, + } + got, err := cfg.buildDSN(tt.args.database) + assert.Equalf(t, tt.wantErr, err, "buildDSN(%v)", tt.args.database) + assert.Equalf(t, tt.want, got, "buildDSN(%v)", tt.args.database) + }) + } +} diff --git a/exporter/clickhouseexporter/example/datasource.yaml b/exporter/clickhouseexporter/example/datasource.yaml index 44410655f87e..af798699aba0 100644 --- a/exporter/clickhouseexporter/example/datasource.yaml +++ b/exporter/clickhouseexporter/example/datasource.yaml @@ -8,9 +8,10 @@ datasources: defaultDatabase: otel port: 9000 server: clickhouse + protocol: native username: tlsSkipVerify: true - secureJsonData: + secureJsonData: password: - name: ClickHouse-vertamedia type: vertamedia-clickhouse-datasource diff --git a/exporter/clickhouseexporter/example/otel-collector-config.yml b/exporter/clickhouseexporter/example/otel-collector-config.yml index d7c086ffa097..5bf5e2856084 100644 --- a/exporter/clickhouseexporter/example/otel-collector-config.yml +++ b/exporter/clickhouseexporter/example/otel-collector-config.yml @@ -24,7 +24,8 @@ processors: action: upsert exporters: clickhouse: - dsn: tcp://clickhouse:9000/otel + endpoint: tcp://clickhouse:9000 + database: otel logs_table_name: otel_logs traces_table_name: otel_traces ttl_days: 3 diff --git a/exporter/clickhouseexporter/exporter_logs.go b/exporter/clickhouseexporter/exporter_logs.go index 741d2c8c0e69..3cb6fea48b88 100644 --- a/exporter/clickhouseexporter/exporter_logs.go +++ b/exporter/clickhouseexporter/exporter_logs.go @@ -18,7 +18,6 @@ import ( "context" "database/sql" "fmt" - "strings" "time" _ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver. @@ -183,16 +182,23 @@ var driverName = "clickhouse" // for testing // newClickhouseClient create a clickhouse client. func newClickhouseClient(cfg *Config) (*sql.DB, error) { - return sql.Open(driverName, cfg.DSN) + dsn, err := cfg.buildDSN(cfg.Database) + if err != nil { + return nil, err + } + db, err := sql.Open(driverName, dsn) + if err != nil { + return nil, err + } + return db, nil } func createDatabase(cfg *Config) error { - database, _ := parseDSNDatabase(cfg.DSN) - if database == defaultDatabase { + if cfg.Database == defaultDatabase { return nil } // use default database to create new database - dsnUseDefaultDatabase, err := getDefaultDSN(cfg.DSN, database) + dsnUseDefaultDatabase, err := cfg.buildDSN(defaultDatabase) if err != nil { return err } @@ -203,7 +209,7 @@ func createDatabase(cfg *Config) error { defer func() { _ = db.Close() }() - query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", database) + query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database) _, err = db.Exec(query) if err != nil { return fmt.Errorf("create database:%w", err) @@ -211,16 +217,6 @@ func createDatabase(cfg *Config) error { return nil } -func getDefaultDSN(dsn string, database string) (string, error) { - if strings.LastIndex(dsn, database) == -1 { - return "", fmt.Errorf("database not present in dsn") - } - if dsn[strings.LastIndex(dsn, database):] == defaultDatabase { - return dsn, nil - } - return fmt.Sprintf("%s%s", dsn[0:strings.LastIndex(dsn, database)], defaultDatabase), nil -} - func createLogsTable(cfg *Config, db *sql.DB) error { if _, err := db.Exec(renderCreateLogsTableSQL(cfg)); err != nil { return fmt.Errorf("exec create logs table sql: %w", err) diff --git a/exporter/clickhouseexporter/exporter_logs_test.go b/exporter/clickhouseexporter/exporter_logs_test.go index fd1699ae2780..900faa9c28a0 100644 --- a/exporter/clickhouseexporter/exporter_logs_test.go +++ b/exporter/clickhouseexporter/exporter_logs_test.go @@ -93,7 +93,7 @@ func TestExporter_pushLogsData(t *testing.T) { return nil }) - exporter := newTestLogsExporter(t, defaultDSN) + exporter := newTestLogsExporter(t, defaultEndpoint) mustPushLogsData(t, exporter, simpleLogs(1)) mustPushLogsData(t, exporter, simpleLogs(2)) @@ -101,32 +101,6 @@ func TestExporter_pushLogsData(t *testing.T) { }) } -func TestLogsExporter_getDefaultDns(t *testing.T) { - t.Run("database name is a substring of the DSN", func(t *testing.T) { - dsn := "tcp://mydatabase-clickhouse-headless:9000/mydatabase" - defaultDSN, err := getDefaultDSN(dsn, "mydatabase") - require.NoError(t, err) - require.Equal(t, defaultDSN, "tcp://mydatabase-clickhouse-headless:9000/default") - }) - t.Run("database name isn't a substring of the DSN", func(t *testing.T) { - dsn := "tcp://newdatabase-clickhouse-headless:9000/otel" - defaultDSN, err := getDefaultDSN(dsn, "otel") - require.NoError(t, err) - require.Equal(t, defaultDSN, "tcp://newdatabase-clickhouse-headless:9000/default") - }) - t.Run("error param for database", func(t *testing.T) { - dsn := "tcp://mydatabase-clickhouse-headless:9000/mydatabase" - _, err := getDefaultDSN(dsn, "otel") - require.Error(t, err) - }) - t.Run("database name is same as default database", func(t *testing.T) { - dsn := "tcp://mydatabase-clickhouse-headless:9000/default" - defaultDSN, err := getDefaultDSN(dsn, "default") - require.NoError(t, err) - require.Equal(t, defaultDSN, "tcp://mydatabase-clickhouse-headless:9000/default") - }) -} - func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsExporter { exporter, err := newLogsExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn)) require.NoError(t, err) @@ -136,10 +110,10 @@ func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsEx } func withTestExporterConfig(fns ...func(*Config)) func(string) *Config { - return func(dsn string) *Config { + return func(endpoint string) *Config { var configMods []func(*Config) configMods = append(configMods, func(cfg *Config) { - cfg.DSN = dsn + cfg.Endpoint = endpoint }) configMods = append(configMods, fns...) return withDefaultConfig(configMods...) diff --git a/exporter/clickhouseexporter/exporter_metrics_test.go b/exporter/clickhouseexporter/exporter_metrics_test.go index 482bbcee215d..845226026a9a 100644 --- a/exporter/clickhouseexporter/exporter_metrics_test.go +++ b/exporter/clickhouseexporter/exporter_metrics_test.go @@ -42,7 +42,7 @@ func TestExporter_pushMetricsData(t *testing.T) { } return nil }) - exporter := newTestMetricsExporter(t, defaultDSN) + exporter := newTestMetricsExporter(t) mustPushMetricsData(t, exporter, simpleMetrics(1)) }) t.Run("push failure", func(t *testing.T) { @@ -52,9 +52,7 @@ func TestExporter_pushMetricsData(t *testing.T) { } return nil }) - exporter := newTestMetricsExporter(t, "tcp://127.0.0.1:9000/db", func(config *Config) { - config.DSN = "tcp://127.0.0.1:9000/db" - }) + exporter := newTestMetricsExporter(t) err := exporter.pushMetricsData(context.TODO(), simpleMetrics(2)) require.Error(t, err) }) @@ -76,7 +74,7 @@ func TestExporter_pushMetricsData(t *testing.T) { } return nil }) - exporter := newTestMetricsExporter(t, defaultDSN) + exporter := newTestMetricsExporter(t) mustPushMetricsData(t, exporter, simpleMetrics(1)) require.Equal(t, 5, items) @@ -85,7 +83,7 @@ func TestExporter_pushMetricsData(t *testing.T) { func Benchmark_pushMetricsData(b *testing.B) { pm := simpleMetrics(1) - exporter := newTestMetricsExporter(&testing.T{}, defaultDSN) + exporter := newTestMetricsExporter(&testing.T{}) b.ReportAllocs() b.ResetTimer() for n := 0; n < b.N; n++ { @@ -326,8 +324,8 @@ func mustPushMetricsData(t *testing.T, exporter *metricsExporter, md pmetric.Met require.NoError(t, err) } -func newTestMetricsExporter(t *testing.T, dsn string, fns ...func(*Config)) *metricsExporter { - exporter, err := newMetricsExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn)) +func newTestMetricsExporter(t *testing.T) *metricsExporter { + exporter, err := newMetricsExporter(zaptest.NewLogger(t), withTestExporterConfig()(defaultEndpoint)) require.NoError(t, err) t.Cleanup(func() { _ = exporter.shutdown(context.TODO()) }) diff --git a/exporter/clickhouseexporter/exporter_traces.go b/exporter/clickhouseexporter/exporter_traces.go index 3de10c31e75b..95919fdd6762 100644 --- a/exporter/clickhouseexporter/exporter_traces.go +++ b/exporter/clickhouseexporter/exporter_traces.go @@ -308,7 +308,6 @@ func renderCreateTraceIDTsTableSQL(cfg *Config) string { } func renderTraceIDTsMaterializedViewSQL(cfg *Config) string { - database, _ := parseDSNDatabase(cfg.DSN) return fmt.Sprintf(createTraceIDTsMaterializedViewSQL, cfg.TracesTableName, - database, cfg.TracesTableName, database, cfg.TracesTableName) + cfg.Database, cfg.TracesTableName, cfg.Database, cfg.TracesTableName) } diff --git a/exporter/clickhouseexporter/exporter_traces_test.go b/exporter/clickhouseexporter/exporter_traces_test.go index 7f303d735921..2d3f7ef5ca35 100644 --- a/exporter/clickhouseexporter/exporter_traces_test.go +++ b/exporter/clickhouseexporter/exporter_traces_test.go @@ -39,7 +39,7 @@ func TestExporter_pushTracesData(t *testing.T) { return nil }) - exporter := newTestTracesExporter(t, defaultDSN) + exporter := newTestTracesExporter(t, defaultEndpoint) mustPushTracesData(t, exporter, simpleTraces(1)) mustPushTracesData(t, exporter, simpleTraces(2)) diff --git a/exporter/clickhouseexporter/factory.go b/exporter/clickhouseexporter/factory.go index 14779949628c..e1729fdfbc4b 100644 --- a/exporter/clickhouseexporter/factory.go +++ b/exporter/clickhouseexporter/factory.go @@ -46,6 +46,7 @@ func createDefaultConfig() component.Config { TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), QueueSettings: QueueSettings{QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize}, RetrySettings: exporterhelper.NewDefaultRetrySettings(), + Database: defaultDatabase, LogsTableName: "otel_logs", TracesTableName: "otel_traces", MetricsTableName: "otel_metrics", diff --git a/exporter/clickhouseexporter/factory_test.go b/exporter/clickhouseexporter/factory_test.go index 18ea6f60b9b7..22bced3a2c3d 100644 --- a/exporter/clickhouseexporter/factory_test.go +++ b/exporter/clickhouseexporter/factory_test.go @@ -34,7 +34,7 @@ func TestCreateDefaultConfig(t *testing.T) { func TestFactory_CreateLogsExporter(t *testing.T) { factory := NewFactory() cfg := withDefaultConfig(func(cfg *Config) { - cfg.DSN = defaultDSN + cfg.Endpoint = defaultEndpoint }) params := exportertest.NewNopCreateSettings() exporter, err := factory.CreateLogsExporter(context.Background(), params, cfg) @@ -47,7 +47,7 @@ func TestFactory_CreateLogsExporter(t *testing.T) { func TestFactory_CreateTracesExporter(t *testing.T) { factory := NewFactory() cfg := withDefaultConfig(func(cfg *Config) { - cfg.DSN = defaultDSN + cfg.Endpoint = defaultEndpoint }) params := exportertest.NewNopCreateSettings() exporter, err := factory.CreateTracesExporter(context.Background(), params, cfg) diff --git a/exporter/clickhouseexporter/testdata/config.yaml b/exporter/clickhouseexporter/testdata/config.yaml index 6b47e66c65e6..b02323a561e8 100644 --- a/exporter/clickhouseexporter/testdata/config.yaml +++ b/exporter/clickhouseexporter/testdata/config.yaml @@ -1,7 +1,13 @@ clickhouse: - dsn: tcp://127.0.0.1:9000/otel + endpoint: tcp://127.0.0.1:9000 clickhouse/full: - dsn: tcp://127.0.0.1:9000/otel + endpoint: tcp://127.0.0.1:9000 + username: foo + password: bar + database: otel + connection_params: + compression: zstd + dial_timeout: 5s ttl_days: 3 logs_table_name: otel_logs traces_table_name: otel_traces @@ -13,3 +19,5 @@ clickhouse/full: max_elapsed_time: 300s sending_queue: queue_size: 100 +clickhouse/invalid-endpoint: + endpoint: 127.0.0.1:9000 \ No newline at end of file diff --git a/internal/components/exporters_test.go b/internal/components/exporters_test.go index 62083f848e0d..e5c3a2436ba1 100644 --- a/internal/components/exporters_test.go +++ b/internal/components/exporters_test.go @@ -291,7 +291,7 @@ func TestDefaultExporters(t *testing.T) { exporter: "clickhouse", getConfigFn: func() component.Config { cfg := expFactories["clickhouse"].CreateDefaultConfig().(*clickhouseexporter.Config) - cfg.DSN = "clickhouse://" + endpoint + cfg.Endpoint = "tcp://" + endpoint return cfg }, },