Skip to content

Commit

Permalink
Merge pull request #226 from YanHeDoki/master
Browse files Browse the repository at this point in the history
修复心跳和新路由不兼容的问题
  • Loading branch information
aceld authored May 8, 2023
2 parents 766f595 + 04599f5 commit 2e55bfe
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 25 deletions.
3 changes: 3 additions & 0 deletions ziface/iheartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ type IHeartbeatChecker interface {
SetHeartbeatMsgFunc(HeartBeatMsgFunc)
SetHeartbeatFunc(HeartBeatFunc)
BindRouter(uint32, IRouter)
BindRouterSlices(uint32, ...RouterHandler)
Start()
Stop()
SendHeartBeatMsg() error
BindConn(IConnection)
Clone() IHeartbeatChecker
MsgID() uint32
Router() IRouter
RouterSlices() []RouterHandler
}

// User-defined method for handling heartbeat detection messages
Expand All @@ -31,6 +33,7 @@ type HeartBeatOption struct {
OnRemoteNotAlive OnRemoteNotAlive // User-defined method for handling remote connections that are not alive(用户自定义的远程连接不存活时的处理方法)
HeadBeatMsgID uint32 // User-defined ID for heartbeat detection messages(用户自定义的心跳检测消息ID)
Router IRouter // User-defined business processing route for heartbeat detection messages(用户自定义的心跳检测消息业务处理路由)
RouterSlices []RouterHandler //新版本的路由处理函数的集合
}

