Skip to content

Commit

Permalink
优化zookeeper 中断后,再次重启;临时节点丢失问题
Browse files Browse the repository at this point in the history
  • Loading branch information
gujiachun committed Jan 23, 2022
1 parent 2aa0940 commit 6ccb265
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.rainbow.bridge.core.ResultEnum;
import com.rainbow.bridge.core.exception.BusinessException;
import com.rainbow.bridge.server.listener.ServerZkChildListener;
import com.rainbow.bridge.server.listener.ServerZkConnectionStateListener;
import com.rainbow.bridge.server.listener.ServerZkDataListener;
import com.rainbow.bridge.server.zk.ZkClientExt;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -113,6 +114,8 @@ public ZkClientExt zkClientExt(ZkService zkService) throws Exception {

nodeCache.start(true);

zkc.getClient().getConnectionStateListenable().addListener(new ServerZkConnectionStateListener());

return zkc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,24 @@ private void buildAddTasks(List<TaskDto> addTaskList){

if (taskCanalClientMap != null && taskCanalClientMap.size() > 0) {
logger.info("集群任务发生变化了,去认领任务了哦-----过滤重复任务");

//获取集群节点 下 所有的任务实例
List<String> taskPathChildren = zkClientExt.getChildren(zkClientExt.getClusterPath());

//过滤掉 已经处理的任务taskId
Iterator<TaskDto> iterator = addTaskList.iterator();
while (iterator.hasNext()) {
TaskDto taskDto = iterator.next();
if (taskCanalClientMap.containsKey(taskDto.getTaskId())) {
logger.info("过滤重复任务,已经执行了任务:{} 无需再次执行",taskDto.getTaskId());
iterator.remove();
// 检查此 正在执行的任务,有没有临时节点,如 没有,需要补进去
// 原因很有可能zookeeper 中断重启了
if (!checkExistsTaskHasZkTempPath(taskDto,taskPathChildren)){
logger.info("补偿 新增临时节点,任务:{}",taskDto.getTaskId());
String zkTempPath = addTempZkPath(taskDto);
zkEsTaskMap.put(taskDto.getTaskId(), zkTempPath);
}
}
}
}
Expand All @@ -360,7 +372,7 @@ private void buildAddTasks(List<TaskDto> addTaskList){
}

int actionCount = maxTaskCount - taskCanalClientMap.size();
logger.info("集群任务发生变化了,去认领任务了哦-----过滤后---新增任务");
logger.info("集群任务发生变化了,去认领任务了哦-----过滤后---新增任务数量:{}",actionCount);
//有很多任务没有处理,就随机取
if (addTaskList.size() > actionCount){
//随机置换
Expand All @@ -371,6 +383,52 @@ private void buildAddTasks(List<TaskDto> addTaskList){
}
}

/**
* 判断已经处理的任务,有没有临时节点存在
* 这个场景 会出现在zookeeper中断后,恢复时,临时节点 丢失
* */
private boolean checkExistsTaskHasZkTempPath(TaskDto taskDto,List<String> taskTempPathChildren){

String currentIpPort = IpLocalUtil.getLocalIpByNetcard() + ":" + port;

for (String taskIdTempPath : taskTempPathChildren){
String data = zkClientExt.readData(zkClientExt.getClusterPath() + "/" + taskIdTempPath);
if ( StringUtils.isNotBlank(data)){
logger.info(">>>>>读取临时任务节点:{},数据:{}",zkClientExt.getClusterPath() + "/" + taskIdTempPath,data);
String[] split = data.split(",");
logger.info(">>>>>读取临时任务节点:{},数据:{},split:{}",zkClientExt.getClusterPath() + "/" + taskIdTempPath,data,split.length);
String taskId = split[2];
String ipPort = split[0];

if (currentIpPort.equals(ipPort) && taskDto.getTaskId().equals(taskId)){
return true;
}
}
}
return false;
}

/**
* 添加 任务的 临时节点
* */
private String addTempZkPath(TaskDto taskDto){
//处理成功后 增加临时有序节点
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String formatDate = "";
if (taskDto.getUpdateTaskRuleTime() != null){
formatDate = simpleDateFormat.format(taskDto.getUpdateTaskRuleTime());
}
String content = String.format("%s,%s,%s,%s,%s",IpLocalUtil.getLocalIpByNetcard() + ":" + port,
formatDate,taskDto.getTaskId(),taskDto.getTargetType(),taskDto.getInstCount());
//任务临时节点 和 任务id的关联关系
String esTaskPath = zkClientExt.createEphemeralSequential(zkClientExt.getClusterPath() + "/" + taskDto.getTaskId(),content);
logger.info(">>>>>taskId:{}产生了临时节点:{}",taskDto.getTaskId(),esTaskPath);
return esTaskPath;
}




/**
* 构建执行任务
* 1、把rockemq或kafka订阅起来
Expand Down Expand Up @@ -406,16 +464,7 @@ private void buildExecutor(List<TaskDto> activeTasks){
}

//处理成功后 增加临时有序节点
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String formatDate = "";
if (taskDto.getUpdateTaskRuleTime() != null){
formatDate = simpleDateFormat.format(taskDto.getUpdateTaskRuleTime());
}
String content = String.format("%s,%s,%s,%s,%s",IpLocalUtil.getLocalIpByNetcard() + ":" + port,
formatDate,taskDto.getTaskId(),taskDto.getTargetType(),taskDto.getInstCount());
//任务临时节点 和 任务id的关联关系
String esTaskPath = zkClientExt.createEphemeralSequential(zkClientExt.getClusterPath() + "/" + taskDto.getTaskId(),content);
logger.info(">>>>>taskId:{}产生了临时节点:{}",taskDto.getTaskId(),esTaskPath);
String esTaskPath = addTempZkPath(taskDto);
zkEsTaskMap.put(taskDto.getTaskId(),esTaskPath);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.rainbow.bridge.server.listener;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author gujiachun
*/
public class ServerZkConnectionStateListener implements ConnectionStateListener {

private static final Logger logger = LoggerFactory.getLogger(ServerZkConnectionStateListener.class);

@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.RECONNECTED){
logger.info(">>>>>>> zk 重连上了");
}else if (newState == ConnectionState.LOST){
logger.warn(">>>>>>> zk 中断了");
}
}
}

0 comments on commit 6ccb265

Please sign in to comment.