Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix weighted lb #396

Merged
merged 5 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ motan-refer:
test-refer:
group: hello
path: helloService
protocol: motanV1Compatible
protocol: motan2
registry: direct
serialization: breeze
asyncInitConnection: false
Expand All @@ -196,7 +196,7 @@ motan-refer:
test-refer:
registry: local
serialization: breeze
protocol: motanV1Compatible
protocol: motan2
group: hello
path: helloService
requestTimeout: 3000
Expand Down Expand Up @@ -559,7 +559,7 @@ func TestNotFoundProvider(t *testing.T) {
notFoundService := "notFoundService"
request := meshClient.BuildRequest(notFoundService, "test", []interface{}{})
epUrl := &core.URL{
Protocol: endpoint.MotanV1Compatible,
Protocol: endpoint.Motan2,
Host: "127.0.0.1",
Port: 9982,
Path: notFoundService,
Expand Down
57 changes: 57 additions & 0 deletions config/compatible.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#only for unit test!!
motan-agent:
port: 9981
mport: 8002
#log_dir: "/data1/logs/agentlog/"
log_dir: "./logs"
registry: "vintage"
application: "pc-yf-test"

motan-server:
testkey:testv

motan-client:
testkey:testv

#config of registries
motan-registry:
vintage:
protocol: vintage
host: 10.**.**.**
port: 8090
registryRetryPeriod: 30000
registrySessionTimeout: 10000
requestTimeout: 5000
consul:
protocol: consul
host: 10.**.**.**
port: 8090
direct:
protocol: direct
host: 10.**.**.**
port: 8013

#conf of basic refers
motan-basicRefer:
mybasicRefer:
group: basic-group
registry: "vintage"
requestTimeout: 1000
haStrategy: failover
loadbalance: random
filter: "accessLog,metrics"
maxClientConnection: 10
minClientConnection: 1
retries: 0
application: pc

#conf of refers
motan-refer:
status-rpc-json:
path: com.weibo.api.test.service.TestRpc
group: test-group
registry: vintage
serialization: simple
protocol: motanV1Compatible
version: 0.1
basicRefer: mybasicRefer
6 changes: 6 additions & 0 deletions core/globalContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,12 @@ func (c *Context) parseSubGroupSuffix(urlMap map[string]*URL) {
func (c *Context) parseRefers() {
referUrls := c.basicConfToURLs(refersSection)
c.parseSubGroupSuffix(referUrls)
//TODO: to compatible with removed protocol: motanV1Compatible
for k := range referUrls {
if referUrls[k].Protocol == "motanV1Compatible" {
referUrls[k].Protocol = "motan"
}
}
c.RefersURLs = referUrls
}

Expand Down
9 changes: 9 additions & 0 deletions core/globalContext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ func TestGetContext(t *testing.T) {
assert.Equal(t, "motan-demo-rpc", rs.ServiceURLs["mytest-motan2"].Group, "parse serivce key fail")
}

func TestCompatible(t *testing.T) {
rs := &Context{ConfigFile: "../config/compatible.yaml"}
rs.Initialize()
assert.NotNil(t, rs.RefersURLs)
for _, j := range rs.RefersURLs {
assert.Equal(t, j.Protocol, "motan")
}
}

func TestNewContext(t *testing.T) {
configFile := filepath.Join("testdata", "app.yaml")
pool1Context := NewContext(configFile, "app", "app-idc1")
Expand Down
6 changes: 3 additions & 3 deletions core/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (u *URL) IsMatch(service, group, protocol, version string) bool {
}
// for motan v1 request, parameter protocol should be empty
if protocol != "" {
if u.Protocol == "motanV1Compatible" {
if u.Protocol == "motan2" {
if protocol != "motan2" && protocol != "motan" {
return false
}
Expand Down Expand Up @@ -351,8 +351,8 @@ func (u *URL) CanServe(other *URL) bool {
}

func (u *URL) CanServeProtocol(other *URL) bool {
// motanV1Compatible should compatible with motan2, motan and motanV1Compatible
if other.Protocol == "motanV1Compatible" && (u.Protocol == "motan2" || u.Protocol == "motan" || u.Protocol == "motanV1Compatible") {
// motan2 is compatible with motan
if other.Protocol == "motan" && u.Protocol == "motan2" {
return true
}
return u.Protocol == other.Protocol || u.Protocol == ProtocolLocal
Expand Down
10 changes: 5 additions & 5 deletions core/url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,22 @@ func TestCanServe(t *testing.T) {
url1.Path = ""
url2.Path = ""
url1.Protocol = "motan2"
url2.Protocol = "motanV1Compatible"
url2.Protocol = "motan"
if !url1.CanServe(url2) {
t.Fatalf("url CanServe testFail url1: %+v, url2: %+v\n", url1, url2)
}
url1.Protocol = "motan"
url2.Protocol = "motanV1Compatible"
if !url1.CanServe(url2) {
url2.Protocol = "motan2"
if url1.CanServe(url2) {
t.Fatalf("url CanServe testFail url1: %+v, url2: %+v\n", url1, url2)
}
url1.Protocol = "local"
url2.Protocol = "motanV1Compatible"
url2.Protocol = "motan2"
if !url1.CanServe(url2) {
t.Fatalf("url CanServe testFail url1: %+v, url2: %+v\n", url1, url2)
}
url1.Protocol = "abc"
url2.Protocol = "motanV1Compatible"
url2.Protocol = "motan2"
if url1.CanServe(url2) {
t.Fatalf("url CanServe testFail url1: %+v, url2: %+v\n", url1, url2)
}
Expand Down
4 changes: 0 additions & 4 deletions dynamicConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ func (c *DynamicConfigurer) doUnregister(url *core.URL) error {
}

func (c *DynamicConfigurer) Subscribe(url *core.URL) error {
// use motanV1Compatible to compatible with protocol: motan
if url.Protocol == "motan" {
url.Protocol = "motanV1Compatible"
}
err := c.doSubscribe(url)
if err != nil {
return err
Expand Down
13 changes: 7 additions & 6 deletions endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (

// ext name
const (
Grpc = "grpc"
Motan2 = "motan2"
Local = "local"
Mock = "mockEndpoint"
MotanV1Compatible = "motanV1Compatible"
Grpc = "grpc"
Motan2 = "motan2"
// Motan1 endpoint is to support dynamic configuration. Golang cannot build motan1 request
Motan1 = "motan"
Local = "local"
Mock = "mockEndpoint"
)

const (
Expand All @@ -41,7 +42,7 @@ func RegistDefaultEndpoint(extFactory motan.ExtensionFactory) {
return &MockEndpoint{URL: url}
})

extFactory.RegistExtEndpoint(MotanV1Compatible, func(url *motan.URL) motan.EndPoint {
extFactory.RegistExtEndpoint(Motan1, func(url *motan.URL) motan.EndPoint {
return &MotanCommonEndpoint{url: url}
})
}
Expand Down
18 changes: 9 additions & 9 deletions endpoint/motanCommonEndpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func TestGetV1Name(t *testing.T) {
url := &motan.URL{Port: 8989, Protocol: "motanV1Compatible"}
url := &motan.URL{Port: 8989, Protocol: "motan"}
url.PutParam(motan.TimeOutKey, "100")
url.PutParam(motan.AsyncInitConnection, "false")
ep := &MotanCommonEndpoint{}
Expand All @@ -33,7 +33,7 @@ func TestGetV1Name(t *testing.T) {
}

func TestV1RecordErrEmptyThreshold(t *testing.T) {
url := &motan.URL{Port: 8989, Protocol: "motanV1Compatible"}
url := &motan.URL{Port: 8989, Protocol: "motan"}
url.PutParam(motan.TimeOutKey, "100")
url.PutParam(motan.ClientConnectionKey, "1")
url.PutParam(motan.AsyncInitConnection, "false")
Expand All @@ -55,7 +55,7 @@ func TestV1RecordErrEmptyThreshold(t *testing.T) {
}

func TestV1RecordErrWithErrThreshold(t *testing.T) {
url := &motan.URL{Port: 8989, Protocol: "motanV1Compatible"}
url := &motan.URL{Port: 8989, Protocol: "motan"}
url.PutParam(motan.TimeOutKey, "100")
url.PutParam(motan.ErrorCountThresholdKey, "5")
url.PutParam(motan.ClientConnectionKey, "1")
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestV1RecordErrWithErrThreshold(t *testing.T) {
}

func TestNotFoundProviderCircuitBreaker(t *testing.T) {
url := &motan.URL{Port: 8989, Protocol: "motanV1Compatible"}
url := &motan.URL{Port: 8989, Protocol: "motan"}
url.PutParam(motan.TimeOutKey, "2000")
url.PutParam(motan.ErrorCountThresholdKey, "5")
url.PutParam(motan.ClientConnectionKey, "10")
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestNotFoundProviderCircuitBreaker(t *testing.T) {
}

func TestMotanCommonEndpoint_SuccessCall(t *testing.T) {
url := &motan.URL{Port: 8989, Protocol: "motanV1Compatible"}
url := &motan.URL{Port: 8989, Protocol: "motan"}
url.PutParam(motan.TimeOutKey, "2000")
url.PutParam(motan.ErrorCountThresholdKey, "1")
url.PutParam(motan.ClientConnectionKey, "1")
Expand All @@ -158,7 +158,7 @@ func TestMotanCommonEndpoint_SuccessCall(t *testing.T) {
}

func TestMotanCommonEndpoint_ErrorCall(t *testing.T) {
url := &motan.URL{Port: 8989, Protocol: "motanV1Compatible"}
url := &motan.URL{Port: 8989, Protocol: "motan"}
url.PutParam(motan.TimeOutKey, "100")
url.PutParam(motan.ErrorCountThresholdKey, "1")
url.PutParam(motan.ClientConnectionKey, "1")
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestMotanCommonEndpoint_ErrorCall(t *testing.T) {
}

func TestMotanCommonEndpoint_RequestTimeout(t *testing.T) {
url := &motan.URL{Port: 8989, Protocol: "motanV1Compatible"}
url := &motan.URL{Port: 8989, Protocol: "motan"}
url.PutParam(motan.TimeOutKey, "100")
url.PutParam(motan.ErrorCountThresholdKey, "1")
url.PutParam(motan.ClientConnectionKey, "1")
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestMotanCommonEndpoint_RequestTimeout(t *testing.T) {
}

func TestV1LazyInit(t *testing.T) {
url := &motan.URL{Port: 8989, Protocol: "motanV1Compatible", Parameters: map[string]string{"lazyInit": "true"}}
url := &motan.URL{Port: 8989, Protocol: "motan", Parameters: map[string]string{"lazyInit": "true"}}
url.PutParam(motan.TimeOutKey, "100")
url.PutParam(motan.ErrorCountThresholdKey, "1")
url.PutParam(motan.ClientConnectionKey, "1")
Expand All @@ -242,7 +242,7 @@ func TestV1LazyInit(t *testing.T) {
}

func TestV1AsyncInit(t *testing.T) {
url := &motan.URL{Port: 8989, Protocol: "motanV1Compatible", Parameters: map[string]string{"asyncInitConnection": "true"}}
url := &motan.URL{Port: 8989, Protocol: "motan", Parameters: map[string]string{"asyncInitConnection": "true"}}
url.PutParam(motan.TimeOutKey, "100")
url.PutParam(motan.ErrorCountThresholdKey, "1")
url.PutParam(motan.ClientConnectionKey, "1")
Expand Down
12 changes: 11 additions & 1 deletion lb/weightedEpRefresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,17 @@ func refreshDynamicWeight(holders []*WeightedEpHolder, taskTimeout int64) bool {
defer close(finishChan)
go func() {
wg.Wait()
finishChan <- struct{}{}
// the chan might be closed
select {
case _, ok := <-finishChan:
if !ok {
// chan has been closed
return
}
default:
finishChan <- struct{}{}
return
}
}()
select {
case <-timer.C:
Expand Down
13 changes: 9 additions & 4 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
dynamicMeta = core.NewStringMap(30)
envMeta = make(map[string]string)
envPrefix = core.DefaultMetaPrefix
metaEmptyMap = make(map[string]string)
metaCache = cache.New(time.Second*time.Duration(defaultCacheExpireSecond), 30*time.Second)
notSupportCache = cache.New(time.Second*time.Duration(notSupportCacheExpireSecond), 30*time.Second)
ServiceNotSupportError = errors.New(core.ServiceNotSupport)
Expand All @@ -32,9 +33,8 @@ var (
"grpc-pb-json": true,
}
supportProtocols = map[string]bool{
"motan": true,
"motan2": true,
"motanV1Compatible": true,
"motan": true,
"motan2": true,
}
once = sync.Once{}
)
Expand Down Expand Up @@ -152,7 +152,12 @@ func getRemoteDynamicMeta(cacheKey string, endpoint core.EndPoint) (map[string]s
if err != nil {
return nil, err
}
return resp.GetValue().(map[string]string), nil
// multiple serialization might encode empty map into interface{}, not map[string]string
// in this case, return a public empty string map
if res, ok := resp.GetValue().(map[string]string); ok && res != nil {
return res, nil
}
return metaEmptyMap, nil
}

func getMetaServiceRequest() core.Request {
Expand Down
3 changes: 0 additions & 3 deletions provider/motanProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ func (m *MotanProvider) Initialize() {
vlog.Errorln("reverse proxy service port config error!")
return
}
if protocol == "motan2" || protocol == "motan" { // TODO temp compatible with motan1. remove if MotanCommonEndpoint as default Motan endpoint
protocol = "motanV1Compatible"
}
host := m.url.GetParam(ProxyHostKey, DefaultHost)
endpointURL := m.url.Copy()
endpointURL.Protocol = protocol
Expand Down
4 changes: 4 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ func (m *MSContext) Initialize() {
}
}

func (m *MSContext) GetContext() *motan.Context {
return m.context
}

// RegisterService register service with serviceId for config ref.
// the type.string will used as serviceId if sid is not set. e.g. 'packageName.structName'
func (m *MSContext) RegisterService(s interface{}, sid string) error {
Expand Down
Loading