const (
Expand Down
32 changes: 16 additions & 16 deletions ziface/irouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
package ziface

/*
IRouter is the interface for message routing. The route is the processing
business method set by the framework user for this connection. The IRequest
in the route includes the connection information and the request data
information for this connection.
(路由接口, 这里面路由是 使用框架者给该链接自定的 处理业务方法
路由里的IRequest 则包含用该链接的链接信息和该链接的请求数据信息)
IRouter is the interface for message routing. The route is the processing
business method set by the framework user for this connection. The IRequest
in the route includes the connection information and the request data
information for this connection.
(路由接口, 这里面路由是 使用框架者给该链接自定的 处理业务方法
路由里的IRequest 则包含用该链接的链接信息和该链接的请求数据信息)
*/
type IRouter interface {
PreHandle(request IRequest) //Hook method before processing conn business(在处理conn业务之前的钩子方法)
Expand All @@ -18,31 +18,31 @@ type IRouter interface {
}

/*
RouterHandler is a method slice collection style router. Unlike the old version,
the new version only saves the router method collection, and the specific execution
is handed over to the IRequest of each request.
(方法切片集合式路路由
不同于旧版 新版本仅保存路由方法集合,具体执行交给每个请求的 IRequest)
RouterHandler is a method slice collection style router. Unlike the old version,
the new version only saves the router method collection, and the specific execution
is handed over to the IRequest of each request.
(方法切片集合式路路由
不同于旧版 新版本仅保存路由方法集合,具体执行交给每个请求的 IRequest)
*/
type RouterHandler func(request IRequest)
type IRouterSlices interface {
// Add global components (添加全局组件)
Use(Handlers ...RouterHandler)

// Add a route
// Add a route (添加业务处理器集合)
AddHandler(msgId uint32, handlers ...RouterHandler)

// Router group management
// Router group management (路由分组管理,并且会返回一个组管理器)
Group(start, end uint32, Handlers ...RouterHandler) IGroupRouterSlices

// Get the method set collection for processing
// Get the method set collection for processing (获得当前的所有注册在MsgId的处理器集合)
GetHandlers(MsgId uint32) ([]RouterHandler, bool)
}

type IGroupRouterSlices interface {
// Add global components
// Add global components (添加全局组件)
Use(Handlers ...RouterHandler)

// Add group routing components
// Add group routing components (添加业务处理器集合)
AddHandler(MsgId uint32, Handlers ...RouterHandler)
}
30 changes: 24 additions & 6 deletions znet/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ type HeartbeatChecker struct {

onRemoteNotAlive ziface.OnRemoteNotAlive // User-defined method for handling remote connections that are not alive (用户自定义的远程连接不存活时的处理方法)

msgID uint32 // Heartbeat message ID(心跳的消息ID)
router ziface.IRouter // User-defined heartbeat message business processing router(用户自定义的心跳检测消息业务处理路由)

conn ziface.IConnection // Bound connection(绑定的链接)
msgID uint32 // Heartbeat message ID(心跳的消息ID)
router ziface.IRouter // User-defined heartbeat message business processing router(用户自定义的心跳检测消息业务处理路由)
routerSlices []ziface.RouterHandler //(用户自定义的心跳检测消息业务处理新路由)
conn ziface.IConnection // Bound connection(绑定的链接)

beatFunc ziface.HeartBeatFunc // // User-defined heartbeat sending function(用户自定义心跳发送函数)
}

/*
Default callback routing business for receiving remote heartbeat messages
(收到remote心跳消息的默认回调路由业务)
Default callback routing business for receiving remote heartbeat messages
(收到remote心跳消息的默认回调路由业务)
*/
type HeatBeatDefaultRouter struct {
BaseRouter
Expand All @@ -36,6 +36,11 @@ func (r *HeatBeatDefaultRouter) Handle(req ziface.IRequest) {
req.GetConnection().RemoteAddr(), req.GetMsgID(), string(req.GetData()))
}

func HeatBeatDefaultHandle(req ziface.IRequest) {
zlog.Ins().InfoF("Recv Heartbeat from %s, MsgID = %+v, Data = %s",
req.GetConnection().RemoteAddr(), req.GetMsgID(), string(req.GetData()))
}

func makeDefaultMsg(conn ziface.IConnection) []byte {
msg := fmt.Sprintf("heartbeat [%s->%s]", conn.LocalAddr(), conn.RemoteAddr())
return []byte(msg)
Expand All @@ -57,6 +62,7 @@ func NewHeartbeatChecker(interval time.Duration) ziface.IHeartbeatChecker {
onRemoteNotAlive: notAliveDefaultFunc,
msgID: ziface.HeartBeatDefaultMsgID,
router: &HeatBeatDefaultRouter{},
routerSlices: []ziface.RouterHandler{HeatBeatDefaultHandle},
beatFunc: nil,
}

Expand Down Expand Up @@ -88,6 +94,13 @@ func (h *HeartbeatChecker) BindRouter(msgID uint32, router ziface.IRouter) {
}
}

func (h *HeartbeatChecker) BindRouterSlices(msgID uint32, handlers ...ziface.RouterHandler) {
if len(handlers) > 0 && msgID != ziface.HeartBeatDefaultMsgID {
h.msgID = msgID
h.routerSlices = append(h.routerSlices, handlers...)
}
}

func (h *HeartbeatChecker) start() {
ticker := time.NewTicker(h.interval)
for {
Expand Down Expand Up @@ -159,6 +172,7 @@ func (h *HeartbeatChecker) Clone() ziface.IHeartbeatChecker {
onRemoteNotAlive: h.onRemoteNotAlive,
msgID: h.msgID,
router: h.router,
routerSlices: h.routerSlices,
conn: nil, // The bound connection needs to be reassigned
}

Expand All @@ -172,3 +186,7 @@ func (h *HeartbeatChecker) MsgID() uint32 {
func (h *HeartbeatChecker) Router() ziface.IRouter {
return h.router
}

func (h *HeartbeatChecker) RouterSlices() []ziface.RouterHandler {
return h.routerSlices
}
21 changes: 18 additions & 3 deletions znet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,12 @@ func (s *Server) StartHeartBeat(interval time.Duration) {
checker := NewHeartbeatChecker(interval)

// Add the heartbeat check router. (添加心跳检测的路由)
s.AddRouter(checker.MsgID(), checker.Router())
//检测当前路由模式
if s.RouterSlicesMode {
s.AddRouterSlices(checker.MsgID(), checker.RouterSlices()...)
} else {
s.AddRouter(checker.MsgID(), checker.Router())
}

// Bind the heartbeat checker to the server. (server绑定心跳检测器)
s.hc = checker
Expand All @@ -514,11 +519,21 @@ func (s *Server) StartHeartBeatWithOption(interval time.Duration, option *ziface
if option != nil {
checker.SetHeartbeatMsgFunc(option.MakeMsg)
checker.SetOnRemoteNotAlive(option.OnRemoteNotAlive)
checker.BindRouter(option.HeadBeatMsgID, option.Router)
//检测当前路由模式
if s.RouterSlicesMode {
checker.BindRouterSlices(option.HeadBeatMsgID, option.RouterSlices...)
} else {
checker.BindRouter(option.HeadBeatMsgID, option.Router)
}
}

// Add the heartbeat checker's router to the server's router (添加心跳检测的路由)
s.AddRouter(checker.MsgID(), checker.Router())
//检测当前路由模式
if s.RouterSlicesMode {
s.AddRouterSlices(checker.MsgID(), checker.RouterSlices()...)
} else {
s.AddRouter(checker.MsgID(), checker.Router())
}

// Bind the server with the heartbeat checker (server绑定心跳检测器)
s.hc = checker
Expand Down

0 comments on commit 2e55bfe

Please sign in to comment.