diff --git a/cmd/conduit/api/client.go b/cmd/conduit/api/client.go index d2bf95a5a..2b030a04b 100644 --- a/cmd/conduit/api/client.go +++ b/cmd/conduit/api/client.go @@ -27,6 +27,7 @@ import ( type Client struct { conn *grpc.ClientConn apiv1.PipelineServiceClient + apiv1.ProcessorServiceClient apiv1.ConnectorServiceClient healthgrpc.HealthClient } @@ -43,6 +44,7 @@ func NewClient(ctx context.Context, address string) (*Client, error) { client := &Client{ conn: conn, PipelineServiceClient: apiv1.NewPipelineServiceClient(conn), + ProcessorServiceClient: apiv1.NewProcessorServiceClient(conn), ConnectorServiceClient: apiv1.NewConnectorServiceClient(conn), HealthClient: healthgrpc.NewHealthClient(conn), } diff --git a/cmd/conduit/root/processors/list.go b/cmd/conduit/root/processors/list.go new file mode 100644 index 000000000..6bae15b6f --- /dev/null +++ b/cmd/conduit/root/processors/list.go @@ -0,0 +1,90 @@ +// Copyright © 2025 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processors + +import ( + "context" + "fmt" + + "github.com/alexeyco/simpletable" + "github.com/conduitio/conduit/cmd/conduit/api" + "github.com/conduitio/conduit/cmd/conduit/cecdysis" + apiv1 "github.com/conduitio/conduit/proto/api/v1" + "github.com/conduitio/ecdysis" +) + +var ( + _ cecdysis.CommandWithExecuteWithClient = (*ListCommand)(nil) + _ ecdysis.CommandWithAliases = (*ListCommand)(nil) + _ ecdysis.CommandWithDocs = (*ListCommand)(nil) +) + +type ListCommand struct{} + +func (c *ListCommand) Docs() ecdysis.Docs { + return ecdysis.Docs{ + Short: "List existing Conduit processors", + Long: `This command requires Conduit to be already running since it will list all processors registered +by Conduit.`, + Example: "conduit processors list\nconduit processors ls", + } +} + +func (c *ListCommand) Aliases() []string { return []string{"ls"} } + +func (c *ListCommand) Usage() string { return "list" } + +func (c *ListCommand) ExecuteWithClient(ctx context.Context, client *api.Client) error { + resp, err := client.ProcessorServiceClient.ListProcessors(ctx, &apiv1.ListProcessorsRequest{}) + if err != nil { + return fmt.Errorf("failed to list processors: %w", err) + } + + displayProcessors(resp.Processors) + + return nil +} + +func displayProcessors(processors []*apiv1.Processor) { + if len(processors) == 0 { + return + } + + table := simpletable.New() + + table.Header = &simpletable.Header{ + Cells: []*simpletable.Cell{ + {Align: simpletable.AlignCenter, Text: "ID"}, + {Align: simpletable.AlignCenter, Text: "PLUGIN"}, + {Align: simpletable.AlignCenter, Text: "CONDITION"}, + {Align: simpletable.AlignCenter, Text: "CREATED"}, + {Align: simpletable.AlignCenter, Text: "LAST_UPDATED"}, + }, + } + + for _, p := range processors { + r := []*simpletable.Cell{ + {Align: simpletable.AlignLeft, Text: p.Id}, + {Align: simpletable.AlignLeft, Text: p.Plugin}, + {Align: simpletable.AlignLeft, Text: p.Condition}, + {Align: simpletable.AlignLeft, Text: p.CreatedAt.AsTime().String()}, + {Align: simpletable.AlignLeft, Text: p.UpdatedAt.AsTime().String()}, + } + + table.Body.Cells = append(table.Body.Cells, r) + } + table.SetStyle(simpletable.StyleCompact) + fmt.Println(table.String()) +} diff --git a/cmd/conduit/root/processors/processors.go b/cmd/conduit/root/processors/processors.go new file mode 100644 index 000000000..59187247c --- /dev/null +++ b/cmd/conduit/root/processors/processors.go @@ -0,0 +1,43 @@ +// Copyright © 2025 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processors + +import ( + "github.com/conduitio/ecdysis" +) + +var ( + _ ecdysis.CommandWithDocs = (*ProcessorsCommand)(nil) + _ ecdysis.CommandWithSubCommands = (*ProcessorsCommand)(nil) + _ ecdysis.CommandWithAliases = (*ProcessorsCommand)(nil) +) + +type ProcessorsCommand struct{} + +func (c *ProcessorsCommand) Aliases() []string { return []string{"processor"} } + +func (c *ProcessorsCommand) SubCommands() []ecdysis.Command { + return []ecdysis.Command{ + &ListCommand{}, + } +} + +func (c *ProcessorsCommand) Usage() string { return "processors" } + +func (c *ProcessorsCommand) Docs() ecdysis.Docs { + return ecdysis.Docs{ + Short: "Manage processors", + } +} diff --git a/cmd/conduit/root/root.go b/cmd/conduit/root/root.go index 71c88062a..cf40bc000 100644 --- a/cmd/conduit/root/root.go +++ b/cmd/conduit/root/root.go @@ -24,6 +24,7 @@ import ( "github.com/conduitio/conduit/cmd/conduit/root/connectors" "github.com/conduitio/conduit/cmd/conduit/root/initialize" "github.com/conduitio/conduit/cmd/conduit/root/pipelines" + "github.com/conduitio/conduit/cmd/conduit/root/processors" "github.com/conduitio/conduit/cmd/conduit/root/run" "github.com/conduitio/conduit/cmd/conduit/root/version" "github.com/conduitio/conduit/pkg/conduit" @@ -83,6 +84,7 @@ func (c *RootCommand) SubCommands() []ecdysis.Command { &initialize.InitCommand{Cfg: &runCmd.Cfg}, &version.VersionCommand{}, &pipelines.PipelinesCommand{}, + &processors.ProcessorsCommand{}, &connectors.ConnectorsCommand{}, &connectorplugins.ConnectorPluginsCommand{}, runCmd,