From e8d9e8ff0431fecac52d5481e4a56f286f84334e Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 3 Jul 2020 23:00:37 +0200 Subject: [PATCH 01/10] vtctl vexec/workflow actions: initial commit Signed-off-by: Rohit Nayak --- go.mod | 6 +- go.sum | 28 ++ .../vreplication/vreplication_test.go | 2 +- go/vt/vtctl/vtctl.go | 95 +++++ go/vt/wrangler/vexec.go | 396 ++++++++++++++++++ go/vt/wrangler/vexec_plan.go | 217 ++++++++++ go/vt/wrangler/vexec_test.go | 200 +++++++++ go/vt/wrangler/wrangler_env_test.go | 298 +++++++++++++ 8 files changed, 1240 insertions(+), 2 deletions(-) create mode 100644 go/vt/wrangler/vexec.go create mode 100644 go/vt/wrangler/vexec_plan.go create mode 100644 go/vt/wrangler/vexec_test.go create mode 100644 go/vt/wrangler/wrangler_env_test.go diff --git a/go.mod b/go.mod index 47f25d9b6fa..26c93a97894 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/Azure/go-autorest/autorest v0.10.0 // indirect github.com/GeertJohan/go.rice v1.0.0 github.com/PuerkitoBio/goquery v1.5.1 + github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296 github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect github.com/aws/aws-sdk-go v1.28.8 github.com/buger/jsonparser v0.0.0-20200322175846-f7e751efca13 @@ -51,7 +52,9 @@ require ( github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/go-testing-interface v1.14.0 // indirect github.com/mitchellh/mapstructure v1.2.3 // indirect - github.com/olekukonko/tablewriter v0.0.0-20180130162743-b8a9be070da4 + github.com/manifoldco/promptui v0.7.0 + github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1 + github.com/olekukonko/tablewriter v0.0.4 github.com/onsi/ginkgo v1.10.3 // indirect github.com/onsi/gomega v1.7.1 // indirect github.com/opentracing-contrib/go-grpc v0.0.0-20180928155321-4b5a12d3ff02 @@ -64,6 +67,7 @@ require ( github.com/prometheus/common v0.9.1 github.com/satori/go.uuid v1.2.0 // indirect github.com/smartystreets/goconvey v1.6.4 // indirect + github.com/spf13/cobra v0.0.5 github.com/stretchr/testify v1.4.0 github.com/tchap/go-patricia v0.0.0-20160729071656-dd168db6051b github.com/tebeka/selenium v0.9.9 diff --git a/go.sum b/go.sum index c41fd8aff02..24a3e2134c3 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296 h1:JYWTroLXcNzSCgu66NMgdjwoMHQRbv2SoOVNFb4kRkE= +github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296/go.mod h1:VSw57q4QFiWDbRnjdX8Cb3Ow0SFncRw+bA/ofY6Q83w= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= github.com/akavel/rsrc v0.8.0 h1:zjWn7ukO9Kc5Q62DOJCcxGpXC18RawVtYAGdz2aLlfw= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= @@ -96,6 +98,10 @@ github.com/buger/jsonparser v0.0.0-20200322175846-f7e751efca13/go.mod h1:tgcrVJ8 github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible h1:C29Ae4G5GtYyYMm1aztcyj/J5ckgJm2zwdDajFbx1NY= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5OhCuC+XN+r/bBCmeuuJtjz+bCNIf8= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3 h1:TJH+oke8D16535+jHExHj4nQvzlZrj7ug5D7I/orNUA= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= @@ -354,6 +360,8 @@ github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpR github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a h1:FaWFmfWdAUKbSCtOU2QjDaorUexogfaMgbipgYATUMU= +github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a/go.mod h1:UJSiEoRfvx3hP73CvoARgeLjaIOjybY9vj8PUPPFGeU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= @@ -380,6 +388,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/krishicks/yaml-patch v0.0.10 h1:H4FcHpnNwVmw8u0MjPRjWyIXtco6zM2F78t+57oNM3E= github.com/krishicks/yaml-patch v0.0.10/go.mod h1:Sm5TchwZS6sm7RJoyg87tzxm2ZcKzdRE4Q7TjNhPrME= +github.com/lunixbochs/vtclean v0.0.0-20180621232353-2d01aacdc34a h1:weJVJJRzAJBFRlAiJQROKQs8oC9vOxvm4rZmBBk0ONw= +github.com/lunixbochs/vtclean v0.0.0-20180621232353-2d01aacdc34a/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= @@ -390,6 +400,9 @@ github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= +github.com/manifoldco/promptui v0.7.0 h1:3l11YT8tm9MnwGFQ4kETwkzpAwY2Jt9lCrumCUW4+z4= +github.com/manifoldco/promptui v0.7.0/go.mod h1:n4zTdgP0vr0S3w7/O/g98U+e0gwLScEXGwov2nIKuGQ= +github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE= @@ -397,6 +410,7 @@ github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope github.com/mattn/go-ieproxy v0.0.0-20190610004146-91bb50d98149 h1:HfxbT6/JcvIljmERptWhwa8XzP7H3T+Z2N26gTsaDaA= github.com/mattn/go-ieproxy v0.0.0-20190610004146-91bb50d98149/go.mod h1:31jz6HNzdxOmlERGGEc4v/dMssOfmp2p5bT/okiKFFc= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= @@ -405,8 +419,13 @@ github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHX github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.2 h1:UnlwIPBGaTZfPQ6T1IGzPI0EkYAQmT9fAEJ/poFC63o= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +<<<<<<< HEAD github.com/mattn/go-runewidth v0.0.3 h1:a+kO+98RDGEfo6asOGMmpodZq4FNtnGP54yps8BzLR4= github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +======= +github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54= +github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +>>>>>>> 475a13b23... vtctl vexec/workflow actions: initial commit github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14 h1:9jZdLNd/P4+SfEJ0TNyxYpsK8N4GtfylBLqtbYN1sbA= @@ -422,7 +441,10 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc= github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= +<<<<<<< HEAD github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= +======= +>>>>>>> 475a13b23... vtctl vexec/workflow actions: initial commit github.com/mitchellh/go-testing-interface v1.14.0 h1:/x0XQ6h+3U3nAyk1yx+bHPURrKa9sVVvYbuqZ7pIAtI= github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= @@ -447,8 +469,13 @@ github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229 h1:E2B8qYyeSgv github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5 h1:58+kh9C6jJVXYjt8IE48G2eWl6BjwU5Gj0gqY84fy78= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= +<<<<<<< HEAD github.com/olekukonko/tablewriter v0.0.0-20180130162743-b8a9be070da4 h1:Mm4XQCBICntJzH8fKglsRuEiFUJYnTnM4BBFvpP5BWs= github.com/olekukonko/tablewriter v0.0.0-20180130162743-b8a9be070da4/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= +======= +github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8= +github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= +>>>>>>> 475a13b23... vtctl vexec/workflow actions: initial commit github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -688,6 +715,7 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190124100055-b90733256f2e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 7d657f6d9f6..655cbb0a5e5 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -50,7 +50,7 @@ func TestBasicVreplicationWorkflow(t *testing.T) { vc = InitCluster(t, cellName) assert.NotNil(t, vc) - defer vc.TearDown() + //defer vc.TearDown() cell = vc.Cells[cellName] vc.AddKeyspace(t, cell, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index eb103f4db9d..e9ff0e9cf4e 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -94,6 +94,8 @@ import ( "sync" "time" + querypb "vitess.io/vitess/go/vt/proto/query" + "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "golang.org/x/net/context" @@ -454,6 +456,22 @@ var commands = []commandGroup{ "Outputs a JSON structure that contains information about the ShardReplication."}, }, }, + { + "Workflow", []command{ + {"VExec", commandVExec, + " --dry-run", + "Runs query on all tablets in workflow. Example: VExec merchant.morders \"update _vt.vreplication set Status='Running'\"", + }, + }, + }, + { + "Workflow", []command{ + {"Workflow", commandWorkflow, + " --dry-run", + "Start/Stop/Delete Workflow on all target tablets in workflow. Example: Workflow merchant.morders Start", + }, + }, + }, } func init() { @@ -2805,6 +2823,83 @@ func commandHelp(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Flag return nil } +func commandVExec(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + json := subFlags.Bool("json", false, "Output JSON instead of human-readable table") + dryRun := subFlags.Bool("dry_run", false, "Does a dry run of VExec and only reports the final query and list of masters on which it will be applied") + if err := subFlags.Parse(args); err != nil { + return err + } + if subFlags.NArg() != 2 { + return fmt.Errorf("usage: VExec --dry-run keyspace.workflow \"\"") + } + keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0)) + if err != nil { + return err + } + _, err = wr.TopoServer().GetKeyspace(ctx, keyspace) + if err != nil { + wr.Logger().Errorf("keyspace %s not found", keyspace) + } + query := subFlags.Arg(1) + + results, err := wr.VExec(ctx, workflow, keyspace, query, *dryRun) + if err != nil { + return err + } + if *dryRun { + return nil + } + if len(results) == 0 { + wr.Logger().Printf("no result returned\n") + } + var qr *sqltypes.Result = &sqltypes.Result{} + qr.RowsAffected = uint64(len(results)) + qr.Fields = []*querypb.Field{{ + Name: "Tablet", + Type: sqltypes.VarBinary, + }} + for _, result := range results { + fields := result.Fields + qr.Fields = append(qr.Fields, fields...) + break + } + for tablet, result := range results { + for _, row := range result.Rows { + var row2 []sqltypes.Value + row2 = append(row2, sqltypes.NewVarBinary(tablet.AliasString())) + row2 = append(row2, row...) + qr.Rows = append(qr.Rows, row2) + } + } + if *json { + return printJSON(wr.Logger(), qr) + } + + printQueryResult(loggerWriter{wr.Logger()}, qr) + return nil +} + +func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + dryRun := subFlags.Bool("dry_run", false, "Does a dry run of VExec and only reports the final query and list of masters on which it will be applied") + if err := subFlags.Parse(args); err != nil { + return err + } + if subFlags.NArg() != 2 { + return fmt.Errorf("usage: Workflow --dry-run keyspace.workflow start/stop/delete") + } + keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0)) + if err != nil { + return err + } + _, err = wr.TopoServer().GetKeyspace(ctx, keyspace) + if err != nil { + wr.Logger().Errorf("Keyspace %s not found", keyspace) + } + action := subFlags.Arg(1) + + return wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun) +} + func commandPanic(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { panic(fmt.Errorf("this command panics on purpose")) } diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go new file mode 100644 index 00000000000..ec5a8a32791 --- /dev/null +++ b/go/vt/wrangler/vexec.go @@ -0,0 +1,396 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "sync" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/olekukonko/tablewriter" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/concurrency" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/evalengine" +) + +const ( + vreplicationTableName = "_vt.vreplication" + copyStateTableName = "_vt.copy_state" +) + +type vexec struct { + ctx context.Context + workflow string + keyspace string + query string + + wr *Wrangler + + plan *vexecPlan + masters []*topo.TabletInfo +} + +func newVExec(ctx context.Context, workflow, keyspace, query string, wr *Wrangler) *vexec { + return &vexec{ + ctx: ctx, + workflow: workflow, + keyspace: keyspace, + query: query, + wr: wr, + } +} + +// VExec executes queries on _vt.vreplication on all masters in the target keyspace of the workflow +func (wr *Wrangler) VExec(ctx context.Context, workflow, keyspace, query string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) { + results, err := wr._vexec(ctx, workflow, keyspace, query, dryRun) + retResults := make(map[*topo.TabletInfo]*sqltypes.Result) + for tablet, result := range results { + retResults[tablet] = sqltypes.Proto3ToResult(result) + } + return retResults, err +} + +func (wr *Wrangler) _vexec(ctx context.Context, workflow, keyspace, query string, dryRun bool) (map[*topo.TabletInfo]*querypb.QueryResult, error) { + vx := newVExec(ctx, workflow, keyspace, query, wr) + if err := vx.getMasters(); err != nil { + return nil, err + } + if _, err := vx.buildVExecPlan(); err != nil { + return nil, err + } + fullQuery := vx.plan.parsedQuery.Query + if dryRun { + return nil, vx.outputDryRunInfo(wr) + } + return vx.exec(fullQuery) +} + +func (vx *vexec) outputDryRunInfo(wr *Wrangler) error { + rsr, err := vx.wr.getStreams(vx.ctx, vx.workflow, vx.keyspace) + if err != nil { + return err + } + + wr.Logger().Printf("Query: %s\nwill be run on the following streams in keyspace %s for workflow %s:\n\n", + vx.plan.parsedQuery.Query, vx.keyspace, vx.workflow) + tableString := &strings.Builder{} + table := tablewriter.NewWriter(tableString) + table.SetHeader([]string{"Tablet", "ID", "BinLogSource", "State", "DBName", "Current GTID", "MaxReplicationLag"}) + for _, master := range vx.masters { + key := fmt.Sprintf("%s/%s", master.Shard, master.AliasString()) + for _, stream := range rsr.Statuses[key] { + table.Append([]string{key, fmt.Sprintf("%d", stream.ID), fmt.Sprintf("%v", stream.Bls), stream.State, stream.DBName, stream.Pos, fmt.Sprintf("%d", stream.MaxReplicationLag)}) + } + } + table.SetRowLine(true) + table.Render() + wr.Logger().Printf(tableString.String()) + wr.Logger().Printf("\n\n") + + return nil +} + +func (vx *vexec) exec(query string) (map[*topo.TabletInfo]*querypb.QueryResult, error) { + var wg sync.WaitGroup + workflow := vx.workflow + allErrors := &concurrency.AllErrorRecorder{} + results := make(map[*topo.TabletInfo]*querypb.QueryResult) + var mu sync.Mutex + ctx, cancel := context.WithTimeout(vx.ctx, 10*time.Second) + defer cancel() + for _, master := range vx.masters { + wg.Add(1) + go func(ctx context.Context, master *topo.TabletInfo) { + defer wg.Done() + fmt.Printf("Running %s on %s\n", query, master.AliasString()) + qr, err := vx.wr.VReplicationExec(ctx, master.Alias, query) + if err != nil { + allErrors.RecordError(err) + } else { + if qr.RowsAffected == 0 { + allErrors.RecordError(fmt.Errorf("\nNo streams found for workflow %s tablet %s", workflow, master.Alias)) + } else { + mu.Lock() + results[master] = qr + mu.Unlock() + } + } + }(ctx, master) + } + wg.Wait() + return results, allErrors.AggrError(vterrors.Aggregate) +} + +func (vx *vexec) getMasters() error { + var err error + shards, err := vx.wr.ts.GetShardNames(vx.ctx, vx.keyspace) + if err != nil { + return err + } + if len(shards) == 0 { + return fmt.Errorf("no shards found in keyspace %s", vx.keyspace) + } + var allMasters []*topo.TabletInfo + var master *topo.TabletInfo + for _, shard := range shards { + if master, err = vx.getMasterForShard(shard); err != nil { + return err + } + if master == nil { + return fmt.Errorf("no master found for shard %s", shard) + } + allMasters = append(allMasters, master) + } + vx.masters = allMasters + return nil +} + +func (vx *vexec) getMasterForShard(shard string) (*topo.TabletInfo, error) { + si, err := vx.wr.ts.GetShard(vx.ctx, vx.keyspace, shard) + if err != nil { + return nil, err + } + if si.MasterAlias == nil { + return nil, fmt.Errorf("no master found for shard %s", shard) + } + master, err := vx.wr.ts.GetTablet(vx.ctx, si.MasterAlias) + if err != nil { + return nil, err + } + if master == nil { + return nil, fmt.Errorf("could not get tablet for %s:%s", vx.keyspace, si.MasterAlias) + } + return master, nil +} + +// WorkflowAction can start/stop/delete or list strams in _vt.vreplication on all masters in the target keyspace of the workflow +func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool) error { + if action == "list" { + return wr.listStreams(ctx, workflow, keyspace) + } + _, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun) + return err +} + +func (wr *Wrangler) getWorkflowActionQuery(action string) (string, error) { + var query string + updateSQL := "update _vt.vreplication set state = %s" + switch action { + case "stop": + query = fmt.Sprintf(updateSQL, encodeString("Stopped")) + case "start": + query = fmt.Sprintf(updateSQL, encodeString("Running")) + case "delete": + query = "delete from _vt.vreplication" + default: + return "", fmt.Errorf("invalid action found: %s", action) + } + return query, nil +} + +func (wr *Wrangler) execWorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool) (map[*topo.TabletInfo]*querypb.QueryResult, error) { + var err error + query, err := wr.getWorkflowActionQuery(action) + if err != nil { + return nil, err + } + vx := newVExec(ctx, workflow, keyspace, query, wr) + err = vx.getMasters() + if err != nil { + return nil, err + } + if _, err := vx.buildVExecPlan(); err != nil { + return nil, err + } + fullQuery := vx.plan.parsedQuery.Query + if dryRun { + return nil, vx.outputDryRunInfo(wr) + } + results, err := vx.exec(fullQuery) + return results, err +} + +type replicationStatusResult struct { + Workflow string + SourceKeyspace string + TargetKeyspace string + + Statuses map[string][]*replicationStatus +} + +type copyState struct { + Table string + LastPK string +} + +type replicationStatus struct { + Shard string + Tablet string + ID int64 + Bls binlogdatapb.BinlogSource + Pos string + StopPos string + State string + MaxReplicationLag int64 + DBName string + TimeUpdated int64 + Message string + + CopyState []copyState +} + +func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqltypes.Value, master *topo.TabletInfo) (*replicationStatus, string, error) { + var err error + var id, maxReplicationLag, timeUpdated int64 + var state, dbName, pos, stopPos, message string + var bls binlogdatapb.BinlogSource + id, err = evalengine.ToInt64(row[0]) + if err != nil { + return nil, "", err + } + if err := proto.UnmarshalText(row[1].ToString(), &bls); err != nil { + return nil, "", err + } + pos = row[2].ToString() + stopPos = row[3].ToString() + maxReplicationLag, err = evalengine.ToInt64(row[4]) + if err != nil { + return nil, "", err + } + state = row[5].ToString() + dbName = row[6].ToString() + timeUpdated, err = evalengine.ToInt64(row[7]) + if err != nil { + return nil, "", err + } + message = row[8].ToString() + //fmt.Printf("%d: %s\n", id, state) + status := &replicationStatus{ + Shard: master.Shard, + Tablet: master.AliasString(), + ID: id, + Bls: bls, + Pos: pos, + StopPos: stopPos, + State: state, + DBName: dbName, + MaxReplicationLag: maxReplicationLag, + TimeUpdated: timeUpdated, + Message: message, + } + status.CopyState, err = wr.getCopyState(ctx, master, id) + if err != nil { + return nil, "", err + } + + status.State = updateState(message, status.State, status.CopyState, timeUpdated) + return status, bls.Keyspace, nil +} + +func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (*replicationStatusResult, error) { + var rsr replicationStatusResult + rsr.Statuses = make(map[string][]*replicationStatus) + rsr.Workflow = workflow + rsr.TargetKeyspace = keyspace + var results map[*topo.TabletInfo]*querypb.QueryResult + query := fmt.Sprintf("select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication") + results, err := wr._vexec(ctx, workflow, keyspace, query, false) + if err != nil { + return nil, err + } + for master, result := range results { + //fmt.Printf("master %s, result %v\n", master.Alias, result) + var rsrStatus []*replicationStatus + qr := sqltypes.Proto3ToResult(result) + for _, row := range qr.Rows { + status, sourceKeyspace, err := wr.getReplicationStatusFromRow(ctx, row, master) + fmt.Printf("getReplicationStatusFromRow status for master %s is %v\n", master.AliasString(), status) + if err != nil { + return nil, err + } + rsr.SourceKeyspace = sourceKeyspace + + rsrStatus = append(rsrStatus, status) + } + rsr.Statuses[fmt.Sprintf("%s/%s", master.Shard, master.AliasString())] = rsrStatus + } + return &rsr, nil +} + +func (wr *Wrangler) listStreams(ctx context.Context, workflow, keyspace string) error { + replStatus, err := wr.getStreams(ctx, workflow, keyspace) + if err != nil { + return err + } + if err := dumpStreamListAsJSON(replStatus, wr); err != nil { + return err + } + + return nil +} + +func updateState(message, state string, cs []copyState, timeUpdated int64) string { + if message != "" { + state = "Error" + } else if state == "Running" && int64(time.Now().Second())-timeUpdated > 10 /* seconds */ { + state = "Lagging" + } else if state == "Running" && len(cs) > 0 { + state = "Copying" + } + return state +} + +func dumpStreamListAsJSON(replStatus *replicationStatusResult, wr *Wrangler) error { + text, err := json.MarshalIndent(replStatus, "", "\t") + if err != nil { + return err + } + wr.Logger().Printf("%s\n", text) + return nil +} + +func (wr *Wrangler) getCopyState(ctx context.Context, tablet *topo.TabletInfo, id int64) ([]copyState, error) { + var cs []copyState + query := fmt.Sprintf(`select table_name, lastpk from _vt.copy_state where vrepl_id = %d`, id) + qr, err := wr.ExecuteFetchAsApp(ctx, tablet.Alias, true, query, 10000) + if err != nil { + return nil, err + } + if qr != nil { + for _, row := range qr.Rows { + table := string(row.Values[0]) + lastPK := string(row.Values[1]) + copyState := copyState{ + Table: table, + LastPK: lastPK, + } + cs = append(cs, copyState) + } + } + + return cs, nil +} + +//TODOs: add to vreplication_test.go endtoend: definitely ListStreams, maybe stopstream/insert/check/startstream/check ... diff --git a/go/vt/wrangler/vexec_plan.go b/go/vt/wrangler/vexec_plan.go new file mode 100644 index 00000000000..339f097ea2e --- /dev/null +++ b/go/vt/wrangler/vexec_plan.go @@ -0,0 +1,217 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "fmt" + + "vitess.io/vitess/go/vt/sqlparser" +) + +type vexecPlan struct { + query string + opcode int + parsedQuery *sqlparser.ParsedQuery +} + +const ( + insertQuery = iota + updateQuery + deleteQuery + selectQuery +) + +func (vx *vexec) buildVExecPlan() (*vexecPlan, error) { + stmt, err := sqlparser.Parse(vx.query) + if err != nil { + return nil, err + } + var plan *vexecPlan + switch stmt := stmt.(type) { + case *sqlparser.Insert: + plan, err = vx.buildInsertPlan(stmt) + case *sqlparser.Update: + plan, err = vx.buildUpdatePlan(stmt) + case *sqlparser.Delete: + plan, err = vx.buildDeletePlan(stmt) + case *sqlparser.Select: + plan, err = vx.buildSelectPlan(stmt) + default: + return nil, fmt.Errorf("query not supported by vexec: %s", sqlparser.String(stmt)) + } + if err != nil { + return nil, err + } + plan.query = vx.query + vx.plan = plan + return plan, nil +} + +func (vx *vexec) addDefaultWheres(where *sqlparser.Where) *sqlparser.Where { + newWhere := where + expr := &sqlparser.ComparisonExpr{ + Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("db_name")}, + Operator: sqlparser.EqualStr, + Right: sqlparser.NewStrVal([]byte(vx.masters[0].DbName())), + } + if newWhere == nil { + newWhere = &sqlparser.Where{ + Type: sqlparser.WhereStr, + Expr: expr, + } + } else { + newWhere.Expr = &sqlparser.AndExpr{ + Left: newWhere.Expr, + Right: expr, + } + } + expr = &sqlparser.ComparisonExpr{ + Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("workflow")}, + Operator: sqlparser.EqualStr, + Right: sqlparser.NewStrVal([]byte(vx.workflow)), + } + newWhere.Expr = &sqlparser.AndExpr{ + Left: newWhere.Expr, + Right: expr, + } + return newWhere +} + +func (vx *vexec) buildInsertPlan(ins *sqlparser.Insert) (*vexecPlan, error) { + switch sqlparser.String(ins.Table) { + case vreplicationTableName: + // no-op + default: + return nil, fmt.Errorf("vexec does not support: %v", sqlparser.String(ins.Table)) + } + + if ins.Action != sqlparser.InsertStr { + return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins)) + } + if ins.Ignore != "" { + return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins)) + } + if ins.Partitions != nil { + return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins)) + } + if ins.OnDup != nil { + return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins)) + } + rows, ok := ins.Rows.(sqlparser.Values) + if !ok { + return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins)) + } + idPos := 0 + if len(ins.Columns) != 0 { + idPos = -1 + for i, col := range ins.Columns { + if col.EqualString("id") { + idPos = i + break + } + } + } + if idPos >= 0 { + for _, row := range rows { + if idPos >= len(row) { + return nil, fmt.Errorf("malformed statement: %v", sqlparser.String(ins)) + } + if _, ok := row[idPos].(*sqlparser.NullVal); !ok { + return nil, fmt.Errorf("id should not have a value: %v", sqlparser.String(ins)) + } + } + } + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("%v", ins) + + return &vexecPlan{ + opcode: insertQuery, + parsedQuery: buf.ParsedQuery(), + }, nil +} + +func (vx *vexec) buildUpdatePlan(upd *sqlparser.Update) (*vexecPlan, error) { + switch sqlparser.String(upd.TableExprs) { + case vreplicationTableName: + // no-op + default: + return nil, fmt.Errorf("vexec does not support: %v", sqlparser.String(upd.TableExprs)) + } + if upd.OrderBy != nil || upd.Limit != nil { + return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(upd)) + } + for _, expr := range upd.Exprs { + if expr.Name.Name.EqualString("id") { + return nil, fmt.Errorf("id cannot be changed: %v", sqlparser.String(expr)) + } + } + + upd.Where = vx.addDefaultWheres(upd.Where) + + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("%v", upd) + + return &vexecPlan{ + opcode: updateQuery, + parsedQuery: buf.ParsedQuery(), + }, nil +} + +func (vx *vexec) buildDeletePlan(del *sqlparser.Delete) (*vexecPlan, error) { + switch sqlparser.String(del.TableExprs) { + case vreplicationTableName: + // no-op + default: + return nil, fmt.Errorf("invalid table name: %v", sqlparser.String(del.TableExprs)) + } + if del.Targets != nil { + return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(del)) + } + if del.Partitions != nil { + return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(del)) + } + if del.OrderBy != nil || del.Limit != nil { + return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(del)) + } + + del.Where = vx.addDefaultWheres(del.Where) + + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("%v", del) + + return &vexecPlan{ + opcode: deleteQuery, + parsedQuery: buf.ParsedQuery(), + }, nil +} + +func (vx *vexec) buildSelectPlan(sel *sqlparser.Select) (*vexecPlan, error) { + switch sqlparser.String(sel.From) { + case vreplicationTableName: + // no-op + default: + return nil, fmt.Errorf("invalid table name: %v", sqlparser.String(sel.From)) + } + sel.Where = vx.addDefaultWheres(sel.Where) + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("%v", sel) + + return &vexecPlan{ + opcode: selectQuery, + parsedQuery: buf.ParsedQuery(), + }, nil +} diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go new file mode 100644 index 00000000000..568bea14d20 --- /dev/null +++ b/go/vt/wrangler/vexec_test.go @@ -0,0 +1,200 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "context" + "fmt" + "sort" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/utils" + "vitess.io/vitess/go/vt/logutil" +) + +func TestListStreams(t *testing.T) { + ctx := context.Background() + workflow := "wrWorkflow" + keyspace := "target" + env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil) + defer env.close() + var logger = logutil.NewMemoryLogger() + wr := New(logger, env.topoServ, env.tmc) + //query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication where workflow = 'wrWorkflow' and db_name = 'vt_target'" + + wr.WorkflowAction(ctx, workflow, keyspace, "ListStreams", false) + +} + +func TestVExec(t *testing.T) { + ctx := context.Background() + workflow := "wrWorkflow" + keyspace := "target" + query := "update _vt.vreplication set state = 'Running'" + env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil) + defer env.close() + var logger = logutil.NewMemoryLogger() + wr := New(logger, env.topoServ, env.tmc) + + vx := newVExec(ctx, workflow, keyspace, query, wr) + err := vx.getMasters() + require.Nil(t, err) + masters := vx.masters + require.NotNil(t, masters) + require.Equal(t, len(masters), 2) + var shards []string + for _, master := range masters { + shards = append(shards, master.Shard) + } + sort.Strings(shards) + require.Equal(t, fmt.Sprintf("%v", shards), "[-80 80-]") + plan, err := vx.buildVExecPlan() + require.NoError(t, err) + require.NotNil(t, plan) + + addWheres := func(query string) string { + if strings.Contains(query, " where ") { + query += " and " + } else { + query += " where " + } + query += fmt.Sprintf("db_name = %s and workflow = %s", encodeString("vt_"+keyspace), encodeString(workflow)) + return query + } + want := addWheres(query) + require.Equal(t, want, plan.parsedQuery.Query) + + query = plan.parsedQuery.Query + vx.exec(query) + + type TestCase struct { + name string + query string + result *sqltypes.Result + errorString string + } + + var result *sqltypes.Result + var testCases []*TestCase + result = sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message", + "int64|varchar|varchar"), + "1|keyspace:\"source\" shard:\"0\" filter: >|", + ) + testCases = append(testCases, &TestCase{ + name: "select", + query: "select id, source, message from _vt.vreplication", + result: result, + }) + result = &sqltypes.Result{ + RowsAffected: 1, + Rows: [][]sqltypes.Value{}, + } + testCases = append(testCases, &TestCase{ + name: "delete", + query: "delete from _vt.vreplication where message != ''", + result: result, + }) + result = &sqltypes.Result{ + RowsAffected: 1, + Rows: [][]sqltypes.Value{}, + } + testCases = append(testCases, &TestCase{ + name: "update", + query: "update _vt.vreplication set state='Stopped', message='for wrangler test'", + result: result, + }) + result = &sqltypes.Result{ + RowsAffected: 2, + Rows: [][]sqltypes.Value{}, + } + testCases = append(testCases, &TestCase{ + name: "insert", + query: "insert into _vt.vreplication(state, workflow, db_name) values ('Running', 'wk1', 'ks1'), ('Stopped', 'wk1', 'ks1')", + result: result, + }) + + errorString := "id should not have a value" + testCases = append(testCases, &TestCase{ + name: "insert invalid-id", + query: "insert into _vt.vreplication(id, state) values (1, 'Running'), (2, 'Stopped')", + errorString: errorString, + }) + + errorString = "invalid table name" + testCases = append(testCases, &TestCase{ + name: "delete invalid-other-table", + query: "delete from _vt.copy_state", + errorString: errorString, + }) + + for _, testCase := range testCases { + t.Run(testCase.query, func(t *testing.T) { + results, err := wr.VExec(ctx, workflow, keyspace, testCase.query, false) + if testCase.errorString == "" { + require.NoError(t, err) + for _, result := range results { + utils.MustMatch(t, testCase.result, result, "Incorrect result") + } + } else { + require.Error(t, err) + if !strings.Contains(err.Error(), testCase.errorString) { + t.Fatalf("Wrong error, want %s, got %s", testCase.errorString, err.Error()) + } + } + }) + } + + query = "delete from _vt.vreplication" + _, err = wr.VExec(ctx, workflow, keyspace, query, true) + require.NoError(t, err) + dryRunResults := []string{ + "Query: delete from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", + "will be run on the following streams in keyspace target for workflow wrWorkflow:", + "+----------------------+----+------------------+---------+-----------+", + "| TABLET | ID | BINLOGSOURCE | STATE | DBNAME |", + "+----------------------+----+------------------+---------+-----------+", + "| -80/zone1-0000000200 | 1 | zone1-0000000200 | Running | vt_target |", + "+----------------------+----+------------------+---------+-----------+", + "| 80-/zone1-0000000210 | 1 | zone1-0000000210 | Running | vt_target |", + "+----------------------+----+------------------+---------+-----------+", + } + require.Equal(t, strings.Join(dryRunResults, "\n")+"\n\n\n\n\n", logger.String()) +} + +/* +func TextVExecValidations(t *testing.T) { + ctx := context.Background() + workflow := "" + keyspace := "" + query := "" + env := newWranglerTestEnv([]string{"0"}, []string{"-80","80-"}, "", nil) + defer env.close() + + wr := New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) + + //validations: empty workflow/ks/query + // valid workflow: no masters found? valid keyspace + // invalid query + // insert query + +} + +*/ diff --git a/go/vt/wrangler/wrangler_env_test.go b/go/vt/wrangler/wrangler_env_test.go new file mode 100644 index 00000000000..ca167a41dbb --- /dev/null +++ b/go/vt/wrangler/wrangler_env_test.go @@ -0,0 +1,298 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "flag" + "fmt" + "sync" + + "golang.org/x/net/context" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/vt/vttablet/tmclient" +) + +const ( + testStopPosition = "MariaDB/5-456-892" + testSourceGtid = "MariaDB/5-456-893" + testTargetMasterPosition = "MariaDB/6-456-892" +) + +type testWranglerEnv struct { + wr *Wrangler + workflow string + topoServ *topo.Server + cell string + tabletType topodatapb.TabletType + tmc *testWranglerTMClient + + mu sync.Mutex + tablets map[int]*testWranglerTablet +} + +// wranglerEnv has to be a global for RegisterDialer to work. +var wranglerEnv *testWranglerEnv + +func init() { + tabletconn.RegisterDialer("WranglerTest", func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + wranglerEnv.mu.Lock() + defer wranglerEnv.mu.Unlock() + fmt.Println("In WranglerTest dialer") + if qs, ok := wranglerEnv.tablets[int(tablet.Alias.Uid)]; ok { + fmt.Printf("query service is %v", qs) + return qs, nil + } + return nil, fmt.Errorf("tablet %d not found", tablet.Alias.Uid) + }) +} + +//---------------------------------------------- +// testWranglerEnv + +func newWranglerTestEnv(sourceShards, targetShards []string, query string, positions map[string]string) *testWranglerEnv { + flag.Set("tablet_protocol", "WranglerTest") + env := &testWranglerEnv{ + workflow: "wrWorkflow", + tablets: make(map[int]*testWranglerTablet), + topoServ: memorytopo.NewServer("zone1"), + cell: "zone1", + tabletType: topodatapb.TabletType_REPLICA, + tmc: newTestWranglerTMClient(), + } + env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) + + tabletID := 100 + for _, shard := range sourceShards { + _ = env.addTablet(tabletID, "source", shard, topodatapb.TabletType_MASTER) + _ = env.addTablet(tabletID+1, "source", shard, topodatapb.TabletType_REPLICA) + env.tmc.waitpos[tabletID+1] = testStopPosition + + tabletID += 10 + } + tabletID = 200 + for _, shard := range targetShards { + master := env.addTablet(tabletID, "target", shard, topodatapb.TabletType_MASTER) + _ = env.addTablet(tabletID+1, "target", shard, topodatapb.TabletType_REPLICA) + + var rows []string + var posRows []string + var bls *binlogdatapb.BinlogSource + for j, sourceShard := range sourceShards { + bls = &binlogdatapb.BinlogSource{ + Keyspace: "source", + Shard: sourceShard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: query, + }}, + }, + } + rows = append(rows, fmt.Sprintf("%d|%v|", j+1, bls)) + position := testStopPosition + if pos := positions[sourceShard+shard]; pos != "" { + position = pos + } + posRows = append(posRows, fmt.Sprintf("%v|%s", bls, position)) + + env.tmc.setVRResults( + master.tablet, + fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for wrangler test' where id=%d", testSourceGtid, j+1), + &sqltypes.Result{}, + ) + } + // migrater buildMigrationTargets + env.tmc.setVRResults( + master.tablet, + "select id, source, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message", + "int64|varchar|varchar"), + rows..., + ), + ) + + env.tmc.setVRResults(master.tablet, "update _vt.vreplication set state = 'Stopped', message = 'for wrangler test' where db_name = 'vt_target' and workflow = 'wrWorkflow'", &sqltypes.Result{RowsAffected: 1}) + env.tmc.setVRResults(master.tablet, "delete from _vt.vreplication where message != '' and db_name = 'vt_target' and workflow = 'wrWorkflow'", &sqltypes.Result{RowsAffected: 1}) + env.tmc.setVRResults(master.tablet, "insert into _vt.vreplication(state, workflow, db_name) values ('Running', 'wk1', 'ks1'), ('Stopped', 'wk1', 'ks1')", &sqltypes.Result{RowsAffected: 2}) + + result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|message", + "int64|varchar|varchar|varchar|int64|varchar|varchar|int64|varchar"), + fmt.Sprintf("1|%v|pos||0|Running|vt_target|1234|", bls), + ) + env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result) + //" + env.tmc.setVRResults( + master.tablet, + "select source, pos from _vt.vreplication where db_name='vt_target' and workflow='wrWorkflow'", + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "source|pos", + "varchar|varchar"), + posRows..., + ), + ) + + env.tmc.vrpos[tabletID] = testSourceGtid + env.tmc.pos[tabletID] = testTargetMasterPosition + + env.tmc.waitpos[tabletID+1] = testTargetMasterPosition + + env.tmc.setVRResults(master.tablet, "update _vt.vreplication set state='Running', message='', stop_pos='' where db_name='vt_target' and workflow='wrWorkflow'", &sqltypes.Result{}) + + tabletID += 10 + } + wranglerEnv = env + return env +} + +func (env *testWranglerEnv) close() { + env.mu.Lock() + defer env.mu.Unlock() + for _, t := range env.tablets { + env.topoServ.DeleteTablet(context.Background(), t.tablet.Alias) + } + env.tablets = nil +} + +func (env *testWranglerEnv) addTablet(id int, keyspace, shard string, tabletType topodatapb.TabletType) *testWranglerTablet { + env.mu.Lock() + defer env.mu.Unlock() + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: env.cell, + Uid: uint32(id), + }, + Keyspace: keyspace, + Shard: shard, + KeyRange: &topodatapb.KeyRange{}, + Type: tabletType, + PortMap: map[string]int32{ + "test": int32(id), + }, + } + env.tablets[id] = newTestWranglerTablet(tablet) + if err := env.wr.InitTablet(context.Background(), tablet, false /* allowMasterOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { + panic(err) + } + if tabletType == topodatapb.TabletType_MASTER { + _, err := env.wr.ts.UpdateShardFields(context.Background(), keyspace, shard, func(si *topo.ShardInfo) error { + si.MasterAlias = tablet.Alias + return nil + }) + if err != nil { + panic(err) + } + } + env.tablets[id].queryResults = make(map[string]*querypb.QueryResult) + return env.tablets[id] +} + +//---------------------------------------------- +// testWranglerTablet + +type testWranglerTablet struct { + queryservice.QueryService + tablet *topodatapb.Tablet + queryResults map[string]*querypb.QueryResult + gotQueries []string +} + +func newTestWranglerTablet(tablet *topodatapb.Tablet) *testWranglerTablet { + return &testWranglerTablet{ + QueryService: fakes.ErrorQueryService, + tablet: tablet, + } +} + +func (tvt *testWranglerTablet) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { + return callback(&querypb.StreamHealthResponse{ + Serving: true, + Target: &querypb.Target{ + Keyspace: tvt.tablet.Keyspace, + Shard: tvt.tablet.Shard, + TabletType: tvt.tablet.Type, + }, + RealtimeStats: &querypb.RealtimeStats{}, + }) +} + +//---------------------------------------------- +// testWranglerTMClient + +type testWranglerTMClient struct { + tmclient.TabletManagerClient + schema *tabletmanagerdatapb.SchemaDefinition + vrQueries map[int]map[string]*querypb.QueryResult + waitpos map[int]string + vrpos map[int]string + pos map[int]string +} + +func newTestWranglerTMClient() *testWranglerTMClient { + return &testWranglerTMClient{ + vrQueries: make(map[int]map[string]*querypb.QueryResult), + waitpos: make(map[int]string), + vrpos: make(map[int]string), + pos: make(map[int]string), + } +} + +func (tmc *testWranglerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { + return tmc.schema, nil +} + +func (tmc *testWranglerTMClient) setVRResults(tablet *topodatapb.Tablet, query string, result *sqltypes.Result) { + queries, ok := tmc.vrQueries[int(tablet.Alias.Uid)] + if !ok { + queries = make(map[string]*querypb.QueryResult) + tmc.vrQueries[int(tablet.Alias.Uid)] = queries + } + queries[query] = sqltypes.ResultToProto3(result) +} + +func (tmc *testWranglerTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { + result, ok := tmc.vrQueries[int(tablet.Alias.Uid)][query] + if !ok { + return nil, fmt.Errorf("query %q not found for tablet %d", query, tablet.Alias.Uid) + } + return result, nil +} + +func (tmc *testWranglerTMClient) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, query []byte, maxRows int) (*querypb.QueryResult, error) { + // fmt.Printf("tablet: %d query: %s\n", tablet.Alias.Uid, string(query)) + t := wranglerEnv.tablets[int(tablet.Alias.Uid)] + t.gotQueries = append(t.gotQueries, string(query)) + result, ok := t.queryResults[string(query)] + if !ok { + result = &querypb.QueryResult{} + log.Errorf("QUery: %s, Result :%v\n", query, result) + } + return result, nil +} From 9afb76fc09a64c8b7eb7594d0f424bbc190cc101 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 3 Jul 2020 23:11:54 +0200 Subject: [PATCH 02/10] Fix go.mod/sum Signed-off-by: Rohit Nayak --- go.mod | 4 +--- go.sum | 14 +------------- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 26c93a97894..e31c6c15878 100644 --- a/go.mod +++ b/go.mod @@ -47,13 +47,11 @@ require ( github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/krishicks/yaml-patch v0.0.10 github.com/magiconair/properties v1.8.1 - github.com/mattn/go-runewidth v0.0.3 // indirect + github.com/manifoldco/promptui v0.7.0 github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1 github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/go-testing-interface v1.14.0 // indirect github.com/mitchellh/mapstructure v1.2.3 // indirect - github.com/manifoldco/promptui v0.7.0 - github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1 github.com/olekukonko/tablewriter v0.0.4 github.com/onsi/ginkgo v1.10.3 // indirect github.com/onsi/gomega v1.7.1 // indirect diff --git a/go.sum b/go.sum index 24a3e2134c3..54df9e7836d 100644 --- a/go.sum +++ b/go.sum @@ -97,11 +97,11 @@ github.com/buger/jsonparser v0.0.0-20200322175846-f7e751efca13 h1:+qUNY4VRkEH46b github.com/buger/jsonparser v0.0.0-20200322175846-f7e751efca13/go.mod h1:tgcrVJ81GPSF0mz+0nu1Xaz0fazGPrmmJfJtxjbHhUQ= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible h1:C29Ae4G5GtYyYMm1aztcyj/J5ckgJm2zwdDajFbx1NY= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5OhCuC+XN+r/bBCmeuuJtjz+bCNIf8= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible h1:C29Ae4G5GtYyYMm1aztcyj/J5ckgJm2zwdDajFbx1NY= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3 h1:TJH+oke8D16535+jHExHj4nQvzlZrj7ug5D7I/orNUA= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= @@ -419,13 +419,9 @@ github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHX github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.2 h1:UnlwIPBGaTZfPQ6T1IGzPI0EkYAQmT9fAEJ/poFC63o= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= -<<<<<<< HEAD -github.com/mattn/go-runewidth v0.0.3 h1:a+kO+98RDGEfo6asOGMmpodZq4FNtnGP54yps8BzLR4= github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= -======= github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54= github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= ->>>>>>> 475a13b23... vtctl vexec/workflow actions: initial commit github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14 h1:9jZdLNd/P4+SfEJ0TNyxYpsK8N4GtfylBLqtbYN1sbA= @@ -441,10 +437,7 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc= github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= -<<<<<<< HEAD github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= -======= ->>>>>>> 475a13b23... vtctl vexec/workflow actions: initial commit github.com/mitchellh/go-testing-interface v1.14.0 h1:/x0XQ6h+3U3nAyk1yx+bHPURrKa9sVVvYbuqZ7pIAtI= github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= @@ -469,13 +462,8 @@ github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229 h1:E2B8qYyeSgv github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5 h1:58+kh9C6jJVXYjt8IE48G2eWl6BjwU5Gj0gqY84fy78= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= -<<<<<<< HEAD -github.com/olekukonko/tablewriter v0.0.0-20180130162743-b8a9be070da4 h1:Mm4XQCBICntJzH8fKglsRuEiFUJYnTnM4BBFvpP5BWs= -github.com/olekukonko/tablewriter v0.0.0-20180130162743-b8a9be070da4/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= -======= github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8= github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= ->>>>>>> 475a13b23... vtctl vexec/workflow actions: initial commit github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= From 6b580b69c3b0ddd271effdf0699c053030398413 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 4 Jul 2020 17:24:49 +0200 Subject: [PATCH 03/10] vtctl vexec/workflow actions: add e2e test, improve list output Signed-off-by: Rohit Nayak --- go.mod | 2 +- go.sum | 2 + go/test/endtoend/vreplication/config.go | 3 + .../vreplication/vreplication_test.go | 76 ++++++++++++++++++- go/vt/wrangler/vexec.go | 4 +- go/vt/wrangler/vexec_test.go | 18 +++-- 6 files changed, 93 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index e31c6c15878..1ff58dc17cc 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/go-testing-interface v1.14.0 // indirect github.com/mitchellh/mapstructure v1.2.3 // indirect - github.com/olekukonko/tablewriter v0.0.4 + github.com/olekukonko/tablewriter v0.0.5-0.20200416053754-163badb3bac6 github.com/onsi/ginkgo v1.10.3 // indirect github.com/onsi/gomega v1.7.1 // indirect github.com/opentracing-contrib/go-grpc v0.0.0-20180928155321-4b5a12d3ff02 diff --git a/go.sum b/go.sum index 54df9e7836d..d017a2f9d3b 100644 --- a/go.sum +++ b/go.sum @@ -464,6 +464,8 @@ github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5 h1:58+kh9C6 github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8= github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= +github.com/olekukonko/tablewriter v0.0.5-0.20200416053754-163badb3bac6 h1:F721VBMijn0OBFZ5wUSuMVVLQj2IJiiupn6UNd7UbBE= +github.com/olekukonko/tablewriter v0.0.5-0.20200416053754-163badb3bac6/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/go/test/endtoend/vreplication/config.go b/go/test/endtoend/vreplication/config.go index 2e831d2c43e..c22a486e39c 100644 --- a/go/test/endtoend/vreplication/config.go +++ b/go/test/endtoend/vreplication/config.go @@ -146,6 +146,9 @@ create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) co }, "cproduct": { "type": "reference" + }, + "vproduct": { + "type": "reference" } } } diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 655cbb0a5e5..aa27faa297f 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" @@ -50,7 +52,7 @@ func TestBasicVreplicationWorkflow(t *testing.T) { vc = InitCluster(t, cellName) assert.NotNil(t, vc) - //defer vc.TearDown() + defer vc.TearDown() cell = vc.Cells[cellName] vc.AddKeyspace(t, cell, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100) @@ -67,6 +69,8 @@ func TestBasicVreplicationWorkflow(t *testing.T) { shardMerchant(t) materializeProduct(t) + materializeProductUsingVExec(t) + materializeMerchantOrders(t) materializeSales(t) materializeMerchantSales(t) @@ -508,6 +512,76 @@ func vdiff(t *testing.T, workflow string) { } } +func materializeProductUsingVExec(t *testing.T) { + workflow := "vproduct" + insertQuery := "insert into _vt.vreplication(workflow, source, state, db_name, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp) values \n" + insertQuery += "('vproduct', 'keyspace:\"product\" shard:\"0\" filter: > ', " + insertQuery += "'Stopped', 'vt_customer', '', 0, 0, '', '', 0, 0)" + customerTablets := vc.getVttabletsInKeyspace(t, cell, "customer", "master") + createVProduct := "create table vproduct(pid bigint, description varchar(128), primary key(pid))" + fmt.Printf("Insert Query:\n%s\n", createVProduct) + for _, tab := range customerTablets { + tab.QueryTabletWithDB(createVProduct, "vt_customer") + } + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("VExec", "customer.vproduct", insertQuery); err != nil { + t.Fatalf("%s:%v", output, err) + } + if err := vc.VtctlClient.ExecuteCommand("VExec", "customer.vproduct", "update _vt.vreplication set state = 'Running'"); err != nil { + t.Fatal(err) + } + for _, tab := range customerTablets { + if vc.WaitForVReplicationToCatchup(tab, workflow, "vt_customer", 3*time.Second) != nil { + t.Fatal("Materialize vproduct timed out") + } + } + for _, tab := range customerTablets { + assert.Empty(t, validateCountInTablet(t, tab, "customer", "vproduct", 2)) + } + + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "customer.vproduct", "stop"); err != nil { + t.Fatalf("%s:%v", output, err) + } + productTab := vc.Cells[cell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet + productTab.QueryTabletWithDB("insert into product(pid, description) values (3, 'mouse')", "vt_product") + time.Sleep(100 * time.Millisecond) + for _, tab := range customerTablets { + assert.Empty(t, validateCountInTablet(t, tab, "customer", "vproduct", 2)) + } + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "customer.vproduct", "start"); err != nil { + t.Fatalf("%s:%v", output, err) + } + time.Sleep(1 * time.Second) + for _, tab := range customerTablets { + assert.Empty(t, validateCountInTablet(t, tab, "customer", "vproduct", 3)) + } + var row string + for _, tab := range customerTablets { + qr, err := tab.QueryTabletWithDB("select count(*) from vreplication where workflow = 'vproduct'", "_vt") + if err != nil { + t.Fatalf("Error: %s", err) + } + if qr == nil || len(qr.Rows) != 1 { + t.Fatalf("Invalid result") + } + row = fmt.Sprintf("%v", qr.Rows[0]) + require.Equal(t, "[INT64(1)]", row) + } + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "customer.vproduct", "delete"); err != nil { + t.Fatalf("%s:%v", output, err) + } + for _, tab := range customerTablets { + qr, err := tab.QueryTabletWithDB("select count(*) from vreplication where workflow = 'vproduct'", "_vt") + if err != nil { + t.Fatalf("Error: %s", err) + } + if qr == nil || len(qr.Rows) != 1 { + t.Fatalf("Invalid result") + } + row = fmt.Sprintf("%v", qr.Rows[0]) + require.Equal(t, "[INT64(0)]", row) + } +} + func materializeProduct(t *testing.T) { workflow := "cproduct" if err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "-vschema", materializeProductVSchema, "customer"); err != nil { diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index ec5a8a32791..e059d306f48 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -37,7 +37,6 @@ import ( const ( vreplicationTableName = "_vt.vreplication" - copyStateTableName = "_vt.copy_state" ) type vexec struct { @@ -101,9 +100,10 @@ func (vx *vexec) outputDryRunInfo(wr *Wrangler) error { for _, master := range vx.masters { key := fmt.Sprintf("%s/%s", master.Shard, master.AliasString()) for _, stream := range rsr.Statuses[key] { - table.Append([]string{key, fmt.Sprintf("%d", stream.ID), fmt.Sprintf("%v", stream.Bls), stream.State, stream.DBName, stream.Pos, fmt.Sprintf("%d", stream.MaxReplicationLag)}) + table.Append([]string{key, fmt.Sprintf("%d", stream.ID), stream.Bls.String(), stream.State, stream.DBName, stream.Pos, fmt.Sprintf("%d", stream.MaxReplicationLag)}) } } + table.SetAutoMergeCellsByColumnIndex([]int{0}) table.SetRowLine(true) table.Render() wr.Logger().Printf(tableString.String()) diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index 568bea14d20..ecb24778ffd 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -167,14 +167,16 @@ func TestVExec(t *testing.T) { require.NoError(t, err) dryRunResults := []string{ "Query: delete from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", - "will be run on the following streams in keyspace target for workflow wrWorkflow:", - "+----------------------+----+------------------+---------+-----------+", - "| TABLET | ID | BINLOGSOURCE | STATE | DBNAME |", - "+----------------------+----+------------------+---------+-----------+", - "| -80/zone1-0000000200 | 1 | zone1-0000000200 | Running | vt_target |", - "+----------------------+----+------------------+---------+-----------+", - "| 80-/zone1-0000000210 | 1 | zone1-0000000210 | Running | vt_target |", - "+----------------------+----+------------------+---------+-----------+", + "will be run on the following streams in keyspace target for workflow wrWorkflow:\n\n", + "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", + "| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | MAXREPLICATIONLAG |", + "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", + "| -80/zone1-0000000200 | 1 | keyspace:\"source\" shard:\"0\" | Running | vt_target | pos | 0 |", + "| | | filter: > | | | | |", + "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", + "| 80-/zone1-0000000210 | 1 | keyspace:\"source\" shard:\"0\" | Running | vt_target | pos | 0 |", + "| | | filter: > | | | | |", + "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", } require.Equal(t, strings.Join(dryRunResults, "\n")+"\n\n\n\n\n", logger.String()) } From d01b80b636a8de649f850bfcca60df709e79697e Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 5 Jul 2020 20:33:09 +0200 Subject: [PATCH 04/10] vtctl vexec/workflow actions: added more tests Signed-off-by: Rohit Nayak --- go/vt/wrangler/vexec.go | 10 +- go/vt/wrangler/vexec_test.go | 206 ++++++++++++++++++++++++++-- go/vt/wrangler/wrangler_env_test.go | 9 ++ 3 files changed, 206 insertions(+), 19 deletions(-) diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index e059d306f48..f8e064f9ffd 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -315,7 +315,7 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) ( rsr.Workflow = workflow rsr.TargetKeyspace = keyspace var results map[*topo.TabletInfo]*querypb.QueryResult - query := fmt.Sprintf("select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication") + query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication" results, err := wr._vexec(ctx, workflow, keyspace, query, false) if err != nil { return nil, err @@ -354,10 +354,10 @@ func (wr *Wrangler) listStreams(ctx context.Context, workflow, keyspace string) func updateState(message, state string, cs []copyState, timeUpdated int64) string { if message != "" { state = "Error" - } else if state == "Running" && int64(time.Now().Second())-timeUpdated > 10 /* seconds */ { - state = "Lagging" } else if state == "Running" && len(cs) > 0 { state = "Copying" + } else if state == "Running" && int64(time.Now().Second())-timeUpdated > 10 /* seconds */ { + state = "Lagging" } return state } @@ -374,7 +374,7 @@ func dumpStreamListAsJSON(replStatus *replicationStatusResult, wr *Wrangler) err func (wr *Wrangler) getCopyState(ctx context.Context, tablet *topo.TabletInfo, id int64) ([]copyState, error) { var cs []copyState query := fmt.Sprintf(`select table_name, lastpk from _vt.copy_state where vrepl_id = %d`, id) - qr, err := wr.ExecuteFetchAsApp(ctx, tablet.Alias, true, query, 10000) + qr, err := wr.VReplicationExec(ctx, tablet.Alias, query) if err != nil { return nil, err } @@ -392,5 +392,3 @@ func (wr *Wrangler) getCopyState(ctx context.Context, tablet *topo.TabletInfo, i return cs, nil } - -//TODOs: add to vreplication_test.go endtoend: definitely ListStreams, maybe stopstream/insert/check/startstream/check ... diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index ecb24778ffd..9f2113979b9 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -22,6 +22,7 @@ import ( "sort" "strings" "testing" + "time" "github.com/stretchr/testify/require" "vitess.io/vitess/go/sqltypes" @@ -171,32 +172,211 @@ func TestVExec(t *testing.T) { "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", "| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | MAXREPLICATIONLAG |", "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", - "| -80/zone1-0000000200 | 1 | keyspace:\"source\" shard:\"0\" | Running | vt_target | pos | 0 |", + "| -80/zone1-0000000200 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos | 0 |", "| | | filter: > | | | | |", "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", - "| 80-/zone1-0000000210 | 1 | keyspace:\"source\" shard:\"0\" | Running | vt_target | pos | 0 |", + "| 80-/zone1-0000000210 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos | 0 |", "| | | filter: > | | | | |", "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", } require.Equal(t, strings.Join(dryRunResults, "\n")+"\n\n\n\n\n", logger.String()) } -/* -func TextVExecValidations(t *testing.T) { +func TestWorkflowStatusUpdate(t *testing.T) { + require.Equal(t, "Error", updateState("master tablet not contactable", "Running", nil, 0)) + require.Equal(t, "Lagging", updateState("", "Running", nil, int64(time.Now().Second())-100)) + require.Equal(t, "Copying", updateState("", "Running", []copyState{{Table: "t1", LastPK: "[[INT64(10)]]"}}, int64(time.Now().Second()))) +} + +func TestWorkflowListStreams(t *testing.T) { + ctx := context.Background() + workflow := "wrWorkflow" + keyspace := "target" + env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil) + defer env.close() + logger := logutil.NewMemoryLogger() + wr := New(logger, env.topoServ, env.tmc) + + err := wr.listStreams(ctx, workflow, keyspace) + require.Nil(t, err) + want := `{ + "Workflow": "wrWorkflow", + "SourceKeyspace": "source", + "TargetKeyspace": "target", + "Statuses": { + "-80/zone1-0000000200": [ + { + "Shard": "-80", + "Tablet": "zone1-0000000200", + "ID": 1, + "Bls": { + "keyspace": "source", + "shard": "0", + "filter": { + "rules": [ + { + "match": "t1" + } + ] + } + }, + "Pos": "pos", + "StopPos": "", + "State": "Copying", + "MaxReplicationLag": 0, + "DBName": "vt_target", + "TimeUpdated": 1234, + "Message": "", + "CopyState": [ + { + "Table": "t", + "LastPK": "1" + } + ] + } + ], + "80-/zone1-0000000210": [ + { + "Shard": "80-", + "Tablet": "zone1-0000000210", + "ID": 1, + "Bls": { + "keyspace": "source", + "shard": "0", + "filter": { + "rules": [ + { + "match": "t1" + } + ] + } + }, + "Pos": "pos", + "StopPos": "", + "State": "Copying", + "MaxReplicationLag": 0, + "DBName": "vt_target", + "TimeUpdated": 1234, + "Message": "", + "CopyState": [ + { + "Table": "t", + "LastPK": "1" + } + ] + } + ] + } +} + +` + require.Equal(t, want, logger.String()) + + results, err := wr.execWorkflowAction(ctx, workflow, keyspace, "stop", false) + require.Nil(t, err) + require.Equal(t, "map[Tablet{zone1-0000000200}:rows_affected:1 Tablet{zone1-0000000210}:rows_affected:1 ]", fmt.Sprintf("%v", results)) + + logger.Clear() + results, err = wr.execWorkflowAction(ctx, workflow, keyspace, "stop", true) + require.Nil(t, err) + require.Equal(t, "map[]", fmt.Sprintf("%v", results)) + dryRunResult := `Query: update _vt.vreplication set state = 'Stopped' where db_name = 'vt_target' and workflow = 'wrWorkflow' +will be run on the following streams in keyspace target for workflow wrWorkflow: + + ++----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+ +| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | MAXREPLICATIONLAG | ++----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+ +| -80/zone1-0000000200 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | 0 | +| | | filter: > | | | | | ++----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+ +| 80-/zone1-0000000210 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | 0 | +| | | filter: > | | | | | ++----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+ + + + + +` + require.Equal(t, dryRunResult, logger.String()) +} + +func TestVExecValidations(t *testing.T) { ctx := context.Background() - workflow := "" - keyspace := "" + workflow := "wf" + keyspace := "ks" query := "" - env := newWranglerTestEnv([]string{"0"}, []string{"-80","80-"}, "", nil) + env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil) defer env.close() wr := New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) - //validations: empty workflow/ks/query - // valid workflow: no masters found? valid keyspace - // invalid query - // insert query + vx := newVExec(ctx, workflow, keyspace, query, wr) -} + type badQuery struct { + name string + query string + errorString string + } + badQueries := []badQuery{ + { + name: "invalid", + query: "bad query", + errorString: "syntax error at position 4 near 'bad'", + }, + { + name: "incorrect table", + query: "select * from _vt.vreplication2", + errorString: "invalid table name: _vt.vreplication2", + }, + { + name: "unsupported query", + query: "describe _vt.vreplication", + errorString: "query not supported by vexec: otherread", + }, + } + for _, bq := range badQueries { + t.Run(bq.name, func(t *testing.T) { + vx.query = bq.query + plan, err := vx.buildVExecPlan() + require.EqualError(t, err, bq.errorString) + require.Nil(t, plan) + }) + } -*/ + type action struct { + name string + want string + expectedError error + } + updateSQL := "update _vt.vreplication set state = %s" + actions := []action{ + { + name: "start", + want: fmt.Sprintf(updateSQL, encodeString("Running")), + expectedError: nil, + }, + { + name: "stop", + want: fmt.Sprintf(updateSQL, encodeString("Stopped")), + expectedError: nil, + }, + { + name: "delete", + want: "delete from _vt.vreplication", + expectedError: nil, + }, + { + name: "other", + want: "", + expectedError: fmt.Errorf("invalid action found: other"), + }} + + for _, a := range actions { + t.Run(a.name, func(t *testing.T) { + sql, err := wr.getWorkflowActionQuery(a.name) + require.Equal(t, a.expectedError, err) + require.Equal(t, a.want, sql) + }) + } +} diff --git a/go/vt/wrangler/wrangler_env_test.go b/go/vt/wrangler/wrangler_env_test.go index ca167a41dbb..fd2c9af4c3f 100644 --- a/go/vt/wrangler/wrangler_env_test.go +++ b/go/vt/wrangler/wrangler_env_test.go @@ -139,6 +139,7 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit ) env.tmc.setVRResults(master.tablet, "update _vt.vreplication set state = 'Stopped', message = 'for wrangler test' where db_name = 'vt_target' and workflow = 'wrWorkflow'", &sqltypes.Result{RowsAffected: 1}) + env.tmc.setVRResults(master.tablet, "update _vt.vreplication set state = 'Stopped' where db_name = 'vt_target' and workflow = 'wrWorkflow'", &sqltypes.Result{RowsAffected: 1}) env.tmc.setVRResults(master.tablet, "delete from _vt.vreplication where message != '' and db_name = 'vt_target' and workflow = 'wrWorkflow'", &sqltypes.Result{RowsAffected: 1}) env.tmc.setVRResults(master.tablet, "insert into _vt.vreplication(state, workflow, db_name) values ('Running', 'wk1', 'ks1'), ('Stopped', 'wk1', 'ks1')", &sqltypes.Result{RowsAffected: 2}) @@ -159,6 +160,14 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit ), ) + result = sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "table|lastpk", + "varchar|varchar"), + "t1|pk1", + ) + + env.tmc.setVRResults(master.tablet, "select table_name, lastpk from _vt.copy_state where vrepl_id = 1", result) + env.tmc.vrpos[tabletID] = testSourceGtid env.tmc.pos[tabletID] = testTargetMasterPosition From 2e995985380b54f39242f9ab4e0a54670666959b Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 13 Jul 2020 20:10:38 +0200 Subject: [PATCH 05/10] vtctl vexec/workflow actions: make adding of db_name/workflow conditional Signed-off-by: Rohit Nayak --- go/vt/wrangler/vexec.go | 2 - go/vt/wrangler/vexec_plan.go | 81 +++++++++++++++++++++++++++--------- 2 files changed, 62 insertions(+), 21 deletions(-) diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index f8e064f9ffd..5fb122a2821 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -286,7 +286,6 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty return nil, "", err } message = row[8].ToString() - //fmt.Printf("%d: %s\n", id, state) status := &replicationStatus{ Shard: master.Shard, Tablet: master.AliasString(), @@ -321,7 +320,6 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) ( return nil, err } for master, result := range results { - //fmt.Printf("master %s, result %v\n", master.Alias, result) var rsrStatus []*replicationStatus qr := sqltypes.Proto3ToResult(result) for _, row := range qr.Rows { diff --git a/go/vt/wrangler/vexec_plan.go b/go/vt/wrangler/vexec_plan.go index 339f097ea2e..290453bd461 100644 --- a/go/vt/wrangler/vexec_plan.go +++ b/go/vt/wrangler/vexec_plan.go @@ -61,33 +61,76 @@ func (vx *vexec) buildVExecPlan() (*vexecPlan, error) { return plan, nil } +func splitAndExpression(filters []sqlparser.Expr, node sqlparser.Expr) []sqlparser.Expr { + if node == nil { + return filters + } + switch node := node.(type) { + case *sqlparser.AndExpr: + filters = splitAndExpression(filters, node.Left) + return splitAndExpression(filters, node.Right) + } + return append(filters, node) +} + +func (vx *vexec) analyzeWhere(where *sqlparser.Where) []string { + var cols []string + if where == nil { + return cols + } + exprs := splitAndExpression(nil, where.Expr) + for _, expr := range exprs { + switch expr := expr.(type) { + case *sqlparser.ComparisonExpr: + qualifiedName, ok := expr.Left.(*sqlparser.ColName) + if ok { + cols = append(cols, qualifiedName.Name.String()) + } + } + } + return cols +} + func (vx *vexec) addDefaultWheres(where *sqlparser.Where) *sqlparser.Where { + cols := vx.analyzeWhere(where) + var hasDBName, hasWorkflow bool + for _, col := range cols { + if col == "db_name" { + hasDBName = true + } else if col == "workflow" { + hasWorkflow = true + } + } newWhere := where - expr := &sqlparser.ComparisonExpr{ - Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("db_name")}, - Operator: sqlparser.EqualStr, - Right: sqlparser.NewStrVal([]byte(vx.masters[0].DbName())), - } - if newWhere == nil { - newWhere = &sqlparser.Where{ - Type: sqlparser.WhereStr, - Expr: expr, + if !hasDBName { + expr := &sqlparser.ComparisonExpr{ + Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("db_name")}, + Operator: sqlparser.EqualStr, + Right: sqlparser.NewStrVal([]byte(vx.masters[0].DbName())), + } + if newWhere == nil { + newWhere = &sqlparser.Where{ + Type: sqlparser.WhereStr, + Expr: expr, + } + } else { + newWhere.Expr = &sqlparser.AndExpr{ + Left: newWhere.Expr, + Right: expr, + } + } + } + if !hasWorkflow { + expr := &sqlparser.ComparisonExpr{ + Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("workflow")}, + Operator: sqlparser.EqualStr, + Right: sqlparser.NewStrVal([]byte(vx.workflow)), } - } else { newWhere.Expr = &sqlparser.AndExpr{ Left: newWhere.Expr, Right: expr, } } - expr = &sqlparser.ComparisonExpr{ - Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("workflow")}, - Operator: sqlparser.EqualStr, - Right: sqlparser.NewStrVal([]byte(vx.workflow)), - } - newWhere.Expr = &sqlparser.AndExpr{ - Left: newWhere.Expr, - Right: expr, - } return newWhere } From ae78f6baee83f5bfaf1f5f1ae1a08b3abde6e804 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 13 Jul 2020 20:15:51 +0200 Subject: [PATCH 06/10] vtctl vexec/workflow actions: rename _vexec Signed-off-by: Rohit Nayak --- go/vt/wrangler/vexec.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index 5fb122a2821..37b9472481d 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -63,7 +63,7 @@ func newVExec(ctx context.Context, workflow, keyspace, query string, wr *Wrangle // VExec executes queries on _vt.vreplication on all masters in the target keyspace of the workflow func (wr *Wrangler) VExec(ctx context.Context, workflow, keyspace, query string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) { - results, err := wr._vexec(ctx, workflow, keyspace, query, dryRun) + results, err := wr.runVexec(ctx, workflow, keyspace, query, dryRun) retResults := make(map[*topo.TabletInfo]*sqltypes.Result) for tablet, result := range results { retResults[tablet] = sqltypes.Proto3ToResult(result) @@ -71,7 +71,7 @@ func (wr *Wrangler) VExec(ctx context.Context, workflow, keyspace, query string, return retResults, err } -func (wr *Wrangler) _vexec(ctx context.Context, workflow, keyspace, query string, dryRun bool) (map[*topo.TabletInfo]*querypb.QueryResult, error) { +func (wr *Wrangler) runVexec(ctx context.Context, workflow, keyspace, query string, dryRun bool) (map[*topo.TabletInfo]*querypb.QueryResult, error) { vx := newVExec(ctx, workflow, keyspace, query, wr) if err := vx.getMasters(); err != nil { return nil, err @@ -315,7 +315,7 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) ( rsr.TargetKeyspace = keyspace var results map[*topo.TabletInfo]*querypb.QueryResult query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication" - results, err := wr._vexec(ctx, workflow, keyspace, query, false) + results, err := wr.runVexec(ctx, workflow, keyspace, query, false) if err != nil { return nil, err } From d8ecaf77c82ff7111f7699be4679e87ee3ffcd64 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 15 Jul 2020 14:41:18 +0200 Subject: [PATCH 07/10] vtctl vexec/workflow actions: better result/error reporting Signed-off-by: Rohit Nayak --- go/vt/vtctl/vtctl.go | 44 +++++++++++++++++++++++++++--------- go/vt/wrangler/vexec.go | 19 +++++++++++----- go/vt/wrangler/vexec_plan.go | 1 + 3 files changed, 47 insertions(+), 17 deletions(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index e9ff0e9cf4e..d9aac938bee 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2852,31 +2852,42 @@ func commandVExec(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla if len(results) == 0 { wr.Logger().Printf("no result returned\n") } + qr := queryResultFromVexecResults(results) + if *json { + return printJSON(wr.Logger(), qr) + } + + printQueryResult(loggerWriter{wr.Logger()}, qr) + return nil +} + +func queryResultFromVexecResults(results map[*topo.TabletInfo]*sqltypes.Result) *sqltypes.Result { var qr *sqltypes.Result = &sqltypes.Result{} qr.RowsAffected = uint64(len(results)) qr.Fields = []*querypb.Field{{ Name: "Tablet", Type: sqltypes.VarBinary, - }} + }, + { + Name: "Rows Affected", + Type: sqltypes.Uint64, + }, + } for _, result := range results { fields := result.Fields qr.Fields = append(qr.Fields, fields...) break } for tablet, result := range results { + var row2 []sqltypes.Value + row2 = append(row2, sqltypes.NewVarBinary(tablet.AliasString())) + row2 = append(row2, sqltypes.NewUint64(qr.RowsAffected)) for _, row := range result.Rows { - var row2 []sqltypes.Value - row2 = append(row2, sqltypes.NewVarBinary(tablet.AliasString())) row2 = append(row2, row...) - qr.Rows = append(qr.Rows, row2) } + qr.Rows = append(qr.Rows, row2) } - if *json { - return printJSON(wr.Logger(), qr) - } - - printQueryResult(loggerWriter{wr.Logger()}, qr) - return nil + return qr } func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { @@ -2897,7 +2908,18 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag. } action := subFlags.Arg(1) - return wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun) + results, err := wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun) + if err != nil { + return err + } + if len(results) == 0 { + wr.Logger().Printf("no result returned\n") + } + qr := queryResultFromVexecResults(results) + + printQueryResult(loggerWriter{wr.Logger()}, qr) + return nil + } func commandPanic(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index 37b9472481d..e978811e63d 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -24,6 +24,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/log" + "github.com/gogo/protobuf/proto" "github.com/olekukonko/tablewriter" "vitess.io/vitess/go/sqltypes" @@ -124,13 +126,14 @@ func (vx *vexec) exec(query string) (map[*topo.TabletInfo]*querypb.QueryResult, wg.Add(1) go func(ctx context.Context, master *topo.TabletInfo) { defer wg.Done() - fmt.Printf("Running %s on %s\n", query, master.AliasString()) + log.Infof("Running %s on %s\n", query, master.AliasString()) qr, err := vx.wr.VReplicationExec(ctx, master.Alias, query) + log.Infof("Result is %s: %v", master.AliasString(), qr) if err != nil { allErrors.RecordError(err) } else { if qr.RowsAffected == 0 { - allErrors.RecordError(fmt.Errorf("\nNo streams found for workflow %s tablet %s", workflow, master.Alias)) + allErrors.RecordError(fmt.Errorf("\nno matching streams found for workflow %s, tablet %s, query %s", workflow, master.Alias, query)) } else { mu.Lock() results[master] = qr @@ -186,12 +189,16 @@ func (vx *vexec) getMasterForShard(shard string) (*topo.TabletInfo, error) { } // WorkflowAction can start/stop/delete or list strams in _vt.vreplication on all masters in the target keyspace of the workflow -func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool) error { +func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) { if action == "list" { - return wr.listStreams(ctx, workflow, keyspace) + return nil, wr.listStreams(ctx, workflow, keyspace) + } + results, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun) + retResults := make(map[*topo.TabletInfo]*sqltypes.Result) + for tablet, result := range results { + retResults[tablet] = sqltypes.Proto3ToResult(result) } - _, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun) - return err + return retResults, err } func (wr *Wrangler) getWorkflowActionQuery(action string) (string, error) { diff --git a/go/vt/wrangler/vexec_plan.go b/go/vt/wrangler/vexec_plan.go index 290453bd461..612c7b3aac1 100644 --- a/go/vt/wrangler/vexec_plan.go +++ b/go/vt/wrangler/vexec_plan.go @@ -53,6 +53,7 @@ func (vx *vexec) buildVExecPlan() (*vexecPlan, error) { default: return nil, fmt.Errorf("query not supported by vexec: %s", sqlparser.String(stmt)) } + if err != nil { return nil, err } From ad0259646772d1506b32f056a80da3fcbab2eb4b Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 23 Jul 2020 19:25:57 +0200 Subject: [PATCH 08/10] vtctl vexec/workflow actions: improve report output and fix bug reported in review Signed-off-by: Rohit Nayak --- go/vt/vtctl/vtctl.go | 58 +++++++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index d9aac938bee..49be08f213c 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2853,45 +2853,61 @@ func commandVExec(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla wr.Logger().Printf("no result returned\n") } qr := queryResultFromVexecResults(results) + if len(qr.Rows) == 0 { + return nil + } if *json { return printJSON(wr.Logger(), qr) } - printQueryResult(loggerWriter{wr.Logger()}, qr) return nil } -func queryResultFromVexecResults(results map[*topo.TabletInfo]*sqltypes.Result) *sqltypes.Result { - var qr *sqltypes.Result = &sqltypes.Result{} +// called for workflow stop/start/delete. Only rows affected are reported per tablet +func queryResultFromWorkflowResults(results map[*topo.TabletInfo]*sqltypes.Result) *sqltypes.Result { + var qr = &sqltypes.Result{} qr.RowsAffected = uint64(len(results)) qr.Fields = []*querypb.Field{{ Name: "Tablet", Type: sqltypes.VarBinary, - }, - { - Name: "Rows Affected", - Type: sqltypes.Uint64, - }, - } - for _, result := range results { - fields := result.Fields - qr.Fields = append(qr.Fields, fields...) - break - } + }, { + Name: "RowsAffected", + Type: sqltypes.Uint64, + }} + var row2 []sqltypes.Value for tablet, result := range results { - var row2 []sqltypes.Value + row2 = nil row2 = append(row2, sqltypes.NewVarBinary(tablet.AliasString())) - row2 = append(row2, sqltypes.NewUint64(qr.RowsAffected)) + row2 = append(row2, sqltypes.NewUint64(result.RowsAffected)) + qr.Rows = append(qr.Rows, row2) + } + return qr +} + +func queryResultFromVexecResults(results map[*topo.TabletInfo]*sqltypes.Result) *sqltypes.Result { + var qr = &sqltypes.Result{} + qr.RowsAffected = uint64(len(results)) + qr.Fields = []*querypb.Field{{ + Name: "Tablet", + Type: sqltypes.VarBinary, + }} + var row2 []sqltypes.Value + for tablet, result := range results { for _, row := range result.Rows { + if len(qr.Fields) == 1 { + qr.Fields = append(qr.Fields, result.Fields...) + } + row2 = nil + row2 = append(row2, sqltypes.NewVarBinary(tablet.AliasString())) row2 = append(row2, row...) + qr.Rows = append(qr.Rows, row2) } - qr.Rows = append(qr.Rows, row2) } return qr } func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { - dryRun := subFlags.Bool("dry_run", false, "Does a dry run of VExec and only reports the final query and list of masters on which it will be applied") + dryRun := subFlags.Bool("dry_run", false, "Does a dry run of Workflow and only reports the final query and list of masters on which the operation will be applied") if err := subFlags.Parse(args); err != nil { return err } @@ -2912,10 +2928,14 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag. if err != nil { return err } + if action == "list" { + return nil + } if len(results) == 0 { wr.Logger().Printf("no result returned\n") + return nil } - qr := queryResultFromVexecResults(results) + qr := queryResultFromWorkflowResults(results) printQueryResult(loggerWriter{wr.Logger()}, qr) return nil From 8ecbbe4f941acc063b2f8990369f7879cbfe1299 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 23 Jul 2020 19:52:01 +0200 Subject: [PATCH 09/10] vtctl vexec/workflow actions: remove insert from vexec Signed-off-by: Rohit Nayak --- .../vreplication/vreplication_test.go | 73 ------------------- go/vt/wrangler/vexec_plan.go | 58 +-------------- go/vt/wrangler/vexec_test.go | 15 +--- 3 files changed, 4 insertions(+), 142 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index aa27faa297f..79163546ccc 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -24,8 +24,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/assert" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" @@ -69,7 +67,6 @@ func TestBasicVreplicationWorkflow(t *testing.T) { shardMerchant(t) materializeProduct(t) - materializeProductUsingVExec(t) materializeMerchantOrders(t) materializeSales(t) @@ -512,76 +509,6 @@ func vdiff(t *testing.T, workflow string) { } } -func materializeProductUsingVExec(t *testing.T) { - workflow := "vproduct" - insertQuery := "insert into _vt.vreplication(workflow, source, state, db_name, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp) values \n" - insertQuery += "('vproduct', 'keyspace:\"product\" shard:\"0\" filter: > ', " - insertQuery += "'Stopped', 'vt_customer', '', 0, 0, '', '', 0, 0)" - customerTablets := vc.getVttabletsInKeyspace(t, cell, "customer", "master") - createVProduct := "create table vproduct(pid bigint, description varchar(128), primary key(pid))" - fmt.Printf("Insert Query:\n%s\n", createVProduct) - for _, tab := range customerTablets { - tab.QueryTabletWithDB(createVProduct, "vt_customer") - } - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("VExec", "customer.vproduct", insertQuery); err != nil { - t.Fatalf("%s:%v", output, err) - } - if err := vc.VtctlClient.ExecuteCommand("VExec", "customer.vproduct", "update _vt.vreplication set state = 'Running'"); err != nil { - t.Fatal(err) - } - for _, tab := range customerTablets { - if vc.WaitForVReplicationToCatchup(tab, workflow, "vt_customer", 3*time.Second) != nil { - t.Fatal("Materialize vproduct timed out") - } - } - for _, tab := range customerTablets { - assert.Empty(t, validateCountInTablet(t, tab, "customer", "vproduct", 2)) - } - - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "customer.vproduct", "stop"); err != nil { - t.Fatalf("%s:%v", output, err) - } - productTab := vc.Cells[cell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet - productTab.QueryTabletWithDB("insert into product(pid, description) values (3, 'mouse')", "vt_product") - time.Sleep(100 * time.Millisecond) - for _, tab := range customerTablets { - assert.Empty(t, validateCountInTablet(t, tab, "customer", "vproduct", 2)) - } - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "customer.vproduct", "start"); err != nil { - t.Fatalf("%s:%v", output, err) - } - time.Sleep(1 * time.Second) - for _, tab := range customerTablets { - assert.Empty(t, validateCountInTablet(t, tab, "customer", "vproduct", 3)) - } - var row string - for _, tab := range customerTablets { - qr, err := tab.QueryTabletWithDB("select count(*) from vreplication where workflow = 'vproduct'", "_vt") - if err != nil { - t.Fatalf("Error: %s", err) - } - if qr == nil || len(qr.Rows) != 1 { - t.Fatalf("Invalid result") - } - row = fmt.Sprintf("%v", qr.Rows[0]) - require.Equal(t, "[INT64(1)]", row) - } - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "customer.vproduct", "delete"); err != nil { - t.Fatalf("%s:%v", output, err) - } - for _, tab := range customerTablets { - qr, err := tab.QueryTabletWithDB("select count(*) from vreplication where workflow = 'vproduct'", "_vt") - if err != nil { - t.Fatalf("Error: %s", err) - } - if qr == nil || len(qr.Rows) != 1 { - t.Fatalf("Invalid result") - } - row = fmt.Sprintf("%v", qr.Rows[0]) - require.Equal(t, "[INT64(0)]", row) - } -} - func materializeProduct(t *testing.T) { workflow := "cproduct" if err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "-vschema", materializeProductVSchema, "customer"); err != nil { diff --git a/go/vt/wrangler/vexec_plan.go b/go/vt/wrangler/vexec_plan.go index 612c7b3aac1..a61a986d679 100644 --- a/go/vt/wrangler/vexec_plan.go +++ b/go/vt/wrangler/vexec_plan.go @@ -29,8 +29,7 @@ type vexecPlan struct { } const ( - insertQuery = iota - updateQuery + updateQuery = iota deleteQuery selectQuery ) @@ -42,8 +41,6 @@ func (vx *vexec) buildVExecPlan() (*vexecPlan, error) { } var plan *vexecPlan switch stmt := stmt.(type) { - case *sqlparser.Insert: - plan, err = vx.buildInsertPlan(stmt) case *sqlparser.Update: plan, err = vx.buildUpdatePlan(stmt) case *sqlparser.Delete: @@ -135,59 +132,6 @@ func (vx *vexec) addDefaultWheres(where *sqlparser.Where) *sqlparser.Where { return newWhere } -func (vx *vexec) buildInsertPlan(ins *sqlparser.Insert) (*vexecPlan, error) { - switch sqlparser.String(ins.Table) { - case vreplicationTableName: - // no-op - default: - return nil, fmt.Errorf("vexec does not support: %v", sqlparser.String(ins.Table)) - } - - if ins.Action != sqlparser.InsertStr { - return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins)) - } - if ins.Ignore != "" { - return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins)) - } - if ins.Partitions != nil { - return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins)) - } - if ins.OnDup != nil { - return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins)) - } - rows, ok := ins.Rows.(sqlparser.Values) - if !ok { - return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins)) - } - idPos := 0 - if len(ins.Columns) != 0 { - idPos = -1 - for i, col := range ins.Columns { - if col.EqualString("id") { - idPos = i - break - } - } - } - if idPos >= 0 { - for _, row := range rows { - if idPos >= len(row) { - return nil, fmt.Errorf("malformed statement: %v", sqlparser.String(ins)) - } - if _, ok := row[idPos].(*sqlparser.NullVal); !ok { - return nil, fmt.Errorf("id should not have a value: %v", sqlparser.String(ins)) - } - } - } - buf := sqlparser.NewTrackedBuffer(nil) - buf.Myprintf("%v", ins) - - return &vexecPlan{ - opcode: insertQuery, - parsedQuery: buf.ParsedQuery(), - }, nil -} - func (vx *vexec) buildUpdatePlan(upd *sqlparser.Update) (*vexecPlan, error) { switch sqlparser.String(upd.TableExprs) { case vreplicationTableName: diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index 9f2113979b9..f0a15e0841f 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -122,20 +122,11 @@ func TestVExec(t *testing.T) { query: "update _vt.vreplication set state='Stopped', message='for wrangler test'", result: result, }) - result = &sqltypes.Result{ - RowsAffected: 2, - Rows: [][]sqltypes.Value{}, - } - testCases = append(testCases, &TestCase{ - name: "insert", - query: "insert into _vt.vreplication(state, workflow, db_name) values ('Running', 'wk1', 'ks1'), ('Stopped', 'wk1', 'ks1')", - result: result, - }) - errorString := "id should not have a value" + errorString := "query not supported by vexec" testCases = append(testCases, &TestCase{ - name: "insert invalid-id", - query: "insert into _vt.vreplication(id, state) values (1, 'Running'), (2, 'Stopped')", + name: "insert", + query: "insert into _vt.vreplication(state, workflow, db_name) values ('Running', 'wk1', 'ks1'), ('Stopped', 'wk1', 'ks1')", errorString: errorString, }) From 11dbdd9305a5fa555eefdbc2dda0fa334e793b41 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 24 Jul 2020 11:25:48 +0200 Subject: [PATCH 10/10] vtctl vexec/workflow actions: show rows affected for vexec dmls Signed-off-by: Rohit Nayak --- go/vt/vtctl/vtctl.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 49be08f213c..284fd58af32 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2852,7 +2852,17 @@ func commandVExec(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla if len(results) == 0 { wr.Logger().Printf("no result returned\n") } - qr := queryResultFromVexecResults(results) + var qr *sqltypes.Result + var numFields int + for _, result := range results { + numFields = len(result.Fields) + break + } + if numFields != 0 { + qr = queryResultForTabletResults(results) + } else { + qr = queryResultForRowsAffected(results) + } if len(qr.Rows) == 0 { return nil } @@ -2864,7 +2874,7 @@ func commandVExec(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla } // called for workflow stop/start/delete. Only rows affected are reported per tablet -func queryResultFromWorkflowResults(results map[*topo.TabletInfo]*sqltypes.Result) *sqltypes.Result { +func queryResultForRowsAffected(results map[*topo.TabletInfo]*sqltypes.Result) *sqltypes.Result { var qr = &sqltypes.Result{} qr.RowsAffected = uint64(len(results)) qr.Fields = []*querypb.Field{{ @@ -2884,7 +2894,7 @@ func queryResultFromWorkflowResults(results map[*topo.TabletInfo]*sqltypes.Resul return qr } -func queryResultFromVexecResults(results map[*topo.TabletInfo]*sqltypes.Result) *sqltypes.Result { +func queryResultForTabletResults(results map[*topo.TabletInfo]*sqltypes.Result) *sqltypes.Result { var qr = &sqltypes.Result{} qr.RowsAffected = uint64(len(results)) qr.Fields = []*querypb.Field{{ @@ -2935,7 +2945,7 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag. wr.Logger().Printf("no result returned\n") return nil } - qr := queryResultFromWorkflowResults(results) + qr := queryResultForRowsAffected(results) printQueryResult(loggerWriter{wr.Logger()}, qr) return nil