Skip to content

Commit

Permalink
add ExecuteWithClient decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
raulb committed Jan 15, 2025
1 parent 935b419 commit dce673d
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 42 deletions.
33 changes: 24 additions & 9 deletions cmd/conduit/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
)

type Client struct {
conn *grpc.ClientConn
PipelineService apiv1.PipelineServiceClient
HealthService healthgrpc.HealthClient
conn *grpc.ClientConn
apiv1.PipelineServiceClient
healthgrpc.HealthClient
}

func NewClient(_ context.Context, address string) (*Client, error) {
func NewClient(ctx context.Context, address string) (*Client, error) {
conn, err := grpc.NewClient(
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
Expand All @@ -39,11 +39,26 @@ func NewClient(_ context.Context, address string) (*Client, error) {
return nil, fmt.Errorf("failed to create gRPC client: %w", err)
}

return &Client{
conn: conn,
PipelineService: apiv1.NewPipelineServiceClient(conn),
HealthService: healthgrpc.NewHealthClient(conn),
}, nil
client := &Client{
conn: conn,
PipelineServiceClient: apiv1.NewPipelineServiceClient(conn),
HealthClient: healthgrpc.NewHealthClient(conn),
}

if err := client.CheckHealth(ctx); err != nil {
client.Close()
return nil, err
}

return client, nil
}

func (c *Client) CheckHealth(ctx context.Context) error {
healthResp, err := c.HealthClient.Check(ctx, &healthgrpc.HealthCheckRequest{})
if err != nil || healthResp.Status != healthgrpc.HealthCheckResponse_SERVING {
return fmt.Errorf("notice: to inspect the API, Conduit needs to be running\nPlease execute `conduit run`")
}
return nil
}

func (c *Client) Close() error {
Expand Down
67 changes: 67 additions & 0 deletions cmd/conduit/cecdysis/decorators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cecdysis

import (
"context"

"github.com/conduitio/conduit/cmd/conduit/api"
"github.com/conduitio/ecdysis"
"github.com/spf13/cobra"
)

// ------------------- CommandWithClient

// CommandWithExecuteWithClient can be implemented by a command that requires a client to interact
// with the Conduit API during the execution.
type CommandWithExecuteWithClient interface {
ecdysis.Command

// ExecuteWithClient is the actual work function. Most commands will implement this.
ExecuteWithClient(context.Context, *api.Client) error
}

// CommandWithExecuteWithClientDecorator is a decorator that adds a Conduit API client to the command execution.
type CommandWithExecuteWithClientDecorator struct{}

func (CommandWithExecuteWithClientDecorator) Decorate(_ *ecdysis.Ecdysis, cmd *cobra.Command, c ecdysis.Command) error {
v, ok := c.(CommandWithExecuteWithClient)
if !ok {
return nil
}

old := cmd.RunE
cmd.RunE = func(cmd *cobra.Command, args []string) error {
if old != nil {
err := old(cmd, args)
if err != nil {
return err
}
}

// TODO: Make sure address is fetched from flags
client, err := api.NewClient(cmd.Context(), ":8084")
if err != nil {
cmd.SilenceUsage = true
return err
}
defer client.Close()

v.ExecuteWithClient(cmd.Context(), client)

Check failure on line 62 in cmd/conduit/cecdysis/decorators.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Error return value of `v.ExecuteWithClient` is not checked (errcheck)
return nil
}

return nil
}
3 changes: 2 additions & 1 deletion cmd/conduit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
"fmt"
"os"

"github.com/conduitio/conduit/cmd/conduit/cecdysis"
"github.com/conduitio/conduit/cmd/conduit/root"
"github.com/conduitio/ecdysis"
)

func main() {
e := ecdysis.New()
e := ecdysis.New(ecdysis.WithDecorators(cecdysis.CommandWithExecuteWithClientDecorator{}))

cmd := e.MustBuildCobraCommand(&root.RootCommand{})
cmd.CompletionOptions.DisableDefaultCmd = true
Expand Down
34 changes: 6 additions & 28 deletions cmd/conduit/root/pipelines/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,19 @@ import (

"github.com/alexeyco/simpletable"
"github.com/conduitio/conduit/cmd/conduit/api"
"github.com/conduitio/conduit/cmd/conduit/cecdysis"
"github.com/conduitio/conduit/cmd/conduit/root/run"
apiv1 "github.com/conduitio/conduit/proto/api/v1"
"github.com/conduitio/ecdysis"
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
)

var (
_ ecdysis.CommandWithExecute = (*ListCommand)(nil)
_ ecdysis.CommandWithAliases = (*ListCommand)(nil)
_ ecdysis.CommandWithDocs = (*ListCommand)(nil)
// with API client ?
_ cecdysis.CommandWithExecuteWithClient = (*ListCommand)(nil)
_ ecdysis.CommandWithAliases = (*ListCommand)(nil)
_ ecdysis.CommandWithDocs = (*ListCommand)(nil)
)

type ListCommand struct {
client *api.Client
RunCmd *run.RunCommand
}

Expand All @@ -52,28 +50,8 @@ func (c *ListCommand) Aliases() []string { return []string{"ls"} }

func (c *ListCommand) Usage() string { return "list" }

func (c *ListCommand) Execute(ctx context.Context) error {
// TODO: Move this elsewhere since it'll be common for all commands that require having Conduit Running
// --------- START
conduitNotRunning := "Notice: To inspect the API, Conduit needs to be running" +
"\nPlease execute `conduit run`"

conduitGRPCAddr := c.RunCmd.GRPCAddress()

conduitClient, err := api.NewClient(ctx, conduitGRPCAddr)
if err != nil {
return fmt.Errorf("failed to connect to Conduit server: %w", err)
}
defer conduitClient.Close()

sourceHealthResp, err := conduitClient.HealthService.Check(ctx, &healthgrpc.HealthCheckRequest{})
if err != nil || sourceHealthResp.Status != healthgrpc.HealthCheckResponse_SERVING {
fmt.Println(conduitNotRunning)
return nil
}
// --------- END

resp, err := conduitClient.PipelineService.ListPipelines(ctx, &apiv1.ListPipelinesRequest{})
func (c *ListCommand) ExecuteWithClient(ctx context.Context, client *api.Client) error {
resp, err := client.PipelineServiceClient.ListPipelines(ctx, &apiv1.ListPipelinesRequest{})
if err != nil {
return fmt.Errorf("failed to list pipelines: %w", err)
}
Expand Down
4 changes: 0 additions & 4 deletions cmd/conduit/root/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,3 @@ func (c *RunCommand) Docs() ecdysis.Docs {
Long: `Starts the Conduit server and runs the configured pipelines.`,
}
}

func (c *RunCommand) GRPCAddress() string {
return c.Cfg.API.GRPC.Address
}

0 comments on commit dce673d

Please sign in to comment.