Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge dev #417

Merged
merged 23 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
71649f7
fix meta protocol support
Oct 9, 2024
7179c8d
Merge pull request #406 from cocowh/fix_meta
rayzhang0603 Oct 15, 2024
7b0290d
update runtime info
Oct 28, 2024
ee5a90c
Merge pull request #407 from cocowh/runtime_wh
rayzhang0603 Oct 30, 2024
272a654
fix ConvertToReqMessage
Nov 6, 2024
dee077b
Merge pull request #408 from cocowh/fix_ConvertToReqMessage
rayzhang0603 Nov 7, 2024
bbfcc98
opt lb unit test
Nov 7, 2024
f2bfe8e
Additional Notes
Dec 3, 2024
a5f66bf
Merge pull request #409 from cocowh/additional_notes
rayzhang0603 Dec 3, 2024
cf19c6f
support filter MotanCluster.refers before refresh LoadBalance
Dec 24, 2024
bb1f26f
Merge pull request #410 from cocowh/sub_node_filter
rayzhang0603 Dec 26, 2024
63c07e3
MotanCluster refers filter support subnet rule
Dec 27, 2024
f33fd57
Merge pull request #411 from cocowh/sub_node_filter
rayzhang0603 Dec 27, 2024
56f8c7b
dynamic params support regexp
Jan 2, 2025
7218b30
Merge pull request #412 from cocowh/dynamic_regexp_param
rayzhang0603 Jan 6, 2025
270c023
add log
Jan 6, 2025
aca0f05
Merge pull request #413 from cocowh/dynamic_regexp_param
rayzhang0603 Jan 6, 2025
72a9926
bugfix
Jan 6, 2025
4ff233f
Merge pull request #414 from cocowh/fix_refers_filter
rayzhang0603 Jan 6, 2025
9a5f372
fix dynamic regexp param match default
Jan 13, 2025
1526b6c
Merge pull request #415 from cocowh/fix_dynamic_regexp_param
rayzhang0603 Jan 13, 2025
240bd5a
dynamic regexp param rule by prefix
Jan 13, 2025
be689be
Merge pull request #416 from cocowh/fix_dynamic_regexp_param
rayzhang0603 Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
)

