-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt.h
151 lines (121 loc) · 4.8 KB
/
mqtt.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// https://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices/
// https://github.com/ioBroker/ioBroker.mqtt/issues/182
/*
Test topics----------------
0_userdata.0.example_data
accuweather.0.Current.Temperature
accuweather.0.Current.Pressure
accuweather.0.Current.WeatherText
accuweather.0.Current.RelativeHumidity
*/
void subscribePage() {
if (!mqttClient.connected())
return;
/*
// Subscribe to "mytopic/test" and display received message to Serial
client.subscribe("mytopic/test", [](const String & payload) {
Serial.println(payload);
});
*/
char topic[256] = {0};
log_d("[MQTT] Subscribing to mqtt components on page:");
for (int idx = PG_upd.mqttStartIdx; idx < PG_upd.compCount; idx++) {
const comp_t* comp = &PG_upd.compList[idx];
PG_upd.readAbsoluteTopic(idx, topic);
log_d("\t topic '%s' from component '%s'!", topic, comp->ptrName);
mqttClient.subscribe(topic, 0);
/*#### use packetIdSub to find the corresponding component?!
uint16_t packetIdSub = mqttClient.subscribe(PubTopic, 2);
*/
}
log_d("[MQTT] %d components subscribed!", PG_upd.mqttStartIdx==MAX_COMPONENT_ITEMS ? 0 : PG_upd.compCount - PG_upd.mqttStartIdx);
}
void unsubscribePage() {
if (!mqttClient.connected())
return;
char topic[256] = {0};
log_d("[MQTT] Unsubscribing mqtt components on page:");
for (int idx = PG_upd.mqttStartIdx; idx < PG_upd.compCount; idx++) {
const comp_t* comp = &PG_upd.compList[idx];
PG_upd.readAbsoluteTopic(idx, topic);
log_d("\t topic '%s' from component '%s'!", topic, comp->ptrName);
mqttClient.unsubscribe(topic);
}
log_d("[MQTT] %d components unsubscribed!", PG_upd.mqttStartIdx==MAX_COMPONENT_ITEMS ? 0 : PG_upd.compCount - PG_upd.mqttStartIdx);
}
void connectToMqtt() {
log_i("Connecting to MQTT...");
#ifdef MQTT_USER
mqttClient.setCredentials(MQTT_USER, MQTT_PWD);
#endif
#ifdef MQTT_CLIENTID
mqttClient.setClientId(MQTT_CLIENTID);
#endif
mqttClient.connect();
}
void onMqttConnected(bool sessionPresent) {
Serial.print("Connected to MQTT broker "); Serial.print(MQTT_HOST);
Serial.print(":"); Serial.println(MQTT_PORT);
log_d("Session present: %s", sessionPresent ? "true" : "false");
subscribePage();
/*
uint16_t packetIdSub = mqttClient.subscribe(PubTopic, 2);
Serial.print("Subscribing at QoS 2, packetId: "); Serial.println(packetIdSub);
mqttClient.publish(PubTopic, 0, true, "ESP32 Test");
Serial.println("Publishing at QoS 0");
uint16_t packetIdPub1 = mqttClient.publish(PubTopic, 1, true, "test 2");
Serial.print("Publishing at QoS 1, packetId: "); Serial.println(packetIdPub1);
uint16_t packetIdPub2 = mqttClient.publish(PubTopic, 2, true, "test 3");
Serial.print("Publishing at QoS 2, packetId: "); Serial.println(packetIdPub2);
*/
} // onMqttConnected()
void onMqttDisconnect(AsyncMqttClientDisconnectReason reason)
{
// log_i("Disconnected from MQTT.");
Serial.printf("Disconnected from MQTT: %u\n", static_cast<std::underlying_type<AsyncMqttClientDisconnectReason>::type>(reason));
unsubscribePage();
if (WiFi.isConnected()) {
xTimerStart(mqttReconnectTimer, 0);
}
}
void onMqttSubscribe(const uint16_t& packetId, const uint8_t& qos) {
log_d("Subscribe acknowledged. packetId: %d qos: %d", packetId, qos);
}
void onMqttUnsubscribe(const uint16_t& packetId) {
log_d("Unsubscribe acknowledged. packetId: %d", packetId);
}
void onMqttPublish(const uint16_t& packetId) {
log_i("Publish acknowledged. packetId: %d", packetId);
}
void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
const size_t& len, const size_t& index, const size_t& total)
{
/*
NOTE: The payload buffer is NOT terminated by null character and is only the size of the len parameter.
So `payload[len] = '\0';` will set a the value outside the buffer!
Also, payload seams to be UTF8 encoded!
*/
log_d("[MQTT] ***** message received. Topic:'%s' payload-len=%lu append=%d\n\tqos%d, dup=%d, retain=%d, index=%lu, total=%lu",
topic, len, (index>0), properties.qos, properties.dup, properties.retain, index, total);
char plmsg[2000] = {0};
bool chr = false;
for (int i=0; i<len && i<2000; i++) {
if (i % 30 == 0 && i != 0) {
if (chr) strcat(plmsg, "'");
strcat(plmsg, "\n -->");
if (chr) strcat(plmsg, "'");
}
if ((uint8_t)payload[i] >= 32 && (uint8_t)payload[i] < 127) {
if (!chr) strcat(plmsg, "'");
sprintf(plmsg+strlen(plmsg), "%c", payload[i]);
chr = true;
} else {
if (chr) strcat(plmsg, "'");
sprintf(plmsg+strlen(plmsg), "~%02X", (uint8_t)payload[i]);
chr = false;
}
}
log_d(" payload:%s%s", plmsg, chr ? "'" : " ");
// all payloads seams to be in string UTF-8 format or UTF-16?
Page_updateMQTT(topic, payload, len, index>0);
} // onMqttMessage()