diff --git a/go/test/endtoend/vtgate/lookup_test.go b/go/test/endtoend/vtgate/lookup_test.go new file mode 100644 index 00000000000..90ac5e2a1fc --- /dev/null +++ b/go/test/endtoend/vtgate/lookup_test.go @@ -0,0 +1,347 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +func TestConsistentLookup(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + // conn2 is for queries that target shards. + conn2, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn2.Close() + + // Simple insert. + exec(t, conn, "begin") + exec(t, conn, "insert into t1(id1, id2) values(1, 4)") + // check that the lookup query happens in the right connection + assertMatches(t, conn, "select * from t1 where id2 = 4", "[[INT64(1) INT64(4)]]") + exec(t, conn, "commit") + assertMatches(t, conn, "select * from t1", "[[INT64(1) INT64(4)]]") + qr := exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Inserting again should fail. + exec(t, conn, "begin") + _, err = conn.ExecuteFetch("insert into t1(id1, id2) values(1, 4)", 1000, false) + exec(t, conn, "rollback") + require.Error(t, err) + mysqlErr := err.(*mysql.SQLError) + assert.Equal(t, 1062, mysqlErr.Num) + assert.Equal(t, "23000", mysqlErr.State) + assert.Contains(t, mysqlErr.Message, "Duplicate entry") + + // Simple delete. + exec(t, conn, "begin") + exec(t, conn, "delete from t1 where id1=1") + assertMatches(t, conn, "select * from t1 where id2 = 4", "[]") + exec(t, conn, "commit") + qr = exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Autocommit insert. + exec(t, conn, "insert into t1(id1, id2) values(1, 4)") + qr = exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select id2 from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + // Autocommit delete. + exec(t, conn, "delete from t1 where id1=1") + + // Dangling row pointing to existing keyspace id. + exec(t, conn, "insert into t1(id1, id2) values(1, 4)") + // Delete the main row only. + exec(t, conn2, "use `ks:-80`") + exec(t, conn2, "delete from t1 where id1=1") + // Verify the lookup row is still there. + qr = exec(t, conn, "select id2 from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + // Insert should still succeed. + exec(t, conn, "begin") + exec(t, conn, "insert into t1(id1, id2) values(1, 4)") + exec(t, conn, "commit") + qr = exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + // Lookup row should be unchanged. + qr = exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Dangling row not pointing to existing keyspace id. + exec(t, conn2, "use `ks:-80`") + exec(t, conn2, "delete from t1 where id1=1") + // Update the lookup row with bogus keyspace id. + exec(t, conn, "update t1_id2_idx set keyspace_id='aaa' where id2=4") + qr = exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"aaa\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + // Insert should still succeed. + exec(t, conn, "begin") + exec(t, conn, "insert into t1(id1, id2) values(1, 4)") + exec(t, conn, "commit") + qr = exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + // lookup row must be updated. + qr = exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Update, but don't change anything. This should not deadlock. + exec(t, conn, "begin") + exec(t, conn, "update t1 set id2=4 where id1=1") + exec(t, conn, "commit") + qr = exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Update, and change the lookup value. This should change main and lookup rows. + exec(t, conn, "begin") + exec(t, conn, "update t1 set id2=5 where id1=1") + exec(t, conn, "commit") + qr = exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(5)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(5) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + exec(t, conn, "delete from t1 where id1=1") +} + +func TestConsistentLookupMultiInsert(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + // conn2 is for queries that target shards. + conn2, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn2.Close() + + exec(t, conn, "begin") + exec(t, conn, "insert into t1(id1, id2) values(1,4), (2,5)") + exec(t, conn, "commit") + qr := exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)] [INT64(2) INT64(5)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select count(*) from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(2)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Delete one row but leave its lookup dangling. + exec(t, conn2, "use `ks:-80`") + exec(t, conn2, "delete from t1 where id1=1") + // Insert a bogus lookup row. + exec(t, conn, "insert into t1_id2_idx(id2, keyspace_id) values(6, 'aaa')") + // Insert 3 rows: + // first row will insert without changing lookup. + // second will insert and change lookup. + // third will be a fresh insert for main and lookup. + exec(t, conn, "begin") + exec(t, conn, "insert into t1(id1, id2) values(1,2), (3,6), (4,7)") + exec(t, conn, "commit") + qr = exec(t, conn, "select id1, id2 from t1 order by id1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(2)] [INT64(2) INT64(5)] [INT64(3) INT64(6)] [INT64(4) INT64(7)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select * from t1_id2_idx where id2=6") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(6) VARBINARY(\"N\\xb1\\x90ɢ\\xfa\\x16\\x9c\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select count(*) from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(5)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + exec(t, conn, "delete from t1 where id1=1") + exec(t, conn, "delete from t1 where id1=2") + exec(t, conn, "delete from t1 where id1=3") + exec(t, conn, "delete from t1 where id1=4") + exec(t, conn, "delete from t1_id2_idx where id2=4") +} + +func TestHashLookupMultiInsertIgnore(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + // conn2 is for queries that target shards. + conn2, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn2.Close() + + // DB should start out clean + qr := exec(t, conn, "select count(*) from t2_id4_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select count(*) from t2") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Try inserting a bunch of ids at once + exec(t, conn, "begin") + exec(t, conn, "insert ignore into t2(id3, id4) values(50,60), (30,40), (10,20)") + exec(t, conn, "commit") + + // Verify + qr = exec(t, conn, "select id3, id4 from t2 order by id3") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select id3, id4 from t2_id4_idx order by id3") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } +} + +func TestConsistentLookupUpdate(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + /* Simple insert. after this dml, the tables will contain the following: + t4 (id1, id2): + 1 2 + 2 2 + 3 3 + 4 3 + + t4_id2_idx (id2, id1, keyspace_id:id1): + 2 1 1 + 2 2 2 + 3 3 3 + 3 4 4 + */ + exec(t, conn, "insert into t4(id1, id2) values(1, '2'), (2, '2'), (3, '3'), (4, '3')") + qr := exec(t, conn, "select id1, id2 from t4 order by id1") + if got, want := fmt.Sprintf("%v", qr.Rows), `[[INT64(1) VARCHAR("2")] [INT64(2) VARCHAR("2")] [INT64(3) VARCHAR("3")] [INT64(4) VARCHAR("3")]]`; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + /* Updating a lookup column. after this dml, the tables will contain the following: + t4 (id1, id2): + 1 42 + 2 2 + 3 3 + 4 3 + + t4_id2_idx (id2, id1, keyspace_id:id1): + 42 1 1 + 2 2 2 + 3 3 3 + 3 4 4 + */ + exec(t, conn, "update t4 set id2 = '42' where id1 = 1") + qr = exec(t, conn, "select id1, id2 from t4 order by id1") + if got, want := fmt.Sprintf("%v", qr.Rows), `[[INT64(1) VARCHAR("42")] [INT64(2) VARCHAR("2")] [INT64(3) VARCHAR("3")] [INT64(4) VARCHAR("3")]]`; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + /* delete one specific keyspace id. after this dml, the tables will contain the following: + t4 (id1, id2): + 2 2 + 3 3 + 4 3 + + t4_id2_idx (id2, id1, keyspace_id:id1): + 2 2 2 + 3 3 3 + 3 4 4 + */ + exec(t, conn, "delete from t4 where id2 = '42'") + qr = exec(t, conn, "select * from t4 where id2 = '42'") + require.Empty(t, qr.Rows) + qr = exec(t, conn, "select * from t4_id2_idx where id2 = '42'") + require.Empty(t, qr.Rows) + + // delete all the rows. + exec(t, conn, "delete from t4") + qr = exec(t, conn, "select * from t4") + require.Empty(t, qr.Rows) + qr = exec(t, conn, "select * from t4_id2_idx") + require.Empty(t, qr.Rows) +} + +func TestSelectNullLookup(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + exec(t, conn, "begin") + exec(t, conn, "insert into t6(id1, id2) values(1, 'a'), (2, 'b'), (3, null)") + exec(t, conn, "commit") + + assertMatches(t, conn, "select id1, id2 from t6 order by id1", "[[INT64(1) VARCHAR(\"a\")] [INT64(2) VARCHAR(\"b\")] [INT64(3) NULL]]") + assertIsEmpty(t, conn, "select id1, id2 from t6 where id2 = null") + assertMatches(t, conn, "select id1, id2 from t6 where id2 is null", "[[INT64(3) NULL]]") + assertMatches(t, conn, "select id1, id2 from t6 where id2 is not null order by id1", "[[INT64(1) VARCHAR(\"a\")] [INT64(2) VARCHAR(\"b\")]]") + assertIsEmpty(t, conn, "select id1, id2 from t6 where id1 IN (null)") + assertMatches(t, conn, "select id1, id2 from t6 where id1 IN (1,2,null) order by id1", "[[INT64(1) VARCHAR(\"a\")] [INT64(2) VARCHAR(\"b\")]]") + assertIsEmpty(t, conn, "select id1, id2 from t6 where id1 NOT IN (1,null) order by id1") + assertMatches(t, conn, "select id1, id2 from t6 where id1 NOT IN (1,3)", "[[INT64(2) VARCHAR(\"b\")]]") + + exec(t, conn, "delete from t6") +} diff --git a/go/test/endtoend/vtgate/misc_test.go b/go/test/endtoend/vtgate/misc_test.go index 92f529b551f..aec30cd7b6f 100644 --- a/go/test/endtoend/vtgate/misc_test.go +++ b/go/test/endtoend/vtgate/misc_test.go @@ -34,141 +34,6 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" ) -func TestConsistentLookup(t *testing.T) { - defer cluster.PanicHandler(t) - ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) - require.Nil(t, err) - defer conn.Close() - // conn2 is for queries that target shards. - conn2, err := mysql.Connect(ctx, &vtParams) - require.Nil(t, err) - defer conn2.Close() - - // Simple insert. - exec(t, conn, "begin") - exec(t, conn, "insert into t1(id1, id2) values(1, 4)") - exec(t, conn, "commit") - qr := exec(t, conn, "select * from t1") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - qr = exec(t, conn, "select * from t1_id2_idx") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - - // Inserting again should fail. - exec(t, conn, "begin") - _, err = conn.ExecuteFetch("insert into t1(id1, id2) values(1, 4)", 1000, false) - exec(t, conn, "rollback") - require.Error(t, err) - mysqlErr := err.(*mysql.SQLError) - assert.Equal(t, 1062, mysqlErr.Num) - assert.Equal(t, "23000", mysqlErr.State) - assert.Contains(t, mysqlErr.Message, "Duplicate entry") - - // Simple delete. - exec(t, conn, "begin") - exec(t, conn, "delete from t1 where id1=1") - exec(t, conn, "commit") - qr = exec(t, conn, "select * from t1") - if got, want := fmt.Sprintf("%v", qr.Rows), "[]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - qr = exec(t, conn, "select * from t1_id2_idx") - if got, want := fmt.Sprintf("%v", qr.Rows), "[]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - - // Autocommit insert. - exec(t, conn, "insert into t1(id1, id2) values(1, 4)") - qr = exec(t, conn, "select * from t1") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - qr = exec(t, conn, "select id2 from t1_id2_idx") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - // Autocommit delete. - exec(t, conn, "delete from t1 where id1=1") - - // Dangling row pointing to existing keyspace id. - exec(t, conn, "insert into t1(id1, id2) values(1, 4)") - // Delete the main row only. - exec(t, conn2, "use `ks:-80`") - exec(t, conn2, "delete from t1 where id1=1") - // Verify the lookup row is still there. - qr = exec(t, conn, "select id2 from t1_id2_idx") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - // Insert should still succeed. - exec(t, conn, "begin") - exec(t, conn, "insert into t1(id1, id2) values(1, 4)") - exec(t, conn, "commit") - qr = exec(t, conn, "select * from t1") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - // Lookup row should be unchanged. - qr = exec(t, conn, "select * from t1_id2_idx") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - - // Dangling row not pointing to existing keyspace id. - exec(t, conn2, "use `ks:-80`") - exec(t, conn2, "delete from t1 where id1=1") - // Update the lookup row with bogus keyspace id. - exec(t, conn, "update t1_id2_idx set keyspace_id='aaa' where id2=4") - qr = exec(t, conn, "select * from t1_id2_idx") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"aaa\")]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - // Insert should still succeed. - exec(t, conn, "begin") - exec(t, conn, "insert into t1(id1, id2) values(1, 4)") - exec(t, conn, "commit") - qr = exec(t, conn, "select * from t1") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - // lookup row must be updated. - qr = exec(t, conn, "select * from t1_id2_idx") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - - // Update, but don't change anything. This should not deadlock. - exec(t, conn, "begin") - exec(t, conn, "update t1 set id2=4 where id1=1") - exec(t, conn, "commit") - qr = exec(t, conn, "select * from t1") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - qr = exec(t, conn, "select * from t1_id2_idx") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - - // Update, and change the lookup value. This should change main and lookup rows. - exec(t, conn, "begin") - exec(t, conn, "update t1 set id2=5 where id1=1") - exec(t, conn, "commit") - qr = exec(t, conn, "select * from t1") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(5)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - qr = exec(t, conn, "select * from t1_id2_idx") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(5) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - exec(t, conn, "delete from t1 where id1=1") -} - func TestDMLScatter(t *testing.T) { ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) @@ -349,188 +214,6 @@ func TestDMLIn(t *testing.T) { require.Empty(t, qr.Rows) } -func TestConsistentLookupMultiInsert(t *testing.T) { - defer cluster.PanicHandler(t) - ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) - require.Nil(t, err) - defer conn.Close() - // conn2 is for queries that target shards. - conn2, err := mysql.Connect(ctx, &vtParams) - require.Nil(t, err) - defer conn2.Close() - - exec(t, conn, "begin") - exec(t, conn, "insert into t1(id1, id2) values(1,4), (2,5)") - exec(t, conn, "commit") - qr := exec(t, conn, "select * from t1") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)] [INT64(2) INT64(5)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - qr = exec(t, conn, "select count(*) from t1_id2_idx") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(2)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - - // Delete one row but leave its lookup dangling. - exec(t, conn2, "use `ks:-80`") - exec(t, conn2, "delete from t1 where id1=1") - // Insert a bogus lookup row. - exec(t, conn, "insert into t1_id2_idx(id2, keyspace_id) values(6, 'aaa')") - // Insert 3 rows: - // first row will insert without changing lookup. - // second will insert and change lookup. - // third will be a fresh insert for main and lookup. - exec(t, conn, "begin") - exec(t, conn, "insert into t1(id1, id2) values(1,2), (3,6), (4,7)") - exec(t, conn, "commit") - qr = exec(t, conn, "select id1, id2 from t1 order by id1") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(2)] [INT64(2) INT64(5)] [INT64(3) INT64(6)] [INT64(4) INT64(7)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - qr = exec(t, conn, "select * from t1_id2_idx where id2=6") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(6) VARBINARY(\"N\\xb1\\x90ɢ\\xfa\\x16\\x9c\")]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - qr = exec(t, conn, "select count(*) from t1_id2_idx") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(5)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - exec(t, conn, "delete from t1 where id1=1") - exec(t, conn, "delete from t1 where id1=2") - exec(t, conn, "delete from t1 where id1=3") - exec(t, conn, "delete from t1 where id1=4") - exec(t, conn, "delete from t1_id2_idx where id2=4") -} - -func TestHashLookupMultiInsertIgnore(t *testing.T) { - defer cluster.PanicHandler(t) - ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) - require.Nil(t, err) - defer conn.Close() - // conn2 is for queries that target shards. - conn2, err := mysql.Connect(ctx, &vtParams) - require.Nil(t, err) - defer conn2.Close() - - // DB should start out clean - qr := exec(t, conn, "select count(*) from t2_id4_idx") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - qr = exec(t, conn, "select count(*) from t2") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - - // Try inserting a bunch of ids at once - exec(t, conn, "begin") - exec(t, conn, "insert ignore into t2(id3, id4) values(50,60), (30,40), (10,20)") - exec(t, conn, "commit") - - // Verify - qr = exec(t, conn, "select id3, id4 from t2 order by id3") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - qr = exec(t, conn, "select id3, id4 from t2_id4_idx order by id3") - if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } -} - -func TestConsistentLookupUpdate(t *testing.T) { - ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) - require.NoError(t, err) - defer conn.Close() - - /* Simple insert. after this dml, the tables will contain the following: - t4 (id1, id2): - 1 2 - 2 2 - 3 3 - 4 3 - - t4_id2_idx (id2, id1, keyspace_id:id1): - 2 1 1 - 2 2 2 - 3 3 3 - 3 4 4 - */ - exec(t, conn, "insert into t4(id1, id2) values(1, '2'), (2, '2'), (3, '3'), (4, '3')") - qr := exec(t, conn, "select id1, id2 from t4 order by id1") - if got, want := fmt.Sprintf("%v", qr.Rows), `[[INT64(1) VARCHAR("2")] [INT64(2) VARCHAR("2")] [INT64(3) VARCHAR("3")] [INT64(4) VARCHAR("3")]]`; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - - /* Updating a lookup column. after this dml, the tables will contain the following: - t4 (id1, id2): - 1 42 - 2 2 - 3 3 - 4 3 - - t4_id2_idx (id2, id1, keyspace_id:id1): - 42 1 1 - 2 2 2 - 3 3 3 - 3 4 4 - */ - exec(t, conn, "update t4 set id2 = '42' where id1 = 1") - qr = exec(t, conn, "select id1, id2 from t4 order by id1") - if got, want := fmt.Sprintf("%v", qr.Rows), `[[INT64(1) VARCHAR("42")] [INT64(2) VARCHAR("2")] [INT64(3) VARCHAR("3")] [INT64(4) VARCHAR("3")]]`; got != want { - t.Errorf("select:\n%v want\n%v", got, want) - } - - /* delete one specific keyspace id. after this dml, the tables will contain the following: - t4 (id1, id2): - 2 2 - 3 3 - 4 3 - - t4_id2_idx (id2, id1, keyspace_id:id1): - 2 2 2 - 3 3 3 - 3 4 4 - */ - exec(t, conn, "delete from t4 where id2 = '42'") - qr = exec(t, conn, "select * from t4 where id2 = '42'") - require.Empty(t, qr.Rows) - qr = exec(t, conn, "select * from t4_id2_idx where id2 = '42'") - require.Empty(t, qr.Rows) - - // delete all the rows. - exec(t, conn, "delete from t4") - qr = exec(t, conn, "select * from t4") - require.Empty(t, qr.Rows) - qr = exec(t, conn, "select * from t4_id2_idx") - require.Empty(t, qr.Rows) -} - -func TestSelectNullLookup(t *testing.T) { - ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) - require.NoError(t, err) - defer conn.Close() - - exec(t, conn, "begin") - exec(t, conn, "insert into t6(id1, id2) values(1, 'a'), (2, 'b'), (3, null)") - exec(t, conn, "commit") - - assertMatches(t, conn, "select id1, id2 from t6 order by id1", "[[INT64(1) VARCHAR(\"a\")] [INT64(2) VARCHAR(\"b\")] [INT64(3) NULL]]") - assertIsEmpty(t, conn, "select id1, id2 from t6 where id2 = null") - assertMatches(t, conn, "select id1, id2 from t6 where id2 is null", "[[INT64(3) NULL]]") - assertMatches(t, conn, "select id1, id2 from t6 where id2 is not null order by id1", "[[INT64(1) VARCHAR(\"a\")] [INT64(2) VARCHAR(\"b\")]]") - assertIsEmpty(t, conn, "select id1, id2 from t6 where id1 IN (null)") - assertMatches(t, conn, "select id1, id2 from t6 where id1 IN (1,2,null) order by id1", "[[INT64(1) VARCHAR(\"a\")] [INT64(2) VARCHAR(\"b\")]]") - assertIsEmpty(t, conn, "select id1, id2 from t6 where id1 NOT IN (1,null) order by id1") - assertMatches(t, conn, "select id1, id2 from t6 where id1 NOT IN (1,3)", "[[INT64(2) VARCHAR(\"b\")]]") - - exec(t, conn, "delete from t6") -} - func TestSelectNull(t *testing.T) { ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) diff --git a/go/vt/vtgate/vindexes/consistent_lookup.go b/go/vt/vtgate/vindexes/consistent_lookup.go index 14e5f84cc9a..1f58a9f3a7e 100644 --- a/go/vt/vtgate/vindexes/consistent_lookup.go +++ b/go/vt/vtgate/vindexes/consistent_lookup.go @@ -99,7 +99,7 @@ func (lu *ConsistentLookup) Map(vcursor VCursor, ids []sqltypes.Value) ([]key.De return out, nil } - results, err := lu.lkp.Lookup(vcursor, ids) + results, err := lu.lkp.Lookup(vcursor, ids, vtgatepb.CommitOrder_PRE) if err != nil { return nil, err } @@ -164,7 +164,7 @@ func (lu *ConsistentLookupUnique) Map(vcursor VCursor, ids []sqltypes.Value) ([] return out, nil } - results, err := lu.lkp.Lookup(vcursor, ids) + results, err := lu.lkp.Lookup(vcursor, ids, vtgatepb.CommitOrder_PRE) if err != nil { return nil, err } diff --git a/go/vt/vtgate/vindexes/consistent_lookup_test.go b/go/vt/vtgate/vindexes/consistent_lookup_test.go index efbf20da1ec..02ed5ede368 100644 --- a/go/vt/vtgate/vindexes/consistent_lookup_test.go +++ b/go/vt/vtgate/vindexes/consistent_lookup_test.go @@ -87,7 +87,7 @@ func TestConsistentLookupMap(t *testing.T) { t.Errorf("Map(): %#v, want %+v", got, want) } vc.verifyLog(t, []string{ - "Execute select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false", + "ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false", }) // Test query fail. @@ -132,7 +132,7 @@ func TestConsistentLookupUniqueMap(t *testing.T) { t.Errorf("Map(): %#v, want %+v", got, want) } vc.verifyLog(t, []string{ - "Execute select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false", + "ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false", }) // More than one result is invalid @@ -177,7 +177,7 @@ func TestConsistentLookupMapAbsent(t *testing.T) { t.Errorf("Map(): %#v, want %+v", got, want) } vc.verifyLog(t, []string{ - "Execute select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false", + "ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false", }) } diff --git a/go/vt/vtgate/vindexes/lookup.go b/go/vt/vtgate/vindexes/lookup.go index 6b2b6e0c066..589f5942705 100644 --- a/go/vt/vtgate/vindexes/lookup.go +++ b/go/vt/vtgate/vindexes/lookup.go @@ -84,7 +84,7 @@ func (ln *LookupNonUnique) Map(vcursor VCursor, ids []sqltypes.Value) ([]key.Des return out, nil } - results, err := ln.lkp.Lookup(vcursor, ids) + results, err := ln.lkp.Lookup(vcursor, ids, vtgatepb.CommitOrder_NORMAL) if err != nil { return nil, err } @@ -238,7 +238,7 @@ func (lu *LookupUnique) Map(vcursor VCursor, ids []sqltypes.Value) ([]key.Destin } return out, nil } - results, err := lu.lkp.Lookup(vcursor, ids) + results, err := lu.lkp.Lookup(vcursor, ids, vtgatepb.CommitOrder_NORMAL) if err != nil { return nil, err } diff --git a/go/vt/vtgate/vindexes/lookup_hash.go b/go/vt/vtgate/vindexes/lookup_hash.go index 4eeb510862f..2f76db4df7a 100644 --- a/go/vt/vtgate/vindexes/lookup_hash.go +++ b/go/vt/vtgate/vindexes/lookup_hash.go @@ -118,7 +118,7 @@ func (lh *LookupHash) Map(vcursor VCursor, ids []sqltypes.Value) ([]key.Destinat return out, nil } - results, err := lh.lkp.Lookup(vcursor, ids) + results, err := lh.lkp.Lookup(vcursor, ids, vtgatepb.CommitOrder_NORMAL) if err != nil { return nil, err } @@ -274,7 +274,7 @@ func (lhu *LookupHashUnique) Map(vcursor VCursor, ids []sqltypes.Value) ([]key.D return out, nil } - results, err := lhu.lkp.Lookup(vcursor, ids) + results, err := lhu.lkp.Lookup(vcursor, ids, vtgatepb.CommitOrder_NORMAL) if err != nil { return nil, err } diff --git a/go/vt/vtgate/vindexes/lookup_internal.go b/go/vt/vtgate/vindexes/lookup_internal.go index 4e808910d78..a139fb187eb 100644 --- a/go/vt/vtgate/vindexes/lookup_internal.go +++ b/go/vt/vtgate/vindexes/lookup_internal.go @@ -68,11 +68,14 @@ func (lkp *lookupInternal) Init(lookupQueryParams map[string]string, autocommit, } // Lookup performs a lookup for the ids. -func (lkp *lookupInternal) Lookup(vcursor VCursor, ids []sqltypes.Value) ([]*sqltypes.Result, error) { +func (lkp *lookupInternal) Lookup(vcursor VCursor, ids []sqltypes.Value, co vtgatepb.CommitOrder) ([]*sqltypes.Result, error) { if vcursor == nil { return nil, fmt.Errorf("cannot perform lookup: no vcursor provided") } results := make([]*sqltypes.Result, 0, len(ids)) + if lkp.Autocommit { + co = vtgatepb.CommitOrder_AUTOCOMMIT + } if !ids[0].IsIntegral() && !ids[0].IsBinary() { // for non integral and binary type, fallback to send query per id for _, id := range ids { @@ -84,10 +87,6 @@ func (lkp *lookupInternal) Lookup(vcursor VCursor, ids []sqltypes.Value) ([]*sql lkp.FromColumns[0]: vars, } var result *sqltypes.Result - co := vtgatepb.CommitOrder_NORMAL - if lkp.Autocommit { - co = vtgatepb.CommitOrder_AUTOCOMMIT - } result, err = vcursor.Execute("VindexLookup", lkp.sel, bindVars, false /* rollbackOnError */, co) if err != nil { return nil, fmt.Errorf("lookup.Map: %v", err) @@ -109,22 +108,18 @@ func (lkp *lookupInternal) Lookup(vcursor VCursor, ids []sqltypes.Value) ([]*sql bindVars := map[string]*querypb.BindVariable{ lkp.FromColumns[0]: vars, } - co := vtgatepb.CommitOrder_NORMAL - if lkp.Autocommit { - co = vtgatepb.CommitOrder_AUTOCOMMIT - } result, err := vcursor.Execute("VindexLookup", lkp.sel, bindVars, false /* rollbackOnError */, co) if err != nil { return nil, fmt.Errorf("lookup.Map: %v", err) } resultMap := make(map[string][][]sqltypes.Value) for _, row := range result.Rows { - resultMap[string(row[0].ToString())] = append(resultMap[string(row[0].ToString())], []sqltypes.Value{row[1]}) + resultMap[row[0].ToString()] = append(resultMap[row[0].ToString()], []sqltypes.Value{row[1]}) } for _, id := range ids { results = append(results, &sqltypes.Result{ - Rows: resultMap[string(id.ToString())], + Rows: resultMap[id.ToString()], }) } } diff --git a/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go b/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go index f0fc787d8b0..a3a8401fc00 100644 --- a/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go +++ b/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go @@ -123,7 +123,7 @@ func (lh *LookupUnicodeLooseMD5Hash) Map(vcursor VCursor, ids []sqltypes.Value) if err != nil { return nil, err } - results, err := lh.lkp.Lookup(vcursor, ids) + results, err := lh.lkp.Lookup(vcursor, ids, vtgatepb.CommitOrder_NORMAL) if err != nil { return nil, err } @@ -290,7 +290,7 @@ func (lhu *LookupUnicodeLooseMD5HashUnique) Map(vcursor VCursor, ids []sqltypes. if err != nil { return nil, err } - results, err := lhu.lkp.Lookup(vcursor, ids) + results, err := lhu.lkp.Lookup(vcursor, ids, vtgatepb.CommitOrder_NORMAL) if err != nil { return nil, err }