Skip to content

Commit

Permalink
feat: add processor describe (#2089)
Browse files Browse the repository at this point in the history
* feat: add processor describe

* check empty cases
  • Loading branch information
raulb authored Jan 22, 2025
1 parent d8169e0 commit 873e9e3
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 1 deletion.
2 changes: 1 addition & 1 deletion cmd/conduit/internal/print_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func DisplayProcessors(processors []*apiv1.Processor, indent int) {
fmt.Printf("%s- ID: %s\n", Indentation(indent+1), p.Id)
fmt.Printf("%sPlugin: %s\n", Indentation(indent+2), p.Plugin)

if p.Condition != "" {
if !IsEmpty(p.Condition) {
fmt.Printf("%sCondition: %s\n", Indentation(indent+2), p.Condition)
}

Expand Down
113 changes: 113 additions & 0 deletions cmd/conduit/root/processors/describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processors

import (
"context"
"fmt"

"github.com/conduitio/conduit/cmd/conduit/api"
"github.com/conduitio/conduit/cmd/conduit/cecdysis"
"github.com/conduitio/conduit/cmd/conduit/internal"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
apiv1 "github.com/conduitio/conduit/proto/api/v1"
"github.com/conduitio/ecdysis"
)

var (
_ cecdysis.CommandWithExecuteWithClient = (*DescribeCommand)(nil)
_ ecdysis.CommandWithAliases = (*DescribeCommand)(nil)
_ ecdysis.CommandWithDocs = (*DescribeCommand)(nil)
_ ecdysis.CommandWithArgs = (*DescribeCommand)(nil)
)

type DescribeArgs struct {
ProcessorID string
}

type DescribeCommand struct {
args DescribeArgs
}

func (c *DescribeCommand) Usage() string { return "describe" }

func (c *DescribeCommand) Docs() ecdysis.Docs {
return ecdysis.Docs{
Short: "Describe an existing processor",
Long: `This command requires Conduit to be already running since it will describe a processor registered
by Conduit. You can list existing processors with the 'conduit processors list' command.`,
Example: "conduit processors describe pipeline-processor\n" +
"conduit processor desc connector-processor",
}
}

func (c *DescribeCommand) Aliases() []string { return []string{"desc"} }

func (c *DescribeCommand) Args(args []string) error {
if len(args) == 0 {
return cerrors.Errorf("requires a processor ID")
}

if len(args) > 1 {
return cerrors.Errorf("too many arguments")
}

c.args.ProcessorID = args[0]
return nil
}

func (c *DescribeCommand) ExecuteWithClient(ctx context.Context, client *api.Client) error {
resp, err := client.ProcessorServiceClient.GetProcessor(ctx, &apiv1.GetProcessorRequest{
Id: c.args.ProcessorID,
})
if err != nil {
return fmt.Errorf("failed to get processor: %w", err)
}

displayProcessor(resp.Processor)
return nil
}

func displayProcessor(p *apiv1.Processor) {
fmt.Printf("ID: %s\n", p.Id)
fmt.Printf("Plugin: %s\n", p.Plugin)

processorType := "Processor "
switch p.Parent.Type.String() {
case "TYPE_PIPELINE":
processorType += "for Pipeline"
case "TYPE_CONNECTOR":
processorType += "for Connector"
default:
processorType += "associated to"
}

fmt.Printf("%s: %s\n", processorType, p.Parent.Id)

if !internal.IsEmpty(p.Condition) {
fmt.Printf("Condition: %s\n", p.Condition)
}

if len(p.Config.Settings) > 0 {
fmt.Println("Config:")
for name, value := range p.Config.Settings {
fmt.Printf("%s%s: %s\n", internal.Indentation(1), name, value)
}
}
fmt.Printf("Workers: %d\n", p.Config.Workers)

fmt.Printf("Created At: %s\n", internal.PrintTime(p.CreatedAt))
fmt.Printf("Updated At: %s\n", internal.PrintTime(p.UpdatedAt))
}
56 changes: 56 additions & 0 deletions cmd/conduit/root/processors/describe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processors

import (
"testing"

"github.com/matryer/is"
)

func TestDescribeExecutionNoArgs(t *testing.T) {
is := is.New(t)

c := DescribeCommand{}
err := c.Args([]string{})

expected := "requires a processor ID"

is.True(err != nil)
is.Equal(err.Error(), expected)
}

func TestDescribeExecutionMultipleArgs(t *testing.T) {
is := is.New(t)

c := DescribeCommand{}
err := c.Args([]string{"foo", "bar"})

expected := "too many arguments"

is.True(err != nil)
is.Equal(err.Error(), expected)
}

func TestDescribeExecutionCorrectArgs(t *testing.T) {
is := is.New(t)
processorID := "my-processor"

c := DescribeCommand{}
err := c.Args([]string{processorID})

is.NoErr(err)
is.Equal(c.args.ProcessorID, processorID)
}
15 changes: 15 additions & 0 deletions cmd/conduit/root/processors/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,31 @@ func displayProcessors(processors []*apiv1.Processor) {
{Align: simpletable.AlignCenter, Text: "ID"},
{Align: simpletable.AlignCenter, Text: "PLUGIN"},
{Align: simpletable.AlignCenter, Text: "CONDITION"},
{Align: simpletable.AlignCenter, Text: "TYPE"},
{Align: simpletable.AlignCenter, Text: "CREATED"},
{Align: simpletable.AlignCenter, Text: "LAST_UPDATED"},
},
}

for _, p := range processors {
var processorType string

switch p.Parent.Type.String() {
case "TYPE_PIPELINE":
processorType = "Pipeline"
case "TYPE_CONNECTOR":
processorType = "Connector"
default:
processorType = "Unknown"
}

processorType = fmt.Sprintf("%s (%s)", processorType, p.Parent.Id)

r := []*simpletable.Cell{
{Align: simpletable.AlignLeft, Text: p.Id},
{Align: simpletable.AlignLeft, Text: p.Plugin},
{Align: simpletable.AlignLeft, Text: p.Condition},
{Align: simpletable.AlignLeft, Text: processorType},
{Align: simpletable.AlignLeft, Text: internal.PrintTime(p.CreatedAt)},
{Align: simpletable.AlignLeft, Text: internal.PrintTime(p.UpdatedAt)},
}
Expand Down
1 change: 1 addition & 0 deletions cmd/conduit/root/processors/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (c *ProcessorsCommand) Aliases() []string { return []string{"processor"} }
func (c *ProcessorsCommand) SubCommands() []ecdysis.Command {
return []ecdysis.Command{
&ListCommand{},
&DescribeCommand{},
}
}

Expand Down

0 comments on commit 873e9e3

Please sign in to comment.