Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-7912][Manager] Only issue normal DataProxy nodes #7913

Merged
merged 4 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
Expand Down Expand Up @@ -1168,6 +1169,11 @@ public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolTy
// TODO consider the data proxy load and re-balance
List<DataProxyNodeInfo> nodeList = new ArrayList<>();
for (InlongClusterNodeEntity nodeEntity : nodeEntities) {
if (Objects.equals(nodeEntity.getStatus(), NodeStatus.HEARTBEAT_TIMEOUT.getStatus())) {
LOGGER.debug("dataproxy node was timeout, parentId={} ip={} port={}", nodeEntity.getParentId(),
nodeEntity.getIp(), nodeEntity.getPort());
continue;
}
DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo();
nodeInfo.setId(nodeEntity.getId());
nodeInfo.setIp(nodeEntity.getIp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
Expand Down Expand Up @@ -49,6 +50,7 @@ public Integer saveOpt(ClusterNodeRequest request, String operator) {

entity.setCreator(operator);
entity.setModifier(operator);
entity.setStatus(NodeStatus.HEARTBEAT_TIMEOUT.getStatus());
clusterNodeMapper.insert(entity);

return entity.getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,13 @@ public Boolean updateClusterNode(Integer id, Integer parentId, String clusterTyp
return clusterService.updateNode(request, GLOBAL_OPERATOR);
}

private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, String port, String type) {
private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, String port, String type,
String protocolType) {
HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
heartbeatMsg.setIp(ip);
heartbeatMsg.setPort(port);
heartbeatMsg.setClusterTag("default_cluster");
heartbeatMsg.setProtocolType(ProtocolType.HTTP);
heartbeatMsg.setProtocolType(protocolType);
heartbeatMsg.setLoad(0xFFFF);
heartbeatMsg.setComponentType(type);
heartbeatMsg.setReportTime(System.currentTimeMillis());
Expand Down Expand Up @@ -330,21 +331,20 @@ public void testDataProxyCluster() {
Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port2, ProtocolType.TCP);
Assertions.assertNotNull(nodeId2);

// report heartbeat
HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, String.valueOf(port1),
ComponentTypeEnum.DataProxy.getType());
heartbeatManager.reportHeartbeat(msg1);
HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, String.valueOf(port2),
ComponentTypeEnum.DataProxy.getType());
heartbeatManager.reportHeartbeat(msg2);

// create an inlong group which use the clusterTag
String inlongGroupId = "test_cluster_tag_group";
InlongGroupInfo inlongGroup = super.createInlongGroup(inlongGroupId, MQType.PULSAR);
InlongGroupInfo updateGroupInfo = groupService.get(inlongGroupId);
updateGroupInfo.setInlongClusterTag(clusterTag);
groupService.update(updateGroupInfo.genRequest(), GLOBAL_OPERATOR);

// report heartbeat
HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, String.valueOf(port1),
ComponentTypeEnum.DataProxy.getType(), ProtocolType.TCP);
heartbeatManager.reportHeartbeat(msg1);
HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, String.valueOf(port2),
ComponentTypeEnum.DataProxy.getType(), ProtocolType.TCP);
heartbeatManager.reportHeartbeat(msg2);
// get the data proxy nodes, the first port should is p1, second port is p2
DataProxyNodeResponse nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.TCP);
List<DataProxyNodeInfo> nodeInfoList = nodeResponse.getNodeList();
Expand All @@ -353,6 +353,13 @@ public void testDataProxyCluster() {
Assertions.assertEquals(port1, nodeInfoList.get(0).getPort());
Assertions.assertEquals(port2, nodeInfoList.get(1).getPort());

// report heartbeat
HeartbeatMsg msg3 = createHeartbeatMsg(clusterName, ip, String.valueOf(port1),
ComponentTypeEnum.DataProxy.getType(), ProtocolType.HTTP);
heartbeatManager.reportHeartbeat(msg3);
HeartbeatMsg msg4 = createHeartbeatMsg(clusterName, ip, String.valueOf(port2),
ComponentTypeEnum.DataProxy.getType(), ProtocolType.HTTP);
heartbeatManager.reportHeartbeat(msg4);
nodeResponse = clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.HTTP);
nodeInfoList = nodeResponse.getNodeList();
nodeInfoList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));
Expand Down