Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][RFC] net: mqtt: Port MQTT library to BSD sockets #5854

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions include/net/async_socket.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* @file
* @brief Asynchronous sockets API definitions
*
* An API for adapting synchronous BSD socket APIs to applications
* requiring asynchronous callbacks.
*
* Created to ease adaptation of asynchronous IP protocols from
* net_app/net_context to sockets.
*/

/*
* Copyright (c) 2018, Texas Instruments Incorporated
*
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef __NET_ASYNC_SOCKET_H
#define __NET_ASYNC_SOCKET_H

#ifndef __ZEPHYR__
#include <stddef.h>
#include <netinet/in.h>
#include <sys/socket.h>
#else
#include <net/socket.h>
#endif

#ifdef __cplusplus
extern "C" {
#endif

/* Callbacks, similar in semantics to those of net_context.h */
typedef void (*async_connect_cb_t)(int sock,
int status,
void *cb_data);

typedef void (*async_send_cb_t)(int sock,
int bytes_sent,
void *cb_data);

typedef void (*async_recv_cb_t)(int sock,
void *data,
size_t bytes_received,
void *cb_data);

/*
* Errors are the same as the corresponding POSIX socket functions: i.e.,
* a return value of -1 implicitly sets errno.
*/

/* For now, same semantics as socket() call: */
static inline int async_socket(int family, int type, int proto)
{
return socket(family, type, proto);
}

int async_close(int sock);

int async_bind(int sock, const struct sockaddr *addr, socklen_t addrlen);

int async_connect(int sock, const struct sockaddr *addr, socklen_t addrlen,
async_connect_cb_t cb, void *cb_data);

ssize_t async_send(int sock, const void *buf, size_t len,
async_send_cb_t cb, void *cb_data, int flags);

/* buf must be unique per sock */
ssize_t async_recv(int sock, void *buf, size_t max_len,
async_recv_cb_t cb, void *cb_data);

#ifdef __cplusplus
}
#endif

/**
* @}
*/

#endif /* __NET_ASYNC_SOCKET_H */
81 changes: 47 additions & 34 deletions include/net/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
#define _MQTT_H_

