Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CLI flag naming #2070

Merged
merged 9 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,12 @@ the [Processors documentation](https://conduit.io/docs/using/processors/getting-
Conduit exposes a gRPC API and an HTTP API.

The gRPC API is by default running on port 8084. You can define a custom address
using the CLI flag `-grpc.address`. To learn more about the gRPC API please have
using the CLI flag `-api.grpc.address`. To learn more about the gRPC API please have
a look at
the [protobuf file](https://github.com/ConduitIO/conduit/blob/main/proto/api/v1/api.proto).

The HTTP API is by default running on port 8080. You can define a custom address
using the CLI flag `-http.address`. It is generated
using the CLI flag `-api.http.address`. It is generated
using [gRPC gateway](https://github.com/grpc-ecosystem/grpc-gateway) and is thus
providing the same functionality as the gRPC API. To learn more about the HTTP
API please have a look at the [API documentation](https://www.conduit.io/api),
Expand Down
2 changes: 1 addition & 1 deletion cmd/conduit/cecdysis/decorators.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func getGRPCAddress(cmd *cobra.Command) (string, error) {

path, err = cmd.Flags().GetString("config.path")
if err != nil || path == "" {
path = conduit.DefaultConfig().ConduitCfgPath
path = conduit.DefaultConfig().ConduitCfg.Path
}

var usrCfg conduit.Config
Expand Down
14 changes: 9 additions & 5 deletions cmd/conduit/root/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ the set environment variables, and the flags used. This command will show the co
}
}

func printStruct(v reflect.Value, parentPath string) {
func printStruct(ctx context.Context, v reflect.Value, parentPath string) {
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
Expand All @@ -68,22 +68,26 @@ func printStruct(v reflect.Value, parentPath string) {

if fieldValue.Kind() == reflect.Struct ||
(fieldValue.Kind() == reflect.Ptr && !fieldValue.IsNil() && fieldValue.Elem().Kind() == reflect.Struct) {
printStruct(fieldValue, fullPath)
printStruct(ctx, fieldValue, fullPath)
continue
}

if longName != "" {
value := fmt.Sprintf("%v", fieldValue.Interface())
if value != "" {
fmt.Printf("%s: %s\n", fullPath, value)
cobraCmd := ecdysis.CobraCmdFromContext(ctx)
_, err := fmt.Fprintf(cobraCmd.OutOrStdout(), "%s: %s\n", fullPath, value)
if err != nil {
fmt.Printf("failed writing config value to out: %v", err)
}
}
}
}
}

func (c *ConfigCommand) Usage() string { return "config" }

func (c ConfigCommand) Execute(_ context.Context) error {
printStruct(reflect.ValueOf(c.RunCmd.Cfg), "")
func (c ConfigCommand) Execute(ctx context.Context) error {
printStruct(ctx, reflect.ValueOf(c.RunCmd.Cfg), "")
return nil
}
178 changes: 138 additions & 40 deletions cmd/conduit/root/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,59 +18,157 @@ import (
"bytes"
"io"
"os"
"reflect"
"slices"
"strings"
"testing"

"github.com/conduitio/conduit/pkg/conduit"
"github.com/conduitio/conduit/cmd/conduit/root/run"
"github.com/conduitio/ecdysis"
"github.com/matryer/is"
)

func TestPrintStructOutput(t *testing.T) {
is := is.New(t)

cfg := conduit.DefaultConfig()

oldStdout := os.Stdout
defer func() { os.Stdout = oldStdout }()
func TestConfig_WithFlags(t *testing.T) {
testCases := []struct {
name string
args []string
wantLines []string
}{
{
name: "with flags (api, config, connectors, db, log)",
args: []string{
"--api.enabled=false",
"--api.grpc.address", "localhost:9999",
"--api.http.address", "localhost:8888",
"--config.path", "/etc/conduit/config.yaml",
"--connectors.path", "/opt/conduit/connectors",
"--db.badger.path", "/var/lib/conduit/data.db",
"--db.postgres.connection-string", "postgres://user:pass@localhost:5432/conduit",
"--db.postgres.table", "my_conduit_store",
"--db.sqlite.path", "/var/lib/conduit/conduit.sqlite",
"--db.sqlite.table", "my_sqlite_store",
"--db.type", "postgres",
"--log.format", "json",
"--log.level", "debug",
},
wantLines: []string{
"config.path: /etc/conduit/config.yaml",
"db.type: postgres",
"db.postgres.table: my_conduit_store",
"db.sqlite.table: my_sqlite_store",
"api.enabled: false",
"api.http.address: localhost:8888",
"api.grpc.address: localhost:9999",
"log.level: debug",
"log.format: json",
"pipelines.exit-on-degraded: false",
"pipelines.error-recovery.min-delay: 1s",
"pipelines.error-recovery.max-delay: 10m0s",
"pipelines.error-recovery.backoff-factor: 2",
"pipelines.error-recovery.max-retries: -1",
"pipelines.error-recovery.max-retries-window: 5m0s",
"schema-registry.type: builtin",
"preview.pipeline-arch-v2: false",
},
},
{
name: "with flags (pipelines, preview, processors, schema, dev)",
args: []string{
"--pipelines.error-recovery.backoff-factor", "5",
"--pipelines.error-recovery.max-delay", "30m",
"--pipelines.error-recovery.max-retries", "10",
"--pipelines.error-recovery.max-retries-window", "15m",
"--pipelines.error-recovery.min-delay", "5s",
"--pipelines.exit-on-degraded=true",
"--pipelines.path", "/var/lib/conduit/pipelines",
"--preview.pipeline-arch-v2=true",
"--processors.path", "/opt/conduit/processors",
"--schema-registry.confluent.connection-string", "http://localhost:8081",
"--schema-registry.type", "confluent",
"--dev.blockprofile", "/tmp/block.prof",
"--dev.cpuprofile", "/tmp/cpu.prof",
"--dev.memprofile", "/tmp/mem.prof",
},
wantLines: []string{
"db.type: badger",
"db.postgres.table: conduit_kv_store",
"db.sqlite.table: conduit_kv_store",
"api.enabled: true",
"api.http.address: :8080",
"api.grpc.address: :8084",
"log.level: info",
"log.format: cli",
"processors.path: /opt/conduit/processors",
"pipelines.path: /var/lib/conduit/pipelines",
"pipelines.exit-on-degraded: true",
"pipelines.error-recovery.min-delay: 5s",
"pipelines.error-recovery.max-delay: 30m0s",
"pipelines.error-recovery.backoff-factor: 5",
"pipelines.error-recovery.max-retries: 10",
"pipelines.error-recovery.max-retries-window: 15m0s",
"schema-registry.type: confluent",
"schema-registry.confluent.connection-string: http://localhost:8081",
"preview.pipeline-arch-v2: false",
"dev.cpuprofile: /tmp/cpu.prof",
"dev.memprofile: /tmp/mem.prof",
"dev.blockprofile: /tmp/block.prof",
},
},
{
name: "default values (no flags)",
args: []string{},
wantLines: []string{
"db.type: badger",
"db.postgres.table: conduit_kv_store",
"db.sqlite.table: conduit_kv_store",
"api.enabled: true",
"api.http.address: :8080",
"api.grpc.address: :8084",
"log.level: info",
"log.format: cli",
"pipelines.exit-on-degraded: false",
"pipelines.error-recovery.min-delay: 1s",
"pipelines.error-recovery.max-delay: 10m0s",
"pipelines.error-recovery.backoff-factor: 2",
"pipelines.error-recovery.max-retries: -1",
"pipelines.error-recovery.max-retries-window: 5m0s",
"schema-registry.type: builtin",
"preview.pipeline-arch-v2: false",
},
},
}

r, w, err := os.Pipe()
is.NoErr(err)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)

os.Stdout = w
t.Cleanup(func() { os.Stdout = oldStdout })
readFrom, writeTo, err := os.Pipe()
is.NoErr(err)

printStruct(reflect.ValueOf(cfg), "")
e := ecdysis.New()
cmd := e.MustBuildCobraCommand(&ConfigCommand{RunCmd: &run.RunCommand{}})
cmd.SetArgs(tc.args)
cmd.SetOut(writeTo)
cmd.SetErr(writeTo)

err = w.Close()
is.NoErr(err)
err = cmd.Execute()
is.NoErr(err)

var buf bytes.Buffer
_, err = io.Copy(&buf, r)
is.NoErr(err)
err = writeTo.Close()
is.NoErr(err)

output := buf.String()
var buf bytes.Buffer
_, err = io.Copy(&buf, readFrom)
is.NoErr(err)

expectedLines := []string{
"db.type: badger",
"db.postgres.table: conduit_kv_store",
"db.sqlite.table: conduit_kv_store",
"api.enabled: true",
"api.http.address: :8080",
"api.grpc.address: :8084",
"log.level: info",
"log.format: cli",
"pipelines.exit-on-degraded: false",
"pipelines.error-recovery.min-delay: 1s",
"pipelines.error-recovery.max-delay: 10m0s",
"pipelines.error-recovery.backoff-factor: 2",
"pipelines.error-recovery.max-retries: -1",
"pipelines.error-recovery.max-retries-window: 5m0s",
"schema-registry.type: builtin",
"preview.pipeline-arch-v2: false",
}
output := buf.String()
is.True(output != "")

for _, line := range expectedLines {
is.True(strings.Contains(output, line))
outputLines := strings.Split(output, "\n")
for _, line := range tc.wantLines {
if !slices.Contains(outputLines, line) {
t.Errorf("output does not contain expected line: %q", line)
}
}
})
}
}
2 changes: 1 addition & 1 deletion cmd/conduit/root/initialize/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type InitCommand struct {

func (c *InitCommand) Flags() []ecdysis.Flag {
flags := ecdysis.BuildFlags(&c.flags)
flags.SetDefault("path", filepath.Dir(c.Cfg.ConduitCfgPath))
flags.SetDefault("path", filepath.Dir(c.Cfg.ConduitCfg.Path))
return flags
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/conduit/root/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ func (c *RunCommand) Execute(_ context.Context) error {
}

func (c *RunCommand) Config() ecdysis.Config {
path := filepath.Dir(c.flags.ConduitCfgPath)
path := filepath.Dir(c.flags.ConduitCfg.Path)

return ecdysis.Config{
EnvPrefix: "CONDUIT",
Parsed: &c.Cfg,
Path: c.flags.ConduitCfgPath,
Path: c.flags.ConduitCfg.Path,
DefaultValues: conduit.DefaultConfigWithBasePath(path),
}
}
Expand All @@ -75,7 +75,7 @@ func (c *RunCommand) Flags() []ecdysis.Flag {
}

c.Cfg = conduit.DefaultConfigWithBasePath(currentPath)
flags.SetDefault("config.path", c.Cfg.ConduitCfgPath)
flags.SetDefault("config.path", c.Cfg.ConduitCfg.Path)
flags.SetDefault("db.type", c.Cfg.DB.Type)
flags.SetDefault("db.badger.path", c.Cfg.DB.Badger.Path)
flags.SetDefault("db.postgres.connection-string", c.Cfg.DB.Postgres.ConnectionString)
Expand Down
21 changes: 12 additions & 9 deletions cmd/conduit/root/run/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func TestRunCommandFlags(t *testing.T) {
is := is.New(t)

expectedFlags := []struct {
wantFlags := []struct {
longName string
shortName string
usage string
Expand Down Expand Up @@ -66,17 +66,20 @@ func TestRunCommandFlags(t *testing.T) {
persistentFlags := c.PersistentFlags()
cmdFlags := c.Flags()

for _, f := range expectedFlags {
for _, wantFlag := range wantFlags {
var cf *pflag.Flag

if f.persistent {
cf = persistentFlags.Lookup(f.longName)
if wantFlag.persistent {
cf = persistentFlags.Lookup(wantFlag.longName)
} else {
cf = cmdFlags.Lookup(f.longName)
cf = cmdFlags.Lookup(wantFlag.longName)
}
is.True(cf != nil)
is.Equal(f.longName, cf.Name)
is.Equal(f.shortName, cf.Shorthand)
is.Equal(cf.Usage, f.usage)
if cf == nil {
t.Logf("flag %q expected, but not found", wantFlag.longName)
t.FailNow()
}
is.Equal(wantFlag.longName, cf.Name)
is.Equal(wantFlag.shortName, cf.Shorthand)
is.Equal(cf.Usage, wantFlag.usage)
}
}
24 changes: 13 additions & 11 deletions pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ const (

// Config holds all configurable values for Conduit.
type Config struct {
ConduitCfgPath string `long:"config.path" usage:"global conduit configuration file" default:"./conduit.yaml"`
ConduitCfg struct {
Path string `long:"config.path" usage:"global conduit configuration file" default:"./conduit.yaml"`
} `mapstructure:"config"`
raulb marked this conversation as resolved.
Show resolved Hide resolved

DB struct {
// When Driver is specified it takes precedence over other DB related
Expand Down Expand Up @@ -90,19 +92,19 @@ type Config struct {

Pipelines struct {
Path string `long:"pipelines.path" usage:"path to pipelines' directory"`
ExitOnDegraded bool `long:"pipelines.exit-on-degraded" usage:"exit Conduit if a pipeline is degraded"`
ExitOnDegraded bool `long:"pipelines.exit-on-degraded" mapstructure:"exit-on-degraded" usage:"exit Conduit if a pipeline is degraded"`
ErrorRecovery struct {
// MinDelay is the minimum delay before restart: Default: 1 second
MinDelay time.Duration `long:"pipelines.error-recovery.min-delay" usage:"minimum delay before restart"`
MinDelay time.Duration `long:"pipelines.error-recovery.min-delay" mapstructure:"min-delay" usage:"minimum delay before restart"`
// MaxDelay is the maximum delay before restart: Default: 10 minutes
MaxDelay time.Duration `long:"pipelines.error-recovery.max-delay" usage:"maximum delay before restart"`
MaxDelay time.Duration `long:"pipelines.error-recovery.max-delay" mapstructure:"max-delay" usage:"maximum delay before restart"`
// BackoffFactor is the factor by which the delay is multiplied after each restart: Default: 2
BackoffFactor int `long:"pipelines.error-recovery.backoff-factor" usage:"backoff factor applied to the last delay"`
BackoffFactor int `long:"pipelines.error-recovery.backoff-factor" mapstructure:"backoff-factor" usage:"backoff factor applied to the last delay"`
// MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite)
MaxRetries int64 `long:"pipelines.error-recovery.max-retries" usage:"maximum number of retries"`
MaxRetries int64 `long:"pipelines.error-recovery.max-retries" mapstructure:"max-retries" usage:"maximum number of retries"`
// MaxRetriesWindow is the duration window in which the max retries are counted: Default: 5 minutes
MaxRetriesWindow time.Duration `long:"pipelines.error-recovery.max-retries-window" usage:"amount of time running without any errors after which a pipeline is considered healthy"`
}
MaxRetriesWindow time.Duration `long:"pipelines.error-recovery.max-retries-window" mapstructure:"max-retries-window" usage:"amount of time running without any errors after which a pipeline is considered healthy"`
} `mapstructure:"error-recovery"`
}

ConnectorPlugins map[string]sdk.Connector
Expand All @@ -111,9 +113,9 @@ type Config struct {
Type string `long:"schema-registry.type" usage:"schema registry type; accepts builtin,confluent"`

Confluent struct {
ConnectionString string `long:"schema-registry.confluent.connection-string" usage:"confluent schema registry connection string"`
ConnectionString string `long:"schema-registry.confluent.connection-string" mapstructure:"connection-string" usage:"confluent schema registry connection string"`
}
}
} `mapstructure:"schema-registry"`

Preview struct {
// PipelineArchV2 enables the new pipeline architecture.
Expand All @@ -139,7 +141,7 @@ func DefaultConfig() Config {
func DefaultConfigWithBasePath(basePath string) Config {
var cfg Config

cfg.ConduitCfgPath = filepath.Join(basePath, "conduit.yaml")
cfg.ConduitCfg.Path = filepath.Join(basePath, "conduit.yaml")

cfg.DB.Type = DBTypeBadger
cfg.DB.Badger.Path = filepath.Join(basePath, "conduit.db")
Expand Down
Loading
Loading