Skip to content
This repository has been archived by the owner on Jan 20, 2025. It is now read-only.

Collected other pull request and Fixbug memory leaks and weakness argument pointer for tcp/ip callback function #173

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Kconfig.projbuild
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,13 @@ config ASYNC_TCP_USE_WDT
default "y"
help
Enable WDT for the AsyncTCP task, so it will trigger if a handler is locking the thread.

config ASYNC_TCP_STACK
int "Stack size for the AsyncTCP task"
default 16384

config ASYNC_TCP_QUEUE_SIZE
int "Events queue size"
default 32

endmenu
82 changes: 60 additions & 22 deletions src/AsyncTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ extern "C"{
}
#include "esp_task_wdt.h"

#define CONFIG_ASYNC_TCP_STACK 2*8192
#define CONFIG_ASYNC_TCP_PRIORITY 10
#define CONFIG_ASYNC_TCP_QUEUE_SIZE 64

#define ASYNC_TCP_PRINTFLN(f_, ...) Serial.printf_P(PSTR(f_ "\r\n"), ##__VA_ARGS__)
#define ASYNC_TCP_TAG_CONSOLE(...) ASYNC_TCP_PRINTFLN("[AsyncTCP]" __VA_ARGS__)
/*
* TCP/IP Event Task
* */
Expand All @@ -44,7 +50,7 @@ typedef struct {
void *arg;
union {
struct {
void * pcb;
tcp_pcb * pcb;
int8_t err;
} connected;
struct {
Expand Down Expand Up @@ -92,10 +98,9 @@ static uint32_t _closed_index = []() {
return 1;
}();


static inline bool _init_async_event_queue(){
if(!_async_queue){
_async_queue = xQueueCreate(32, sizeof(lwip_event_packet_t *));
_async_queue = xQueueCreate(CONFIG_ASYNC_TCP_QUEUE_SIZE, sizeof(lwip_event_packet_t *));
if(!_async_queue){
return false;
}
Expand Down Expand Up @@ -218,7 +223,7 @@ static bool _start_async_task(){
return false;
}
if(!_async_service_task_handle){
xTaskCreateUniversal(_async_service_task, "async_tcp", 8192 * 2, NULL, 3, &_async_service_task_handle, CONFIG_ASYNC_TCP_RUNNING_CORE);
xTaskCreateUniversal(_async_service_task, "async_tcp", CONFIG_ASYNC_TCP_STACK, NULL, CONFIG_ASYNC_TCP_PRIORITY, &_async_service_task_handle, CONFIG_ASYNC_TCP_RUNNING_CORE);
if(!_async_service_task_handle){
return false;
}
Expand Down Expand Up @@ -417,7 +422,7 @@ static esp_err_t _tcp_write(tcp_pcb * pcb, int8_t closed_slot, const char* data,
static err_t _tcp_recved_api(struct tcpip_api_call_data *api_call_msg){
tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg;
msg->err = ERR_CONN;
if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) {
if(msg->closed_slot != -1 || !_closed_slots[msg->closed_slot]) {
Copy link
Contributor

@mathieucarbou mathieucarbou Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this condition does not make any sense anymore: is it tested ? If msg->closed_slot == -1, second expression is evaluated with an index being -1. If not -1, second expression is not evaluated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TienHuyIoT : ^^ ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

closed_slot is int8_t so -1 means 255, but _closed_slots is an array of 16.

msg->err = 0;
tcp_recved(msg->pcb, msg->received);
}
Expand Down Expand Up @@ -534,8 +539,6 @@ static tcp_pcb * _tcp_listen_with_backlog(tcp_pcb * pcb, uint8_t backlog) {
return msg.pcb;
}



/*
Async TCP Client
*/
Expand Down Expand Up @@ -568,13 +571,15 @@ AsyncClient::AsyncClient(tcp_pcb* pcb)
_pcb = pcb;
_closed_slot = -1;
if(_pcb){
_allocate_closed_slot();
_rx_last_packet = millis();
tcp_arg(_pcb, this);
tcp_recv(_pcb, &_tcp_recv);
tcp_sent(_pcb, &_tcp_sent);
tcp_err(_pcb, &_tcp_error);
tcp_poll(_pcb, &_tcp_poll, 1);
if(!_allocate_closed_slot()) {
_close();
}
}
}

Expand Down Expand Up @@ -684,6 +689,11 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port){
return false;
}

if(!_allocate_closed_slot()) {
log_e("failed to closed slot full");
return false;
}

ip_addr_t addr;
addr.type = IPADDR_TYPE_V4;
addr.u_addr.ip4.addr = ip;
Expand All @@ -699,7 +709,6 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port){
tcp_recv(pcb, &_tcp_recv);
tcp_sent(pcb, &_tcp_sent);
tcp_poll(pcb, &_tcp_poll, 1);
//_tcp_connect(pcb, &addr, port,(tcp_connected_fn)&_s_connected);
_tcp_connect(pcb, _closed_slot, &addr, port,(tcp_connected_fn)&_tcp_connected);
return true;
}
Expand Down Expand Up @@ -796,7 +805,7 @@ void AsyncClient::ackPacket(struct pbuf * pb){
* */

int8_t AsyncClient::_close(){
//ets_printf("X: 0x%08x\n", (uint32_t)this);
// ets_printf("X: 0x%08x\n", (uint32_t)this);
int8_t err = ERR_OK;
if(_pcb) {
//log_i("");
Expand All @@ -810,6 +819,7 @@ int8_t AsyncClient::_close(){
if(err != ERR_OK) {
err = abort();
}
_free_closed_slot();
_pcb = NULL;
if(_discard_cb) {
_discard_cb(_discard_cb_arg, this);
Expand All @@ -818,7 +828,10 @@ int8_t AsyncClient::_close(){
return err;
}

void AsyncClient::_allocate_closed_slot(){
boolean AsyncClient::_allocate_closed_slot(){
if (_closed_slot != -1) {
return true;
}
xSemaphoreTake(_slots_lock, portMAX_DELAY);
uint32_t closed_slot_min_index = 0;
for (int i = 0; i < _number_of_closed_slots; ++ i) {
Expand All @@ -831,28 +844,28 @@ void AsyncClient::_allocate_closed_slot(){
_closed_slots[_closed_slot] = 0;
}
xSemaphoreGive(_slots_lock);
return (_closed_slot != -1);
}

void AsyncClient::_free_closed_slot(){
xSemaphoreTake(_slots_lock, portMAX_DELAY);
if (_closed_slot != -1) {
_closed_slots[_closed_slot] = _closed_index;
_closed_slot = -1;
++ _closed_index;
}
xSemaphoreGive(_slots_lock);
}

/*
* Private Callbacks
* */

int8_t AsyncClient::_connected(void* pcb, int8_t err){
int8_t AsyncClient::_connected(tcp_pcb* pcb, int8_t err){
_pcb = reinterpret_cast<tcp_pcb*>(pcb);
if(_pcb){
_rx_last_packet = millis();
_pcb_busy = false;
// tcp_recv(_pcb, &_tcp_recv);
// tcp_sent(_pcb, &_tcp_sent);
// tcp_poll(_pcb, &_tcp_poll, 1);
}
if(_connect_cb) {
_connect_cb(_connect_cb_arg, this);
Expand All @@ -869,6 +882,7 @@ void AsyncClient::_error(int8_t err) {
tcp_err(_pcb, NULL);
tcp_poll(_pcb, NULL, 0);
}
_free_closed_slot();
_pcb = NULL;
}
if(_error_cb) {
Expand Down Expand Up @@ -920,13 +934,19 @@ int8_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) {
}

int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) {
while(pb != NULL) {
if(!_pcb || pcb != _pcb){
log_e("0x%08x != 0x%08x", (uint32_t)pcb, (uint32_t)_pcb);
return ERR_OK;
}
size_t total = 0;
while((pb != NULL) && (ERR_OK == err)) {
_rx_last_packet = millis();
//we should not ack before we assimilate the data
_ack_pcb = true;
pbuf *b = pb;
pb = b->next;
b->next = NULL;
total += b->len;
if(_pb_cb){
_pb_cb(_pb_cb_arg, this, b);
} else {
Expand All @@ -935,13 +955,12 @@ int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) {
}
if(!_ack_pcb) {
_rx_ack_len += b->len;
} else if(_pcb) {
_tcp_recved(_pcb, _closed_slot, b->len);
}
pbuf_free(b);
}
pbuf_free(b);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pbuf_free change is OK but the remaining changes no.
See here for more details: tbnobody/OpenDTU#2326 (comment)

This causes a drastic slowdown of the readings.

}
return ERR_OK;
err = _tcp_recved(pcb, _closed_slot, total);
return err;
}

int8_t AsyncClient::_poll(tcp_pcb* pcb){
Expand Down Expand Up @@ -1017,9 +1036,12 @@ size_t AsyncClient::write(const char* data) {

size_t AsyncClient::write(const char* data, size_t size, uint8_t apiflags) {
size_t will_send = add(data, size, apiflags);
if(!will_send || !send()) {
if (!will_send) {
return 0;
}
while (connected() && !send()) {
taskYIELD();
}
Comment on lines +1043 to +1045
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TienHuyIoT: when using SSE events in ESPAsycnWebServer, I think this busy loop can wait forever in case of a WIFi disconnection because the client is still seen there and connected

return will_send;
}

Expand Down Expand Up @@ -1057,6 +1079,18 @@ bool AsyncClient::getNoDelay(){
return tcp_nagle_disabled(_pcb);
}

void AsyncClient::setKeepAlive(uint32_t ms, uint8_t cnt){
if(ms!=0) {
_pcb->so_options |= SOF_KEEPALIVE; //Turn on TCP Keepalive for the given pcb
// Set the time between keepalive messages in milli-seconds
_pcb->keep_idle = ms;
_pcb->keep_intvl = ms;
_pcb->keep_cnt = cnt; //The number of unanswered probes required to force closure of the socket
} else {
_pcb->so_options &= ~SOF_KEEPALIVE; //Turn off TCP Keepalive for the given pcb
}
}

uint16_t AsyncClient::getMss(){
if(!_pcb) {
return 0;
Expand Down Expand Up @@ -1226,7 +1260,7 @@ void AsyncClient::_s_error(void * arg, int8_t err) {
reinterpret_cast<AsyncClient*>(arg)->_error(err);
}

int8_t AsyncClient::_s_connected(void * arg, void * pcb, int8_t err){
int8_t AsyncClient::_s_connected(void * arg, struct tcp_pcb * pcb, int8_t err){
return reinterpret_cast<AsyncClient*>(arg)->_connected(pcb, err);
}

Expand Down Expand Up @@ -1261,6 +1295,10 @@ void AsyncServer::onClient(AcConnectHandler cb, void* arg){
_connect_cb_arg = arg;
}

void AsyncServer::port(uint16_t port){
_port = port;
}

void AsyncServer::begin(){
if(_pcb) {
return;
Expand Down
11 changes: 7 additions & 4 deletions src/AsyncTCP.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
#include "sdkconfig.h"
#include <functional>
extern "C" {
#include "freertos/FreeRTOS.h"
#include "freertos/semphr.h"
#include "lwip/pbuf.h"
}

//If core is not defined, then we are running in Arduino or PIO
#ifndef CONFIG_ASYNC_TCP_RUNNING_CORE
#define CONFIG_ASYNC_TCP_RUNNING_CORE -1 //any available core
#define CONFIG_ASYNC_TCP_USE_WDT 1 //if enabled, adds between 33us and 200us per event
#define CONFIG_ASYNC_TCP_USE_WDT 1 //if enabled, adds between 33us and 200us per event
#endif

class AsyncClient;
Expand Down Expand Up @@ -99,6 +100,7 @@ class AsyncClient {
void setNoDelay(bool nodelay);
bool getNoDelay();

void setKeepAlive(uint32_t ms, uint8_t cnt);
uint32_t getRemoteAddress();
uint16_t getRemotePort();
uint32_t getLocalAddress();
Expand Down Expand Up @@ -133,7 +135,7 @@ class AsyncClient {
static int8_t _s_lwip_fin(void *arg, struct tcp_pcb *tpcb, int8_t err);
static void _s_error(void *arg, int8_t err);
static int8_t _s_sent(void *arg, struct tcp_pcb *tpcb, uint16_t len);
static int8_t _s_connected(void* arg, void* tpcb, int8_t err);
static int8_t _s_connected(void* arg, struct tcp_pcb *tpcb, int8_t err);
static void _s_dns_found(const char *name, struct ip_addr *ipaddr, void *arg);

int8_t _recv(tcp_pcb* pcb, pbuf* pb, int8_t err);
Expand Down Expand Up @@ -171,8 +173,8 @@ class AsyncClient {

int8_t _close();
void _free_closed_slot();
void _allocate_closed_slot();
int8_t _connected(void* pcb, int8_t err);
boolean _allocate_closed_slot();
int8_t _connected(tcp_pcb* pcb, int8_t err);
void _error(int8_t err);
int8_t _poll(tcp_pcb* pcb);
int8_t _sent(tcp_pcb* pcb, uint16_t len);
Expand All @@ -191,6 +193,7 @@ class AsyncServer {
AsyncServer(uint16_t port);
~AsyncServer();
void onClient(AcConnectHandler cb, void* arg);
void port(uint16_t port);
void begin();
void end();
void setNoDelay(bool nodelay);
Expand Down