Skip to content

Commit

Permalink
✨ 同步私服部分功能
Browse files Browse the repository at this point in the history
  • Loading branch information
li-xunhuan committed May 22, 2024
1 parent f30ef9b commit 356ab22
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import net.dreamlu.iot.mqtt.core.client.MqttClient;

import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ThreadLocalRandom;

/**
Expand Down Expand Up @@ -57,15 +55,11 @@ public static void main(String[] args) {
System.out.println(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});

Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
int LightSwitch = ThreadLocalRandom.current().nextBoolean() ? 0 : 1;
String content = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":" + LightSwitch + "}}";
client.publish("/sys/" + productKey + "/" + deviceName + "/thing/event/property/post", content.getBytes(StandardCharsets.UTF_8));
}
}, 3000, 3000);
client.schedule(() -> {
int LightSwitch = ThreadLocalRandom.current().nextBoolean() ? 0 : 1;
String content = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":" + LightSwitch + "}}";
client.publish("/sys/" + productKey + "/" + deviceName + "/thing/event/property/post", content.getBytes(StandardCharsets.UTF_8));
}, 3000);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import net.dreamlu.iot.mqtt.core.client.MqttClient;

import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;

/**
* 设备 C,每 5 秒上报一个数据
Expand All @@ -38,13 +36,9 @@ public static void main(String[] args) {
.password("123456")
.connectSync();

Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
client.publish("/a/door/open", "open".getBytes(StandardCharsets.UTF_8));
}
}, 5000, 5000);
client.schedule(() -> {
client.publish("/a/door/open", "open".getBytes(StandardCharsets.UTF_8));
}, 5000);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.tio.core.ChannelContext;

import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;

/**
* 客户端测试
Expand Down Expand Up @@ -74,12 +72,8 @@ public void onMessage(ChannelContext context, String topic, MqttPublishMessage m
client.publish("/test/client", "mica最牛皮2".getBytes(StandardCharsets.UTF_8));
client.publish("/test/client", "mica最牛皮3".getBytes(StandardCharsets.UTF_8));

Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
}
}, 1000, 2000);
client.schedule(() -> {
client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
}, 1000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,7 @@ public static void main(String[] args) {

// 连接上之后发送消息,注意:连接时出现异常等就不会发出
client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
// 2.3.0 开始支持,可停止
// client.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.tio.core.ChannelContext;

import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;

/**
* 客户端测试
Expand Down Expand Up @@ -76,12 +74,8 @@ public void onMessage(ChannelContext context, String topic, MqttPublishMessage m
client.publish("/test/client", "mica最牛皮2".getBytes(StandardCharsets.UTF_8));
client.publish("/test/client", "mica最牛皮3".getBytes(StandardCharsets.UTF_8));

Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
}
}, 1000, 2000);
client.schedule(() -> {
client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
}, 2000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import net.dreamlu.iot.mqtt.core.client.MqttClient;

import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;

/**
* 客户端测试
Expand Down Expand Up @@ -54,7 +52,7 @@ public static void main(String[] args) {
.connectSync();

// 订阅命令下发topic
String cmdRequestTopic = "$oc/devices/" + deviceId + "/sys/commands/#";
String cmdRequestTopic = "$oc/devices/" + deviceId + "/sys/commands/#";

client.subQos0(cmdRequestTopic, (context, topic, message, payload) -> {
System.out.println(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
Expand All @@ -64,13 +62,9 @@ public static void main(String[] args) {
String reportTopic = "$oc/devices/" + deviceId + "/sys/properties/report";
String jsonMsg = "{\"services\":[{\"service_id\":\"Temperature\", \"properties\":{\"value\":57}},{\"service_id\":\"Battery\",\"properties\":{\"level\":88}}]}";

Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
client.publish(reportTopic, jsonMsg.getBytes(StandardCharsets.UTF_8));
}
}, 3000, 3000);
client.schedule(() -> {
client.publish(reportTopic, jsonMsg.getBytes(StandardCharsets.UTF_8));
}, 3000);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import org.tio.core.Tio;
import org.tio.core.intf.Packet;
import org.tio.utils.thread.ThreadUtils;
import org.tio.utils.timer.TimerTask;
import org.tio.utils.timer.TimerTaskService;

import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -54,6 +57,7 @@ public final class MqttClient {
private final TioClientConfig clientTioConfig;
private final IMqttClientSession clientSession;
private final TimerTaskService taskService;
private final ExecutorService mqttExecutor;
private final IMqttClientMessageIdGenerator messageIdGenerator;
private ClientChannelContext context;

Expand All @@ -66,6 +70,7 @@ public static MqttClientCreator create() {
this.config = config;
this.clientTioConfig = tioClient.getTioClientConfig();
this.taskService = config.getTaskService();
this.mqttExecutor = config.getMqttExecutor();
this.clientSession = config.getClientSession();
this.messageIdGenerator = config.getMessageIdGenerator();
startHeartbeatTask();
Expand Down Expand Up @@ -372,6 +377,81 @@ public boolean publish(String topic, byte[] payload, MqttQoS qos, Consumer<MqttM
return result;
}

/**
* 添加定时任务,注意:如果抛出异常,会终止后续任务,请自行处理异常
*
* @param command runnable
* @param delay delay
* @return TimerTask
*/
public TimerTask schedule(Runnable command, long delay) {
return schedule(command, delay, null);
}

