From 709317ded82f2d0807036b1aef9d03d737632f63 Mon Sep 17 00:00:00 2001 From: Jianhui Zhao Date: Mon, 29 Jan 2018 16:18:27 +0800 Subject: [PATCH] Debug Signed-off-by: Jianhui Zhao --- example/helloworld.c | 2 +- src/umqtt.c | 501 +++++++++++++++++++++---------------------- src/umqtt.h | 11 +- 3 files changed, 250 insertions(+), 264 deletions(-) diff --git a/example/helloworld.c b/example/helloworld.c index 6e6d814..f9042e0 100755 --- a/example/helloworld.c +++ b/example/helloworld.c @@ -59,7 +59,7 @@ static void on_suback(struct umqtt_client *cl, uint16_t mid, uint8_t *granted_qo ULOG_INFO("on_suback mid(%u), qos(", mid); for (i = 0; i < qos_count; i++) ULOG_INFO("%d ", granted_qos[i]); - ULOG_INFO(")\n"); + ULOG_INFO("\b)\n"); } static void on_publish(struct umqtt_client *cl, struct umqtt_message *msg) diff --git a/src/umqtt.c b/src/umqtt.c index 390b5b4..b71595e 100755 --- a/src/umqtt.c +++ b/src/umqtt.c @@ -28,10 +28,11 @@ #include "log.h" #include "helpers.h" -static void umqtt_message_free(struct umqtt_message *msg) +static void umqtt_message_free(struct umqtt_message *msg, bool out) { - free(msg->topic); - if (msg->direction == UMQTT_MESSAGE_DIR_OUT && msg->payload) + if (msg->topic) + free(msg->topic); + if (out && msg->payload) free(msg->payload); free(msg); } @@ -49,8 +50,10 @@ static void umqtt_free(struct umqtt_client *cl) if (cl->ssl_ops && cl->ssl_ctx) cl->ssl_ops->context_free(cl->ssl_ctx); #endif - avl_remove_all_elements(&cl->msgs, msg, avl, tmp) - umqtt_message_free(msg); + avl_remove_all_elements(&cl->in_queue, msg, avl, tmp) + umqtt_message_free(msg, false); + avl_remove_all_elements(&cl->out_queue, msg, avl, tmp) + umqtt_message_free(msg, true); free(cl); } @@ -61,44 +64,189 @@ static inline void umqtt_error(struct umqtt_client *cl, int error) ustream_state_change(cl->us); } -static inline void send_puback(struct umqtt_client *cl, uint16_t mid) +static void send_pubxx(struct umqtt_client *cl, uint8_t header, uint16_t mid) { - uint8_t buf[4] = {0x40, 0x02}; + uint8_t buf[4] = {header, 0x02}; *((uint16_t *)&buf[2]) = htons(mid); ustream_write(cl->us, (const char *)buf, 4, false); } -static inline void send_pubrec(struct umqtt_client *cl, uint16_t mid) +static inline void send_puback(struct umqtt_client *cl, uint16_t mid) { - uint8_t buf[4] = {0x50, 0x02}; + send_pubxx(cl, 0x40, mid); +} - *((uint16_t *)&buf[2]) = htons(mid); - ustream_write(cl->us, (const char *)buf, 4, false); +static inline void send_pubrec(struct umqtt_client *cl, uint16_t mid) +{ + send_pubxx(cl, 0x50, mid); } static inline void send_pubrel(struct umqtt_client *cl, uint16_t mid) { - uint8_t buf[4] = {0x62, 0x02}; - - *((uint16_t *)&buf[2]) = htons(mid); - ustream_write(cl->us, (const char *)buf, 4, false); + send_pubxx(cl, 0x62, mid); } static inline void send_pubcomp(struct umqtt_client *cl, uint16_t mid) { - uint8_t buf[4] = {0x70, 0x02}; + send_pubxx(cl, 0x70, mid); +} + +static void handle_conack(struct umqtt_client *cl, uint8_t *data) +{ + bool sp = data[0] & 0x01; /* Session Present */ + int return_code = data[1]; - *((uint16_t *)&buf[2]) = htons(mid); - ustream_write(cl->us, (const char *)buf, 4, false); + if (return_code == UMQTT_CONNECTION_ACCEPTED) { + uloop_timeout_set(&cl->ping_timer, UMQTT_PING_INTERVAL * 1000); + uloop_timeout_set(&cl->retry_timer, 1000); + } + + if (cl->on_conack) + cl->on_conack(cl, sp, return_code); +} + +static void handle_pubackcomp(struct umqtt_client *cl, uint8_t *data) +{ + uint16_t mid = (data[0] << 8) | data[1]; + struct umqtt_message *msg; + + msg = avl_find_element(&cl->out_queue, &mid, msg, avl); + if (msg) { + avl_delete(&cl->out_queue, &msg->avl); + umqtt_message_free(msg, true); + } +} + +static void handle_pubrec(struct umqtt_client *cl, uint8_t *data) +{ + uint16_t mid = (data[0] << 8) | data[1]; + struct umqtt_message *msg; + + send_pubrel(cl, mid); + + msg = avl_find_element(&cl->out_queue, &mid, msg, avl); + if (msg) { + + free(msg->topic); + msg->topic = NULL; + free(msg->payload); + msg->payload = NULL; + msg->timestamp = time(NULL); + msg->state = umqtt_ms_wait_for_pubcomp; + } +} + +static void handle_pubrel(struct umqtt_client *cl, uint8_t *data) +{ + uint16_t mid = (data[0] << 8) | data[1]; + struct umqtt_message *msg; + + send_pubcomp(cl, mid); + + msg = avl_find_element(&cl->in_queue, &mid, msg, avl); + if (msg) { + if (cl->on_publish) + cl->on_publish(cl, msg); + + avl_delete(&cl->in_queue, &msg->avl); + umqtt_message_free(msg, false); + } +} + +static void handle_suback(struct umqtt_client *cl, uint8_t *data) +{ + struct umqtt_packet *pkt = &cl->pkt; + if (cl->on_suback) { + uint16_t mid = (data[0] << 8) | data[1]; + cl->on_suback(cl, mid, data + 2, pkt->remlen - 2); + } +} + +static void handle_publish(struct umqtt_client *cl, uint8_t *data) +{ + struct umqtt_packet *pkt = &cl->pkt; + int len = (data[0] << 8) + data[1]; + + data += 2; + pkt->msg->topic = strndup((const char *)data, len); + data += len; + if (pkt->msg->qos > 0) { + struct umqtt_message *msg; + + pkt->msg->mid = (data[0] << 8) + data[1]; + len += 2; + data += 2; + + msg = avl_find_element(&cl->in_queue, &pkt->msg->mid, msg, avl); + if (msg) { + umqtt_log_err("Duplicate PUBLISH received:(q%d, m%d, '%s')\n", pkt->msg->qos, pkt->msg->mid, pkt->msg->topic); + free(pkt->msg); + return; + } + } + pkt->msg->payloadlen = pkt->remlen - len - 2; + pkt->msg->payload = data; + + if (pkt->msg->qos == 2) { + pkt->msg->avl.key = &pkt->msg->mid; + pkt->msg->state = umqtt_ms_wait_for_pubrel; + avl_insert(&cl->in_queue, &pkt->msg->avl); + send_pubrec(cl, pkt->msg->mid); + } else { + if (pkt->msg->qos == 1) + send_puback(cl, pkt->msg->mid); + + if (cl->on_publish) + cl->on_publish(cl, pkt->msg); + umqtt_message_free(pkt->msg, false); + } } -static void parse_fixed_header(struct umqtt_client *cl, uint8_t *data, int len) +static bool handle_packet(struct umqtt_client *cl, uint8_t *data, int len) +{ + struct umqtt_packet *pkt = &cl->pkt; + + if (len < pkt->remlen) + return false; + + switch (pkt->type) { + case UMQTT_CONNACK_PACKET: + handle_conack(cl, data); + break; + case UMQTT_PUBACK_PACKET: + case UMQTT_PUBCOMP_PACKET: + handle_pubackcomp(cl, data); + break; + case UMQTT_PUBREC_PACKET: + handle_pubrec(cl, data); + break; + case UMQTT_PUBREL_PACKET: + handle_pubrel(cl, data); + break; + case UMQTT_SUBACK_PACKET: + handle_suback(cl, data); + break; + case UMQTT_UNSUBACK_PACKET: + break; + case UMQTT_PUBLISH_PACKET: + handle_publish(cl, data); + break; + default: + break; + } + + cl->ps = PARSE_STATE_FH; + ustream_consume(cl->us, len); + return true; +} + +static bool parse_fixed_header(struct umqtt_client *cl, uint8_t *data, int len) { struct umqtt_packet *pkt = &cl->pkt; bool more_remlen; if (len < 2) - return; + return false; memset(pkt, 0, sizeof(*pkt)); @@ -109,7 +257,7 @@ static void parse_fixed_header(struct umqtt_client *cl, uint8_t *data, int len) if (more_remlen) cl->ps = PARSE_STATE_REMLEN; else if (pkt->remlen > 0) - cl->ps = PARSE_STATE_VH; + cl->ps = PARSE_STATE_HANDLE; else cl->ps = PARSE_STATE_FH; @@ -137,9 +285,10 @@ static void parse_fixed_header(struct umqtt_client *cl, uint8_t *data, int len) } ustream_consume(cl->us, 2); + return true; } -static void parse_remaining_ength(struct umqtt_client *cl, uint8_t *data, int len) +static bool parse_remaining_ength(struct umqtt_client *cl, uint8_t *data, int len) { int parsed = 0; struct umqtt_packet *pkt = &cl->pkt; @@ -147,7 +296,7 @@ static void parse_remaining_ength(struct umqtt_client *cl, uint8_t *data, int le while (parsed++ < len) { pkt->remlen = (pkt->remlen << 6) + (*data & 0X7F); if ((*data & 0x80) == 0) { - cl->ps = PARSE_STATE_VH; + cl->ps = PARSE_STATE_HANDLE; break; } @@ -158,206 +307,32 @@ static void parse_remaining_ength(struct umqtt_client *cl, uint8_t *data, int le data++; } ustream_consume(cl->us, parsed); + return true; } -static void parse_variable_header(struct umqtt_client *cl, uint8_t *data, int len) -{ - int parsed = 0; - struct umqtt_packet *pkt = &cl->pkt; - - switch (pkt->type) { - case UMQTT_CONNACK_PACKET: { - int return_code; - bool sp; - - if (len < 2) - return; - parsed = 2; - cl->ps = PARSE_STATE_FH; - sp = data[0] & 0x01; /* Session Present */ - return_code = data[1]; - - if (return_code == UMQTT_CONNECTION_ACCEPTED) { - uloop_timeout_set(&cl->ping_timer, UMQTT_PING_INTERVAL * 1000); - uloop_timeout_set(&cl->retry_timer, 1000); - } - - if (cl->on_conack) - cl->on_conack(cl, sp, return_code); - break; - } - case UMQTT_PUBACK_PACKET: - case UMQTT_PUBCOMP_PACKET: { - uint16_t mid; - struct umqtt_message *msg; - - if (len < 2) - return; - - parsed = 2; - cl->ps = PARSE_STATE_FH; - mid = (data[0] << 8) | data[1]; - - msg = avl_find_element(&cl->msgs, &mid, msg, avl); - if (msg) { - avl_delete(&cl->msgs, &msg->avl); - umqtt_message_free(msg); - } - break; - } - case UMQTT_PUBREC_PACKET: { - uint16_t mid; - struct umqtt_message *msg; - - if (len < 2) - return; - - parsed = 2; - cl->ps = PARSE_STATE_FH; - mid = (data[0] << 8) | data[1]; - msg = avl_find_element(&cl->msgs, &mid, msg, avl); - if (msg) { - send_pubrel(cl, msg->mid); - msg->timestamp = time(NULL); - msg->state = umqtt_ms_wait_for_pubcomp; - } - break; - } - case UMQTT_PUBREL_PACKET: { - uint16_t mid; - struct umqtt_message *msg; - - if (len < 2) - return; - - parsed = 2; - cl->ps = PARSE_STATE_FH; - mid = (data[0] << 8) | data[1]; - - msg = avl_find_element(&cl->msgs, &mid, msg, avl); - if (msg) { - if (cl->on_publish) - cl->on_publish(cl, msg); - send_pubcomp(cl, mid); - avl_delete(&cl->msgs, &msg->avl); - umqtt_message_free(msg); - } - - break; - } - case UMQTT_SUBACK_PACKET: - case UMQTT_UNSUBACK_PACKET: - if (len < 2) - return; - pkt->mid = (data[0] << 8) | data[1]; - parsed = 2; - if (pkt->type == UMQTT_SUBACK_PACKET) { - cl->ps = PARSE_STATE_PAYLOAD; - } else { - cl->ps = PARSE_STATE_FH; - /* TODO */ - } - break; - case UMQTT_PUBLISH_PACKET: - if (len < 2) - return; - parsed = 2 + (data[0] << 8) + data[1]; - if (pkt->msg->qos > 0) - parsed += 2; - if (len < parsed) - return; - - len = (data[0] << 8) + data[1]; - data += 2; - pkt->msg->topic = strndup((const char *)data, len); - data += len; - if (pkt->msg->qos > 0) - pkt->msg->mid = (data[0] << 8) + data[1]; - pkt->msg->payloadlen = pkt->remlen - parsed; - cl->ps = PARSE_STATE_PAYLOAD; - break; - default: - umqtt_log_err("Invalid packet:%d\n", pkt->type); - umqtt_error(cl, UMQTT_INVALID_PACKET); - break; - } - ustream_consume(cl->us, parsed); -} - -static void parse_payload(struct umqtt_client *cl, uint8_t *data, int len) -{ - int parsed = 0; - struct umqtt_packet *pkt = &cl->pkt; - - switch (pkt->type) { - case UMQTT_SUBACK_PACKET: { - if (len < pkt->remlen - 2) - return; - - cl->ps = PARSE_STATE_FH; - parsed = pkt->remlen - 2; - - if (cl->on_suback) - cl->on_suback(cl, pkt->mid, data, pkt->remlen - 2); - break; - } - case UMQTT_PUBLISH_PACKET: - if (len < pkt->msg->payloadlen) - return; - - parsed = pkt->msg->payloadlen; - pkt->msg->payload = data; - cl->ps = PARSE_STATE_FH; - - if (pkt->msg->qos == 2) { - pkt->msg->avl.key = &pkt->msg->mid; - avl_insert(&cl->msgs, &pkt->msg->avl); - send_pubrec(cl, pkt->msg->mid); - } else { - if (pkt->msg->qos == 1) - send_puback(cl, pkt->msg->mid); - - if (cl->on_publish) - cl->on_publish(cl, pkt->msg); - umqtt_message_free(pkt->msg); - } - break; - default: - umqtt_log_err("Invalid packet:%d\n", pkt->type); - exit(1); - break; - } - ustream_consume(cl->us, parsed); -} +typedef bool (*parse_cb_t)(struct umqtt_client *cl, uint8_t *data, int len); +static parse_cb_t parse_cbs[] = { + [PARSE_STATE_FH] = parse_fixed_header, + [PARSE_STATE_REMLEN] = parse_remaining_ength, + [PARSE_STATE_HANDLE] = handle_packet +}; static inline void __umqtt_notify_read(struct umqtt_client *cl, struct ustream *s) { - uint8_t *data; + void *data; int len; - while (!cl->error) { - data = (uint8_t *)ustream_get_read_buf(s, &len); + do { + data = ustream_get_read_buf(s, &len); if (!data || !len) return; - switch (cl->ps) { - case PARSE_STATE_FH: - parse_fixed_header(cl, data, len); - break; - case PARSE_STATE_REMLEN: - parse_remaining_ength(cl, data, len); - break; - case PARSE_STATE_VH: - parse_variable_header(cl, data, len); - break; - case PARSE_STATE_PAYLOAD: - parse_payload(cl, data, len); - break; - default: - umqtt_log_err("Never come here\n"); + if (cl->ps >= ARRAY_SIZE(parse_cbs) || !parse_cbs[cl->ps]) + return; + + if (!parse_cbs[cl->ps](cl, data, len)) break; - } - } + } while(1); if (cl->error) umqtt_error(cl, cl->error); @@ -538,6 +513,19 @@ static int umqtt_connect(struct umqtt_client *cl, struct umqtt_options *opts, st return 0; } +static uint16_t get_unused_mid(struct umqtt_client *cl) +{ + uint16_t mid = 1; + struct umqtt_message *msg; + + avl_for_each_element(&cl->out_queue, msg, avl) { + if (msg->mid == mid) + mid++; + } + + return mid; +} + int umqtt_subscribe(struct umqtt_client *cl, struct umqtt_topic *topics, int num) { uint8_t *buf, *p; @@ -561,8 +549,7 @@ int umqtt_subscribe(struct umqtt_client *cl, struct umqtt_topic *topics, int num *p++ = (UMQTT_SUBSCRIBE_PACKET << 4) | 0x02; umqtt_encode_remlen(remlen, &p); - cl->last_mid++; - UMQTT_PUT_U16(p, cl->last_mid); + UMQTT_PUT_U16(p, get_unused_mid(cl)); for (i = 0; i < num; i++) { UMQTT_PUT_STRING(p, topics[i].len, topics[i].topic); @@ -597,8 +584,7 @@ int umqtt_unsubscribe(struct umqtt_client *cl, struct umqtt_topic *topics, int n *p++ = (UMQTT_UNSUBSCRIBE_PACKET << 4) | 0x02; umqtt_encode_remlen(remlen, &p); - cl->last_mid++; - UMQTT_PUT_U16(p, cl->last_mid); + UMQTT_PUT_U16(p, get_unused_mid(cl)); for (i = 0; i < num; i++) { UMQTT_PUT_STRING(p, topics[i].len, topics[i].topic); @@ -644,39 +630,6 @@ static int __umqtt_publish(struct umqtt_client *cl, uint16_t mid, const char *to return 0; } -static void umqtt_retry_cb(struct uloop_timeout *timeout) -{ - struct umqtt_client *cl = container_of(timeout, struct umqtt_client, retry_timer); - time_t now = time(NULL); - struct umqtt_message *msg; - - avl_for_each_element(&cl->msgs, msg, avl) { - if ((msg->direction == UMQTT_MESSAGE_DIR_OUT) && (now - msg->timestamp > 2 )) { - switch (msg->state) { - case umqtt_ms_wait_for_puback: - case umqtt_ms_wait_for_pubrec: - msg->timestamp = now; - msg->dup = true; - __umqtt_publish(cl, msg->mid, msg->topic, msg->payloadlen, msg->payload, msg->qos, msg->retain, true); - break; - case umqtt_ms_wait_for_pubrel: - msg->timestamp = now; - msg->dup = true; - send_pubrec(cl, msg->mid); - break; - case umqtt_ms_wait_for_pubcomp: - msg->timestamp = now; - msg->dup = true; - send_pubrel(cl, msg->mid); - break; - default: - break; - } - } - } - uloop_timeout_set(&cl->retry_timer, 1000); -} - int umqtt_publish(struct umqtt_client *cl, const char *topic, uint32_t payloadlen, const void *payload, uint8_t qos, bool retain) { @@ -684,7 +637,7 @@ int umqtt_publish(struct umqtt_client *cl, const char *topic, uint32_t payloadle struct umqtt_message *msg; if (qos > 0) - mid = ++(cl->last_mid); + mid = get_unused_mid(cl); if (__umqtt_publish(cl, mid, topic, payloadlen, payload, qos, retain, false) < 0) return -1; @@ -699,13 +652,12 @@ int umqtt_publish(struct umqtt_client *cl, const char *topic, uint32_t payloadle msg->payload = malloc(payloadlen); if (!msg->payload) { umqtt_log_serr("malloc"); - umqtt_message_free(msg); + umqtt_message_free(msg, true); return -1; } msg->payloadlen = payloadlen; memcpy(msg->payload, payload, payloadlen); - msg->direction = UMQTT_MESSAGE_DIR_OUT; msg->timestamp = time(NULL); msg->state = (qos == 1) ? umqtt_ms_wait_for_puback : umqtt_ms_wait_for_pubrec; msg->retain = retain; @@ -713,7 +665,7 @@ int umqtt_publish(struct umqtt_client *cl, const char *topic, uint32_t payloadle msg->mid = mid; msg->topic = strdup(topic); msg->avl.key = &msg->mid; - avl_insert(&cl->msgs, &msg->avl); + avl_insert(&cl->out_queue, &msg->avl); } return 0; @@ -746,7 +698,45 @@ static void umqtt_ping_cb(struct uloop_timeout *timeout) uloop_timeout_set(&cl->ping_timer, 1 * 1000); } -static int msg_cmp(const void *k1, const void *k2, void *ptr) + +static void umqtt_retry(struct umqtt_client *cl, struct avl_tree *queue) +{ + time_t now = time(NULL); + struct umqtt_message *msg; + + avl_for_each_element(queue, msg, avl) { + if (now - msg->timestamp > 2) { + switch (msg->state) { + case umqtt_ms_wait_for_puback: + case umqtt_ms_wait_for_pubrec: + msg->timestamp = now; + __umqtt_publish(cl, msg->mid, msg->topic, msg->payloadlen, msg->payload, msg->qos, msg->retain, true); + break; + case umqtt_ms_wait_for_pubrel: + msg->timestamp = now; + send_pubrec(cl, msg->mid); + break; + case umqtt_ms_wait_for_pubcomp: + msg->timestamp = now; + send_pubrel(cl, msg->mid); + break; + default: + break; + } + } + } +} + +static void umqtt_retry_cb(struct uloop_timeout *timeout) +{ + struct umqtt_client *cl = container_of(timeout, struct umqtt_client, retry_timer); + + umqtt_retry(cl, &cl->in_queue); + umqtt_retry(cl, &cl->out_queue); + uloop_timeout_set(&cl->retry_timer, 1000); +} + +static int avl_pkt_cmp(const void *k1, const void *k2, void *ptr) { return *(uint16_t *)k1 - *(uint16_t *)k2; } @@ -781,7 +771,8 @@ struct umqtt_client *umqtt_new_ssl(const char *host, int port, bool ssl, const c ustream_fd_init(&cl->sfd, sock); - avl_init(&cl->msgs, msg_cmp, false, NULL); + avl_init(&cl->in_queue, avl_pkt_cmp, false, NULL); + avl_init(&cl->out_queue, avl_pkt_cmp, false, NULL); if (ssl) { #if (UMQTT_SSL_SUPPORT) diff --git a/src/umqtt.h b/src/umqtt.h index 6cbfdd5..d42ba45 100755 --- a/src/umqtt.h +++ b/src/umqtt.h @@ -33,9 +33,6 @@ #define UMQTT_MAX_REMLEN 268435455 -#define UMQTT_MESSAGE_DIR_IN 0x00 -#define UMQTT_MESSAGE_DIR_OUT 0x01 - enum umqtt_packet_type { UMQTT_NO_PACKET, UMQTT_CONNECT_PACKET, @@ -76,8 +73,7 @@ enum umqtt_error_code { enum parse_state { PARSE_STATE_FH, /* Fixed header */ PARSE_STATE_REMLEN, /* Remaining Length */ - PARSE_STATE_VH, /* Variable header */ - PARSE_STATE_PAYLOAD /* Payload */ + PARSE_STATE_HANDLE /* handle packet */ }; struct umqtt_topic { @@ -102,7 +98,6 @@ enum umqtt_msg_state { }; struct umqtt_message { - uint8_t direction; time_t timestamp; enum umqtt_msg_state state; bool dup; @@ -144,10 +139,10 @@ struct umqtt_client { struct uloop_timeout ping_timer; struct uloop_timeout retry_timer; enum umqtt_error_code error; - uint16_t last_mid; enum parse_state ps; bool wait_pingresp; - struct avl_tree msgs; + struct avl_tree in_queue; + struct avl_tree out_queue; #if (UMQTT_SSL_SUPPORT) bool ssl_require_validation;