Skip to content

Commit

Permalink
servivebus:uses the golang context instead of the stop channel
Browse files Browse the repository at this point in the history
Signed-off-by: zhangjie <iamkadisi@163.com>
  • Loading branch information
kadisi committed Nov 12, 2019
1 parent b2e92da commit 3592901
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 148 deletions.
51 changes: 0 additions & 51 deletions edge/pkg/edgehub/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,57 +45,6 @@ func (eh *EdgeHub) initial() (err error) {
return nil
}

//Start will start EdgeHub
func (eh *EdgeHub) start(ctx context.Context) {
config.InitEdgehubConfig()
for {
select {
case <-ctx.Done():
klog.Warning("EdgeHub stop")
return
default:

}
err := eh.initial()
if err != nil {
klog.Fatalf("failed to init controller: %v", err)
return
}
err = eh.chClient.Init()
if err != nil {
klog.Errorf("connection error, try again after 60s: %v", err)
time.Sleep(waitConnectionPeriod)
continue
}
// execute hook func after connect
eh.pubConnectInfo(true)
go eh.routeToEdge(ctx)
go eh.routeToCloud(ctx)
go eh.keepalive(ctx)

// wait the stop singal
// stop authinfo manager/websocket connection
<-eh.retryChan
eh.chClient.Uninit()

// execute hook fun after disconnect
eh.pubConnectInfo(false)

// sleep one period of heartbeat, then try to connect cloud hub again
time.Sleep(eh.config.HeartbeatPeriod * 2)

// clean channel
clean:
for {
select {
case <-eh.retryChan:
default:
break clean
}
}
}
}

