diff --git a/go/cmd/vtctldclient/command/vreplication/common/update.go b/go/cmd/vtctldclient/command/vreplication/common/update.go index 6beecb58ffa..7875c9412ac 100644 --- a/go/cmd/vtctldclient/command/vreplication/common/update.go +++ b/go/cmd/vtctldclient/command/vreplication/common/update.go @@ -127,14 +127,14 @@ func commandUpdateState(cmd *cobra.Command, args []string) error { var state binlogdatapb.VReplicationWorkflowState switch strings.ToLower(cmd.Name()) { case "start": - if err := CanRestartWorkflow(workflowUpdateOptions.Workflow, workflowOptions.Keyspace); err != nil { + if err := CanRestartWorkflow(workflowOptions.Keyspace, workflowUpdateOptions.Workflow); err != nil { return err } state = binlogdatapb.VReplicationWorkflowState_Running case "stop": state = binlogdatapb.VReplicationWorkflowState_Stopped default: - return fmt.Errorf("invalid workstate: %s", args[0]) + return fmt.Errorf("invalid workflow state: %s", args[0]) } // The only thing we're updating is the state. diff --git a/go/cmd/vtctldclient/command/vreplication/materialize/materialize.go b/go/cmd/vtctldclient/command/vreplication/materialize/materialize.go index 97adc8518e2..403ba15fbff 100644 --- a/go/cmd/vtctldclient/command/vreplication/materialize/materialize.go +++ b/go/cmd/vtctldclient/command/vreplication/materialize/materialize.go @@ -20,6 +20,7 @@ import ( "github.com/spf13/cobra" "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common" + "vitess.io/vitess/go/vt/topo/topoproto" ) var ( @@ -37,10 +38,12 @@ func registerCommands(root *cobra.Command) { common.AddCommonFlags(base) root.AddCommand(base) - common.AddCommonCreateFlags(create) + create.Flags().StringSliceVarP(&common.CreateOptions.Cells, "cells", "c", nil, "Cells and/or CellAliases to copy table data from.") + create.Flags().Var((*topoproto.TabletTypeListFlag)(&common.CreateOptions.TabletTypes), "tablet-types", "Source tablet types to replicate table data from (e.g. PRIMARY,REPLICA,RDONLY).") + create.Flags().BoolVar(&common.CreateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-preference-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.") create.Flags().StringVar(&createOptions.SourceKeyspace, "source-keyspace", "", "Keyspace where the tables are being moved from.") create.MarkFlagRequired("source-keyspace") - create.Flags().Var(&createOptions.TableSettings, "table-settings", "A JSON array where each value must contain two key/value pairs. The first key is 'target_table' and it is the name of the table in the target-keyspace to store the results in. The second key is 'source_expression' and its value is the select to run against the source table. An optional k/v pair can be specified for 'create_ddl' which provides the DDL to create the target table if it does not exist.") + create.Flags().Var(&createOptions.TableSettings, "table-settings", "A JSON array where each value must contain two key/value pairs. The first key is 'target_table' and it is the name of the table in the target-keyspace to store the results in. The second key is 'source_expression' and its value is the select to run against the source table. An optional k/v pair can be specified for 'create_ddl' which provides the DDL to create the target table if it does not exist (you can specify a value of 'copy' if the target-table should be copied as-is from the source keyspace).") create.MarkFlagRequired("table-settings") base.AddCommand(create) diff --git a/go/cmd/vtctldclient/command/vreplication/workflow/delete.go b/go/cmd/vtctldclient/command/vreplication/workflow/delete.go index 7713a5aa3e5..4eae8076fec 100644 --- a/go/cmd/vtctldclient/command/vreplication/workflow/delete.go +++ b/go/cmd/vtctldclient/command/vreplication/workflow/delete.go @@ -29,32 +29,31 @@ import ( ) var ( - workflowDeleteOptions = struct { - Workflow string + deleteOptions = struct { KeepData bool KeepRoutingRules bool }{} - // WorkflowDelete makes a WorkflowDelete gRPC call to a vtctld. - workflowDelete = &cobra.Command{ + // delete makes a WorkflowDelete gRPC call to a vtctld. + delete = &cobra.Command{ Use: "delete", Short: "Delete a VReplication workflow.", Example: `vtctldclient --server localhost:15999 workflow --keyspace customer delete --workflow commerce2customer`, DisableFlagsInUseLine: true, Aliases: []string{"Delete"}, Args: cobra.NoArgs, - RunE: commandWorkflowDelete, + RunE: commandDelete, } ) -func commandWorkflowDelete(cmd *cobra.Command, args []string) error { +func commandDelete(cmd *cobra.Command, args []string) error { cli.FinishedParsing(cmd) req := &vtctldatapb.WorkflowDeleteRequest{ - Keyspace: workflowOptions.Keyspace, - Workflow: workflowDeleteOptions.Workflow, - KeepData: workflowDeleteOptions.KeepData, - KeepRoutingRules: workflowDeleteOptions.KeepRoutingRules, + Keyspace: baseOptions.Keyspace, + Workflow: baseOptions.Workflow, + KeepData: deleteOptions.KeepData, + KeepRoutingRules: deleteOptions.KeepRoutingRules, } resp, err := common.GetClient().WorkflowDelete(common.GetCommandCtx(), req) if err != nil { @@ -75,11 +74,3 @@ func commandWorkflowDelete(cmd *cobra.Command, args []string) error { return nil } - -func addWorkflowDeleteFlags(cmd *cobra.Command) { - workflowDelete.Flags().StringVarP(&workflowDeleteOptions.Workflow, "workflow", "w", "", "The workflow you want to delete (required).") - workflowDelete.MarkFlagRequired("workflow") - workflowDelete.Flags().BoolVar(&workflowDeleteOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the workflow in the target keyspace.") - workflowDelete.Flags().BoolVar(&workflowDeleteOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules created for the workflow.") - -} diff --git a/go/cmd/vtctldclient/command/vreplication/workflow/get.go b/go/cmd/vtctldclient/command/vreplication/workflow/get.go index 95cc80ffbbb..affa8ffc2fa 100644 --- a/go/cmd/vtctldclient/command/vreplication/workflow/get.go +++ b/go/cmd/vtctldclient/command/vreplication/workflow/get.go @@ -64,7 +64,3 @@ func commandGetWorkflows(cmd *cobra.Command, args []string) error { return nil } - -func addGetWorkflowsFlags(cmd *cobra.Command) { - cmd.Flags().BoolVarP(&getWorkflowsOptions.ShowAll, "show-all", "a", false, "Show all workflows instead of just active workflows.") -} diff --git a/go/cmd/vtctldclient/command/vreplication/workflow/show.go b/go/cmd/vtctldclient/command/vreplication/workflow/show.go index 6f80e821559..d16125168c6 100644 --- a/go/cmd/vtctldclient/command/vreplication/workflow/show.go +++ b/go/cmd/vtctldclient/command/vreplication/workflow/show.go @@ -37,27 +37,27 @@ var ( DisableFlagsInUseLine: true, Aliases: []string{"List"}, Args: cobra.NoArgs, - RunE: commandWorkflowShow, + RunE: commandShow, } - // WorkflowShow makes a GetWorkflows gRPC call to a vtctld. - workflowShow = &cobra.Command{ + // show makes a GetWorkflows gRPC call to a vtctld. + show = &cobra.Command{ Use: "show", Short: "Show the details for a VReplication workflow.", Example: `vtctldclient --server localhost:15999 workflow --keyspace customer show --workflow commerce2customer`, DisableFlagsInUseLine: true, Aliases: []string{"Show"}, Args: cobra.NoArgs, - RunE: commandWorkflowShow, + RunE: commandShow, } ) -func commandWorkflowShow(cmd *cobra.Command, args []string) error { +func commandShow(cmd *cobra.Command, args []string) error { cli.FinishedParsing(cmd) req := &vtctldatapb.GetWorkflowsRequest{ - Keyspace: workflowOptions.Keyspace, - Workflow: workflowDeleteOptions.Workflow, + Keyspace: baseOptions.Keyspace, + Workflow: baseOptions.Workflow, } resp, err := common.GetClient().GetWorkflows(common.GetCommandCtx(), req) if err != nil { @@ -82,8 +82,3 @@ func commandWorkflowShow(cmd *cobra.Command, args []string) error { return nil } - -func addWorkflowShowFlags(cmd *cobra.Command) { - workflowShow.Flags().StringVarP(&workflowDeleteOptions.Workflow, "workflow", "w", "", "The workflow you want the details for (required).") - workflowShow.MarkFlagRequired("workflow") -} diff --git a/go/cmd/vtctldclient/command/vreplication/workflow/state.go b/go/cmd/vtctldclient/command/vreplication/workflow/state.go index 573c504d9d5..89e75312ab2 100644 --- a/go/cmd/vtctldclient/command/vreplication/workflow/state.go +++ b/go/cmd/vtctldclient/command/vreplication/workflow/state.go @@ -34,50 +34,50 @@ import ( ) var ( - // WorkflowStart makes a WorfklowUpdate gRPC call to a vtctld. - workflowStart = &cobra.Command{ + // start makes a WorfklowUpdate gRPC call to a vtctld. + start = &cobra.Command{ Use: "start", Short: "Start a VReplication workflow.", Example: `vtctldclient --server localhost:15999 workflow --keyspace customer start --workflow commerce2customer`, DisableFlagsInUseLine: true, Aliases: []string{"Start"}, Args: cobra.NoArgs, - RunE: commandWorkflowUpdateState, + RunE: commandUpdateState, } - // WorkflowStop makes a WorfklowUpdate gRPC call to a vtctld. - workflowStop = &cobra.Command{ + // stop makes a WorfklowUpdate gRPC call to a vtctld. + stop = &cobra.Command{ Use: "stop", Short: "Stop a VReplication workflow.", Example: `vtctldclient --server localhost:15999 workflow --keyspace customer stop --workflow commerce2customer`, DisableFlagsInUseLine: true, Aliases: []string{"Stop"}, Args: cobra.NoArgs, - RunE: commandWorkflowUpdateState, + RunE: commandUpdateState, } ) -func commandWorkflowUpdateState(cmd *cobra.Command, args []string) error { +func commandUpdateState(cmd *cobra.Command, args []string) error { cli.FinishedParsing(cmd) var state binlogdatapb.VReplicationWorkflowState switch strings.ToLower(cmd.Name()) { case "start": - if err := common.CanRestartWorkflow(workflowUpdateOptions.Workflow, workflowOptions.Keyspace); err != nil { + if err := common.CanRestartWorkflow(baseOptions.Keyspace, baseOptions.Workflow); err != nil { return err } state = binlogdatapb.VReplicationWorkflowState_Running case "stop": state = binlogdatapb.VReplicationWorkflowState_Stopped default: - return fmt.Errorf("invalid workstate: %s", args[0]) + return fmt.Errorf("invalid workflow state: %s", args[0]) } // The only thing we're updating is the state. req := &vtctldatapb.WorkflowUpdateRequest{ - Keyspace: workflowOptions.Keyspace, + Keyspace: baseOptions.Keyspace, TabletRequest: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ - Workflow: workflowUpdateOptions.Workflow, + Workflow: baseOptions.Workflow, Cells: textutil.SimulatedNullStringSlice, TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt), diff --git a/go/cmd/vtctldclient/command/vreplication/workflow/update.go b/go/cmd/vtctldclient/command/vreplication/workflow/update.go index ca366b42bd8..404900ddf8e 100644 --- a/go/cmd/vtctldclient/command/vreplication/workflow/update.go +++ b/go/cmd/vtctldclient/command/vreplication/workflow/update.go @@ -26,7 +26,6 @@ import ( "vitess.io/vitess/go/cmd/vtctldclient/cli" "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common" "vitess.io/vitess/go/textutil" - "vitess.io/vitess/go/vt/topo/topoproto" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -35,8 +34,15 @@ import ( ) var ( - // WorkflowUpdate makes a WorkflowUpdate gRPC call to a vtctld. - workflowUpdate = &cobra.Command{ + updateOptions = struct { + Cells []string + TabletTypes []topodatapb.TabletType + TabletTypesInPreferenceOrder bool + OnDDL string + }{} + + // update makes a WorkflowUpdate gRPC call to a vtctld. + update = &cobra.Command{ Use: "update", Short: "Update the configuration parameters for a VReplication workflow.", Example: `vtctldclient --server localhost:15999 workflow --keyspace customer update --workflow commerce2customer --cells zone1 --cells zone2 -c "zone3,zone4" -c zone5`, @@ -47,21 +53,21 @@ var ( changes := false if cmd.Flags().Lookup("cells").Changed { // Validate the provided value(s) changes = true - for i, cell := range workflowUpdateOptions.Cells { // Which only means trimming whitespace - workflowUpdateOptions.Cells[i] = strings.TrimSpace(cell) + for i, cell := range updateOptions.Cells { // Which only means trimming whitespace + updateOptions.Cells[i] = strings.TrimSpace(cell) } } else { - workflowUpdateOptions.Cells = textutil.SimulatedNullStringSlice + updateOptions.Cells = textutil.SimulatedNullStringSlice } if cmd.Flags().Lookup("tablet-types").Changed { changes = true } else { - workflowUpdateOptions.TabletTypes = []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)} + updateOptions.TabletTypes = []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)} } if cmd.Flags().Lookup("on-ddl").Changed { // Validate the provided value changes = true - if _, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(workflowUpdateOptions.OnDDL)]; !ok { - return fmt.Errorf("invalid on-ddl value: %s", workflowUpdateOptions.OnDDL) + if _, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(updateOptions.OnDDL)]; !ok { + return fmt.Errorf("invalid on-ddl value: %s", updateOptions.OnDDL) } } // Simulated NULL will need to be handled in command if !changes { @@ -69,25 +75,25 @@ var ( } return nil }, - RunE: commandWorkflowUpdate, + RunE: commandUpdate, } ) -func commandWorkflowUpdate(cmd *cobra.Command, args []string) error { +func commandUpdate(cmd *cobra.Command, args []string) error { cli.FinishedParsing(cmd) // We've already validated any provided value, if one WAS provided. // Now we need to do the mapping from the string representation to // the enum value. onddl := int32(textutil.SimulatedNullInt) // Simulated NULL when no value provided - if val, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(workflowUpdateOptions.OnDDL)]; ok { + if val, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(updateOptions.OnDDL)]; ok { onddl = val } // Simulated NULL when no value is provided. tsp := tabletmanagerdatapb.TabletSelectionPreference_UNKNOWN if cmd.Flags().Lookup("tablet-types-in-order").Changed { - if workflowUpdateOptions.TabletTypesInPreferenceOrder { + if updateOptions.TabletTypesInPreferenceOrder { tsp = tabletmanagerdatapb.TabletSelectionPreference_INORDER } else { tsp = tabletmanagerdatapb.TabletSelectionPreference_ANY @@ -95,11 +101,11 @@ func commandWorkflowUpdate(cmd *cobra.Command, args []string) error { } req := &vtctldatapb.WorkflowUpdateRequest{ - Keyspace: workflowOptions.Keyspace, + Keyspace: baseOptions.Keyspace, TabletRequest: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ - Workflow: workflowUpdateOptions.Workflow, - Cells: workflowUpdateOptions.Cells, - TabletTypes: workflowUpdateOptions.TabletTypes, + Workflow: baseOptions.Workflow, + Cells: updateOptions.Cells, + TabletTypes: updateOptions.TabletTypes, TabletSelectionPreference: tsp, OnDdl: binlogdatapb.OnDDLAction(onddl), }, @@ -124,13 +130,3 @@ func commandWorkflowUpdate(cmd *cobra.Command, args []string) error { return nil } - -func addWorkflowUpdateFlags(cmd *cobra.Command) { - workflowUpdate.Flags().StringVarP(&workflowUpdateOptions.Workflow, "workflow", "w", "", "The workflow you want to update (required).") - workflowUpdate.MarkFlagRequired("workflow") - workflowUpdate.Flags().StringSliceVarP(&workflowUpdateOptions.Cells, "cells", "c", nil, "New Cell(s) or CellAlias(es) (comma-separated) to replicate from.") - workflowUpdate.Flags().VarP((*topoproto.TabletTypeListFlag)(&workflowUpdateOptions.TabletTypes), "tablet-types", "t", "New source tablet types to replicate from (e.g. PRIMARY,REPLICA,RDONLY).") - workflowUpdate.Flags().BoolVar(&workflowUpdateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.") - workflowUpdate.Flags().StringVar(&workflowUpdateOptions.OnDDL, "on-ddl", "", "New instruction on what to do when DDL is encountered in the VReplication stream. Possible values are IGNORE, STOP, EXEC, and EXEC_IGNORE.") - -} diff --git a/go/cmd/vtctldclient/command/vreplication/workflow/workflow.go b/go/cmd/vtctldclient/command/vreplication/workflow/workflow.go index f1f515a0b2f..76170e85569 100644 --- a/go/cmd/vtctldclient/command/vreplication/workflow/workflow.go +++ b/go/cmd/vtctldclient/command/vreplication/workflow/workflow.go @@ -20,14 +20,12 @@ import ( "github.com/spf13/cobra" "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo/topoproto" ) var ( - - // workflow is a parent command for Workflow* sub commands. - workflow = &cobra.Command{ + // base is a parent command for Workflow commands. + base = &cobra.Command{ Use: "Workflow --keyspace [command] [command-flags]", Short: "Administer VReplication workflows (Reshard, MoveTables, etc) in the given keyspace.", DisableFlagsInUseLine: true, @@ -38,42 +36,49 @@ var ( ) var ( - workflowOptions = struct { + baseOptions = struct { Keyspace string - }{} - - workflowUpdateOptions = struct { - Workflow string - Cells []string - TabletTypes []topodatapb.TabletType - TabletTypesInPreferenceOrder bool - OnDDL string + Workflow string }{} ) -func RegisterWorkflowCommands(root *cobra.Command) { - workflow.PersistentFlags().StringVarP(&workflowOptions.Keyspace, "keyspace", "k", "", "Keyspace context for the workflow (required).") - workflow.MarkPersistentFlagRequired("keyspace") - root.AddCommand(workflow) - - addGetWorkflowsFlags(getWorkflows) - root.AddCommand(getWorkflows) - - addWorkflowDeleteFlags(workflowDelete) - workflow.AddCommand(workflowDelete) - - workflow.AddCommand(workflowList) - - addWorkflowShowFlags(workflowShow) - workflow.AddCommand(workflowShow) - - workflow.AddCommand(workflowStart) - workflow.AddCommand(workflowStop) - - addWorkflowUpdateFlags(workflowUpdate) - workflow.AddCommand(workflowUpdate) +func registerCommands(root *cobra.Command) { + base.PersistentFlags().StringVarP(&baseOptions.Keyspace, "keyspace", "k", "", "Keyspace context for the workflow.") + base.MarkPersistentFlagRequired("keyspace") + root.AddCommand(base) + + getWorkflows.Flags().BoolVarP(&getWorkflowsOptions.ShowAll, "show-all", "a", false, "Show all workflows instead of just active workflows.") + root.AddCommand(getWorkflows) // Yes this is supposed to be root as GetWorkflows is a top-level command. + + delete.Flags().StringVarP(&baseOptions.Workflow, "workflow", "w", "", "The workflow you want to delete.") + delete.MarkFlagRequired("workflow") + delete.Flags().BoolVar(&deleteOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the workflow in the target keyspace.") + delete.Flags().BoolVar(&deleteOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules created for the workflow.") + base.AddCommand(delete) + + base.AddCommand(workflowList) + + show.Flags().StringVarP(&baseOptions.Workflow, "workflow", "w", "", "The workflow you want the details for.") + show.MarkFlagRequired("workflow") + base.AddCommand(show) + + start.Flags().StringVarP(&baseOptions.Workflow, "workflow", "w", "", "The workflow you want to start.") + start.MarkFlagRequired("workflow") + base.AddCommand(start) + + stop.Flags().StringVarP(&baseOptions.Workflow, "workflow", "w", "", "The workflow you want to stop.") + stop.MarkFlagRequired("workflow") + base.AddCommand(stop) + + update.Flags().StringVarP(&baseOptions.Workflow, "workflow", "w", "", "The workflow you want to update.") + update.MarkFlagRequired("workflow") + update.Flags().StringSliceVarP(&updateOptions.Cells, "cells", "c", nil, "New Cell(s) or CellAlias(es) (comma-separated) to replicate from.") + update.Flags().VarP((*topoproto.TabletTypeListFlag)(&updateOptions.TabletTypes), "tablet-types", "t", "New source tablet types to replicate from (e.g. PRIMARY,REPLICA,RDONLY).") + update.Flags().BoolVar(&updateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.") + update.Flags().StringVar(&updateOptions.OnDDL, "on-ddl", "", "New instruction on what to do when DDL is encountered in the VReplication stream. Possible values are IGNORE, STOP, EXEC, and EXEC_IGNORE.") + base.AddCommand(update) } func init() { - common.RegisterCommandHandler("Workflow", RegisterWorkflowCommands) + common.RegisterCommandHandler("Workflow", registerCommands) } diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index 08d6b1b2d71..d41a7c8195b 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -4876,6 +4876,7 @@ func (s *VtctldServer) WorkflowUpdate(ctx context.Context, req *vtctldatapb.Work span.Annotate("cells", req.TabletRequest.Cells) span.Annotate("tablet_types", req.TabletRequest.TabletTypes) span.Annotate("on_ddl", req.TabletRequest.OnDdl) + span.Annotate("state", req.TabletRequest.State) resp, err = s.ws.WorkflowUpdate(ctx, req) return resp, err diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 8ace54020d8..39021615856 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1991,6 +1991,7 @@ func (s *Server) WorkflowUpdate(ctx context.Context, req *vtctldatapb.WorkflowUp span.Annotate("cells", req.TabletRequest.Cells) span.Annotate("tablet_types", req.TabletRequest.TabletTypes) span.Annotate("on_ddl", req.TabletRequest.OnDdl) + span.Annotate("state", req.TabletRequest.State) vx := vexec.NewVExec(req.Keyspace, req.TabletRequest.Workflow, s.ts, s.tmc) callback := func(ctx context.Context, tablet *topo.TabletInfo) (*querypb.QueryResult, error) {