From 6254172b2e5b5ced7ce10ec01f1b9cfd618e2754 Mon Sep 17 00:00:00 2001 From: Aceld Date: Mon, 27 Mar 2023 18:52:12 +0800 Subject: [PATCH] fix send updateActive --- examples/zinx_heartbeat/client.go | 40 +++++++++++++++------ examples/zinx_heartbeat/client_default.go | 10 ++---- examples/zinx_heartbeat/server.go | 42 ++++++++++++++++++----- znet/connection.go | 11 ++++-- 4 files changed, 74 insertions(+), 29 deletions(-) diff --git a/examples/zinx_heartbeat/client.go b/examples/zinx_heartbeat/client.go index 4aeeeee1..585a579b 100644 --- a/examples/zinx_heartbeat/client.go +++ b/examples/zinx_heartbeat/client.go @@ -2,29 +2,49 @@ package main import ( "fmt" + "github.com/aceld/zinx/ziface" "github.com/aceld/zinx/znet" - "os" - "os/signal" "time" ) -func wait() { - // close - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill) - sig := <-c - fmt.Println("===exit===", sig) +// 用户自定义的心跳检测消息处理方法 +func myClientHeartBeatMsg(conn ziface.IConnection) []byte { + return []byte("heartbeat, I am Client, I am alive") +} + +// 用户自定义的远程连接不存活时的处理方法 +func myClientOnRemoteNotAlive(conn ziface.IConnection) { + fmt.Println("myClientOnRemoteNotAlive is Called, connID=", conn.GetConnID(), "remoteAddr = ", conn.RemoteAddr()) + //关闭链接 + conn.Stop() +} + +// 用户自定义的心跳检测消息处理方法 +type myClientHeartBeatRouter struct { + znet.BaseRouter +} + +func (r *myClientHeartBeatRouter) Handle(request ziface.IRequest) { + // 业务处理 + fmt.Println("in myClientHeartBeatRouter Handle, recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData())) } func main() { //创建一个Client句柄,使用Zinx的API client := znet.NewClient("127.0.0.1", 8999) + myHeartBeatMsgID := 88888 + //启动心跳检测 - client.StartHeartBeat(3 * time.Second) + client.StartHeartBeatWithOption(3*time.Second, &ziface.HeartBeatOption{ + MakeMsg: myClientHeartBeatMsg, + OnRemoteNotAlive: myClientOnRemoteNotAlive, + Router: &myClientHeartBeatRouter{}, + HeadBeatMsgID: uint32(myHeartBeatMsgID), + }) //启动客户端client client.Start() - wait() + select {} } diff --git a/examples/zinx_heartbeat/client_default.go b/examples/zinx_heartbeat/client_default.go index f75b1748..3e8dbdcf 100644 --- a/examples/zinx_heartbeat/client_default.go +++ b/examples/zinx_heartbeat/client_default.go @@ -1,10 +1,7 @@ package main import ( - "fmt" "github.com/aceld/zinx/znet" - "os" - "os/signal" "time" ) @@ -18,9 +15,6 @@ func main() { //启动客户端client client.Start() - // close - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill) - sig := <-c - fmt.Println("===exit===", sig) + // wait + select {} } diff --git a/examples/zinx_heartbeat/server.go b/examples/zinx_heartbeat/server.go index a2658e1f..f3860dcc 100644 --- a/examples/zinx_heartbeat/server.go +++ b/examples/zinx_heartbeat/server.go @@ -1,20 +1,46 @@ package main import ( + "fmt" + "github.com/aceld/zinx/ziface" "github.com/aceld/zinx/znet" "time" ) +// 用户自定义的心跳检测消息处理方法 +func myHeartBeatMsg(conn ziface.IConnection) []byte { + return []byte("heartbeat, I am server, I am alive") +} + +// 用户自定义的远程连接不存活时的处理方法 +func myOnRemoteNotAlive(conn ziface.IConnection) { + fmt.Println("myOnRemoteNotAlive is Called, connID=", conn.GetConnID(), "remoteAddr = ", conn.RemoteAddr()) + //关闭链接 + conn.Stop() +} + +// 用户自定义的心跳检测消息处理方法 +type myHeartBeatRouter struct { + znet.BaseRouter +} + +func (r *myHeartBeatRouter) Handle(request ziface.IRequest) { + // 业务处理 + fmt.Println("in MyHeartBeatRouter Handle, recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData())) +} + func main() { - //创建Client客户端 - client := znet.NewClient("127.0.0.1", 8999) + s := znet.NewServer() - //设置心跳检测 - client.StartHeartBeat(3 * time.Second) + myHeartBeatMsgID := 88888 - //启动客户端 - client.Start() + //启动心跳检测 + s.StartHeartBeatWithOption(1*time.Second, &ziface.HeartBeatOption{ + MakeMsg: myHeartBeatMsg, + OnRemoteNotAlive: myOnRemoteNotAlive, + Router: &myHeartBeatRouter{}, + HeadBeatMsgID: uint32(myHeartBeatMsgID), + }) - //防止进程退出,等待中断信号 - select {} + s.Serve() } diff --git a/znet/connection.go b/znet/connection.go index facd6958..b014d184 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -130,7 +130,7 @@ func (c *Connection) StartWriter() { } //写对端成功, 更新链接活动时间 - c.updateActivity() + //c.updateActivity() } else { zlog.Ins().ErrorF("msgBuffChan is Closed") break @@ -162,6 +162,11 @@ func (c *Connection) StartReader() { } zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n])) + //正常读取到对端数据,更新心跳检测Active状态 + if n > 0 && c.hc != nil { + c.updateActivity() + } + //处理自定义协议断粘包问题 add by uuxia 2023-03-21 if c.lengthFieldDecoder != nil { //为读取到的0-n个字节的数据进行解码 @@ -247,7 +252,7 @@ func (c *Connection) Send(data []byte) error { } //写对端成功, 更新链接活动时间 - c.updateActivity() + //c.updateActivity() return nil } @@ -307,7 +312,7 @@ func (c *Connection) SendMsg(msgID uint32, data []byte) error { } //写对端成功, 更新链接活动时间 - c.updateActivity() + //c.updateActivity() return nil }