var (
startTime = time.Now()
initParamLock sync.Mutex
setAgentLock sync.Mutex
notFoundProviderCount int64 = 0
Expand Down Expand Up @@ -134,6 +135,14 @@ func (a *Agent) OnAfterStart(f func(a *Agent)) {
a.onAfterStart = append(a.onAfterStart, f)
}

func (a *Agent) GetRuntimeInfo() map[string]interface{} {
info := map[string]interface{}{}
info[motan.RuntimeStartTimeKey] = startTime
info[motan.RuntimeCpuPercentKey] = GetCpuPercent()
info[motan.RuntimeRssMemoryKey] = GetRssMemory()
return info
}

func (a *Agent) RegisterCommandHandler(f CommandHandler) {
a.commandHandlers = append(a.commandHandlers, f)
}
Expand Down
167 changes: 160 additions & 7 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"flag"
"fmt"
_ "fmt"
"github.com/weibocom/motan-go/cluster"
"github.com/weibocom/motan-go/config"
"github.com/weibocom/motan-go/endpoint"
vlog "github.com/weibocom/motan-go/log"
Expand All @@ -16,6 +17,7 @@ import (
_ "golang.org/x/net/context"
"io/ioutil"
"math/rand"
"mime/multipart"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -1064,18 +1066,169 @@ func TestRuntimeHandler(t *testing.T) {
err = json.Unmarshal(bodyBytes, &runtimeInfo)
assert.Nil(t, err)

for _, s := range []string{
core.RuntimeInstanceTypeKey,
for _, s := range defaultKeys {
info, ok := runtimeInfo[s]
assert.True(t, ok)
assert.NotNil(t, info)
t.Logf("key: %s", s)
}

// test param keys
keys := []string{
core.RuntimeExportersKey,
core.RuntimeClustersKey,
core.RuntimeHttpClustersKey,
core.RuntimeExtensionFactoryKey,
core.RuntimeServersKey,
core.RuntimeBasicKey,
} {
core.RuntimeHttpClustersKey,
}
resp, err = http.Get("http://127.0.0.1:8002/runtime/info" + "?keys=" + strings.Join(keys, ","))
assert.Nil(t, err)
bodyBytes, err = ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
err = resp.Body.Close()
assert.Nil(t, err)
err = json.Unmarshal(bodyBytes, &runtimeInfo)
assert.Nil(t, err)
for _, s := range keys {
info, ok := runtimeInfo[s]
assert.True(t, ok)
assert.NotNil(t, info)
t.Logf("key: %s", s)
}
}

func TestClusterRefersFilterHandler(t *testing.T) {
completeConfigBody := `[{"service":"com.weibo.api.test1","mode":"exclude","rule":"127.93.0.0/16"},{"service":"com.weibo.api.test2","mode":"exclude","rule":"127.93.0.0/16"}]`
var completeConfig cluster.RefersFilterConfigList
_ = json.Unmarshal([]byte(completeConfigBody), &completeConfig)
completeConfigExpect, _ := json.Marshal(completeConfig)
rewriteConfigBody := `[{"mode":"exclude","rule":"127.93.0.0/16"},{"service":"com.weibo.api.test2","mode":"exclude","rule":"127.93.0.0/16"}]`
var rewriteConfig cluster.RefersFilterConfigList
_ = json.Unmarshal([]byte(rewriteConfigBody), &rewriteConfig)
rewriteConfigExpect, _ := json.Marshal(rewriteConfig)
emptyConfigExpect := `[]`
cases := []struct {
desc string
request *http.Request
assertFunc func(t *testing.T, resp *http.Response, respErr error)
}{
{
desc: "verify initial value",
request: func() *http.Request {
req, _ := http.NewRequest("GET", "http://127.0.0.1:8002/refers/filter/get", nil)
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf(`{"result":"ok","data":%s}`, emptyConfigExpect), string(bodyBytes))
},
},
{
desc: "set empty config",
request: func() *http.Request {
payload := &bytes.Buffer{}
writer := multipart.NewWriter(payload)
_ = writer.WriteField("config", "[]")
_ = writer.Close()
req, _ := http.NewRequest("POST", "http://127.0.0.1:8002/refers/filter/set", payload)
req.Header.Set("Content-Type", writer.FormDataContentType())
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, `{"result":"ok","data":"ok"}`, string(bodyBytes))
},
},
{
desc: "get empty config",
request: func() *http.Request {
req, _ := http.NewRequest("GET", "http://127.0.0.1:8002/refers/filter/get", nil)
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf(`{"result":"ok","data":%s}`, emptyConfigExpect), string(bodyBytes))
},
},
{
desc: "set complete config",
request: func() *http.Request {
payload := &bytes.Buffer{}
writer := multipart.NewWriter(payload)
_ = writer.WriteField("config", completeConfigBody)
_ = writer.Close()
req, _ := http.NewRequest("POST", "http://127.0.0.1:8002/refers/filter/set", payload)
req.Header.Set("Content-Type", writer.FormDataContentType())
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, `{"result":"ok","data":"ok"}`, string(bodyBytes))
},
},
{
desc: "get complete config",
request: func() *http.Request {
req, _ := http.NewRequest("POST", "http://127.0.0.1:8002/refers/filter/get", nil)
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf(`{"result":"ok","data":%s}`, completeConfigExpect), string(bodyBytes))
},
},
{
desc: "rewrite config",
request: func() *http.Request {
payload := &bytes.Buffer{}
writer := multipart.NewWriter(payload)
_ = writer.WriteField("config", string(rewriteConfigBody))
_ = writer.Close()
req, _ := http.NewRequest("POST", "http://127.0.0.1:8002/refers/filter/set", payload)
req.Header.Set("Content-Type", writer.FormDataContentType())
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, `{"result":"ok","data":"ok"}`, string(bodyBytes))
},
},
{
desc: "get rewrite config",
request: func() *http.Request {
req, _ := http.NewRequest("POST", "http://127.0.0.1:8002/refers/filter/get", nil)
return req
}(),
assertFunc: func(t *testing.T, resp *http.Response, respErr error) {
assert.Nil(t, respErr)
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf(`{"result":"ok","data":%s}`, rewriteConfigExpect), string(bodyBytes))
},
},
}

for _, c := range cases {
t.Logf("case %s start", c.desc)
resp, err := http.DefaultClient.Do(c.request)
c.assertFunc(t, resp, err)
t.Logf("case %s finish", c.desc)
}
}
161 changes: 161 additions & 0 deletions cluster/RefersFilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package cluster

import (
"fmt"
motan "github.com/weibocom/motan-go/core"
vlog "github.com/weibocom/motan-go/log"
"net"
"strings"
)

