Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vttablet: fast and reliable state transitions #7011

Merged
merged 18 commits into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/tablet/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ healthcheck:
unhealthyThresholdSeconds: 7200 # unhealthy_threshold

gracePeriods:
transactionShutdownSeconds: 0 # transaction_shutdown_grace_period
transitionSeconds: 0 # serving_state_grace_period
shutdownSeconds: 0 # shutdown_grace_period
transitionSeconds: 0 # serving_state_grace_period

replicationTracker:
mode: disable # enable_replication_reporter
Expand Down
3 changes: 1 addition & 2 deletions go/cmd/vttablet/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ var (
<a href="/debug/tablet_plans">Schema&nbsp;Query&nbsp;Plans</a></br>
<a href="/debug/query_stats">Schema&nbsp;Query&nbsp;Stats</a></br>
<a href="/queryz">Query&nbsp;Stats</a></br>
<a href="/streamqueryz">Streaming&nbsp;Query&nbsp;Stats</a></br>
</td>
<td width="25%" border="">
<a href="/debug/consolidations">Consolidations</a></br>
Expand All @@ -76,7 +75,7 @@ var (
<td width="25%" border="">
<a href="/healthz">Health Check</a></br>
<a href="/debug/health">Query Service Health Check</a></br>
<a href="/streamqueryz">Current Stream Queries</a></br>
<a href="/livequeryz/">Real-time Queries</a></br>
<a href="/debug/status_details">JSON Status Details</a></br>
</td>
</tr>
Expand Down
18 changes: 18 additions & 0 deletions go/pools/numbered.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,24 @@ func (nu *Numbered) GetAll() (vals []interface{}) {
return vals
}

// GetByFilter returns a list of resources that match the filter.
// It does not return any resources that are already locked.
func (nu *Numbered) GetByFilter(purpose string, match func(val interface{}) bool) (vals []interface{}) {
nu.mu.Lock()
defer nu.mu.Unlock()
for _, nw := range nu.resources {
if nw.inUse || !nw.enforceTimeout {
continue
}
if match(nw.val) {
nw.inUse = true
nw.purpose = purpose
vals = append(vals, nw.val)
}
}
return vals
}

// GetOutdated returns a list of resources that are older than age, and locks them.
// It does not return any resources that are already locked.
func (nu *Numbered) GetOutdated(age time.Duration, purpose string) (vals []interface{}) {
Expand Down
62 changes: 38 additions & 24 deletions go/pools/numbered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,48 @@ import (
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNumbered(t *testing.T) {
func TestNumberedGeneral(t *testing.T) {
id := int64(0)
p := NewNumbered()

var err error
if err = p.Register(id, id, true); err != nil {
t.Errorf("Error %v", err)
}
if err = p.Register(id, id, true); err.Error() != "already present" {
t.Errorf("want 'already present', got '%v'", err)
}
err := p.Register(id, id, true)
require.NoError(t, err)

err = p.Register(id, id, true)
assert.Contains(t, "already present", err.Error())

var v interface{}
if v, err = p.Get(id, "test"); err != nil {
t.Errorf("Error %v", err)
}
if v.(int64) != id {
t.Errorf("want %v, got %v", id, v.(int64))
}
if _, err = p.Get(id, "test1"); err.Error() != "in use: test" {
t.Errorf("want 'in use: test', got '%v'", err)
}
v, err = p.Get(id, "test")
require.NoError(t, err)
assert.Equal(t, id, v.(int64))

_, err = p.Get(id, "test1")
assert.Contains(t, "in use: test", err.Error())

p.Put(id, true)
if _, err = p.Get(1, "test2"); err.Error() != "not found" {
t.Errorf("want 'not found', got '%v'", err)
}
_, err = p.Get(1, "test2")
assert.Contains(t, "not found", err.Error())
p.Unregister(1, "test") // Should not fail
p.Unregister(0, "test")
// p is now empty

if _, err = p.Get(0, "test3"); !(strings.HasPrefix(err.Error(), "ended at") && strings.HasSuffix(err.Error(), "(test)")) {
t.Errorf("want prefix 'ended at' and suffix '(test'), got '%v'", err)
t.Errorf("want prefix 'ended at' and suffix '(test)', got '%v'", err)
}

id = 0
p.Register(id, id, true)
id++
id = 1
p.Register(id, id, true)
id++
id = 2
p.Register(id, id, false)
time.Sleep(300 * time.Millisecond)
id++
id = 3
p.Register(id, id, true)
time.Sleep(100 * time.Millisecond)

Expand Down Expand Up @@ -105,6 +105,20 @@ func TestNumbered(t *testing.T) {
p.WaitForEmpty()
}

func TestNumberedGetByFilter(t *testing.T) {
p := NewNumbered()
p.Register(1, 1, true)
p.Register(2, 2, true)
p.Register(3, 3, true)
p.Get(1, "locked")

vals := p.GetByFilter("filtered", func(v interface{}) bool {
return v.(int) <= 2
})
want := []interface{}{2}
assert.Equal(t, want, vals)
}

/*
go test --test.run=XXX --test.bench=. --test.benchtime=10s

Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/tabletmanager/tablet_security_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestFallbackSecurityPolicy(t *testing.T) {
require.NoError(t, err)

// It should deny ADMIN role.
url := fmt.Sprintf("http://localhost:%d/streamqueryz/terminate", mTablet.HTTPPort)
url := fmt.Sprintf("http://localhost:%d/livequeryz/terminate", mTablet.HTTPPort)
assertNotAllowedURLTest(t, url)

// It should deny MONITORING role.
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestDenyAllSecurityPolicy(t *testing.T) {
require.NoError(t, err)

// It should deny ADMIN role.
url := fmt.Sprintf("http://localhost:%d/streamqueryz/terminate", mTablet.HTTPPort)
url := fmt.Sprintf("http://localhost:%d/livequeryz/terminate", mTablet.HTTPPort)
assertNotAllowedURLTest(t, url)

// It should deny MONITORING role.
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestReadOnlySecurityPolicy(t *testing.T) {
require.NoError(t, err)

// It should deny ADMIN role.
url := fmt.Sprintf("http://localhost:%d/streamqueryz/terminate", mTablet.HTTPPort)
url := fmt.Sprintf("http://localhost:%d/livequeryz/terminate", mTablet.HTTPPort)
assertNotAllowedURLTest(t, url)

// It should deny MONITORING role.
Expand Down
35 changes: 35 additions & 0 deletions go/timer/sleep_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright 2020 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package timer

import (
"context"
"time"
)

// SleepContext sleeps for the specified time period.
// If the context expires early, it returns an error.
func SleepContext(ctx context.Context, duration time.Duration) error {
timer := time.NewTimer(duration)
defer timer.Stop()
select {
case <-timer.C:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
43 changes: 43 additions & 0 deletions go/timer/sleep_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2020 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package timer

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSleepContext(t *testing.T) {
ctx := context.Background()
start := time.Now()
err := SleepContext(ctx, 10*time.Millisecond)
require.NoError(t, err)
assert.True(t, time.Since(start) > 10*time.Millisecond, time.Since(start))
assert.True(t, time.Since(start) < 100*time.Millisecond, time.Since(start))

ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
start = time.Now()
err = SleepContext(ctx, 100*time.Millisecond)
require.Error(t, err)
assert.True(t, time.Since(start) > 10*time.Millisecond, time.Since(start))
assert.True(t, time.Since(start) < 100*time.Millisecond, time.Since(start))
}
2 changes: 1 addition & 1 deletion go/vt/logz/logz_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
// a sortable table on a webpage.
//
// It is used by many internal vttablet pages e.g. /queryz, /querylogz, /schemaz
// /streamqueryz or /txlogz.
// /livequeryz or /txlogz.
//
// See tabletserver/querylogz.go for an example how to use it.
package logz
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
"time"
)

// StreamQuery contains the streaming query info.
type StreamQuery struct {
// LiveQuery contains the streaming query info.
type LiveQuery struct {
Type string
Query string
ContextHTML string
Start time.Time
Expand All @@ -34,11 +35,11 @@ type StreamQuery struct {
ShowTerminateLink bool
}

// StreamQueryz returns the contents of /streamqueryz?format=json.
// as a []StreamQuery. The function returns an empty list on error.
func StreamQueryz() []StreamQuery {
var out []StreamQuery
response, err := http.Get(fmt.Sprintf("%s/streamqueryz?format=json", ServerAddress))
// OLAPQueryz returns the contents of /livequeryz?format=json.
// as a []LiveQuery. The function returns an empty list on error.
func LiveQueryz() []LiveQuery {
var out []LiveQuery
response, err := http.Get(fmt.Sprintf("%s/livequeryz?format=json", ServerAddress))
if err != nil {
return out
}
Expand All @@ -49,7 +50,7 @@ func StreamQueryz() []StreamQuery {

// StreamTerminate terminates the specified streaming query.
func StreamTerminate(connID int) error {
response, err := http.Get(fmt.Sprintf("%s/streamqueryz/terminate?format=json&connID=%d", ServerAddress, connID))
response, err := http.Get(fmt.Sprintf("%s/livequeryz/terminate?format=json&connID=%d", ServerAddress, connID))
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/endtoend/framework/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"net/http"
"time"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/yaml2"

"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -111,6 +113,9 @@ func StartServer(connParams, connAppDebugParams mysql.ConnParams, dbName string)
config.TwoPCCoordinatorAddress = "fake"
config.HotRowProtection.Mode = tabletenv.Enable
config.TrackSchemaVersions = true
config.GracePeriods.ShutdownSeconds = 2
gotBytes, _ := yaml2.Marshal(config)
log.Infof("Config:\n%s", gotBytes)
Comment on lines +117 to +118
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this adding for debugging or want to publish this on startup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good to publish this on startup. VTTablet does the same thing. It's useful for knowing what the default values are for each flag. But it doesn't spam the output because it shows up only if you -alsologtostderr.

return StartCustomServer(connParams, connAppDebugParams, dbName, config)
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/endtoend/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ func TestStreamTerminate(t *testing.T) {
nil,
func(*sqltypes.Result) error {
if !called {
queries := framework.StreamQueryz()
queries := framework.LiveQueryz()
if l := len(queries); l != 1 {
t.Errorf("len(queries): %d, want 1", l)
return errors.New("no queries from StreamQueryz")
return errors.New("no queries from LiveQueryz")
}
err := framework.StreamTerminate(queries[0].ConnID)
if err != nil {
Expand Down
Loading