Skip to content

Commit

Permalink
make pipeline name optional (#2130)
Browse files Browse the repository at this point in the history
fixes #2129
  • Loading branch information
raulb authored Feb 5, 2025
1 parent de1579d commit 75cbcd7
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 17 deletions.
55 changes: 38 additions & 17 deletions cmd/conduit/root/pipelines/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ var (
)

const (
defaultSource = "generator"
defaultDestination = "file"
defaultSource = "generator"
defaultDestination = "file"
defaultPipelineName = "generator-to-file"
)

type InitArgs struct {
name string
pipelineName string
}

type InitFlags struct {
Expand Down Expand Up @@ -76,26 +77,27 @@ func (c *InitCommand) Flags() []ecdysis.Flag {
}

func (c *InitCommand) Args(args []string) error {
if len(args) == 0 {
return cerrors.Errorf("requires a pipeline name")
}

if len(args) > 1 {
switch len(args) {
case 0:
c.args.pipelineName = defaultPipelineName
case 1:
c.args.pipelineName = args[0]
default:
return cerrors.Errorf("too many arguments")
}
c.args.name = args[0]
return nil
}

func (c *InitCommand) Usage() string { return "init PIPELINE_NAME" }
func (c *InitCommand) Usage() string { return "init [PIPELINE_NAME]" }

func (c *InitCommand) Docs() ecdysis.Docs {
return ecdysis.Docs{
Short: "Initialize an example pipeline.",
Long: `Initialize a pipeline configuration file, with all of parameters for source and destination connectors
initialized and described. The source and destination connector can be chosen via flags. If no connectors are chosen, then
a simple and runnable generator-to-log pipeline is configured.`,
Example: "conduit pipelines init awesome-pipeline-name --source postgres --destination kafka \n" +
a simple and runnable generator-to-log pipeline is configured. `,
Example: "conduit pipelines init\n" +
"conduit pipelines init awesome-pipeline-name --source postgres --destination kafka \n" +
"conduit pipelines init file-to-pg --source file --destination postgres --pipelines.path ./my-pipelines",
}
}
Expand Down Expand Up @@ -201,7 +203,7 @@ func (c *InitCommand) buildTemplatePipeline() (pipelineTemplate, error) {
}

return pipelineTemplate{
Name: c.args.name,
Name: c.getPipelineName(),
SourceSpec: srcSpec,
DestinationSpec: dstSpec,
}, nil
Expand Down Expand Up @@ -233,8 +235,28 @@ func (c *InitCommand) write(pipeline pipelineTemplate) error {
return nil
}

// getPipelineName returns the desired pipeline name based on configuration.
// If user provided one, it'll respect it. Otherwise, it'll be based on source and dest connectors.
func (c *InitCommand) getPipelineName() string {
src := defaultSource
dest := defaultDestination

if c.args.pipelineName != defaultPipelineName {
return c.args.pipelineName
}

if c.flags.Source != "" {
src = c.flags.Source
}

if c.flags.Destination != "" {
dest = c.flags.Destination
}
return fmt.Sprintf("%s-to-%s", src, dest)
}

func (c *InitCommand) Execute(_ context.Context) error {
c.configFilePath = filepath.Join(c.flags.PipelinesPath, fmt.Sprintf("pipeline-%s.yaml", c.args.name))
c.configFilePath = filepath.Join(c.flags.PipelinesPath, fmt.Sprintf("pipeline-%s.yaml", c.getPipelineName()))

pipeline, err := c.buildTemplatePipeline()
if err != nil {
Expand All @@ -245,9 +267,8 @@ func (c *InitCommand) Execute(_ context.Context) error {
return cerrors.Errorf("could not write pipeline: %w", err)
}

fmt.Printf(`Your pipeline has been initialized and created at %s.
To run the pipeline, simply run 'conduit run'.`, c.configFilePath)
fmt.Printf("Your pipeline has been initialized and created at %q.\n"+
"To run the pipeline, simply run `conduit run`.\n", c.configFilePath)

return nil
}
111 changes: 111 additions & 0 deletions cmd/conduit/root/pipelines/init_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipelines

import (
"fmt"
"testing"

"github.com/matryer/is"
)

func TestInitExecutionNoArgs(t *testing.T) {
is := is.New(t)

c := InitCommand{}
err := c.Args([]string{})
is.NoErr(err)
is.Equal(defaultPipelineName, c.args.pipelineName)
}

func TestInitExecutionMultipleArgs(t *testing.T) {
is := is.New(t)

c := InitCommand{}
err := c.Args([]string{"foo", "bar"})
is.True((err != nil))
is.Equal(err.Error(), "too many arguments")
}

func TestInitExecutionOneArg(t *testing.T) {
is := is.New(t)

pipelineName := "pipeline-name"

c := InitCommand{}
err := c.Args([]string{pipelineName})
is.NoErr(err)
is.Equal(pipelineName, c.args.pipelineName)
}

func TestInit_getPipelineName(t *testing.T) {
tests := []struct {
name string
argsPipelineName string
flagsSource string
flagsDestination string
expected string
}{
{
name: "Custom pipeline name",
argsPipelineName: "custom-pipeline",
flagsSource: "",
flagsDestination: "",
expected: "custom-pipeline",
},
{
name: "Default pipeline name with custom source and destination",
argsPipelineName: defaultPipelineName,
flagsSource: "custom-source",
flagsDestination: "custom-destination",
expected: "custom-source-to-custom-destination",
},
{
name: "Default pipeline name with custom source only",
argsPipelineName: defaultPipelineName,
flagsSource: "custom-source",
flagsDestination: "",
expected: fmt.Sprintf("custom-source-to-%s", defaultDestination),
},
{
name: "Default pipeline name with custom destination only",
argsPipelineName: defaultPipelineName,
flagsSource: "",
flagsDestination: "custom-destination",
expected: fmt.Sprintf("%s-to-custom-destination", defaultSource),
},
{
name: "Default pipeline name with default source and destination",
argsPipelineName: defaultPipelineName,
flagsSource: "",
flagsDestination: "",
expected: fmt.Sprintf("%s-to-%s", defaultSource, defaultDestination),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
is := is.New(t)
c := &InitCommand{}

c.args.pipelineName = tt.argsPipelineName
c.flags.Source = tt.flagsSource
c.flags.Destination = tt.flagsDestination

got := c.getPipelineName()
is.Equal(got, tt.expected)
})
}
}

0 comments on commit 75cbcd7

Please sign in to comment.