Skip to content

Commit

Permalink
feat(CLI): add processors ls (#2077)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulb authored Jan 17, 2025
1 parent 0f0e5db commit 84c87ed
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/conduit/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type Client struct {
conn *grpc.ClientConn
apiv1.PipelineServiceClient
apiv1.ProcessorServiceClient
apiv1.ConnectorServiceClient
healthgrpc.HealthClient
}
Expand All @@ -43,6 +44,7 @@ func NewClient(ctx context.Context, address string) (*Client, error) {
client := &Client{
conn: conn,
PipelineServiceClient: apiv1.NewPipelineServiceClient(conn),
ProcessorServiceClient: apiv1.NewProcessorServiceClient(conn),
ConnectorServiceClient: apiv1.NewConnectorServiceClient(conn),
HealthClient: healthgrpc.NewHealthClient(conn),
}
Expand Down
90 changes: 90 additions & 0 deletions cmd/conduit/root/processors/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processors

import (
"context"
"fmt"

"github.com/alexeyco/simpletable"
"github.com/conduitio/conduit/cmd/conduit/api"
"github.com/conduitio/conduit/cmd/conduit/cecdysis"
apiv1 "github.com/conduitio/conduit/proto/api/v1"
"github.com/conduitio/ecdysis"
)

var (
_ cecdysis.CommandWithExecuteWithClient = (*ListCommand)(nil)
_ ecdysis.CommandWithAliases = (*ListCommand)(nil)
_ ecdysis.CommandWithDocs = (*ListCommand)(nil)
)

type ListCommand struct{}

func (c *ListCommand) Docs() ecdysis.Docs {
return ecdysis.Docs{
Short: "List existing Conduit processors",
Long: `This command requires Conduit to be already running since it will list all processors registered
by Conduit.`,
Example: "conduit processors list\nconduit processors ls",
}
}

func (c *ListCommand) Aliases() []string { return []string{"ls"} }

func (c *ListCommand) Usage() string { return "list" }

func (c *ListCommand) ExecuteWithClient(ctx context.Context, client *api.Client) error {
resp, err := client.ProcessorServiceClient.ListProcessors(ctx, &apiv1.ListProcessorsRequest{})
if err != nil {
return fmt.Errorf("failed to list processors: %w", err)
}

displayProcessors(resp.Processors)

return nil
}

func displayProcessors(processors []*apiv1.Processor) {
if len(processors) == 0 {
return
}

table := simpletable.New()

table.Header = &simpletable.Header{
Cells: []*simpletable.Cell{
{Align: simpletable.AlignCenter, Text: "ID"},
{Align: simpletable.AlignCenter, Text: "PLUGIN"},
{Align: simpletable.AlignCenter, Text: "CONDITION"},
{Align: simpletable.AlignCenter, Text: "CREATED"},
{Align: simpletable.AlignCenter, Text: "LAST_UPDATED"},
},
}

for _, p := range processors {
r := []*simpletable.Cell{
{Align: simpletable.AlignLeft, Text: p.Id},
{Align: simpletable.AlignLeft, Text: p.Plugin},
{Align: simpletable.AlignLeft, Text: p.Condition},
{Align: simpletable.AlignLeft, Text: p.CreatedAt.AsTime().String()},
{Align: simpletable.AlignLeft, Text: p.UpdatedAt.AsTime().String()},
}

table.Body.Cells = append(table.Body.Cells, r)
}
table.SetStyle(simpletable.StyleCompact)
fmt.Println(table.String())
}
43 changes: 43 additions & 0 deletions cmd/conduit/root/processors/processors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processors

import (
"github.com/conduitio/ecdysis"
)

var (
_ ecdysis.CommandWithDocs = (*ProcessorsCommand)(nil)
_ ecdysis.CommandWithSubCommands = (*ProcessorsCommand)(nil)
_ ecdysis.CommandWithAliases = (*ProcessorsCommand)(nil)
)

type ProcessorsCommand struct{}

func (c *ProcessorsCommand) Aliases() []string { return []string{"processor"} }

func (c *ProcessorsCommand) SubCommands() []ecdysis.Command {
return []ecdysis.Command{
&ListCommand{},
}
}

func (c *ProcessorsCommand) Usage() string { return "processors" }

func (c *ProcessorsCommand) Docs() ecdysis.Docs {
return ecdysis.Docs{
Short: "Manage processors",
}
}
2 changes: 2 additions & 0 deletions cmd/conduit/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/conduitio/conduit/cmd/conduit/root/connectors"
"github.com/conduitio/conduit/cmd/conduit/root/initialize"
"github.com/conduitio/conduit/cmd/conduit/root/pipelines"
"github.com/conduitio/conduit/cmd/conduit/root/processors"
"github.com/conduitio/conduit/cmd/conduit/root/run"
"github.com/conduitio/conduit/cmd/conduit/root/version"
"github.com/conduitio/conduit/pkg/conduit"
Expand Down Expand Up @@ -83,6 +84,7 @@ func (c *RootCommand) SubCommands() []ecdysis.Command {
&initialize.InitCommand{Cfg: &runCmd.Cfg},
&version.VersionCommand{},
&pipelines.PipelinesCommand{},
&processors.ProcessorsCommand{},
&connectors.ConnectorsCommand{},
&connectorplugins.ConnectorPluginsCommand{},
runCmd,
Expand Down

0 comments on commit 84c87ed

Please sign in to comment.