Skip to content

Commit

Permalink
Merge pull request #410 from cocowh/sub_node_filter
Browse files Browse the repository at this point in the history
support filter MotanCluster.refers before refresh LoadBalance
  • Loading branch information
rayzhang0603 authored Dec 26, 2024
2 parents a5f66bf + cf19c6f commit bb1f26f
Show file tree
Hide file tree
Showing 9 changed files with 613 additions and 11 deletions.
109 changes: 109 additions & 0 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"flag"
"fmt"
_ "fmt"
"github.com/weibocom/motan-go/cluster"
"github.com/weibocom/motan-go/config"
"github.com/weibocom/motan-go/endpoint"
vlog "github.com/weibocom/motan-go/log"
Expand All @@ -16,6 +17,7 @@ import (
_ "golang.org/x/net/context"
"io/ioutil"
"math/rand"
"mime/multipart"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -1092,3 +1094,110 @@ func TestRuntimeHandler(t *testing.T) {
t.Logf("key: %s", s)
}
}

func TestClusterRefersFilterHandler(t *testing.T) {
completeConfig := cluster.RefersFilterConfigList{
{
Group: "g1",
Service: "s1",
Mode: cluster.FilterModeExclude,
Rule: "127.0",
},
}
completeConfigExpect, _ := json.Marshal(completeConfig)
emptyConfig := cluster.RefersFilterConfigList{}
emptyConfigExpect, _ := json.Marshal(emptyConfig)
cases := []struct {
desc string
request *http.Request
assertFunc func(t *testing.T, resp *http.Response, respErr error)
}{
{
desc: "verify initial value",
request: func() *http.Request {
req, _ := http.NewRequest("GET", "http://127.0.0.1:8002/refers/filter/get", nil)
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf(`{"result":"ok","data":%s}`, emptyConfigExpect), string(bodyBytes))
},
},
{
desc: "set empty config",
request: func() *http.Request {
payload := &bytes.Buffer{}
writer := multipart.NewWriter(payload)
_ = writer.WriteField("config", "[]")
_ = writer.Close()
req, _ := http.NewRequest("POST", "http://127.0.0.1:8002/refers/filter/set", payload)
req.Header.Set("Content-Type", writer.FormDataContentType())
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, `{"result":"ok","data":"ok"}`, string(bodyBytes))
},
},
{
desc: "get empty config",
request: func() *http.Request {
req, _ := http.NewRequest("GET", "http://127.0.0.1:8002/refers/filter/get", nil)
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf(`{"result":"ok","data":%s}`, emptyConfigExpect), string(bodyBytes))
},
},
{
desc: "set complete config",
request: func() *http.Request {
payload := &bytes.Buffer{}
writer := multipart.NewWriter(payload)
_ = writer.WriteField("config", string(completeConfigExpect))
_ = writer.Close()
req, _ := http.NewRequest("POST", "http://127.0.0.1:8002/refers/filter/set", payload)
req.Header.Set("Content-Type", writer.FormDataContentType())
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, `{"result":"ok","data":"ok"}`, string(bodyBytes))
},
},
{
desc: "get complete config",
request: func() *http.Request {
req, _ := http.NewRequest("POST", "http://127.0.0.1:8002/refers/filter/get", nil)
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf(`{"result":"ok","data":%s}`, completeConfigExpect), string(bodyBytes))
},
},
}

for _, c := range cases {
t.Logf("case %s start", c.desc)
resp, err := http.DefaultClient.Do(c.request)
c.assertFunc(t, resp, err)
t.Logf("case %s finish", c.desc)
}
}
129 changes: 129 additions & 0 deletions cluster/RefersFilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package cluster

import (
"fmt"
motan "github.com/weibocom/motan-go/core"
vlog "github.com/weibocom/motan-go/log"
"strings"
)

const (
FilterModeInclude = "include"
FilterModeExclude = "exclude"
)

type RefersFilterConfigList []RefersFilterConfig

type RefersFilterConfig struct {
Group string `json:"group"`
Service string `json:"service"`
Mode string `json:"mode"`
Rule string `json:"rule"`
}

func (list RefersFilterConfigList) Verify() error {
for _, filter := range list {
if filter.Rule == "" {
return fmt.Errorf("exist empty rule in filter config")
}
if filter.Mode != FilterModeInclude && filter.Mode != FilterModeExclude {
return fmt.Errorf("invalid mode(%s) for rule(%s)", filter.Mode, filter.Rule)
}
}
return nil
}

func (list RefersFilterConfigList) ParseRefersFilters(clusterURL *motan.URL) RefersFilter {
var rules []RefersFilterConfig
for _, filter := range list {
if filter.Group != "" && filter.Group != clusterURL.Group {
continue
}
if filter.Service != "" && filter.Service != clusterURL.Path {
continue
}
rules = append(rules, filter)
vlog.Infof("add refer filter for group: %s, service: %s, rule: %s, mode: %s", clusterURL.Group, clusterURL.Path, filter.Rule, filter.Mode)
}
if len(rules) == 0 {
return nil
}
return NewDefaultRefersFilter(rules)
}

type RefersFilter interface {
Filter([]motan.EndPoint) []motan.EndPoint
}

type filterRule struct {
mode string
prefixes []string
}

func (fr filterRule) IsMatch(refer motan.EndPoint) bool {
for _, prefix := range fr.prefixes {
if strings.HasPrefix(refer.GetURL().Host, prefix) {
if fr.mode == FilterModeExclude {
vlog.Infof("filter refer: %s, rule: %s, mode: %s", refer.GetURL().GetIdentity(), prefix, fr.mode)
}
return true
}
}
return false
}

type DefaultRefersFilter struct {
excludeRules []filterRule
includeRules []filterRule
}

func NewDefaultRefersFilter(filterConfig []RefersFilterConfig) *DefaultRefersFilter {
var includeRules []filterRule
var excludeRules []filterRule
for _, config := range filterConfig {
rule := filterRule{
mode: config.Mode,
prefixes: motan.TrimSplit(config.Rule, ","),
}
if config.Mode == FilterModeExclude {
excludeRules = append(excludeRules, rule)
} else {
includeRules = append(includeRules, rule)
}
}
return &DefaultRefersFilter{includeRules: includeRules, excludeRules: excludeRules}
}

func (f *DefaultRefersFilter) Filter(refers []motan.EndPoint) []motan.EndPoint {
var newRefers []motan.EndPoint
for _, refer := range refers {
// discard refer if hit an exclude rule
excludeRefer := false
for _, excludeRule := range f.excludeRules {
excludeRefer = excludeRule.IsMatch(refer)
if excludeRefer {
break
}
}
if excludeRefer {
continue
}
// retained refer if hit an include rule
var includeRefer bool
if len(f.includeRules) == 0 {
includeRefer = true
}
for _, includeRule := range f.includeRules {
includeRefer = includeRule.IsMatch(refer)
if includeRefer {
break
}
}
if includeRefer {
newRefers = append(newRefers, refer)
} else {
vlog.Infof("no include rule hit. filter refer: %s", refer.GetURL().GetIdentity())
}
}
return newRefers
}
Loading

0 comments on commit bb1f26f

Please sign in to comment.