/**
* 添加定时任务,注意:如果抛出异常,会终止后续任务,请自行处理异常
*
* @param command runnable
* @param delay delay
* @param executor 用于自定义线程池,处理耗时业务
* @return TimerTask
*/
public TimerTask schedule(Runnable command, long delay, Executor executor) {
return config.getTaskService().addTask((systemTimer -> new org.tio.utils.timer.TimerTask(delay) {
@Override
public void run() {
try {
// 1. 再次添加 任务
systemTimer.add(this);
// 2. 执行任务
if (executor == null) {
command.run();
} else {
executor.execute(command);
}
} catch (Exception e) {
logger.error("Mqtt client schedule error", e);
}
}
}));
}

/**
* 添加定时任务
*
* @param command runnable
* @param delay delay
* @return TimerTask
*/
public TimerTask scheduleOnce(Runnable command, long delay) {
return scheduleOnce(command, delay, null);
}

/**
* 添加定时任务
*
* @param command runnable
* @param delay delay
* @param executor 用于自定义线程池,处理耗时业务
* @return TimerTask
*/
public TimerTask scheduleOnce(Runnable command, long delay, Executor executor) {
return config.getTaskService().addTask((systemTimer -> new TimerTask(delay) {
@Override
public void run() {
try {
if (executor == null) {
command.run();
} else {
executor.execute(command);
}
} catch (Exception e) {
logger.error("Mqtt client schedule once error", e);
}
}
}));
}

/**
* 异步连接
*
Expand Down Expand Up @@ -456,11 +536,7 @@ public boolean reconnect(Node serverNode) {
* @return 是否需要重新订阅
*/
public static boolean isNeedReSub(ChannelContext context) {
if (context.containsKey(MQTT_NEED_RE_SUB)) {
context.remove(MQTT_NEED_RE_SUB);
return true;
}
return false;
return context.getAndRemove(MQTT_NEED_RE_SUB) != null;
}

/**
Expand Down Expand Up @@ -492,6 +568,19 @@ public boolean stop() {
this.disconnect();
// 3. 停止 tio
boolean result = tioClient.stop();
// 4. 停止工作线程
try {
mqttExecutor.shutdown();
} catch (Exception e1) {
logger.error(e1.getMessage(), e1);
}
try {
// 等待线程池中的任务结束,客户端等待 6 秒基本上足够了
result &= mqttExecutor.awaitTermination(6, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error(e.getMessage(), e);
}
logger.info("MqttClient stop result:{}", result);
// 4. 清理 session
this.clientSession.clean();
Expand Down Expand Up @@ -568,58 +657,12 @@ public boolean isDisconnected() {
* mqtt 定时任务:发心跳
*/
private void startHeartbeatTask() {
// 先判断用户是否开启心跳检测
final long heartbeatTimeout = TimeUnit.SECONDS.toMillis(config.getKeepAliveSecs());
if (heartbeatTimeout <= 0) {
final int keepAliveSecs = config.getKeepAliveSecs();
if (keepAliveSecs <= 0) {
logger.warn("用户取消了 mica-mqtt 的心跳定时发送功能,请用户自己去完成心跳机制");
return;
}
final ClientGroupStat clientGroupStat = (ClientGroupStat) clientTioConfig.groupStat;
final TioClientHandler clientHandler = clientTioConfig.getTioClientHandler();
final String id = clientTioConfig.getId();
new Thread(() -> {
while (!clientTioConfig.isStopped()) {
try {
Set<ChannelContext> set = clientTioConfig.connecteds;
long currTime = System.currentTimeMillis();
for (ChannelContext entry : set) {
ClientChannelContext channelContext = (ClientChannelContext) entry;
if (channelContext.isClosed() || channelContext.isRemoved()) {
continue;
}
long interval = currTime - channelContext.stat.latestTimeOfSentPacket;
if (interval >= heartbeatTimeout) {
Packet packet = clientHandler.heartbeatPacket(channelContext);
if (packet != null) {
Boolean result = Tio.send(channelContext, packet);
if (clientTioConfig.debug && logger.isInfoEnabled()) {
logger.info("{} 发送心跳包 result:{}", channelContext, result);
}
}
}
}
// 打印连接信息
if (clientTioConfig.debug && logger.isInfoEnabled()) {
if (clientTioConfig.statOn) {
logger.info("[{}]: curr:{}, closed:{}, received:({}p)({}b), handled:{}, sent:({}p)({}b)", id, set.size(), clientGroupStat.closed.sum(),
clientGroupStat.receivedPackets.sum(), clientGroupStat.receivedBytes.sum(), clientGroupStat.handledPackets.sum(),
clientGroupStat.sentPackets.sum(), clientGroupStat.sentBytes.sum());
} else {
logger.info("[{}]: curr:{}, closed:{}", id, set.size(), clientGroupStat.closed.sum());
}
}
} catch (Throwable e) {
logger.error("", e);
} finally {
try {
Thread.sleep(heartbeatTimeout / 3);
} catch (Throwable e) {
Thread.currentThread().interrupt();
logger.error(e.getMessage(), e);
}
}
}
}, "mqtt-heartbeat" + id).start();
taskService.addTask(systemTimer -> new MqttClientHeartbeatTask(systemTimer, clientTioConfig, keepAliveSecs));
}

}
Loading

0 comments on commit 356ab22

Please sign in to comment.