#include <net/mqtt_types.h>
#if defined(CONFIG_MQTT_LIB_TLS)
#include <net/net_context.h>
#include <net/net_app.h>
#endif

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -63,20 +65,19 @@ enum mqtt_app {
* the state of the received and sent messages.</b>
*/
struct mqtt_ctx {
/** Net app context structure */
struct net_app_ctx net_app_ctx;
s32_t net_init_timeout;
s32_t net_timeout;
int sock;
int32_t net_init_timeout;
int32_t net_timeout;

/** Connectivity */
char *peer_addr_str;
u16_t peer_port;
uint16_t peer_port;

#if defined(CONFIG_MQTT_LIB_TLS)
/** TLS parameters */
u8_t *request_buf;
uint8_t *request_buf;
size_t request_buf_len;
u8_t *personalization_data;
uint8_t *personalization_data;
size_t personalization_data_len;
char *cert_host;

Expand All @@ -91,7 +92,7 @@ struct mqtt_ctx {

/** TLS handshake */
struct k_sem tls_hs_wait;
s32_t tls_hs_timeout;
int32_t tls_hs_timeout;
#endif

/** Callback executed when a MQTT CONNACK msg is received and validated.
Expand Down Expand Up @@ -120,7 +121,7 @@ struct mqtt_ctx {
* @param [in] pkt_id Packet Identifier for the input MQTT msg
* @param [in] type Packet type
*/
int (*publish_tx)(struct mqtt_ctx *ctx, u16_t pkt_id,
int (*publish_tx)(struct mqtt_ctx *ctx, uint16_t pkt_id,
enum mqtt_packet type);

/** Callback executed when a MQTT_APP_SUBSCRIBER,
Expand All @@ -140,7 +141,7 @@ struct mqtt_ctx {
* @param [in] type Packet type
*/
int (*publish_rx)(struct mqtt_ctx *ctx, struct mqtt_publish_msg *msg,
u16_t pkt_id, enum mqtt_packet type);
uint16_t pkt_id, enum mqtt_packet type);

/** Callback executed when a MQTT_APP_SUBSCRIBER or
* MQTT_APP_PUBLISHER_SUBSCRIBER receives the MQTT SUBACK message
Expand All @@ -154,8 +155,8 @@ struct mqtt_ctx {
* @param [in] items Number of elements in the qos array
* @param [in] qos Array of QoS values
*/
int (*subscribe)(struct mqtt_ctx *ctx, u16_t pkt_id,
u8_t items, enum mqtt_qos qos[]);
int (*subscribe)(struct mqtt_ctx *ctx, uint16_t pkt_id,
uint8_t items, enum mqtt_qos qos[]);

/** Callback executed when a MQTT_APP_SUBSCRIBER or
* MQTT_APP_PUBLISHER_SUBSCRIBER receives the MQTT UNSUBACK message
Expand All @@ -167,7 +168,7 @@ struct mqtt_ctx {
* @param [in] ctx MQTT context
* @param [in] pkt_id Packet Identifier for the MQTT SUBACK msg
*/
int (*unsubscribe)(struct mqtt_ctx *ctx, u16_t pkt_id);
int (*unsubscribe)(struct mqtt_ctx *ctx, uint16_t pkt_id);

/** Callback executed when an incoming message doesn't pass the
* validation stage. This callback may be NULL.
Expand All @@ -177,30 +178,33 @@ struct mqtt_ctx {
* @param [in] ctx MQTT context
* @param [in] pkt_type MQTT Packet type
*/
void (*malformed)(struct mqtt_ctx *ctx, u16_t pkt_type);
void (*malformed)(struct mqtt_ctx *ctx, uint16_t pkt_type);

/* Internal use only */
int (*rcv)(struct mqtt_ctx *ctx, struct net_pkt *);
int (*rcv)(struct mqtt_ctx *ctx, void *buf, size_t len);

/* Receive buffer for async receive callbacks */
void *rcv_buf;

/** Application type, see: enum mqtt_app */
u8_t app_type;
uint8_t app_type;

/* Clean session is also part of the MQTT CONNECT msg, however app
* behavior is influenced by this parameter, so we keep a copy here
*/
/** MQTT Clean Session parameter */
u8_t clean_session:1;
uint8_t clean_session:1;

/** 1 if the MQTT application is connected and 0 otherwise */
u8_t connected:1;
uint8_t connected:1;
};

/**
* Initializes the MQTT context structure
*
* @param ctx MQTT context structure
* @param app_type See enum mqtt_app
* @retval 0 always
* @retval 0 on success, and <0 if error
*/
int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type);

Expand Down Expand Up @@ -257,7 +261,7 @@ int mqtt_tx_disconnect(struct mqtt_ctx *ctx);
* @retval -ENOMEM
* @retval -EIO
*/
int mqtt_tx_puback(struct mqtt_ctx *ctx, u16_t id);
int mqtt_tx_puback(struct mqtt_ctx *ctx, uint16_t id);

/**
* Sends the MQTT PUBCOMP message with the given packet id
Expand All @@ -270,7 +274,7 @@ int mqtt_tx_puback(struct mqtt_ctx *ctx, u16_t id);
* @retval -ENOMEM
* @retval -EIO
*/
int mqtt_tx_pubcomp(struct mqtt_ctx *ctx, u16_t id);
int mqtt_tx_pubcomp(struct mqtt_ctx *ctx, uint16_t id);

/**
* Sends the MQTT PUBREC message with the given packet id
Expand All @@ -282,7 +286,7 @@ int mqtt_tx_pubcomp(struct mqtt_ctx *ctx, u16_t id);
* @retval -ENOMEM
* @retval -EIO
*/
int mqtt_tx_pubrec(struct mqtt_ctx *ctx, u16_t id);
int mqtt_tx_pubrec(struct mqtt_ctx *ctx, uint16_t id);

/**
* Sends the MQTT PUBREL message with the given packet id
Expand All @@ -295,7 +299,7 @@ int mqtt_tx_pubrec(struct mqtt_ctx *ctx, u16_t id);
* @retval -ENOMEM
* @retval -EIO
*/
int mqtt_tx_pubrel(struct mqtt_ctx *ctx, u16_t id);
int mqtt_tx_pubrel(struct mqtt_ctx *ctx, uint16_t id);

/**
* Sends the MQTT PUBLISH message
Expand Down Expand Up @@ -341,7 +345,7 @@ int mqtt_tx_pingreq(struct mqtt_ctx *ctx);
* @retval -ENOMEM
* @retval -EIO
*/
int mqtt_tx_subscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items,
int mqtt_tx_subscribe(struct mqtt_ctx *ctx, uint16_t pkt_id, uint8_t items,
const char *topics[], const enum mqtt_qos qos[]);

/**
Expand All @@ -357,7 +361,7 @@ int mqtt_tx_subscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items,
* @retval -ENOMEM
* @retval -EIO
*/
int mqtt_tx_unsubscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items,
int mqtt_tx_unsubscribe(struct mqtt_ctx *ctx, uint16_t pkt_id, uint8_t items,
const char *topics[]);


Expand All @@ -366,102 +370,111 @@ int mqtt_tx_unsubscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items,
*
* @param [in] ctx MQTT context structure
* @param [in] rx Data buffer
* @param [in] len Length of data
* @param [in] clean_session MQTT clean session parameter
*
* @retval 0 on success
* @retval -EINVAL
*/
int mqtt_rx_connack(struct mqtt_ctx *ctx, struct net_buf *rx,
int mqtt_rx_connack(struct mqtt_ctx *ctx, void *rx, size_t len,
int clean_session);

/**
* Parses and validates the MQTT PUBACK message
*
* @param [in] ctx MQTT context structure
* @param [in] rx Data buffer
* @param [in] len Length of data
*
* @retval 0 on success
* @retval -EINVAL
*/
int mqtt_rx_puback(struct mqtt_ctx *ctx, struct net_buf *rx);
int mqtt_rx_puback(struct mqtt_ctx *ctx, void *rx, size_t len);

/**
* Parses and validates the MQTT PUBCOMP message
*
* @param [in] ctx MQTT context structure
* @param [in] rx Data buffer
* @param [in] len Length of data
*
* @retval 0 on success
* @retval -EINVAL
*/
int mqtt_rx_pubcomp(struct mqtt_ctx *ctx, struct net_buf *rx);
int mqtt_rx_pubcomp(struct mqtt_ctx *ctx, void *rx, size_t len);

/**
* Parses and validates the MQTT PUBREC message
*
* @param [in] ctx MQTT context structure
* @param [in] rx Data buffer
* @param [in] len Length of data
*
* @retval 0 on success
* @retval -EINVAL
*/
int mqtt_rx_pubrec(struct mqtt_ctx *ctx, struct net_buf *rx);
int mqtt_rx_pubrec(struct mqtt_ctx *ctx, void *rx, size_t len);

/**
* Parses and validates the MQTT PUBREL message
*
* @param [in] ctx MQTT context structure
* @param [in] rx Data buffer
* @param [in] len Length of data
*
* @retval 0 on success
* @retval -EINVAL
*/
int mqtt_rx_pubrel(struct mqtt_ctx *ctx, struct net_buf *rx);
int mqtt_rx_pubrel(struct mqtt_ctx *ctx, void *rx, size_t len);

/**
* Parses the MQTT PINGRESP message
*
* @param [in] ctx MQTT context structure
* @param [in] rx Data buffer
* @param [in] len Length of data
*
* @retval 0 on success
* @retval -EINVAL
*/
int mqtt_rx_pingresp(struct mqtt_ctx *ctx, struct net_buf *rx);
int mqtt_rx_pingresp(struct mqtt_ctx *ctx, void *rx, size_t len);

/**
* Parses the MQTT SUBACK message
*
* @param [in] ctx MQTT context structure
* @param [in] rx Data buffer
* @param [in] len Length of data
*
* @retval 0 on success
* @retval -EINVAL
*/
int mqtt_rx_suback(struct mqtt_ctx *ctx, struct net_buf *rx);
int mqtt_rx_suback(struct mqtt_ctx *ctx, void *rx, size_t len);

/**
* Parses the MQTT UNSUBACK message
*
* @param [in] ctx MQTT context structure
* @param [in] rx Data buffer
* @param [in] len Length of data
*
* @retval 0 on success
* @retval -EINVAL
*/
int mqtt_rx_unsuback(struct mqtt_ctx *ctx, struct net_buf *rx);
int mqtt_rx_unsuback(struct mqtt_ctx *ctx, void *rx, size_t len);

/**
* Parses the MQTT PUBLISH message
*
* @param [in] ctx MQTT context structure
* @param [in] rx Data buffer
* @param [in] len Length of data
*
* @retval 0 on success
* @retval -EINVAL
* @retval -ENOMEM
*/
int mqtt_rx_publish(struct mqtt_ctx *ctx, struct net_buf *rx);
int mqtt_rx_publish(struct mqtt_ctx *ctx, void *rx, size_t len);

/**
* @}
Expand Down
Loading