Skip to content

Commit

Permalink
Fixes and adjustments from local testing
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Oct 5, 2023
1 parent 53922cf commit db274dc
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 112 deletions.
4 changes: 2 additions & 2 deletions go/cmd/vtctldclient/command/vreplication/common/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ func commandUpdateState(cmd *cobra.Command, args []string) error {
var state binlogdatapb.VReplicationWorkflowState
switch strings.ToLower(cmd.Name()) {
case "start":
if err := CanRestartWorkflow(workflowUpdateOptions.Workflow, workflowOptions.Keyspace); err != nil {
if err := CanRestartWorkflow(workflowOptions.Keyspace, workflowUpdateOptions.Workflow); err != nil {
return err
}
state = binlogdatapb.VReplicationWorkflowState_Running
case "stop":
state = binlogdatapb.VReplicationWorkflowState_Stopped
default:
return fmt.Errorf("invalid workstate: %s", args[0])
return fmt.Errorf("invalid workflow state: %s", args[0])
}

// The only thing we're updating is the state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
"vitess.io/vitess/go/vt/topo/topoproto"
)

var (
Expand All @@ -37,10 +38,12 @@ func registerCommands(root *cobra.Command) {
common.AddCommonFlags(base)
root.AddCommand(base)

common.AddCommonCreateFlags(create)
create.Flags().StringSliceVarP(&common.CreateOptions.Cells, "cells", "c", nil, "Cells and/or CellAliases to copy table data from.")
create.Flags().Var((*topoproto.TabletTypeListFlag)(&common.CreateOptions.TabletTypes), "tablet-types", "Source tablet types to replicate table data from (e.g. PRIMARY,REPLICA,RDONLY).")
create.Flags().BoolVar(&common.CreateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-preference-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.")
create.Flags().StringVar(&createOptions.SourceKeyspace, "source-keyspace", "", "Keyspace where the tables are being moved from.")
create.MarkFlagRequired("source-keyspace")
create.Flags().Var(&createOptions.TableSettings, "table-settings", "A JSON array where each value must contain two key/value pairs. The first key is 'target_table' and it is the name of the table in the target-keyspace to store the results in. The second key is 'source_expression' and its value is the select to run against the source table. An optional k/v pair can be specified for 'create_ddl' which provides the DDL to create the target table if it does not exist.")
create.Flags().Var(&createOptions.TableSettings, "table-settings", "A JSON array where each value must contain two key/value pairs. The first key is 'target_table' and it is the name of the table in the target-keyspace to store the results in. The second key is 'source_expression' and its value is the select to run against the source table. An optional k/v pair can be specified for 'create_ddl' which provides the DDL to create the target table if it does not exist (you can specify a value of 'copy' if the target-table should be copied as-is from the source keyspace).")
create.MarkFlagRequired("table-settings")
base.AddCommand(create)

Expand Down
27 changes: 9 additions & 18 deletions go/cmd/vtctldclient/command/vreplication/workflow/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,31 @@ import (
)

var (
workflowDeleteOptions = struct {
Workflow string
deleteOptions = struct {
KeepData bool
KeepRoutingRules bool
}{}

// WorkflowDelete makes a WorkflowDelete gRPC call to a vtctld.
workflowDelete = &cobra.Command{
// delete makes a WorkflowDelete gRPC call to a vtctld.
delete = &cobra.Command{
Use: "delete",
Short: "Delete a VReplication workflow.",
Example: `vtctldclient --server localhost:15999 workflow --keyspace customer delete --workflow commerce2customer`,
DisableFlagsInUseLine: true,
Aliases: []string{"Delete"},
Args: cobra.NoArgs,
RunE: commandWorkflowDelete,
RunE: commandDelete,
}
)

func commandWorkflowDelete(cmd *cobra.Command, args []string) error {
func commandDelete(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.WorkflowDeleteRequest{
Keyspace: workflowOptions.Keyspace,
Workflow: workflowDeleteOptions.Workflow,
KeepData: workflowDeleteOptions.KeepData,
KeepRoutingRules: workflowDeleteOptions.KeepRoutingRules,
Keyspace: baseOptions.Keyspace,
Workflow: baseOptions.Workflow,
KeepData: deleteOptions.KeepData,
KeepRoutingRules: deleteOptions.KeepRoutingRules,
}
resp, err := common.GetClient().WorkflowDelete(common.GetCommandCtx(), req)
if err != nil {
Expand All @@ -75,11 +74,3 @@ func commandWorkflowDelete(cmd *cobra.Command, args []string) error {

return nil
}

func addWorkflowDeleteFlags(cmd *cobra.Command) {
workflowDelete.Flags().StringVarP(&workflowDeleteOptions.Workflow, "workflow", "w", "", "The workflow you want to delete (required).")
workflowDelete.MarkFlagRequired("workflow")
workflowDelete.Flags().BoolVar(&workflowDeleteOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the workflow in the target keyspace.")
workflowDelete.Flags().BoolVar(&workflowDeleteOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules created for the workflow.")

}
4 changes: 0 additions & 4 deletions go/cmd/vtctldclient/command/vreplication/workflow/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,3 @@ func commandGetWorkflows(cmd *cobra.Command, args []string) error {

return nil
}

func addGetWorkflowsFlags(cmd *cobra.Command) {
cmd.Flags().BoolVarP(&getWorkflowsOptions.ShowAll, "show-all", "a", false, "Show all workflows instead of just active workflows.")
}
19 changes: 7 additions & 12 deletions go/cmd/vtctldclient/command/vreplication/workflow/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,27 @@ var (
DisableFlagsInUseLine: true,
Aliases: []string{"List"},
Args: cobra.NoArgs,
RunE: commandWorkflowShow,
RunE: commandShow,
}

// WorkflowShow makes a GetWorkflows gRPC call to a vtctld.
workflowShow = &cobra.Command{
// show makes a GetWorkflows gRPC call to a vtctld.
show = &cobra.Command{
Use: "show",
Short: "Show the details for a VReplication workflow.",
Example: `vtctldclient --server localhost:15999 workflow --keyspace customer show --workflow commerce2customer`,
DisableFlagsInUseLine: true,
Aliases: []string{"Show"},
Args: cobra.NoArgs,
RunE: commandWorkflowShow,
RunE: commandShow,
}
)

func commandWorkflowShow(cmd *cobra.Command, args []string) error {
func commandShow(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.GetWorkflowsRequest{
Keyspace: workflowOptions.Keyspace,
Workflow: workflowDeleteOptions.Workflow,
Keyspace: baseOptions.Keyspace,
Workflow: baseOptions.Workflow,
}
resp, err := common.GetClient().GetWorkflows(common.GetCommandCtx(), req)
if err != nil {
Expand All @@ -82,8 +82,3 @@ func commandWorkflowShow(cmd *cobra.Command, args []string) error {

return nil
}

func addWorkflowShowFlags(cmd *cobra.Command) {
workflowShow.Flags().StringVarP(&workflowDeleteOptions.Workflow, "workflow", "w", "", "The workflow you want the details for (required).")
workflowShow.MarkFlagRequired("workflow")
}
22 changes: 11 additions & 11 deletions go/cmd/vtctldclient/command/vreplication/workflow/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,50 +34,50 @@ import (
)

var (
// WorkflowStart makes a WorfklowUpdate gRPC call to a vtctld.
workflowStart = &cobra.Command{
// start makes a WorfklowUpdate gRPC call to a vtctld.
start = &cobra.Command{
Use: "start",
Short: "Start a VReplication workflow.",
Example: `vtctldclient --server localhost:15999 workflow --keyspace customer start --workflow commerce2customer`,
DisableFlagsInUseLine: true,
Aliases: []string{"Start"},
Args: cobra.NoArgs,
RunE: commandWorkflowUpdateState,
RunE: commandUpdateState,
}

// WorkflowStop makes a WorfklowUpdate gRPC call to a vtctld.
workflowStop = &cobra.Command{
// stop makes a WorfklowUpdate gRPC call to a vtctld.
stop = &cobra.Command{
Use: "stop",
Short: "Stop a VReplication workflow.",
Example: `vtctldclient --server localhost:15999 workflow --keyspace customer stop --workflow commerce2customer`,
DisableFlagsInUseLine: true,
Aliases: []string{"Stop"},
Args: cobra.NoArgs,
RunE: commandWorkflowUpdateState,
RunE: commandUpdateState,
}
)

func commandWorkflowUpdateState(cmd *cobra.Command, args []string) error {
func commandUpdateState(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

var state binlogdatapb.VReplicationWorkflowState
switch strings.ToLower(cmd.Name()) {
case "start":
if err := common.CanRestartWorkflow(workflowUpdateOptions.Workflow, workflowOptions.Keyspace); err != nil {
if err := common.CanRestartWorkflow(baseOptions.Keyspace, baseOptions.Workflow); err != nil {
return err
}
state = binlogdatapb.VReplicationWorkflowState_Running
case "stop":
state = binlogdatapb.VReplicationWorkflowState_Stopped
default:
return fmt.Errorf("invalid workstate: %s", args[0])
return fmt.Errorf("invalid workflow state: %s", args[0])
}

// The only thing we're updating is the state.
req := &vtctldatapb.WorkflowUpdateRequest{
Keyspace: workflowOptions.Keyspace,
Keyspace: baseOptions.Keyspace,
TabletRequest: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflowUpdateOptions.Workflow,
Workflow: baseOptions.Workflow,
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
Expand Down
50 changes: 23 additions & 27 deletions go/cmd/vtctldclient/command/vreplication/workflow/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"vitess.io/vitess/go/cmd/vtctldclient/cli"
"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/topo/topoproto"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand All @@ -35,8 +34,15 @@ import (
)

var (
// WorkflowUpdate makes a WorkflowUpdate gRPC call to a vtctld.
workflowUpdate = &cobra.Command{
updateOptions = struct {
Cells []string
TabletTypes []topodatapb.TabletType
TabletTypesInPreferenceOrder bool
OnDDL string
}{}

// update makes a WorkflowUpdate gRPC call to a vtctld.
update = &cobra.Command{
Use: "update",
Short: "Update the configuration parameters for a VReplication workflow.",
Example: `vtctldclient --server localhost:15999 workflow --keyspace customer update --workflow commerce2customer --cells zone1 --cells zone2 -c "zone3,zone4" -c zone5`,
Expand All @@ -47,59 +53,59 @@ var (
changes := false
if cmd.Flags().Lookup("cells").Changed { // Validate the provided value(s)
changes = true
for i, cell := range workflowUpdateOptions.Cells { // Which only means trimming whitespace
workflowUpdateOptions.Cells[i] = strings.TrimSpace(cell)
for i, cell := range updateOptions.Cells { // Which only means trimming whitespace
updateOptions.Cells[i] = strings.TrimSpace(cell)
}
} else {
workflowUpdateOptions.Cells = textutil.SimulatedNullStringSlice
updateOptions.Cells = textutil.SimulatedNullStringSlice
}
if cmd.Flags().Lookup("tablet-types").Changed {
changes = true
} else {
workflowUpdateOptions.TabletTypes = []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}
updateOptions.TabletTypes = []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}
}
if cmd.Flags().Lookup("on-ddl").Changed { // Validate the provided value
changes = true
if _, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(workflowUpdateOptions.OnDDL)]; !ok {
return fmt.Errorf("invalid on-ddl value: %s", workflowUpdateOptions.OnDDL)
if _, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(updateOptions.OnDDL)]; !ok {
return fmt.Errorf("invalid on-ddl value: %s", updateOptions.OnDDL)
}
} // Simulated NULL will need to be handled in command
if !changes {
return fmt.Errorf("no configuration options specified to update")
}
return nil
},
RunE: commandWorkflowUpdate,
RunE: commandUpdate,
}
)

func commandWorkflowUpdate(cmd *cobra.Command, args []string) error {
func commandUpdate(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

// We've already validated any provided value, if one WAS provided.
// Now we need to do the mapping from the string representation to
// the enum value.
onddl := int32(textutil.SimulatedNullInt) // Simulated NULL when no value provided
if val, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(workflowUpdateOptions.OnDDL)]; ok {
if val, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(updateOptions.OnDDL)]; ok {
onddl = val
}

// Simulated NULL when no value is provided.
tsp := tabletmanagerdatapb.TabletSelectionPreference_UNKNOWN
if cmd.Flags().Lookup("tablet-types-in-order").Changed {
if workflowUpdateOptions.TabletTypesInPreferenceOrder {
if updateOptions.TabletTypesInPreferenceOrder {
tsp = tabletmanagerdatapb.TabletSelectionPreference_INORDER
} else {
tsp = tabletmanagerdatapb.TabletSelectionPreference_ANY
}
}

req := &vtctldatapb.WorkflowUpdateRequest{
Keyspace: workflowOptions.Keyspace,
Keyspace: baseOptions.Keyspace,
TabletRequest: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflowUpdateOptions.Workflow,
Cells: workflowUpdateOptions.Cells,
TabletTypes: workflowUpdateOptions.TabletTypes,
Workflow: baseOptions.Workflow,
Cells: updateOptions.Cells,
TabletTypes: updateOptions.TabletTypes,
TabletSelectionPreference: tsp,
OnDdl: binlogdatapb.OnDDLAction(onddl),
},
Expand All @@ -124,13 +130,3 @@ func commandWorkflowUpdate(cmd *cobra.Command, args []string) error {

return nil
}

func addWorkflowUpdateFlags(cmd *cobra.Command) {
workflowUpdate.Flags().StringVarP(&workflowUpdateOptions.Workflow, "workflow", "w", "", "The workflow you want to update (required).")
workflowUpdate.MarkFlagRequired("workflow")
workflowUpdate.Flags().StringSliceVarP(&workflowUpdateOptions.Cells, "cells", "c", nil, "New Cell(s) or CellAlias(es) (comma-separated) to replicate from.")
workflowUpdate.Flags().VarP((*topoproto.TabletTypeListFlag)(&workflowUpdateOptions.TabletTypes), "tablet-types", "t", "New source tablet types to replicate from (e.g. PRIMARY,REPLICA,RDONLY).")
workflowUpdate.Flags().BoolVar(&workflowUpdateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.")
workflowUpdate.Flags().StringVar(&workflowUpdateOptions.OnDDL, "on-ddl", "", "New instruction on what to do when DDL is encountered in the VReplication stream. Possible values are IGNORE, STOP, EXEC, and EXEC_IGNORE.")

}
Loading

0 comments on commit db274dc

Please sign in to comment.