Skip to content

Commit

Permalink
feat: add pipelines ls cmd (+client API) (#2063)
Browse files Browse the repository at this point in the history
* rename pkg name

* feat: add pipelines ls

* pipeline ls work

* hide processors for now

* include alias for pipelines cmd

* hide columns for now

We'll augment these on pipeline describe and see how it feels

* fix api flags (+test)

* use pipeline service directly

* update flag names

* remove testing boolean

* add ExecuteWithClient decorator

* fix double conn err msg

* better approach

* include comment

* read from the same addresss

* fix ci

* fix ci II

* fix tests

* pipelines ls with config

* uses ecdysis parsing

* fix ci

* remove test

* return default value in case it's not parsed

* give additional context if there's an error

* early return

* fix lint
  • Loading branch information
raulb authored Jan 16, 2025
1 parent b8bfbfa commit 1745afd
Show file tree
Hide file tree
Showing 61 changed files with 360 additions and 75 deletions.
68 changes: 68 additions & 0 deletions cmd/conduit/api/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"context"
"fmt"

apiv1 "github.com/conduitio/conduit/proto/api/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
)

type Client struct {
conn *grpc.ClientConn
apiv1.PipelineServiceClient
healthgrpc.HealthClient
}

func NewClient(ctx context.Context, address string) (*Client, error) {
conn, err := grpc.NewClient(
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("failed to create gRPC client: %w", err)
}

client := &Client{
conn: conn,
PipelineServiceClient: apiv1.NewPipelineServiceClient(conn),
HealthClient: healthgrpc.NewHealthClient(conn),
}

if err := client.CheckHealth(ctx, address); err != nil {
client.Close()
return nil, err
}

return client, nil
}

func (c *Client) CheckHealth(ctx context.Context, address string) error {
healthResp, err := c.HealthClient.Check(ctx, &healthgrpc.HealthCheckRequest{})
if err != nil || healthResp.Status != healthgrpc.HealthCheckResponse_SERVING {
return fmt.Errorf("we couldn't connect to Conduit at the configured address %q\n"+
"Please execute `conduit run` to start it.\nTo check the current configured `api.grpc.address`, run `conduit config`\n\n"+
"Error details: %v", address, err)
}
return nil
}

func (c *Client) Close() error {
return c.conn.Close()
}
106 changes: 106 additions & 0 deletions cmd/conduit/cecdysis/decorators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cecdysis

import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/conduitio/conduit/cmd/conduit/api"
"github.com/conduitio/conduit/pkg/conduit"
"github.com/conduitio/ecdysis"
"github.com/spf13/cobra"
)

// ------------------- CommandWithClient

// CommandWithExecuteWithClient can be implemented by a command that requires a client to interact
// with the Conduit API during the execution.
type CommandWithExecuteWithClient interface {
ecdysis.Command

// ExecuteWithClient is the actual work function. Most commands will implement this.
ExecuteWithClient(context.Context, *api.Client) error
}

// CommandWithExecuteWithClientDecorator is a decorator that adds a Conduit API client to the command execution.
type CommandWithExecuteWithClientDecorator struct{}

func (CommandWithExecuteWithClientDecorator) Decorate(_ *ecdysis.Ecdysis, cmd *cobra.Command, c ecdysis.Command) error {
v, ok := c.(CommandWithExecuteWithClient)
if !ok {
return nil
}

old := cmd.RunE
cmd.RunE = func(cmd *cobra.Command, args []string) error {
if old != nil {
err := old(cmd, args)
if err != nil {
return err
}
}

grpcAddress, err := getGRPCAddress(cmd)
if err != nil {
return fmt.Errorf("error reading gRPC address: %w", err)
}

client, err := api.NewClient(cmd.Context(), grpcAddress)
if err != nil {
// Not an error we need to escalate to the main CLI execution. We'll print it out and not execute further.
_, _ = fmt.Fprintf(os.Stderr, "%v\n", err)
return nil
}
defer client.Close()

ctx := ecdysis.ContextWithCobraCommand(cmd.Context(), cmd)
return v.ExecuteWithClient(ctx, client)
}

return nil
}

