diff --git a/server/core/storage.go b/server/core/storage.go index e647e7ba944..1fc5f6fc212 100644 --- a/server/core/storage.go +++ b/server/core/storage.go @@ -27,6 +27,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server/encryptionkm" @@ -501,10 +502,10 @@ func (s *Storage) RemoveServiceGCSafePoint(serviceID string) error { return s.Remove(key) } -func (s *Storage) initServiceGCSafePointForGCWorker() (*ServiceSafePoint, error) { +func (s *Storage) initServiceGCSafePointForGCWorker(initialValue uint64) (*ServiceSafePoint, error) { ssp := &ServiceSafePoint{ ServiceID: gcWorkerServiceSafePointID, - SafePoint: 0, + SafePoint: initialValue, ExpiredAt: math.MaxInt64, } if err := s.SaveServiceGCSafePoint(ssp); err != nil { @@ -522,16 +523,31 @@ func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, e return nil, err } if len(keys) == 0 { - // There's no service safepoint. Store an initial value for GC worker. - return s.initServiceGCSafePointForGCWorker() + // There's no service safepoint. It may be a new cluster, or upgraded from an older version where all service + // safepoints are missing. For the second case, we have no way to recover it. Store an initial value 0 for + // gc_worker. + return s.initServiceGCSafePointForGCWorker(0) } + hasGCWorker := false min := &ServiceSafePoint{SafePoint: math.MaxUint64} for i, key := range keys { ssp := &ServiceSafePoint{} if err := json.Unmarshal([]byte(values[i]), ssp); err != nil { return nil, err } + if ssp.ServiceID == gcWorkerServiceSafePointID { + hasGCWorker = true + // If gc_worker's expire time is incorrectly set, fix it. + if ssp.ExpiredAt != math.MaxInt64 { + ssp.ExpiredAt = math.MaxInt64 + err = s.SaveServiceGCSafePoint(ssp) + if err != nil { + return nil, errors.Trace(err) + } + } + } + if ssp.ExpiredAt < now.Unix() { s.Remove(key) continue @@ -541,6 +557,18 @@ func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, e } } + if min.SafePoint == math.MaxUint64 { + // There's no valid safepoints and we have no way to recover it. Just set gc_worker to 0. + log.Info("there are no valid service safepoints. init gc_worker's service safepoint to 0") + return s.initServiceGCSafePointForGCWorker(0) + } + + if !hasGCWorker { + // If there exists some service safepoints but gc_worker is missing, init it with the min value among all + // safepoints (including expired ones) + return s.initServiceGCSafePointForGCWorker(min.SafePoint) + } + return min, nil } diff --git a/server/core/storage_test.go b/server/core/storage_test.go index 8ccbbffda6f..d513da0d037 100644 --- a/server/core/storage_test.go +++ b/server/core/storage_test.go @@ -245,8 +245,21 @@ func (s *testKVSuite) TestLoadMinServiceGCSafePoint(c *C) { c.Assert(storage.SaveServiceGCSafePoint(ssp), IsNil) } + // gc_worker's safepoint will be automatically inserted when loading service safepoints. Here the returned + // safepoint can be either of "gc_worker" or "2". ssp, err := storage.LoadMinServiceGCSafePoint(time.Now()) c.Assert(err, IsNil) + c.Assert(ssp.SafePoint, Equals, uint64(2)) + + // Advance gc_worker's safepoint + c.Assert(storage.SaveServiceGCSafePoint(&ServiceSafePoint{ + ServiceID: "gc_worker", + ExpiredAt: math.MaxInt64, + SafePoint: 10, + }), IsNil) + + ssp, err = storage.LoadMinServiceGCSafePoint(time.Now()) + c.Assert(err, IsNil) c.Assert(ssp.ServiceID, Equals, "2") c.Assert(ssp.ExpiredAt, Equals, expireAt) c.Assert(ssp.SafePoint, Equals, uint64(2)) diff --git a/server/grpc_service.go b/server/grpc_service.go index dddc5136998..52889037b2e 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -855,7 +855,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd ExpiredAt: now.Unix() + request.TTL, SafePoint: request.SafePoint, } - if request.TTL == math.MaxInt64 { + if math.MaxInt64-now.Unix() <= request.TTL { ssp.ExpiredAt = math.MaxInt64 } if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil { diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 35f9a658dad..0403e76c209 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "math" + "path" "path/filepath" "sort" "strconv" @@ -911,6 +912,56 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) { _, err = s.client.UpdateServiceGCSafePoint(context.Background(), "", 1000, 15) c.Assert(err, NotNil) + + // Put some other safepoints to test fixing gc_worker's safepoint when there exists other safepoints. + _, err = s.client.UpdateServiceGCSafePoint(context.Background(), + "a", 1000, 11) + c.Assert(err, IsNil) + _, err = s.client.UpdateServiceGCSafePoint(context.Background(), + "b", 1000, 12) + c.Assert(err, IsNil) + _, err = s.client.UpdateServiceGCSafePoint(context.Background(), + "c", 1000, 13) + c.Assert(err, IsNil) + + // Force set invalid ttl to gc_worker + gcWorkerKey := path.Join("gc", "safe_point", "service", "gc_worker") + { + gcWorkerSsp := &core.ServiceSafePoint{ + ServiceID: "gc_worker", + ExpiredAt: -12345, + SafePoint: 10, + } + value, err := json.Marshal(gcWorkerSsp) + c.Assert(err, IsNil) + err = s.srv.GetStorage().Save(gcWorkerKey, string(value)) + c.Assert(err, IsNil) + } + + minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now()) + c.Assert(err, IsNil) + c.Assert(minSsp.ServiceID, Equals, "gc_worker") + c.Assert(minSsp.SafePoint, Equals, uint64(10)) + c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64)) + + // Force delete gc_worker, then the min service safepoint is 11 of "a". + err = s.srv.GetStorage().Remove(gcWorkerKey) + c.Assert(err, IsNil) + minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now()) + c.Assert(err, IsNil) + c.Assert(minSsp.SafePoint, Equals, uint64(11)) + // After calling LoadMinServiceGCS when "gc_worker"'s service safepoint is missing, "gc_worker"'s service safepoint + // will be newly created. + // Increase "a" so that "gc_worker" is the only minimum that will be returned by LoadMinServiceGCSafePoint. + _, err = s.client.UpdateServiceGCSafePoint(context.Background(), + "a", 1000, 14) + c.Assert(err, IsNil) + + minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now()) + c.Assert(err, IsNil) + c.Assert(minSsp.ServiceID, Equals, "gc_worker") + c.Assert(minSsp.SafePoint, Equals, uint64(11)) + c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64)) } func (s *testClientSuite) TestScatterRegion(c *C) {