diff --git a/README.md b/README.md index 2f4ae2b3d..ec1c9974c 100644 --- a/README.md +++ b/README.md @@ -250,12 +250,12 @@ the [Processors documentation](https://conduit.io/docs/using/processors/getting- Conduit exposes a gRPC API and an HTTP API. The gRPC API is by default running on port 8084. You can define a custom address -using the CLI flag `-grpc.address`. To learn more about the gRPC API please have +using the CLI flag `-api.grpc.address`. To learn more about the gRPC API please have a look at the [protobuf file](https://github.com/ConduitIO/conduit/blob/main/proto/api/v1/api.proto). The HTTP API is by default running on port 8080. You can define a custom address -using the CLI flag `-http.address`. It is generated +using the CLI flag `-api.http.address`. It is generated using [gRPC gateway](https://github.com/grpc-ecosystem/grpc-gateway) and is thus providing the same functionality as the gRPC API. To learn more about the HTTP API please have a look at the [API documentation](https://www.conduit.io/api), diff --git a/cmd/conduit/cecdysis/decorators.go b/cmd/conduit/cecdysis/decorators.go index 31a0e5853..fc090a097 100644 --- a/cmd/conduit/cecdysis/decorators.go +++ b/cmd/conduit/cecdysis/decorators.go @@ -83,7 +83,7 @@ func getGRPCAddress(cmd *cobra.Command) (string, error) { path, err = cmd.Flags().GetString("config.path") if err != nil || path == "" { - path = conduit.DefaultConfig().ConduitCfgPath + path = conduit.DefaultConfig().ConduitCfg.Path } var usrCfg conduit.Config diff --git a/cmd/conduit/root/config/config.go b/cmd/conduit/root/config/config.go index 52314259e..f7e131e62 100644 --- a/cmd/conduit/root/config/config.go +++ b/cmd/conduit/root/config/config.go @@ -50,7 +50,7 @@ the set environment variables, and the flags used. This command will show the co } } -func printStruct(v reflect.Value, parentPath string) { +func printStruct(ctx context.Context, v reflect.Value, parentPath string) { if v.Kind() == reflect.Ptr { v = v.Elem() } @@ -68,14 +68,18 @@ func printStruct(v reflect.Value, parentPath string) { if fieldValue.Kind() == reflect.Struct || (fieldValue.Kind() == reflect.Ptr && !fieldValue.IsNil() && fieldValue.Elem().Kind() == reflect.Struct) { - printStruct(fieldValue, fullPath) + printStruct(ctx, fieldValue, fullPath) continue } if longName != "" { value := fmt.Sprintf("%v", fieldValue.Interface()) if value != "" { - fmt.Printf("%s: %s\n", fullPath, value) + cobraCmd := ecdysis.CobraCmdFromContext(ctx) + _, err := fmt.Fprintf(cobraCmd.OutOrStdout(), "%s: %s\n", fullPath, value) + if err != nil { + fmt.Printf("failed writing config value to out: %v", err) + } } } } @@ -83,7 +87,7 @@ func printStruct(v reflect.Value, parentPath string) { func (c *ConfigCommand) Usage() string { return "config" } -func (c ConfigCommand) Execute(_ context.Context) error { - printStruct(reflect.ValueOf(c.RunCmd.Cfg), "") +func (c ConfigCommand) Execute(ctx context.Context) error { + printStruct(ctx, reflect.ValueOf(c.RunCmd.Cfg), "") return nil } diff --git a/cmd/conduit/root/config/config_test.go b/cmd/conduit/root/config/config_test.go index aa65d0153..0e7362cc7 100644 --- a/cmd/conduit/root/config/config_test.go +++ b/cmd/conduit/root/config/config_test.go @@ -18,59 +18,157 @@ import ( "bytes" "io" "os" - "reflect" + "slices" "strings" "testing" - "github.com/conduitio/conduit/pkg/conduit" + "github.com/conduitio/conduit/cmd/conduit/root/run" + "github.com/conduitio/ecdysis" "github.com/matryer/is" ) -func TestPrintStructOutput(t *testing.T) { - is := is.New(t) - - cfg := conduit.DefaultConfig() - - oldStdout := os.Stdout - defer func() { os.Stdout = oldStdout }() +func TestConfig_WithFlags(t *testing.T) { + testCases := []struct { + name string + args []string + wantLines []string + }{ + { + name: "with flags (api, config, connectors, db, log)", + args: []string{ + "--api.enabled=false", + "--api.grpc.address", "localhost:9999", + "--api.http.address", "localhost:8888", + "--config.path", "/etc/conduit/config.yaml", + "--connectors.path", "/opt/conduit/connectors", + "--db.badger.path", "/var/lib/conduit/data.db", + "--db.postgres.connection-string", "postgres://user:pass@localhost:5432/conduit", + "--db.postgres.table", "my_conduit_store", + "--db.sqlite.path", "/var/lib/conduit/conduit.sqlite", + "--db.sqlite.table", "my_sqlite_store", + "--db.type", "postgres", + "--log.format", "json", + "--log.level", "debug", + }, + wantLines: []string{ + "config.path: /etc/conduit/config.yaml", + "db.type: postgres", + "db.postgres.table: my_conduit_store", + "db.sqlite.table: my_sqlite_store", + "api.enabled: false", + "api.http.address: localhost:8888", + "api.grpc.address: localhost:9999", + "log.level: debug", + "log.format: json", + "pipelines.exit-on-degraded: false", + "pipelines.error-recovery.min-delay: 1s", + "pipelines.error-recovery.max-delay: 10m0s", + "pipelines.error-recovery.backoff-factor: 2", + "pipelines.error-recovery.max-retries: -1", + "pipelines.error-recovery.max-retries-window: 5m0s", + "schema-registry.type: builtin", + "preview.pipeline-arch-v2: false", + }, + }, + { + name: "with flags (pipelines, preview, processors, schema, dev)", + args: []string{ + "--pipelines.error-recovery.backoff-factor", "5", + "--pipelines.error-recovery.max-delay", "30m", + "--pipelines.error-recovery.max-retries", "10", + "--pipelines.error-recovery.max-retries-window", "15m", + "--pipelines.error-recovery.min-delay", "5s", + "--pipelines.exit-on-degraded=true", + "--pipelines.path", "/var/lib/conduit/pipelines", + "--preview.pipeline-arch-v2=true", + "--processors.path", "/opt/conduit/processors", + "--schema-registry.confluent.connection-string", "http://localhost:8081", + "--schema-registry.type", "confluent", + "--dev.blockprofile", "/tmp/block.prof", + "--dev.cpuprofile", "/tmp/cpu.prof", + "--dev.memprofile", "/tmp/mem.prof", + }, + wantLines: []string{ + "db.type: badger", + "db.postgres.table: conduit_kv_store", + "db.sqlite.table: conduit_kv_store", + "api.enabled: true", + "api.http.address: :8080", + "api.grpc.address: :8084", + "log.level: info", + "log.format: cli", + "processors.path: /opt/conduit/processors", + "pipelines.path: /var/lib/conduit/pipelines", + "pipelines.exit-on-degraded: true", + "pipelines.error-recovery.min-delay: 5s", + "pipelines.error-recovery.max-delay: 30m0s", + "pipelines.error-recovery.backoff-factor: 5", + "pipelines.error-recovery.max-retries: 10", + "pipelines.error-recovery.max-retries-window: 15m0s", + "schema-registry.type: confluent", + "schema-registry.confluent.connection-string: http://localhost:8081", + "preview.pipeline-arch-v2: false", + "dev.cpuprofile: /tmp/cpu.prof", + "dev.memprofile: /tmp/mem.prof", + "dev.blockprofile: /tmp/block.prof", + }, + }, + { + name: "default values (no flags)", + args: []string{}, + wantLines: []string{ + "db.type: badger", + "db.postgres.table: conduit_kv_store", + "db.sqlite.table: conduit_kv_store", + "api.enabled: true", + "api.http.address: :8080", + "api.grpc.address: :8084", + "log.level: info", + "log.format: cli", + "pipelines.exit-on-degraded: false", + "pipelines.error-recovery.min-delay: 1s", + "pipelines.error-recovery.max-delay: 10m0s", + "pipelines.error-recovery.backoff-factor: 2", + "pipelines.error-recovery.max-retries: -1", + "pipelines.error-recovery.max-retries-window: 5m0s", + "schema-registry.type: builtin", + "preview.pipeline-arch-v2: false", + }, + }, + } - r, w, err := os.Pipe() - is.NoErr(err) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) - os.Stdout = w - t.Cleanup(func() { os.Stdout = oldStdout }) + readFrom, writeTo, err := os.Pipe() + is.NoErr(err) - printStruct(reflect.ValueOf(cfg), "") + e := ecdysis.New() + cmd := e.MustBuildCobraCommand(&ConfigCommand{RunCmd: &run.RunCommand{}}) + cmd.SetArgs(tc.args) + cmd.SetOut(writeTo) + cmd.SetErr(writeTo) - err = w.Close() - is.NoErr(err) + err = cmd.Execute() + is.NoErr(err) - var buf bytes.Buffer - _, err = io.Copy(&buf, r) - is.NoErr(err) + err = writeTo.Close() + is.NoErr(err) - output := buf.String() + var buf bytes.Buffer + _, err = io.Copy(&buf, readFrom) + is.NoErr(err) - expectedLines := []string{ - "db.type: badger", - "db.postgres.table: conduit_kv_store", - "db.sqlite.table: conduit_kv_store", - "api.enabled: true", - "api.http.address: :8080", - "api.grpc.address: :8084", - "log.level: info", - "log.format: cli", - "pipelines.exit-on-degraded: false", - "pipelines.error-recovery.min-delay: 1s", - "pipelines.error-recovery.max-delay: 10m0s", - "pipelines.error-recovery.backoff-factor: 2", - "pipelines.error-recovery.max-retries: -1", - "pipelines.error-recovery.max-retries-window: 5m0s", - "schema-registry.type: builtin", - "preview.pipeline-arch-v2: false", - } + output := buf.String() + is.True(output != "") - for _, line := range expectedLines { - is.True(strings.Contains(output, line)) + outputLines := strings.Split(output, "\n") + for _, line := range tc.wantLines { + if !slices.Contains(outputLines, line) { + t.Errorf("output does not contain expected line: %q", line) + } + } + }) } } diff --git a/cmd/conduit/root/initialize/init.go b/cmd/conduit/root/initialize/init.go index 6e0191448..59a60af4c 100644 --- a/cmd/conduit/root/initialize/init.go +++ b/cmd/conduit/root/initialize/init.go @@ -46,7 +46,7 @@ type InitCommand struct { func (c *InitCommand) Flags() []ecdysis.Flag { flags := ecdysis.BuildFlags(&c.flags) - flags.SetDefault("path", filepath.Dir(c.Cfg.ConduitCfgPath)) + flags.SetDefault("path", filepath.Dir(c.Cfg.ConduitCfg.Path)) return flags } diff --git a/cmd/conduit/root/run/run.go b/cmd/conduit/root/run/run.go index 0dd030650..732c465de 100644 --- a/cmd/conduit/root/run/run.go +++ b/cmd/conduit/root/run/run.go @@ -54,12 +54,12 @@ func (c *RunCommand) Execute(_ context.Context) error { } func (c *RunCommand) Config() ecdysis.Config { - path := filepath.Dir(c.flags.ConduitCfgPath) + path := filepath.Dir(c.flags.ConduitCfg.Path) return ecdysis.Config{ EnvPrefix: "CONDUIT", Parsed: &c.Cfg, - Path: c.flags.ConduitCfgPath, + Path: c.flags.ConduitCfg.Path, DefaultValues: conduit.DefaultConfigWithBasePath(path), } } @@ -75,7 +75,7 @@ func (c *RunCommand) Flags() []ecdysis.Flag { } c.Cfg = conduit.DefaultConfigWithBasePath(currentPath) - flags.SetDefault("config.path", c.Cfg.ConduitCfgPath) + flags.SetDefault("config.path", c.Cfg.ConduitCfg.Path) flags.SetDefault("db.type", c.Cfg.DB.Type) flags.SetDefault("db.badger.path", c.Cfg.DB.Badger.Path) flags.SetDefault("db.postgres.connection-string", c.Cfg.DB.Postgres.ConnectionString) diff --git a/cmd/conduit/root/run/run_test.go b/cmd/conduit/root/run/run_test.go index 535bc8561..b676498d2 100644 --- a/cmd/conduit/root/run/run_test.go +++ b/cmd/conduit/root/run/run_test.go @@ -25,7 +25,7 @@ import ( func TestRunCommandFlags(t *testing.T) { is := is.New(t) - expectedFlags := []struct { + wantFlags := []struct { longName string shortName string usage string @@ -66,17 +66,20 @@ func TestRunCommandFlags(t *testing.T) { persistentFlags := c.PersistentFlags() cmdFlags := c.Flags() - for _, f := range expectedFlags { + for _, wantFlag := range wantFlags { var cf *pflag.Flag - if f.persistent { - cf = persistentFlags.Lookup(f.longName) + if wantFlag.persistent { + cf = persistentFlags.Lookup(wantFlag.longName) } else { - cf = cmdFlags.Lookup(f.longName) + cf = cmdFlags.Lookup(wantFlag.longName) } - is.True(cf != nil) - is.Equal(f.longName, cf.Name) - is.Equal(f.shortName, cf.Shorthand) - is.Equal(cf.Usage, f.usage) + if cf == nil { + t.Logf("flag %q expected, but not found", wantFlag.longName) + t.FailNow() + } + is.Equal(wantFlag.longName, cf.Name) + is.Equal(wantFlag.shortName, cf.Shorthand) + is.Equal(cf.Usage, wantFlag.usage) } } diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 45442ecb1..6fa8fe92c 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -41,7 +41,9 @@ const ( // Config holds all configurable values for Conduit. type Config struct { - ConduitCfgPath string `long:"config.path" usage:"global conduit configuration file" default:"./conduit.yaml"` + ConduitCfg struct { + Path string `long:"config.path" usage:"global conduit configuration file" default:"./conduit.yaml"` + } `mapstructure:"config"` DB struct { // When Driver is specified it takes precedence over other DB related @@ -90,19 +92,19 @@ type Config struct { Pipelines struct { Path string `long:"pipelines.path" usage:"path to pipelines' directory"` - ExitOnDegraded bool `long:"pipelines.exit-on-degraded" usage:"exit Conduit if a pipeline is degraded"` + ExitOnDegraded bool `long:"pipelines.exit-on-degraded" mapstructure:"exit-on-degraded" usage:"exit Conduit if a pipeline is degraded"` ErrorRecovery struct { // MinDelay is the minimum delay before restart: Default: 1 second - MinDelay time.Duration `long:"pipelines.error-recovery.min-delay" usage:"minimum delay before restart"` + MinDelay time.Duration `long:"pipelines.error-recovery.min-delay" mapstructure:"min-delay" usage:"minimum delay before restart"` // MaxDelay is the maximum delay before restart: Default: 10 minutes - MaxDelay time.Duration `long:"pipelines.error-recovery.max-delay" usage:"maximum delay before restart"` + MaxDelay time.Duration `long:"pipelines.error-recovery.max-delay" mapstructure:"max-delay" usage:"maximum delay before restart"` // BackoffFactor is the factor by which the delay is multiplied after each restart: Default: 2 - BackoffFactor int `long:"pipelines.error-recovery.backoff-factor" usage:"backoff factor applied to the last delay"` + BackoffFactor int `long:"pipelines.error-recovery.backoff-factor" mapstructure:"backoff-factor" usage:"backoff factor applied to the last delay"` // MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite) - MaxRetries int64 `long:"pipelines.error-recovery.max-retries" usage:"maximum number of retries"` + MaxRetries int64 `long:"pipelines.error-recovery.max-retries" mapstructure:"max-retries" usage:"maximum number of retries"` // MaxRetriesWindow is the duration window in which the max retries are counted: Default: 5 minutes - MaxRetriesWindow time.Duration `long:"pipelines.error-recovery.max-retries-window" usage:"amount of time running without any errors after which a pipeline is considered healthy"` - } + MaxRetriesWindow time.Duration `long:"pipelines.error-recovery.max-retries-window" mapstructure:"max-retries-window" usage:"amount of time running without any errors after which a pipeline is considered healthy"` + } `mapstructure:"error-recovery"` } ConnectorPlugins map[string]sdk.Connector @@ -111,9 +113,9 @@ type Config struct { Type string `long:"schema-registry.type" usage:"schema registry type; accepts builtin,confluent"` Confluent struct { - ConnectionString string `long:"schema-registry.confluent.connection-string" usage:"confluent schema registry connection string"` + ConnectionString string `long:"schema-registry.confluent.connection-string" mapstructure:"connection-string" usage:"confluent schema registry connection string"` } - } + } `mapstructure:"schema-registry"` Preview struct { // PipelineArchV2 enables the new pipeline architecture. @@ -139,7 +141,7 @@ func DefaultConfig() Config { func DefaultConfigWithBasePath(basePath string) Config { var cfg Config - cfg.ConduitCfgPath = filepath.Join(basePath, "conduit.yaml") + cfg.ConduitCfg.Path = filepath.Join(basePath, "conduit.yaml") cfg.DB.Type = DBTypeBadger cfg.DB.Badger.Path = filepath.Join(basePath, "conduit.db") diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index dddc83422..849e04508 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -353,6 +353,8 @@ func (r *Runtime) Run(ctx context.Context) (err error) { r.logger.Info(ctx).Send() r.logger.Info(ctx).Msgf("click here to navigate to explore the HTTP API: http://localhost:%d/openapi", port) r.logger.Info(ctx).Send() + } else { + r.logger.Info(ctx).Msg("API is disabled") } close(r.Ready)