Skip to content

Commit

Permalink
Fix MQTT reconnect during proxy attach (#1078)
Browse files Browse the repository at this point in the history
  • Loading branch information
MertCingoz authored Jan 30, 2025
1 parent badb23d commit 607df33
Showing 1 changed file with 2 additions and 6 deletions.
8 changes: 2 additions & 6 deletions pubber/src/main/java/udmi/lib/base/MqttPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public class MqttPublisher implements Publisher {
private static final Map<String, AtomicInteger> EVENT_SERIAL = new HashMap<>();
private static final String GCP_CLIENT_PREFIX = "projects/";
private static final Integer DEFAULT_MQTT_PORT = 8883;
private static final long ATTACH_DELAY_MS = 1000;
private static final long RETRY_DELAY_MS = 1000;
private static final String LOCAL_MQTT_PREFIX = "/r/";

private final Semaphore connectionLock = new Semaphore(1);
Expand Down Expand Up @@ -205,7 +205,7 @@ private void publishCore(String deviceId, String topicSuffix, Object data, Runna
debug(format("Sending message to %s", sendTopic));
if (!sendMessage(deviceId, sendTopic, payload.getBytes())) {
debug(format("Queue message for retry %s %s", topicSuffix, deviceId));
safeSleep(ATTACH_DELAY_MS);
safeSleep(RETRY_DELAY_MS);
if (isActive()) {
publisherExecutor.submit(() -> publishCore(deviceId, topicSuffix, data, callback));
}
Expand Down Expand Up @@ -321,20 +321,16 @@ private MqttClient newProxyClient(String deviceId) {
String gatewayId = getGatewayId();
info(format("Connecting device %s through gateway %s", deviceId, gatewayId));
final MqttClient mqttClient = getConnectedClient(gatewayId);
long timeToWait = mqttClient.getTimeToWait();
try {
startupLatchWait(connectionLatch, "gateway startup exchange");
String topic = getMessageTopic(deviceId, MqttDevice.ATTACH_TOPIC);
info(format("Publishing attach message %s", topic));
byte[] mqttMessage = EMPTY_STRING.getBytes(StandardCharsets.UTF_8);
mqttClient.setTimeToWait(ATTACH_DELAY_MS);
mqttClientPublish(mqttClient, topic, mqttMessage);
subscribeToUpdates(mqttClient, deviceId);
return mqttClient;
} catch (Exception e) {
throw new RuntimeException(format("While binding client %s", deviceId), e);
} finally {
mqttClient.setTimeToWait(timeToWait);
}
}

Expand Down

0 comments on commit 607df33

Please sign in to comment.