Skip to content

Commit

Permalink
tm init: publish displayState
Browse files Browse the repository at this point in the history
Signed-off-by: deepthi <deepthi@planetscale.com>
  • Loading branch information
deepthi committed Sep 9, 2020
1 parent db4837a commit 2996c15
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 8 deletions.
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, healthCheckInterval ti
return nil
}

tm.tmState.Open(tm.BatchCtx)
tm.tmState.Open()
return nil
}

Expand Down Expand Up @@ -473,7 +473,7 @@ func (tm *TabletManager) checkMastership(ctx context.Context, si *topo.ShardInfo
tablet.MasterTermStartTime = oldTablet.MasterTermStartTime
})
} else {
log.Warningf("Shard master alias matches, but existing tablet is not master. Switching to master with the shard's master term start time: %v", oldTablet.MasterTermStartTime)
log.Warningf("Shard master alias matches, but existing tablet is not master. Switching from %v to master with the shard's master term start time: %v", oldTablet.Type, si.MasterTermStartTime)
tm.tmState.UpdateTablet(func(tablet *topodatapb.Tablet) {
tablet.Type = topodatapb.TabletType_MASTER
tablet.MasterTermStartTime = si.MasterTermStartTime
Expand Down Expand Up @@ -599,7 +599,7 @@ func (tm *TabletManager) handleRestore(ctx context.Context) (bool, error) {
if *restoreFromBackup {
go func() {
// Open the state manager after restore is done.
defer tm.tmState.Open(ctx)
defer tm.tmState.Open()

// restoreFromBackup will just be a regular action
// (same as if it was triggered remotely)
Expand Down
65 changes: 65 additions & 0 deletions go/vt/vttablet/tabletmanager/tm_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,71 @@ func TestStartDoesNotUpdateReplicationDataForTabletInWrongShard(t *testing.T) {
assert.Equal(t, 0, len(tablets))
}

func TestCheckTabletTypeResets(t *testing.T) {
defer func(saved time.Duration) { rebuildKeyspaceRetryInterval = saved }(rebuildKeyspaceRetryInterval)
rebuildKeyspaceRetryInterval = 10 * time.Millisecond

ctx := context.Background()
cell := "cell1"
ts := memorytopo.NewServer(cell)
alias := &topodatapb.TabletAlias{
Cell: "cell1",
Uid: 1,
}

// 1. Initialize the tablet as REPLICA.
// This will create the respective topology records.
tm := newTestTM(t, ts, 1, "ks", "0")
tablet := tm.Tablet()
ensureSrvKeyspace(t, ts, cell, "ks")
ti, err := ts.GetTablet(ctx, alias)
require.NoError(t, err)
assert.Equal(t, topodatapb.TabletType_REPLICA, ti.Type)
tm.Stop()

// 2. Update tablet record with tabletType RESTORE
_, err = ts.UpdateTabletFields(ctx, alias, func(t *topodatapb.Tablet) error {
t.Type = topodatapb.TabletType_RESTORE
return nil
})
require.NoError(t, err)
err = tm.Start(tablet, 0)
require.NoError(t, err)
assert.Equal(t, tm.tmState.tablet.Type, tm.tmState.displayState.tablet.Type)
ti, err = ts.GetTablet(ctx, alias)
require.NoError(t, err)
// Verify that it changes back to initTabletType
assert.Equal(t, topodatapb.TabletType_REPLICA, ti.Type)

// 3. Update shard's master to our alias, then try to init again.
// (This simulates the case where the MasterAlias in the shard record says
// that we are the master but the tablet record says otherwise. In that case,
// we become master by inheriting the shard record's timestamp.)
now := time.Now()
_, err = ts.UpdateShardFields(ctx, "ks", "0", func(si *topo.ShardInfo) error {
si.MasterAlias = alias
si.MasterTermStartTime = logutil.TimeToProto(now)
// Reassign to now for easier comparison.
now = si.GetMasterTermStartTime()
return nil
})
require.NoError(t, err)
si, err := tm.createKeyspaceShard(ctx)
require.NoError(t, err)
err = tm.checkMastership(ctx, si)
require.NoError(t, err)
assert.Equal(t, tm.tmState.tablet.Type, tm.tmState.displayState.tablet.Type)
err = tm.initTablet(ctx)
require.NoError(t, err)
assert.Equal(t, tm.tmState.tablet.Type, tm.tmState.displayState.tablet.Type)
ti, err = ts.GetTablet(ctx, alias)
require.NoError(t, err)
assert.Equal(t, topodatapb.TabletType_MASTER, ti.Type)
ter0 := ti.GetMasterTermStartTime()
assert.Equal(t, now, ter0)
tm.Stop()
}

func newTestTM(t *testing.T, ts *topo.Server, uid int, keyspace, shard string) *TabletManager {
t.Helper()
ctx := context.Background()
Expand Down
11 changes: 7 additions & 4 deletions go/vt/vttablet/tabletmanager/tm_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,28 @@ type tmState struct {
}

func newTMState(tm *TabletManager, tablet *topodatapb.Tablet) *tmState {
ctx, cancel := context.WithCancel(tm.BatchCtx)
return &tmState{
tm: tm,
displayState: displayState{
tablet: proto.Clone(tablet).(*topodatapb.Tablet),
},
tablet: tablet,
ctx: ctx,
cancel: cancel,
}
}

func (ts *tmState) Open(ctx context.Context) {
func (ts *tmState) Open() {
ts.mu.Lock()
defer ts.mu.Unlock()
if ts.isOpen {
return
}

ts.ctx, ts.cancel = context.WithCancel(ctx)
ts.isOpen = true
ts.updateLocked(ts.ctx)
ts.publishStateLocked(ctx)
ts.publishStateLocked(ts.ctx)
}

func (ts *tmState) Close() {
Expand Down Expand Up @@ -192,18 +194,19 @@ func (ts *tmState) UpdateTablet(update func(tablet *topodatapb.Tablet)) {
ts.mu.Lock()
defer ts.mu.Unlock()
update(ts.tablet)
ts.publishForDisplay()
}

func (ts *tmState) updateLocked(ctx context.Context) {
span, ctx := trace.NewSpan(ctx, "tmState.update")
defer span.Finish()
ts.publishForDisplay()

if !ts.isOpen {
return
}

terTime := logutil.ProtoToTime(ts.tablet.MasterTermStartTime)
ts.publishForDisplay()

// Disable TabletServer first so the nonserving state gets advertised
// before other services are shutdown.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/tm_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestStateOpenClose(t *testing.T) {
savedCtx := tm.tmState.ctx
tm.tmState.mu.Unlock()

tm.tmState.Open(context.Background())
tm.tmState.Open()

tm.tmState.mu.Lock()
assert.Equal(t, savedCtx, tm.tmState.ctx)
Expand Down

0 comments on commit 2996c15

Please sign in to comment.