@@ -10,19 +10,40 @@ const _ = require('lodash');
10
10
require ( 'source-map-support' ) . install ( ) ;
11
11
12
12
const config = ConfigService . getConfig ( ) ;
13
+ const availabilityTopic = `${ config . mqtt . topic } /availability` ;
14
+
13
15
const client = mqtt . connect ( config . mqtt . connectionUri , {
14
16
username : config . mqtt . username ,
15
17
password : config . mqtt . password ,
16
18
protocolVersion : ConfigService . autoParseEnvVariable ( config . mqtt . protocolVersion ) ,
17
19
connectTimeout : ConfigService . autoParseEnvVariable ( config . mqtt . connectTimeout ) ,
18
20
clientId : config . mqtt . clientId ,
21
+ reconnectPeriod : 5000 ,
22
+ rejectUnauthorized : false ,
23
+ will : {
24
+ topic : availabilityTopic ,
25
+ payload : "offline" ,
26
+ qos : 1 ,
27
+ retain : true
28
+ }
19
29
} ) ;
30
+
20
31
logger . level = ConfigService ?. getConfig ( ) ?. logs ?. level ;
21
32
33
+ // Track connection state
34
+ let isConnected = false ;
35
+ let reconnectCount = 0 ;
36
+ const MAX_RECONNECT_DELAY = ConfigService . autoParseEnvVariable ( config . mqtt . maxReconnectDelay ) * 1000 || 300000 ;
37
+
22
38
export const mqttClient = client ;
23
39
24
40
// Check for new/old containers and publish updates
25
41
const checkAndPublishContainerMessages = async ( ) : Promise < void > => {
42
+ if ( ! isConnected ) {
43
+ logger . warn ( "MQTT client not connected. Skipping container check." ) ;
44
+ return ;
45
+ }
46
+
26
47
logger . info ( "Checking for removed containers..." ) ;
27
48
const containers = await DockerService . listContainers ( ) ;
28
49
const runningContainerIds = containers . map ( container => container . Id ) ;
@@ -69,6 +90,11 @@ const checkAndPublishContainerMessages = async (): Promise<void> => {
69
90
} ;
70
91
71
92
const checkAndPublishImageUpdateMessages = async ( ) : Promise < void > => {
93
+ if ( ! isConnected ) {
94
+ logger . warn ( "MQTT client not connected. Skipping image update check." ) ;
95
+ return ;
96
+ }
97
+
72
98
logger . info ( "Checking for image updates..." ) ;
73
99
await HomeassistantService . publishImageUpdateMessages ( client ) ;
74
100
@@ -93,6 +119,11 @@ const startImageCheckingInterval = async () => {
93
119
// Connected to MQTT broker
94
120
client . on ( 'connect' , async function ( ) {
95
121
logger . info ( 'MQTT client successfully connected' ) ;
122
+ isConnected = true ;
123
+ reconnectCount = 0 ; // Reset reconnect counter on successful connection
124
+
125
+ // Publish availability as online
126
+ await HomeassistantService . publishAvailability ( client , true ) ;
96
127
97
128
if ( config ?. ignore ?. containers == "*" ) {
98
129
logger . warn ( 'Skipping setup of container checking cause all containers is ignored `ignore.containers="*"`.' )
@@ -117,6 +148,28 @@ client.on('error', function (err) {
117
148
logger . error ( 'MQTT client connection error: ' , err ) ;
118
149
} ) ;
119
150
151
+ // Handle disconnection
152
+ client . on ( 'offline' , function ( ) {
153
+ logger . warn ( 'MQTT client disconnected' ) ;
154
+ isConnected = false ;
155
+ } ) ;
156
+
157
+ // Handle reconnection attempts
158
+ client . on ( 'reconnect' , function ( ) {
159
+ reconnectCount ++ ;
160
+ const backoffDelay = Math . min ( Math . pow ( 2 , reconnectCount ) * 1000 , MAX_RECONNECT_DELAY ) ;
161
+ logger . info ( `Attempting to reconnect to MQTT broker (attempt ${ reconnectCount } ). Next retry in ${ backoffDelay / 1000 } seconds.` ) ;
162
+
163
+ // Dynamically adjust reconnect period with exponential backoff
164
+ client . options . reconnectPeriod = backoffDelay ;
165
+ } ) ;
166
+
167
+ // Handle connection close
168
+ client . on ( 'close' , function ( ) {
169
+ logger . warn ( 'MQTT connection closed' ) ;
170
+ isConnected = false ;
171
+ } ) ;
172
+
120
173
// Update-Handler for the /update message from MQTT
121
174
client . on ( "message" , async ( topic : string , message : any ) => {
122
175
if ( topic == `${ config . mqtt . topic } /update` ) {
@@ -263,18 +316,38 @@ const exitHandler = async (exitCode: number, error?: any) => {
263
316
isExiting = true ;
264
317
265
318
try {
266
- await HomeassistantService . publishAvailability ( client , false ) ;
319
+ logger . info ( "Shutting down MqDockerUp..." ) ;
320
+
321
+ if ( isConnected ) {
322
+ await HomeassistantService . publishAvailability ( client , false ) ;
323
+ }
324
+
267
325
const updatingContainers = DockerService . updatingContainers ;
268
326
269
327
if ( updatingContainers . length > 0 ) {
270
328
logger . warn (
271
329
`Stopping MqDockerUp while updating containers: ${ updatingContainers . join ( ", " ) } `
272
330
) ;
273
331
for ( const containerId of updatingContainers ) {
274
- await HomeassistantService . publishAbortUpdateMessage ( containerId , client ) ;
332
+ if ( isConnected ) {
333
+ await HomeassistantService . publishAbortUpdateMessage ( containerId , client ) ;
334
+ }
275
335
}
276
336
}
277
337
338
+ logger . info ( "Closing MQTT connection..." ) ;
339
+ await new Promise < void > ( ( resolve ) => {
340
+ client . end ( false , { } , ( ) => {
341
+ logger . info ( "MQTT connection closed successfully" ) ;
342
+ resolve ( ) ;
343
+ } ) ;
344
+
345
+ setTimeout ( ( ) => {
346
+ logger . warn ( "MQTT connection close timed out" ) ;
347
+ resolve ( ) ;
348
+ } , 2000 ) ;
349
+ } ) ;
350
+
278
351
let message = exitCode === 0 ? `MqDockerUp gracefully stopped` : `MqDockerUp stopped due to an error` ;
279
352
280
353
if ( error ) {
0 commit comments