From 29d882549ba1b29963d465f85393cf2bb198486f Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Tue, 25 Apr 2023 10:27:49 +0800 Subject: [PATCH 1/4] [INLONG-7912][Manager] manager only issues normal dataproxy node information --- .../manager/service/cluster/InlongClusterServiceImpl.java | 6 ++++++ .../service/cluster/node/AbstractClusterNodeOperator.java | 2 ++ 2 files changed, 8 insertions(+) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index be32bed286e..20a42a439fe 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -39,6 +39,7 @@ import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.common.enums.NodeStatus; import org.apache.inlong.manager.common.enums.UserTypeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; @@ -1168,6 +1169,11 @@ public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolTy // TODO consider the data proxy load and re-balance List nodeList = new ArrayList<>(); for (InlongClusterNodeEntity nodeEntity : nodeEntities) { + if (Objects.equals(nodeEntity.getStatus(), NodeStatus.HEARTBEAT_TIMEOUT.getStatus())) { + LOGGER.debug("data proxy is heart time out by parentId={}, ip={}", nodeEntity.getParentId(), + nodeEntity.getIp()); + continue; + } DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo(); nodeInfo.setId(nodeEntity.getId()); nodeInfo.setIp(nodeEntity.getIp()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java index b32c9ca7574..85200c0bf26 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java @@ -19,6 +19,7 @@ import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.enums.NodeStatus; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity; @@ -49,6 +50,7 @@ public Integer saveOpt(ClusterNodeRequest request, String operator) { entity.setCreator(operator); entity.setModifier(operator); + entity.setStatus(NodeStatus.HEARTBEAT_TIMEOUT.getStatus()); clusterNodeMapper.insert(entity); return entity.getId(); From a265ab58f557074953ca68de9c3839c2b16c2ebc Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Tue, 25 Apr 2023 10:37:47 +0800 Subject: [PATCH 2/4] [INLONG-7912][Manager] Add log info --- .../manager/service/cluster/InlongClusterServiceImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index 20a42a439fe..06f85fa9b68 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -1170,8 +1170,8 @@ public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolTy List nodeList = new ArrayList<>(); for (InlongClusterNodeEntity nodeEntity : nodeEntities) { if (Objects.equals(nodeEntity.getStatus(), NodeStatus.HEARTBEAT_TIMEOUT.getStatus())) { - LOGGER.debug("data proxy is heart time out by parentId={}, ip={}", nodeEntity.getParentId(), - nodeEntity.getIp()); + LOGGER.debug("data proxy is heart time out by parentId={}, ip={}, port={}", nodeEntity.getParentId(), + nodeEntity.getIp(), nodeEntity.getPort()); continue; } DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo(); From fc22aa01296c7c26613b40c98a074b104845497f Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Thu, 27 Apr 2023 15:43:26 +0800 Subject: [PATCH 3/4] [INLONG-7912][Manager] Fix comment --- .../cluster/InlongClusterServiceImpl.java | 2 +- .../cluster/InlongClusterServiceTest.java | 25 ++++++++++++------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index 06f85fa9b68..d8fb62fbcb2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -1170,7 +1170,7 @@ public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolTy List nodeList = new ArrayList<>(); for (InlongClusterNodeEntity nodeEntity : nodeEntities) { if (Objects.equals(nodeEntity.getStatus(), NodeStatus.HEARTBEAT_TIMEOUT.getStatus())) { - LOGGER.debug("data proxy is heart time out by parentId={}, ip={}, port={}", nodeEntity.getParentId(), + LOGGER.debug("dataproxy node was timeout, parentId={} ip={} port={}", nodeEntity.getParentId(), nodeEntity.getIp(), nodeEntity.getPort()); continue; } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java index c4dabbbeeed..a2d9b0205c2 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java @@ -174,12 +174,12 @@ public Boolean updateClusterNode(Integer id, Integer parentId, String clusterTyp return clusterService.updateNode(request, GLOBAL_OPERATOR); } - private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, String port, String type) { + private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, String port, String type, String protocolType) { HeartbeatMsg heartbeatMsg = new HeartbeatMsg(); heartbeatMsg.setIp(ip); heartbeatMsg.setPort(port); heartbeatMsg.setClusterTag("default_cluster"); - heartbeatMsg.setProtocolType(ProtocolType.HTTP); + heartbeatMsg.setProtocolType(protocolType); heartbeatMsg.setLoad(0xFFFF); heartbeatMsg.setComponentType(type); heartbeatMsg.setReportTime(System.currentTimeMillis()); @@ -330,13 +330,6 @@ public void testDataProxyCluster() { Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port2, ProtocolType.TCP); Assertions.assertNotNull(nodeId2); - // report heartbeat - HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, String.valueOf(port1), - ComponentTypeEnum.DataProxy.getType()); - heartbeatManager.reportHeartbeat(msg1); - HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, String.valueOf(port2), - ComponentTypeEnum.DataProxy.getType()); - heartbeatManager.reportHeartbeat(msg2); // create an inlong group which use the clusterTag String inlongGroupId = "test_cluster_tag_group"; @@ -345,6 +338,13 @@ public void testDataProxyCluster() { updateGroupInfo.setInlongClusterTag(clusterTag); groupService.update(updateGroupInfo.genRequest(), GLOBAL_OPERATOR); + // report heartbeat + HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, String.valueOf(port1), + ComponentTypeEnum.DataProxy.getType(), ProtocolType.TCP); + heartbeatManager.reportHeartbeat(msg1); + HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, String.valueOf(port2), + ComponentTypeEnum.DataProxy.getType(), ProtocolType.TCP); + heartbeatManager.reportHeartbeat(msg2); // get the data proxy nodes, the first port should is p1, second port is p2 DataProxyNodeResponse nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.TCP); List nodeInfoList = nodeResponse.getNodeList(); @@ -353,6 +353,13 @@ public void testDataProxyCluster() { Assertions.assertEquals(port1, nodeInfoList.get(0).getPort()); Assertions.assertEquals(port2, nodeInfoList.get(1).getPort()); + // report heartbeat + HeartbeatMsg msg3 = createHeartbeatMsg(clusterName, ip, String.valueOf(port1), + ComponentTypeEnum.DataProxy.getType(), ProtocolType.HTTP); + heartbeatManager.reportHeartbeat(msg3); + HeartbeatMsg msg4 = createHeartbeatMsg(clusterName, ip, String.valueOf(port2), + ComponentTypeEnum.DataProxy.getType(), ProtocolType.HTTP); + heartbeatManager.reportHeartbeat(msg4); nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.HTTP); nodeInfoList = nodeResponse.getNodeList(); nodeInfoList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId)); From d2a7e955a40f985935df9584f085e31f9ab53ca2 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Thu, 4 May 2023 11:09:13 +0800 Subject: [PATCH 4/4] [INLONG-7912][Manager] Fix code style --- .../manager/service/cluster/InlongClusterServiceTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java index a2d9b0205c2..36aa96e0deb 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java @@ -174,7 +174,8 @@ public Boolean updateClusterNode(Integer id, Integer parentId, String clusterTyp return clusterService.updateNode(request, GLOBAL_OPERATOR); } - private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, String port, String type, String protocolType) { + private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, String port, String type, + String protocolType) { HeartbeatMsg heartbeatMsg = new HeartbeatMsg(); heartbeatMsg.setIp(ip); heartbeatMsg.setPort(port); @@ -330,7 +331,6 @@ public void testDataProxyCluster() { Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port2, ProtocolType.TCP); Assertions.assertNotNull(nodeId2); - // create an inlong group which use the clusterTag String inlongGroupId = "test_cluster_tag_group"; InlongGroupInfo inlongGroup = super.createInlongGroup(inlongGroupId, MQType.PULSAR);