const (
FilterModeInclude = "include"
FilterModeExclude = "exclude"
)

type RefersFilterConfigList []RefersFilterConfig

type RefersFilterConfig struct {
Group string `json:"group"`
Service string `json:"service"`
Mode string `json:"mode"`
Rule string `json:"rule"`
}

func (list RefersFilterConfigList) Verify() error {
for _, filter := range list {
if filter.Rule == "" {
return fmt.Errorf("exist empty rule in filter config")
}
if filter.Mode != FilterModeInclude && filter.Mode != FilterModeExclude {
return fmt.Errorf("invalid mode(%s) for rule(%s)", filter.Mode, filter.Rule)
}
}
return nil
}

func (list RefersFilterConfigList) ParseRefersFilters(clusterURL *motan.URL) RefersFilter {
var rules []RefersFilterConfig
for _, filter := range list {
if filter.Group != "" && filter.Group != clusterURL.Group {
continue
}
if filter.Service != "" && filter.Service != clusterURL.Path {
continue
}
rules = append(rules, filter)
vlog.Infof("add refer filter for group: %s, service: %s, rule: %s, mode: %s, filter group: %s, filter service: %s", clusterURL.Group, clusterURL.Path, filter.Rule, filter.Mode, filter.Group, filter.Service)
}
if len(rules) == 0 {
return nil
}
return NewDefaultRefersFilter(rules)
}

type RefersFilter interface {
Filter([]motan.EndPoint) []motan.EndPoint
}

type filterRule struct {
mode string
prefixes []string
subnets []*net.IPNet
}

func (fr filterRule) IsMatch(refer motan.EndPoint) bool {
for _, prefix := range fr.prefixes {
if strings.HasPrefix(refer.GetURL().Host, prefix) {
if fr.mode == FilterModeExclude {
vlog.Infof("filter refer: %s, prefix rule: %s, mode: %s", refer.GetURL().GetIdentity(), prefix, fr.mode)
}
return true
}
}
if len(fr.subnets) == 0 {
return false
}
ip := net.ParseIP(refer.GetURL().Host)
if ip == nil {
vlog.Errorf("invalid refer ip: %s", refer.GetURL().Host)
return false
}
for _, ipNet := range fr.subnets {
if ipNet.Contains(ip) {
if fr.mode == FilterModeExclude {
vlog.Infof("filter refer: %s, subnet rule: %s, mode: %s", refer.GetURL().GetIdentity(), ipNet.String(), fr.mode)
}
return true
}
}
return false
}

type DefaultRefersFilter struct {
excludeRules []filterRule
includeRules []filterRule
}

func NewDefaultRefersFilter(filterConfig []RefersFilterConfig) *DefaultRefersFilter {
var includeRules []filterRule
var excludeRules []filterRule
for _, config := range filterConfig {
rules := motan.TrimSplit(config.Rule, ",")
fr := filterRule{
mode: config.Mode,
prefixes: []string{},
subnets: []*net.IPNet{},
}
for _, item := range rules {
if strings.Contains(item, "/") {
_, subnet, err := net.ParseCIDR(item)
if err != nil {
vlog.Errorf("invalid subnet rule: %s", item)
continue
}
fr.subnets = append(fr.subnets, subnet)
} else {
fr.prefixes = append(fr.prefixes, item)
}
}
if config.Mode == FilterModeExclude {
excludeRules = append(excludeRules, fr)
} else {
includeRules = append(includeRules, fr)
}
}
return &DefaultRefersFilter{includeRules: includeRules, excludeRules: excludeRules}
}

func (f *DefaultRefersFilter) Filter(refers []motan.EndPoint) []motan.EndPoint {
var newRefers []motan.EndPoint
for _, refer := range refers {
// discard refer if hit an exclude rule
excludeRefer := false
for _, excludeRule := range f.excludeRules {
excludeRefer = excludeRule.IsMatch(refer)
if excludeRefer {
break
}
}
if excludeRefer {
continue
}
// retained refer if hit an include rule
var includeRefer bool
if len(f.includeRules) == 0 {
includeRefer = true
}
for _, includeRule := range f.includeRules {
includeRefer = includeRule.IsMatch(refer)
if includeRefer {
break
}
}
if includeRefer {
newRefers = append(newRefers, refer)
} else {
vlog.Infof("no include rule hit. filter refer: %s", refer.GetURL().GetIdentity())
}
}
return newRefers
}
Loading
Loading