func getGRPCAddress(cmd *cobra.Command) (string, error) {
var (
path string
err error
)

path, err = cmd.Flags().GetString("config.path")
if err != nil || path == "" {
path = conduit.DefaultConfig().ConduitCfgPath
}

var usrCfg conduit.Config
defaultConfigValues := conduit.DefaultConfigWithBasePath(filepath.Dir(path))

cfg := ecdysis.Config{
EnvPrefix: "CONDUIT",
Parsed: &usrCfg,
Path: path,
DefaultValues: defaultConfigValues,
}

// If it can't be parsed, we return the default value
err = ecdysis.ParseConfig(cfg, cmd)
if err != nil || usrCfg.API.GRPC.Address == "" {
return defaultConfigValues.API.GRPC.Address, nil
}

return usrCfg.API.GRPC.Address, nil
}
3 changes: 2 additions & 1 deletion cmd/conduit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
"fmt"
"os"

"github.com/conduitio/conduit/cmd/conduit/cecdysis"
"github.com/conduitio/conduit/cmd/conduit/root"
"github.com/conduitio/ecdysis"
)

func main() {
e := ecdysis.New()
e := ecdysis.New(ecdysis.WithDecorators(cecdysis.CommandWithExecuteWithClientDecorator{}))

cmd := e.MustBuildCobraCommand(&root.RootCommand{})
cmd.CompletionOptions.DisableDefaultCmd = true
Expand Down
4 changes: 2 additions & 2 deletions cmd/conduit/root/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func TestPrintStructOutput(t *testing.T) {
"db.postgres.table: conduit_kv_store",
"db.sqlite.table: conduit_kv_store",
"api.enabled: true",
"http.address: :8080",
"grpc.address: :8084",
"api.http.address: :8080",
"api.grpc.address: :8084",
"log.level: info",
"log.format: cli",
"pipelines.exit-on-degraded: false",
Expand Down
89 changes: 89 additions & 0 deletions cmd/conduit/root/pipelines/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipelines

import (
"context"
"fmt"

"github.com/alexeyco/simpletable"
"github.com/conduitio/conduit/cmd/conduit/api"
"github.com/conduitio/conduit/cmd/conduit/cecdysis"
apiv1 "github.com/conduitio/conduit/proto/api/v1"
"github.com/conduitio/ecdysis"
)

var (
_ cecdysis.CommandWithExecuteWithClient = (*ListCommand)(nil)
_ ecdysis.CommandWithAliases = (*ListCommand)(nil)
_ ecdysis.CommandWithDocs = (*ListCommand)(nil)
)

type ListCommand struct{}

func (c *ListCommand) Docs() ecdysis.Docs {
return ecdysis.Docs{
Short: "List existing Conduit pipelines",
Long: `This command requires Conduit to be already running since it will list all pipelines registered
by Conduit. This will depend on the configured pipelines directory, which by default is /pipelines; however, it could
be configured via --pipelines.path at the time of running Conduit.`,
Example: "conduit pipelines ls",
}
}

func (c *ListCommand) Aliases() []string { return []string{"ls"} }

func (c *ListCommand) Usage() string { return "list" }

func (c *ListCommand) ExecuteWithClient(ctx context.Context, client *api.Client) error {
resp, err := client.PipelineServiceClient.ListPipelines(ctx, &apiv1.ListPipelinesRequest{})
if err != nil {
return fmt.Errorf("failed to list pipelines: %w", err)
}

displayPipelines(resp.Pipelines)

return nil
}

func displayPipelines(pipelines []*apiv1.Pipeline) {
if len(pipelines) == 0 {
return
}

table := simpletable.New()

table.Header = &simpletable.Header{
Cells: []*simpletable.Cell{
{Align: simpletable.AlignCenter, Text: "ID"},
{Align: simpletable.AlignCenter, Text: "STATE"},
{Align: simpletable.AlignCenter, Text: "CREATED"},
{Align: simpletable.AlignCenter, Text: "LAST_UPDATED"},
},
}

for _, p := range pipelines {
r := []*simpletable.Cell{
{Align: simpletable.AlignRight, Text: p.Id},
{Align: simpletable.AlignLeft, Text: p.State.Status.String()},
{Align: simpletable.AlignLeft, Text: p.CreatedAt.AsTime().String()},
{Align: simpletable.AlignLeft, Text: p.UpdatedAt.AsTime().String()},
}

table.Body.Cells = append(table.Body.Cells, r)
}
table.SetStyle(simpletable.StyleCompact)
fmt.Println(table.String())
}
4 changes: 4 additions & 0 deletions cmd/conduit/root/pipelines/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ import (
var (
_ ecdysis.CommandWithDocs = (*PipelinesCommand)(nil)
_ ecdysis.CommandWithSubCommands = (*PipelinesCommand)(nil)
_ ecdysis.CommandWithAliases = (*PipelinesCommand)(nil)
)

type PipelinesCommand struct{}

func (c *PipelinesCommand) Aliases() []string { return []string{"pipeline"} }

func (c *PipelinesCommand) SubCommands() []ecdysis.Command {
return []ecdysis.Command{
&InitCommand{},
&ListCommand{},
}
}

Expand Down
6 changes: 5 additions & 1 deletion cmd/conduit/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ var (

type RootFlags struct {
Version bool `long:"version" short:"v" usage:"show the current Conduit version"`

// Global Flags
GRPCAddress string `long:"api.grpc.address" usage:"address where Conduit is running" persistent:"true"`
ConfigPath string `long:"config.path" usage:"path to the configuration file" persistent:"true"`
}

type RootCommand struct {
Expand Down Expand Up @@ -77,6 +81,6 @@ func (c *RootCommand) SubCommands() []ecdysis.Command {
&initialize.InitCommand{Cfg: &runCmd.Cfg},
&version.VersionCommand{},
&pipelines.PipelinesCommand{},
&run.RunCommand{},
runCmd,
}
}
2 changes: 2 additions & 0 deletions cmd/conduit/root/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func TestRootCommandFlags(t *testing.T) {
persistent bool
}{
{longName: "version", shortName: "v", usage: "show the current Conduit version"},
{longName: "api.grpc.address", usage: "address where Conduit is running", persistent: true},
{longName: "config.path", usage: "path to the configuration file", persistent: true},
}

e := ecdysis.New()
Expand Down
11 changes: 9 additions & 2 deletions cmd/conduit/root/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package run

import (
"context"
"fmt"
"os"
"path/filepath"

Expand All @@ -42,6 +43,12 @@ type RunCommand struct {

func (c *RunCommand) Execute(_ context.Context) error {
e := &conduit.Entrypoint{}

if !c.Cfg.API.Enabled {
fmt.Print("Warning: API is currently disabled. Most Conduit CLI commands won't work without the API enabled." +
"To enable it, run conduit with `--api.enabled=true` or set `CONDUIT_API_ENABLED=true` in your environment.")
}

e.Serve(c.Cfg)
return nil
}
Expand Down Expand Up @@ -76,8 +83,8 @@ func (c *RunCommand) Flags() []ecdysis.Flag {
flags.SetDefault("db.sqlite.path", c.Cfg.DB.SQLite.Path)
flags.SetDefault("db.sqlite.table", c.Cfg.DB.SQLite.Table)
flags.SetDefault("api.enabled", c.Cfg.API.Enabled)
flags.SetDefault("http.address", c.Cfg.API.HTTP.Address)
flags.SetDefault("grpc.address", c.Cfg.API.GRPC.Address)
flags.SetDefault("api.http.address", c.Cfg.API.HTTP.Address)
flags.SetDefault("api.grpc.address", c.Cfg.API.GRPC.Address)
flags.SetDefault("log.level", c.Cfg.Log.Level)
flags.SetDefault("log.format", c.Cfg.Log.Format)
flags.SetDefault("connectors.path", c.Cfg.Connectors.Path)
Expand Down
4 changes: 2 additions & 2 deletions cmd/conduit/root/run/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestRunCommandFlags(t *testing.T) {
{longName: "db.sqlite.path", usage: "path to sqlite3 DB"},
{longName: "db.sqlite.table", usage: "sqlite3 table in which to store data (will be created if it does not exist)"},
{longName: "api.enabled", usage: "enable HTTP and gRPC API"},
{longName: "http.address", usage: "address for serving the HTTP API"},
{longName: "grpc.address", usage: "address for serving the gRPC API"},
{longName: "api.http.address", usage: "address for serving the HTTP API"},
{longName: "api.grpc.address", usage: "address for serving the gRPC API"},
{longName: "log.level", usage: "sets logging level; accepts debug, info, warn, error, trace"},
{longName: "log.format", usage: "sets the format of the logging; accepts json, cli"},
{longName: "connectors.path", usage: "path to standalone connectors' directory"},
Expand Down
Loading

0 comments on commit 1745afd

Please sign in to comment.