From bb69585d7fb4b5cbcc921fb843561e54b9c75a52 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 22 Dec 2021 18:15:08 +0800 Subject: [PATCH] close #4490 Signed-off-by: Cabinfever_B --- pkg/apiutil/apiutil.go | 12 +++++ pkg/apiutil/serverapi/middleware.go | 26 ++++++++++ server/api/server.go | 1 + server/self_protection.go | 80 +++++++++++++++++++++++++++++ server/server.go | 8 +++ server/server_test.go | 42 +++++++++++++++ tests/pdctl/global.go | 59 +++++++++++++++++++++ tests/server/api/api_test.go | 32 ++++++++++++ 8 files changed, 260 insertions(+) create mode 100644 server/self_protection.go create mode 100644 tests/pdctl/global.go diff --git a/pkg/apiutil/apiutil.go b/pkg/apiutil/apiutil.go index 2c61ed45f288..89c89f39e8da 100644 --- a/pkg/apiutil/apiutil.go +++ b/pkg/apiutil/apiutil.go @@ -21,6 +21,7 @@ import ( "net/http" "strconv" + "github.com/gorilla/mux" "github.com/pingcap/errcode" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -127,3 +128,14 @@ func ErrorResp(rd *render.Render, w http.ResponseWriter, err error) { rd.JSON(w, http.StatusInternalServerError, err.Error()) } } + +// GetHTTPRouteName return mux route name registered for ServiceName +func GetHTTPRouteName(req *http.Request) (string, bool) { + route := mux.CurrentRoute(req) + if route != nil { + if route.GetName() != "" { + return route.GetName(), true + } + } + return "", false +} diff --git a/pkg/apiutil/serverapi/middleware.go b/pkg/apiutil/serverapi/middleware.go index 973775d1214b..971e1a5d78c8 100644 --- a/pkg/apiutil/serverapi/middleware.go +++ b/pkg/apiutil/serverapi/middleware.go @@ -20,6 +20,7 @@ import ( "net/url" "strings" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server" @@ -79,6 +80,31 @@ func IsServiceAllowed(s *server.Server, group server.ServiceGroup) bool { return false } +type selfProtector struct { + s *server.Server +} + +// NewSelfProtector handle self-protection +func NewSelfProtector(s *server.Server) negroni.Handler { + return &selfProtector{s: s} +} + +func (protector *selfProtector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { + handler := protector.s.GetSelfProtectionHandler() + + failpoint.Inject("addSelfProtectionHTTPHeader", func() { + w.Header().Add("self-protection", "ok") + }) + + if handler == nil || handler.HandleHTTPSelfProtection(r) { + next(w, r) + } else { + // current plan will only deny request when over the speed limit + // todo: support more HTTP Status code + http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests) + } +} + type redirector struct { s *server.Server } diff --git a/server/api/server.go b/server/api/server.go index 528c20a08f3e..df2ed9ddf563 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -37,6 +37,7 @@ func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.S router.PathPrefix(apiPrefix).Handler(negroni.New( serverapi.NewRuntimeServiceValidator(svr, group), serverapi.NewRedirector(svr), + serverapi.NewSelfProtector(svr), negroni.Wrap(r)), ) diff --git a/server/self_protection.go b/server/self_protection.go new file mode 100644 index 000000000000..ceffdd9fdbef --- /dev/null +++ b/server/self_protection.go @@ -0,0 +1,80 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "net/http" + + "github.com/tikv/pd/pkg/apiutil" +) + +// SelfProtectionHandler is a framework to handle self protection mechanism +// Self-protection granularity is a logical service +type SelfProtectionHandler struct { + // ServiceHandlers is a map to store handler owned by different services + ServiceHandlers map[string]*serviceSelfProtectionHandler +} + +// NewSelfProtectionHandler returns a new SelfProtectionHandler with config +func NewSelfProtectionHandler(server *Server) *SelfProtectionHandler { + handler := &SelfProtectionHandler{ + ServiceHandlers: make(map[string]*serviceSelfProtectionHandler), + } + return handler +} + +// HandleHTTPSelfProtection is used to handle http api self protection +func (h *SelfProtectionHandler) HandleHTTPSelfProtection(req *http.Request) bool { + serviceName, findName := apiutil.GetHTTPRouteName(req) + // if path is not registered in router, go on process + if !findName { + return true + } + + serviceHandler, ok := h.ServiceHandlers[serviceName] + // if there is no service handler, go on process + if !ok { + return true + } + + httpHandler := &HTTPServiceSelfProtectionHandler{ + req: req, + handler: serviceHandler, + } + return httpHandler.Handle() +} + +// ServiceSelfProtectionHandler is a interface for define self-protection handler by service granularity +type ServiceSelfProtectionHandler interface { + Handle() bool +} + +// HTTPServiceSelfProtectionHandler implement ServiceSelfProtectionHandler to handle http +type HTTPServiceSelfProtectionHandler struct { + req *http.Request + handler *serviceSelfProtectionHandler +} + +// Handle implement ServiceSelfProtectionHandler defined function +func (h *HTTPServiceSelfProtectionHandler) Handle() bool { + // to be implemented + return true +} + +// serviceSelfProtectionHandler is a handler which is independent communication mode +type serviceSelfProtectionHandler struct { + // todo APIRateLimiter + // todo AuditLogger +} diff --git a/server/server.go b/server/server.go index b4aec089f278..7156f5dd829b 100644 --- a/server/server.go +++ b/server/server.go @@ -149,6 +149,8 @@ type Server struct { // tsoDispatcher is used to dispatch different TSO requests to // the corresponding forwarding TSO channel. tsoDispatcher sync.Map /* Store as map[string]chan *tsoRequest */ + // SelfProtectionHandler is used for PD self-pretection + selfProtectionHandler *SelfProtectionHandler } // HandlerBuilder builds a server HTTP handler. @@ -238,6 +240,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, serviceBuilders ...Ha } s.handler = newHandler(s) + s.selfProtectionHandler = NewSelfProtectionHandler(s) // Adjust etcd config. etcdCfg, err := s.cfg.GenEmbedEtcdConfig() @@ -668,6 +671,11 @@ func (s *Server) stopRaftCluster() { s.cluster.Stop() } +// GetSelfProtectionHandler returns the selfProt ectionHandler +func (s *Server) GetSelfProtectionHandler() *SelfProtectionHandler { + return s.selfProtectionHandler +} + // GetAddr returns the server urls for clients. func (s *Server) GetAddr() string { return s.cfg.AdvertiseClientUrls diff --git a/server/server_test.go b/server/server_test.go index 25f63f87ee34..ae71e54efa47 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -21,11 +21,14 @@ import ( "net/http" "testing" + "github.com/gorilla/mux" . "github.com/pingcap/check" + "github.com/tikv/pd/pkg/apiutil" "github.com/tikv/pd/pkg/assertutil" "github.com/tikv/pd/pkg/etcdutil" "github.com/tikv/pd/pkg/testutil" "github.com/tikv/pd/server/config" + "github.com/urfave/negroni" "go.etcd.io/etcd/embed" "go.etcd.io/etcd/pkg/types" "go.uber.org/goleak" @@ -235,3 +238,42 @@ func (s *testServerHandlerSuite) TestRegisterServerHandler(c *C) { bodyString := string(bodyBytes) c.Assert(bodyString, Equals, "Hello World\n") } + +func (s *testServerHandlerSuite) TestMuxRouterName(c *C) { + handler := func(ctx context.Context, s *Server) (http.Handler, ServiceGroup, error) { + r := mux.NewRouter() + r.HandleFunc("/pd/apis/mok/v1/router", func(w http.ResponseWriter, r *http.Request) { + RouterName, _ := apiutil.GetHTTPRouteName(r) + fmt.Fprintln(w, RouterName) + }).Name("Mux Router") + info := ServiceGroup{ + Name: "mok", + Version: "v1", + } + router := mux.NewRouter() + router.PathPrefix("/pd").Handler(negroni.New( + negroni.Wrap(r)), + ) + return router, info, nil + } + cfg := NewTestSingleConfig(checkerWithNilAssert(c)) + ctx, cancel := context.WithCancel(context.Background()) + svr, err := CreateServer(ctx, cfg, handler) + c.Assert(err, IsNil) + defer func() { + cancel() + svr.Close() + testutil.CleanServer(svr.cfg.DataDir) + }() + err = svr.Run() + c.Assert(err, IsNil) + resp, err := http.Get(fmt.Sprintf("%s/pd/apis/mok/v1/router", svr.GetAddr())) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + c.Assert(err, IsNil) + bodyBytes, err := io.ReadAll(resp.Body) + resp.Body.Close() + c.Assert(err, IsNil) + bodyString := string(bodyBytes) + c.Assert(bodyString, Equals, "Mux Router\n") +} diff --git a/tests/pdctl/global.go b/tests/pdctl/global.go new file mode 100644 index 000000000000..4e2c15f26476 --- /dev/null +++ b/tests/pdctl/global.go @@ -0,0 +1,59 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdctl + +import ( + "context" + "encoding/json" + "testing" + + . "github.com/pingcap/check" + "github.com/tikv/pd/server" + "github.com/tikv/pd/server/api" + "github.com/tikv/pd/tests" + "github.com/tikv/pd/tools/pd-ctl/pdctl" + cmd "github.com/tikv/pd/tools/pd-ctl/pdctl" +) + +func Test(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&globalTestSuite{}) + +type globalTestSuite struct{} + +func (s *globalTestSuite) SetUpSuite(c *C) { + server.EnableZap = true +} + +func (s *globalTestSuite) TestHealth(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 1) + c.Assert(err, IsNil) + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + pdAddr := cluster.GetConfig().GetClientURL() + cmd := cmd.GetRootCmd() + + args := []string{"-u", pdAddr, "health"} + output, err := pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + h := make([]api.Health, len(healths)) + c.Assert(json.Unmarshal(output, &h), IsNil) + c.Assert(err, IsNil) + c.Assert(h, DeepEquals, healths) +} diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 736063ae0a58..ea8fe8ecd7fe 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -22,6 +22,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/tikv/pd/pkg/apiutil/serverapi" "github.com/tikv/pd/pkg/testutil" "github.com/tikv/pd/pkg/typeutil" @@ -110,6 +111,37 @@ func (s *serverTestSuite) TestReconnect(c *C) { } } +var _ = Suite(&testSelfProtectorSuite{}) + +type testSelfProtectorSuite struct { + cleanup func() + cluster *tests.TestCluster +} + +func (s *testSelfProtectorSuite) SetUpSuite(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + server.EnableZap = true + s.cleanup = cancel + cluster, err := tests.NewTestCluster(ctx, 3) + c.Assert(err, IsNil) + c.Assert(cluster.RunInitialServers(), IsNil) + c.Assert(cluster.WaitLeader(), Not(HasLen), 0) + s.cluster = cluster +} + +func (s *testSelfProtectorSuite) TearDownSuite(c *C) { + s.cleanup() + s.cluster.Destroy() +} + +func (s *testSelfProtectorSuite) TestSelfProtect(c *C) { + c.Assert(failpoint.Enable("github.com/tikv/pd/pkg/apiutil/serverapi/addSelfProtectionHTTPHeader", "return(true)"), IsNil) + leader := s.cluster.GetServer(s.cluster.GetLeader()) + header := mustRequestSuccess(c, leader.GetServer()) + c.Assert(header.Get("self-protection"), Equals, "ok") + c.Assert(failpoint.Disable("github.com/tikv/pd/pkg/apiutil/serverapi/addSelfProtectionHTTPHeader"), IsNil) +} + var _ = Suite(&testRedirectorSuite{}) type testRedirectorSuite struct {