func (eh *EdgeHub) addKeepChannel(msgID string) chan model.Message {
eh.keeperLock.Lock()
defer eh.keeperLock.Unlock()
Expand Down
53 changes: 52 additions & 1 deletion edge/pkg/edgehub/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package edgehub
import (
"context"
"sync"
"time"

"k8s.io/klog"

"github.com/kubeedge/beehive/pkg/core"
beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
Expand Down Expand Up @@ -52,7 +55,55 @@ func (eh *EdgeHub) Start(c *beehiveContext.Context) {
var ctx context.Context
eh.context = c
ctx, eh.cancel = context.WithCancel(context.Background())
eh.start(ctx)

config.InitEdgehubConfig()

for {
select {
case <-ctx.Done():
klog.Warning("EdgeHub stop")
return
default:

}
err := eh.initial()
if err != nil {
klog.Fatalf("failed to init controller: %v", err)
return
}
err = eh.chClient.Init()
if err != nil {
klog.Errorf("connection error, try again after 60s: %v", err)
time.Sleep(waitConnectionPeriod)
continue
}
// execute hook func after connect
eh.pubConnectInfo(true)
go eh.routeToEdge(ctx)
go eh.routeToCloud(ctx)
go eh.keepalive(ctx)

// wait the stop singal
// stop authinfo manager/websocket connection
<-eh.retryChan
eh.chClient.Uninit()

// execute hook fun after disconnect
eh.pubConnectInfo(false)

// sleep one period of heartbeat, then try to connect cloud hub again
time.Sleep(eh.config.HeartbeatPeriod * 2)

// clean channel
clean:
for {
select {
case <-eh.retryChan:
default:
break clean
}
}
}
}

//Cleanup sets up context cleanup through Edgehub name
Expand Down
151 changes: 83 additions & 68 deletions edge/pkg/servicebus/servicebus.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package servicebus

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -11,7 +12,7 @@ import (
"k8s.io/klog"

"github.com/kubeedge/beehive/pkg/core"
"github.com/kubeedge/beehive/pkg/core/context"
beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/beehive/pkg/core/model"
"github.com/kubeedge/kubeedge/edge/pkg/common/modules"
"github.com/kubeedge/kubeedge/edge/pkg/servicebus/util"
Expand All @@ -24,7 +25,8 @@ const (

// servicebus struct
type servicebus struct {
context *context.Context
context *beehiveContext.Context
cancel context.CancelFunc
}

// Register register servicebus
Expand All @@ -41,9 +43,11 @@ func (*servicebus) Group() string {
return modules.BusGroup
}

func (sb *servicebus) Start(c *context.Context) {
func (sb *servicebus) Start(c *beehiveContext.Context) {
// no need to call TopicInit now, we have fixed topic
var ctx context.Context
sb.context = c
ctx, sb.cancel = context.WithCancel(context.Background())
var htc = new(http.Client)
htc.Timeout = time.Second * 10

Expand All @@ -52,82 +56,93 @@ func (sb *servicebus) Start(c *context.Context) {

//Get message from channel
for {
if msg, ok := sb.context.Receive("servicebus"); ok == nil {
go func() {
klog.Infof("ServiceBus receive msg")
source := msg.GetSource()
if source != sourceType {
return
select {
case <-ctx.Done():
klog.Warning("ServiceBus stop")
return
default:

}
msg, err := sb.context.Receive("servicebus")
if err != nil {
klog.Warningf("servicebus receive msg error %v", err)
continue
}
go func() {
klog.Infof("ServiceBus receive msg")
source := msg.GetSource()
if source != sourceType {
return
}
resource := msg.GetResource()
r := strings.Split(resource, ":")
if len(r) != 2 {
m := "the format of resource " + resource + " is incorrect"
klog.Warningf(m)
code := http.StatusBadRequest
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
resource := msg.GetResource()
r := strings.Split(resource, ":")
if len(r) != 2 {
m := "the format of resource " + resource + " is incorrect"
klog.Warningf(m)
code := http.StatusBadRequest
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
return
}
content, err := json.Marshal(msg.GetContent())
if err != nil {
klog.Errorf("marshall message content failed %v", err)
m := "error to marshal request msg content"
code := http.StatusBadRequest
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
content, err := json.Marshal(msg.GetContent())
if err != nil {
klog.Errorf("marshall message content failed %v", err)
m := "error to marshal request msg content"
code := http.StatusBadRequest
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
return
}
var httpRequest util.HTTPRequest
if err := json.Unmarshal(content, &httpRequest); err != nil {
m := "error to parse http request"
code := http.StatusBadRequest
klog.Errorf(m, err)
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
var httpRequest util.HTTPRequest
if err := json.Unmarshal(content, &httpRequest); err != nil {
m := "error to parse http request"
code := http.StatusBadRequest
klog.Errorf(m, err)
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
return
}
operation := msg.GetOperation()
targetURL := "http://127.0.0.1:" + r[0] + "/" + r[1]
resp, err := uc.HTTPDo(operation, targetURL, httpRequest.Header, httpRequest.Body)
if err != nil {
m := "error to call service"
code := http.StatusNotFound
klog.Errorf(m, err)
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
operation := msg.GetOperation()
targetURL := "http://127.0.0.1:" + r[0] + "/" + r[1]
resp, err := uc.HTTPDo(operation, targetURL, httpRequest.Header, httpRequest.Body)
if err != nil {
m := "error to call service"
code := http.StatusNotFound
klog.Errorf(m, err)
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
return
}
resp.Body = http.MaxBytesReader(nil, resp.Body, maxBodySize)
resBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
if err.Error() == "http: request body too large" {
err = fmt.Errorf("response body too large")
}
resp.Body = http.MaxBytesReader(nil, resp.Body, maxBodySize)
resBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
if err.Error() == "http: request body too large" {
err = fmt.Errorf("response body too large")
}
m := "error to receive response, err: " + err.Error()
code := http.StatusInternalServerError
klog.Errorf(m, err)
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
m := "error to receive response, err: " + err.Error()
code := http.StatusInternalServerError
klog.Errorf(m, err)
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
}

response := util.HTTPResponse{Header: resp.Header, StatusCode: resp.StatusCode, Body: resBody}
responseMsg := model.NewMessage(msg.GetID())
responseMsg.Content = response
responseMsg.SetRoute("servicebus", modules.UserGroup)
sb.context.SendToGroup(modules.HubGroup, *responseMsg)
}()
}
response := util.HTTPResponse{Header: resp.Header, StatusCode: resp.StatusCode, Body: resBody}
responseMsg := model.NewMessage(msg.GetID())
responseMsg.Content = response
responseMsg.SetRoute("servicebus", modules.UserGroup)
sb.context.SendToGroup(modules.HubGroup, *responseMsg)
}()
}
}

func (sb *servicebus) Cleanup() {
sb.cancel()
sb.context.Cleanup(sb.Name())
}

Expand Down
6 changes: 3 additions & 3 deletions edge/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"k8s.io/klog"

"github.com/kubeedge/beehive/pkg/core"
"github.com/kubeedge/beehive/pkg/core/context"
beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/beehive/pkg/core/model"
"github.com/kubeedge/kubeedge/edge/pkg/common/message"
"github.com/kubeedge/kubeedge/edge/pkg/common/modules"
Expand All @@ -29,7 +29,7 @@ func Register() {
}

type testManager struct {
context *context.Context
context *beehiveContext.Context
moduleWait *sync.WaitGroup
}

Expand Down Expand Up @@ -217,7 +217,7 @@ func (tm *testManager) configmapHandler(w http.ResponseWriter, req *http.Request
}
}

func (tm *testManager) Start(c *context.Context) {
func (tm *testManager) Start(c *beehiveContext.Context) {
tm.context = c
defer tm.Cleanup()

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ require (
github.com/ghodss/yaml v1.0.0
github.com/go-chassis/go-archaius v0.20.0
github.com/go-chassis/go-chassis v1.7.1
github.com/go-chassis/paas-lager v1.1.0 // indirect
github.com/go-mesh/openlogging v1.0.1-0.20181205082104-3d418c478b2d
github.com/godbus/dbus v0.0.0-20181101234600-2ff6f7ffd60f // indirect
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.3.2
github.com/google/cadvisor v0.33.2-0.20190411163913-9db8c7dee20a
github.com/google/go-cmp v0.3.1 // indirect
github.com/google/uuid v1.1.1
github.com/googleapis/gnostic v0.3.0 // indirect
github.com/gophercloud/gophercloud v0.1.0 // indirect
github.com/gorilla/mux v1.7.3
github.com/gorilla/websocket v1.4.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
Expand All @@ -52,7 +53,6 @@ require (
github.com/imdario/mergo v0.3.7 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/json-iterator/go v1.1.7 // indirect
github.com/kadisi/test v0.0.0-20191111013547-b3fc1244c581 // indirect
github.com/karrick/godirwalk v1.10.12 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kubeedge/beehive v0.0.0
Expand Down
Loading

0 comments on commit 3592901

Please sign in to comment.