Skip to content

Commit

Permalink
support filter MotanCluster.refers before refresh LoadBalance
Browse files Browse the repository at this point in the history
support filter MotanCluster.refers before refresh LoadBalance

support filter MotanCluster.refers before refresh LoadBalance

code review
  • Loading branch information
wuhua3 committed Dec 25, 2024
1 parent a5f66bf commit 3d9c4dd
Show file tree
Hide file tree
Showing 9 changed files with 611 additions and 11 deletions.
109 changes: 109 additions & 0 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"flag"
"fmt"
_ "fmt"
"github.com/weibocom/motan-go/cluster"
"github.com/weibocom/motan-go/config"
"github.com/weibocom/motan-go/endpoint"
vlog "github.com/weibocom/motan-go/log"
Expand All @@ -16,6 +17,7 @@ import (
_ "golang.org/x/net/context"
"io/ioutil"
"math/rand"
"mime/multipart"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -1092,3 +1094,110 @@ func TestRuntimeHandler(t *testing.T) {
t.Logf("key: %s", s)
}
}

func TestClusterRefersFilterHandler(t *testing.T) {
completeConfig := cluster.RefersFilterConfigList{
{
Group: "g1",
Service: "s1",
Mode: cluster.FilterModeExclude,
Rule: "127.0",
},
}
completeConfigExpect, _ := json.Marshal(completeConfig)
emptyConfig := &cluster.RefersFilterConfig{}
emptyConfigExpect, _ := json.Marshal(emptyConfig)
cases := []struct {
desc string
request *http.Request
assertFunc func(t *testing.T, resp *http.Response, respErr error)
}{
{
desc: "verify initial value",
request: func() *http.Request {
req, _ := http.NewRequest("GET", "http://127.0.0.1:8002/refers/filter/get", nil)
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf(`{"result":"ok","data":%s}`, emptyConfigExpect), string(bodyBytes))
},
},
{
desc: "set empty config",
request: func() *http.Request {
payload := &bytes.Buffer{}
writer := multipart.NewWriter(payload)
_ = writer.WriteField("config", "[]")
_ = writer.Close()
req, _ := http.NewRequest("POST", "http://127.0.0.1:8002/refers/filter/set", payload)
req.Header.Set("Content-Type", writer.FormDataContentType())
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, `{"result":"ok","data":"ok"}`, string(bodyBytes))
},
},
{
desc: "get empty config",
request: func() *http.Request {
req, _ := http.NewRequest("GET", "http://127.0.0.1:8002/refers/filter/get", nil)
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf(`{"result":"ok","data":%s}`, emptyConfigExpect), string(bodyBytes))
},
},
{
desc: "set complete config",
request: func() *http.Request {
payload := &bytes.Buffer{}
writer := multipart.NewWriter(payload)
_ = writer.WriteField("config", string(completeConfigExpect))
_ = writer.Close()
req, _ := http.NewRequest("POST", "http://127.0.0.1:8002/refers/filter/set", payload)
req.Header.Set("Content-Type", writer.FormDataContentType())
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, `{"result":"ok","data":"ok"}`, string(bodyBytes))
},
},
{
desc: "get complete config",
request: func() *http.Request {
req, _ := http.NewRequest("POST", "http://127.0.0.1:8002/refers/filter/get", nil)
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf(`{"result":"ok","data":%s}`, emptyConfigExpect), string(bodyBytes))
},
},
}

for _, c := range cases {
t.Logf("case %s start", c.desc)
resp, err := http.DefaultClient.Do(c.request)
c.assertFunc(t, resp, err)
t.Logf("case %s finish", c.desc)
}
}
129 changes: 129 additions & 0 deletions cluster/RefersFilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package cluster

import (
"fmt"
motan "github.com/weibocom/motan-go/core"
vlog "github.com/weibocom/motan-go/log"
"strings"
)

const (
FilterModeInclude = "include"
FilterModeExclude = "exclude"
)

type RefersFilterConfigList []RefersFilterConfig

type RefersFilterConfig struct {
Group string `json:"group"`
Service string `json:"service"`
Mode string `json:"mode"`
Rule string `json:"rule"`
}

func (list RefersFilterConfigList) Verify() error {
for _, filter := range list {
if filter.Rule == "" {
return fmt.Errorf("exist empty rule in filter config")
}
if filter.Mode != FilterModeInclude && filter.Mode != FilterModeExclude {
return fmt.Errorf("invalid mode(%s) for rule(%s)", filter.Mode, filter.Rule)
}
}
return nil
}

func (list RefersFilterConfigList) ParseRefersFilters(clusterURL *motan.URL) RefersFilter {
var rules []RefersFilterConfig
for _, filter := range list {
if filter.Group != "" && filter.Group != clusterURL.Group {
continue
}
if filter.Service != "" && filter.Service != clusterURL.Path {
continue
}
rules = append(rules, filter)
vlog.Infof("add refer filter for group: %s, service: %s, rule: %s, mode: %s", clusterURL.Group, clusterURL.Path, filter.Rule, filter.Mode)
}
if len(rules) == 0 {
return nil
}
return NewDefaultRefersFilter(rules)
}

type RefersFilter interface {
Filter([]motan.EndPoint) []motan.EndPoint
}

type filterRule struct {
mode string
prefixes []string
}

func (fr filterRule) IsMatch(refer motan.EndPoint) bool {
for _, prefix := range fr.prefixes {
if strings.HasPrefix(refer.GetURL().Host, prefix) {
if fr.mode == FilterModeExclude {
vlog.Infof("filter refer: %s, rule: %s, mode: %s", refer.GetURL().GetIdentity(), prefix, fr.mode)
}
return true
}
}
return false
}

type DefaultRefersFilter struct {
excludeRules []filterRule
includeRules []filterRule
}

func NewDefaultRefersFilter(filterConfig []RefersFilterConfig) *DefaultRefersFilter {
var includeRules []filterRule
var excludeRules []filterRule
for _, config := range filterConfig {
rule := filterRule{
mode: config.Mode,
prefixes: motan.TrimSplit(config.Rule, ","),
}
if config.Mode == FilterModeExclude {
excludeRules = append(excludeRules, rule)
} else {
includeRules = append(includeRules, rule)
}
}
return &DefaultRefersFilter{includeRules: includeRules, excludeRules: excludeRules}
}

func (f *DefaultRefersFilter) Filter(refers []motan.EndPoint) []motan.EndPoint {
var newRefers []motan.EndPoint
for _, refer := range refers {
// discard refer if hit an exclude rule
excludeRefer := false
for _, excludeRule := range f.excludeRules {
if excludeRefer {
break
}
excludeRefer = excludeRule.IsMatch(refer)
}
if excludeRefer {
continue
}
// retained refer if hit an include rule
var includeRefer bool
if len(f.includeRules) == 0 {
includeRefer = true
}
for _, includeRule := range f.includeRules {
if includeRefer {
break
}
includeRefer = includeRule.IsMatch(refer)
}
if includeRefer {
newRefers = append(newRefers, refer)
} else {
vlog.Infof("no include rule hit. filter refer: %s", refer.GetURL().GetIdentity())
}
}
return newRefers
}
Loading

0 comments on commit 3d9c4dd

Please sign in to comment.