-
Notifications
You must be signed in to change notification settings - Fork 441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Collected other pull request and Fixbug memory leaks and weakness argument pointer for tcp/ip callback function #173
base: master
Are you sure you want to change the base?
Changes from all commits
66c82d6
74420db
83f855b
6125471
f8faa06
1b85959
9298a07
552d17c
de1ddda
9af997f
5bbad55
4589401
f7d543b
5c8789a
eba332d
1256267
2b5ed28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,12 @@ extern "C"{ | |
} | ||
#include "esp_task_wdt.h" | ||
|
||
#define CONFIG_ASYNC_TCP_STACK 2*8192 | ||
#define CONFIG_ASYNC_TCP_PRIORITY 3 | ||
#define CONFIG_ASYNC_TCP_QUEUE_SIZE 128 | ||
|
||
#define ASYNC_TCP_PRINTFLN(f_, ...) Serial.printf_P(PSTR(f_ "\r\n"), ##__VA_ARGS__) | ||
#define ASYNC_TCP_TAG_CONSOLE(...) ASYNC_TCP_PRINTFLN("[AsyncTCP]" __VA_ARGS__) | ||
/* | ||
* TCP/IP Event Task | ||
* */ | ||
|
@@ -44,7 +50,7 @@ typedef struct { | |
void *arg; | ||
union { | ||
struct { | ||
void * pcb; | ||
tcp_pcb * pcb; | ||
int8_t err; | ||
} connected; | ||
struct { | ||
|
@@ -92,10 +98,9 @@ static uint32_t _closed_index = []() { | |
return 1; | ||
}(); | ||
|
||
|
||
static inline bool _init_async_event_queue(){ | ||
if(!_async_queue){ | ||
_async_queue = xQueueCreate(32, sizeof(lwip_event_packet_t *)); | ||
_async_queue = xQueueCreate(CONFIG_ASYNC_TCP_QUEUE_SIZE, sizeof(lwip_event_packet_t *)); | ||
if(!_async_queue){ | ||
return false; | ||
} | ||
|
@@ -218,7 +223,7 @@ static bool _start_async_task(){ | |
return false; | ||
} | ||
if(!_async_service_task_handle){ | ||
xTaskCreateUniversal(_async_service_task, "async_tcp", 8192 * 2, NULL, 3, &_async_service_task_handle, CONFIG_ASYNC_TCP_RUNNING_CORE); | ||
xTaskCreateUniversal(_async_service_task, "async_tcp", CONFIG_ASYNC_TCP_STACK, NULL, CONFIG_ASYNC_TCP_PRIORITY, &_async_service_task_handle, CONFIG_ASYNC_TCP_RUNNING_CORE); | ||
if(!_async_service_task_handle){ | ||
return false; | ||
} | ||
|
@@ -417,7 +422,7 @@ static esp_err_t _tcp_write(tcp_pcb * pcb, int8_t closed_slot, const char* data, | |
static err_t _tcp_recved_api(struct tcpip_api_call_data *api_call_msg){ | ||
tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; | ||
msg->err = ERR_CONN; | ||
if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) { | ||
if(msg->closed_slot != -1 || !_closed_slots[msg->closed_slot]) { | ||
msg->err = 0; | ||
tcp_recved(msg->pcb, msg->received); | ||
} | ||
|
@@ -534,8 +539,6 @@ static tcp_pcb * _tcp_listen_with_backlog(tcp_pcb * pcb, uint8_t backlog) { | |
return msg.pcb; | ||
} | ||
|
||
|
||
|
||
/* | ||
Async TCP Client | ||
*/ | ||
|
@@ -568,13 +571,15 @@ AsyncClient::AsyncClient(tcp_pcb* pcb) | |
_pcb = pcb; | ||
_closed_slot = -1; | ||
if(_pcb){ | ||
_allocate_closed_slot(); | ||
_rx_last_packet = millis(); | ||
tcp_arg(_pcb, this); | ||
tcp_recv(_pcb, &_tcp_recv); | ||
tcp_sent(_pcb, &_tcp_sent); | ||
tcp_err(_pcb, &_tcp_error); | ||
tcp_poll(_pcb, &_tcp_poll, 1); | ||
if(!_allocate_closed_slot()) { | ||
_close(); | ||
} | ||
} | ||
} | ||
|
||
|
@@ -684,6 +689,11 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port){ | |
return false; | ||
} | ||
|
||
if(!_allocate_closed_slot()) { | ||
log_e("failed to closed slot full"); | ||
return false; | ||
} | ||
|
||
ip_addr_t addr; | ||
addr.type = IPADDR_TYPE_V4; | ||
addr.u_addr.ip4.addr = ip; | ||
|
@@ -796,7 +806,7 @@ void AsyncClient::ackPacket(struct pbuf * pb){ | |
* */ | ||
|
||
int8_t AsyncClient::_close(){ | ||
//ets_printf("X: 0x%08x\n", (uint32_t)this); | ||
// ets_printf("X: 0x%08x\n", (uint32_t)this); | ||
int8_t err = ERR_OK; | ||
if(_pcb) { | ||
//log_i(""); | ||
|
@@ -810,6 +820,7 @@ int8_t AsyncClient::_close(){ | |
if(err != ERR_OK) { | ||
err = abort(); | ||
} | ||
_free_closed_slot(); | ||
_pcb = NULL; | ||
if(_discard_cb) { | ||
_discard_cb(_discard_cb_arg, this); | ||
|
@@ -818,7 +829,10 @@ int8_t AsyncClient::_close(){ | |
return err; | ||
} | ||
|
||
void AsyncClient::_allocate_closed_slot(){ | ||
boolean AsyncClient::_allocate_closed_slot(){ | ||
if (_closed_slot != -1) { | ||
return true; | ||
} | ||
xSemaphoreTake(_slots_lock, portMAX_DELAY); | ||
uint32_t closed_slot_min_index = 0; | ||
for (int i = 0; i < _number_of_closed_slots; ++ i) { | ||
|
@@ -831,28 +845,28 @@ void AsyncClient::_allocate_closed_slot(){ | |
_closed_slots[_closed_slot] = 0; | ||
} | ||
xSemaphoreGive(_slots_lock); | ||
return (_closed_slot != -1); | ||
} | ||
|
||
void AsyncClient::_free_closed_slot(){ | ||
xSemaphoreTake(_slots_lock, portMAX_DELAY); | ||
if (_closed_slot != -1) { | ||
_closed_slots[_closed_slot] = _closed_index; | ||
_closed_slot = -1; | ||
++ _closed_index; | ||
} | ||
xSemaphoreGive(_slots_lock); | ||
} | ||
|
||
/* | ||
* Private Callbacks | ||
* */ | ||
|
||
int8_t AsyncClient::_connected(void* pcb, int8_t err){ | ||
int8_t AsyncClient::_connected(tcp_pcb* pcb, int8_t err){ | ||
_pcb = reinterpret_cast<tcp_pcb*>(pcb); | ||
if(_pcb){ | ||
_rx_last_packet = millis(); | ||
_pcb_busy = false; | ||
// tcp_recv(_pcb, &_tcp_recv); | ||
// tcp_sent(_pcb, &_tcp_sent); | ||
// tcp_poll(_pcb, &_tcp_poll, 1); | ||
} | ||
if(_connect_cb) { | ||
_connect_cb(_connect_cb_arg, this); | ||
|
@@ -869,6 +883,7 @@ void AsyncClient::_error(int8_t err) { | |
tcp_err(_pcb, NULL); | ||
tcp_poll(_pcb, NULL, 0); | ||
} | ||
_free_closed_slot(); | ||
_pcb = NULL; | ||
} | ||
if(_error_cb) { | ||
|
@@ -920,13 +935,19 @@ int8_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) { | |
} | ||
|
||
int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) { | ||
while(pb != NULL) { | ||
if(!_pcb || pcb != _pcb){ | ||
log_e("0x%08x != 0x%08x", (uint32_t)pcb, (uint32_t)_pcb); | ||
return ERR_OK; | ||
} | ||
size_t total = 0; | ||
while((pb != NULL) && (ERR_OK == err)) { | ||
_rx_last_packet = millis(); | ||
//we should not ack before we assimilate the data | ||
_ack_pcb = true; | ||
pbuf *b = pb; | ||
pb = b->next; | ||
b->next = NULL; | ||
total += b->len; | ||
if(_pb_cb){ | ||
_pb_cb(_pb_cb_arg, this, b); | ||
} else { | ||
|
@@ -935,13 +956,12 @@ int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) { | |
} | ||
if(!_ack_pcb) { | ||
_rx_ack_len += b->len; | ||
} else if(_pcb) { | ||
_tcp_recved(_pcb, _closed_slot, b->len); | ||
} | ||
pbuf_free(b); | ||
} | ||
pbuf_free(b); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The pbuf_free change is OK but the remaining changes no. This causes a drastic slowdown of the readings. |
||
} | ||
return ERR_OK; | ||
err = _tcp_recved(pcb, _closed_slot, total); | ||
return err; | ||
} | ||
|
||
int8_t AsyncClient::_poll(tcp_pcb* pcb){ | ||
|
@@ -1017,9 +1037,12 @@ size_t AsyncClient::write(const char* data) { | |
|
||
size_t AsyncClient::write(const char* data, size_t size, uint8_t apiflags) { | ||
size_t will_send = add(data, size, apiflags); | ||
if(!will_send || !send()) { | ||
if (!will_send) { | ||
return 0; | ||
} | ||
while (connected() && !send()) { | ||
taskYIELD(); | ||
} | ||
Comment on lines
+1043
to
+1045
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @TienHuyIoT: when using SSE events in ESPAsycnWebServer, I think this busy loop can wait forever in case of a WIFi disconnection because the client is still seen there and connected |
||
return will_send; | ||
} | ||
|
||
|
@@ -1057,6 +1080,18 @@ bool AsyncClient::getNoDelay(){ | |
return tcp_nagle_disabled(_pcb); | ||
} | ||
|
||
void AsyncClient::setKeepAlive(uint32_t ms, uint8_t cnt){ | ||
if(ms!=0) { | ||
_pcb->so_options |= SOF_KEEPALIVE; //Turn on TCP Keepalive for the given pcb | ||
// Set the time between keepalive messages in milli-seconds | ||
_pcb->keep_idle = ms; | ||
_pcb->keep_intvl = ms; | ||
_pcb->keep_cnt = cnt; //The number of unanswered probes required to force closure of the socket | ||
} else { | ||
_pcb->so_options &= ~SOF_KEEPALIVE; //Turn off TCP Keepalive for the given pcb | ||
} | ||
} | ||
|
||
uint16_t AsyncClient::getMss(){ | ||
if(!_pcb) { | ||
return 0; | ||
|
@@ -1226,7 +1261,7 @@ void AsyncClient::_s_error(void * arg, int8_t err) { | |
reinterpret_cast<AsyncClient*>(arg)->_error(err); | ||
} | ||
|
||
int8_t AsyncClient::_s_connected(void * arg, void * pcb, int8_t err){ | ||
int8_t AsyncClient::_s_connected(void * arg, struct tcp_pcb * pcb, int8_t err){ | ||
return reinterpret_cast<AsyncClient*>(arg)->_connected(pcb, err); | ||
} | ||
|
||
|
@@ -1261,6 +1296,10 @@ void AsyncServer::onClient(AcConnectHandler cb, void* arg){ | |
_connect_cb_arg = arg; | ||
} | ||
|
||
void AsyncServer::port(uint16_t port){ | ||
_port = port; | ||
} | ||
|
||
void AsyncServer::begin(){ | ||
if(_pcb) { | ||
return; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this condition does not make any sense anymore: is it tested ? If
msg->closed_slot == -1
, second expression is evaluated with an index being-1
. If not -1, second expression is not evaluated.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TienHuyIoT : ^^ ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closed_slot
is int8_t so -1 means 255, but_closed_slots
is an array of 16.