Skip to content
This repository has been archived by the owner on Jan 20, 2025. It is now read-only.

Commit

Permalink
Imrovement
Browse files Browse the repository at this point in the history
- Running async tcp by Core0
- Disable watchdog handler async tcp task
- _tcp_recved() should be called when it has processed the data
  • Loading branch information
TienHuyIoT committed Nov 4, 2023
1 parent 9af997f commit fd6447f
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 30 deletions.
68 changes: 43 additions & 25 deletions src/AsyncTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ extern "C"{
#include "esp_task_wdt.h"

#define CONFIG_ASYNC_TCP_STACK 2*8192
#define CONFIG_ASYNC_TCP_PRIORITY 3
#define CONFIG_ASYNC_TCP_QUEUE_SIZE 128
#define CONFIG_ASYNC_TCP_PRIORITY 10
#define CONFIG_ASYNC_TCP_QUEUE_SIZE 64

#define ASYNC_TCP_PRINTFLN(f_, ...) Serial.printf_P(PSTR(f_ "\r\n"), ##__VA_ARGS__)
#define ASYNC_TCP_TAG_CONSOLE(...) ASYNC_TCP_PRINTFLN("[AsyncTCP]" __VA_ARGS__)
/*
* TCP/IP Event Task
* */
Expand All @@ -48,7 +50,7 @@ typedef struct {
void *arg;
union {
struct {
void * pcb;
tcp_pcb * pcb;
int8_t err;
} connected;
struct {
Expand Down Expand Up @@ -387,7 +389,7 @@ static esp_err_t _tcp_output(tcp_pcb * pcb, int8_t closed_slot) {
if(!pcb){
return ERR_CONN;
}
static tcp_api_call_t msg;
tcp_api_call_t msg;
msg.pcb = pcb;
msg.closed_slot = closed_slot;
tcpip_api_call(_tcp_output_api, (struct tcpip_api_call_data*)&msg);
Expand All @@ -407,7 +409,7 @@ static esp_err_t _tcp_write(tcp_pcb * pcb, int8_t closed_slot, const char* data,
if(!pcb){
return ERR_CONN;
}
static tcp_api_call_t msg;
tcp_api_call_t msg;
msg.pcb = pcb;
msg.closed_slot = closed_slot;
msg.write.data = data;
Expand All @@ -420,7 +422,7 @@ static esp_err_t _tcp_write(tcp_pcb * pcb, int8_t closed_slot, const char* data,
static err_t _tcp_recved_api(struct tcpip_api_call_data *api_call_msg){
tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg;
msg->err = ERR_CONN;
if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) {
if(msg->closed_slot != -1 || !_closed_slots[msg->closed_slot]) {
msg->err = 0;
tcp_recved(msg->pcb, msg->received);
}
Expand All @@ -431,7 +433,7 @@ static esp_err_t _tcp_recved(tcp_pcb * pcb, int8_t closed_slot, size_t len) {
if(!pcb){
return ERR_CONN;
}
static tcp_api_call_t msg;
tcp_api_call_t msg;
msg.pcb = pcb;
msg.closed_slot = closed_slot;
msg.received = len;
Expand All @@ -452,7 +454,7 @@ static esp_err_t _tcp_close(tcp_pcb * pcb, int8_t closed_slot) {
if(!pcb){
return ERR_CONN;
}
static tcp_api_call_t msg;
tcp_api_call_t msg;
msg.pcb = pcb;
msg.closed_slot = closed_slot;
tcpip_api_call(_tcp_close_api, (struct tcpip_api_call_data*)&msg);
Expand All @@ -472,7 +474,7 @@ static esp_err_t _tcp_abort(tcp_pcb * pcb, int8_t closed_slot) {
if(!pcb){
return ERR_CONN;
}
static tcp_api_call_t msg;
tcp_api_call_t msg;
msg.pcb = pcb;
msg.closed_slot = closed_slot;
tcpip_api_call(_tcp_abort_api, (struct tcpip_api_call_data*)&msg);
Expand All @@ -489,7 +491,7 @@ static esp_err_t _tcp_connect(tcp_pcb * pcb, int8_t closed_slot, ip_addr_t * add
if(!pcb){
return ESP_FAIL;
}
static tcp_api_call_t msg;
tcp_api_call_t msg;
msg.pcb = pcb;
msg.closed_slot = closed_slot;
msg.connect.addr = addr;
Expand All @@ -509,7 +511,7 @@ static esp_err_t _tcp_bind(tcp_pcb * pcb, ip_addr_t * addr, uint16_t port) {
if(!pcb){
return ESP_FAIL;
}
static tcp_api_call_t msg;
tcp_api_call_t msg;
msg.pcb = pcb;
msg.closed_slot = -1;
msg.bind.addr = addr;
Expand All @@ -529,7 +531,7 @@ static tcp_pcb * _tcp_listen_with_backlog(tcp_pcb * pcb, uint8_t backlog) {
if(!pcb){
return NULL;
}
static tcp_api_call_t msg;
tcp_api_call_t msg;
msg.pcb = pcb;
msg.closed_slot = -1;
msg.backlog = backlog?backlog:0xFF;
Expand Down Expand Up @@ -569,13 +571,15 @@ AsyncClient::AsyncClient(tcp_pcb* pcb)
_pcb = pcb;
_closed_slot = -1;
if(_pcb){
_allocate_closed_slot();
_rx_last_packet = millis();
tcp_arg(_pcb, this);
tcp_recv(_pcb, &_tcp_recv);
tcp_sent(_pcb, &_tcp_sent);
tcp_err(_pcb, &_tcp_error);
tcp_poll(_pcb, &_tcp_poll, 1);
if(!_allocate_closed_slot()) {
_close();
}
}
}

Expand Down Expand Up @@ -685,6 +689,11 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port){
return false;
}

if(!_allocate_closed_slot()) {
log_e("failed to closed slot full");
return false;
}

ip_addr_t addr;
addr.type = IPADDR_TYPE_V4;
addr.u_addr.ip4.addr = ip;
Expand All @@ -700,7 +709,6 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port){
tcp_recv(pcb, &_tcp_recv);
tcp_sent(pcb, &_tcp_sent);
tcp_poll(pcb, &_tcp_poll, 1);
//_tcp_connect(pcb, &addr, port,(tcp_connected_fn)&_s_connected);
_tcp_connect(pcb, _closed_slot, &addr, port,(tcp_connected_fn)&_tcp_connected);
return true;
}
Expand Down Expand Up @@ -797,7 +805,7 @@ void AsyncClient::ackPacket(struct pbuf * pb){
* */

int8_t AsyncClient::_close(){
//ets_printf("X: 0x%08x\n", (uint32_t)this);
// ets_printf("X: 0x%08x\n", (uint32_t)this);
int8_t err = ERR_OK;
if(_pcb) {
//log_i("");
Expand All @@ -811,6 +819,7 @@ int8_t AsyncClient::_close(){
if(err != ERR_OK) {
err = abort();
}
_free_closed_slot();
_pcb = NULL;
if(_discard_cb) {
_discard_cb(_discard_cb_arg, this);
Expand All @@ -819,7 +828,10 @@ int8_t AsyncClient::_close(){
return err;
}

void AsyncClient::_allocate_closed_slot(){
boolean AsyncClient::_allocate_closed_slot(){
if (_closed_slot != -1) {
return true;
}
xSemaphoreTake(_slots_lock, portMAX_DELAY);
uint32_t closed_slot_min_index = 0;
for (int i = 0; i < _number_of_closed_slots; ++ i) {
Expand All @@ -832,28 +844,28 @@ void AsyncClient::_allocate_closed_slot(){
_closed_slots[_closed_slot] = 0;
}
xSemaphoreGive(_slots_lock);
return (_closed_slot != -1);
}

void AsyncClient::_free_closed_slot(){
xSemaphoreTake(_slots_lock, portMAX_DELAY);
if (_closed_slot != -1) {
_closed_slots[_closed_slot] = _closed_index;
_closed_slot = -1;
++ _closed_index;
}
xSemaphoreGive(_slots_lock);
}

/*
* Private Callbacks
* */

int8_t AsyncClient::_connected(void* pcb, int8_t err){
int8_t AsyncClient::_connected(tcp_pcb* pcb, int8_t err){
_pcb = reinterpret_cast<tcp_pcb*>(pcb);
if(_pcb){
_rx_last_packet = millis();
_pcb_busy = false;
// tcp_recv(_pcb, &_tcp_recv);
// tcp_sent(_pcb, &_tcp_sent);
// tcp_poll(_pcb, &_tcp_poll, 1);
}
if(_connect_cb) {
_connect_cb(_connect_cb_arg, this);
Expand All @@ -870,6 +882,7 @@ void AsyncClient::_error(int8_t err) {
tcp_err(_pcb, NULL);
tcp_poll(_pcb, NULL, 0);
}
_free_closed_slot();
_pcb = NULL;
}
if(_error_cb) {
Expand Down Expand Up @@ -921,13 +934,19 @@ int8_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) {
}

int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) {
while(pb != NULL) {
if(!_pcb || pcb != _pcb){
log_e("0x%08x != 0x%08x", (uint32_t)pcb, (uint32_t)_pcb);
return ERR_OK;
}
size_t total = 0;
while((pb != NULL) && (ERR_OK == err)) {
_rx_last_packet = millis();
//we should not ack before we assimilate the data
_ack_pcb = true;
pbuf *b = pb;
pb = b->next;
b->next = NULL;
total += b->len;
if(_pb_cb){
_pb_cb(_pb_cb_arg, this, b);
} else {
Expand All @@ -936,13 +955,12 @@ int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) {
}
if(!_ack_pcb) {
_rx_ack_len += b->len;
} else if(_pcb) {
_tcp_recved(_pcb, _closed_slot, b->len);
}
}
pbuf_free(b);
}
return ERR_OK;
err = _tcp_recved(pcb, _closed_slot, total);
return err;
}

int8_t AsyncClient::_poll(tcp_pcb* pcb){
Expand Down Expand Up @@ -1242,7 +1260,7 @@ void AsyncClient::_s_error(void * arg, int8_t err) {
reinterpret_cast<AsyncClient*>(arg)->_error(err);
}

int8_t AsyncClient::_s_connected(void * arg, void * pcb, int8_t err){
int8_t AsyncClient::_s_connected(void * arg, struct tcp_pcb * pcb, int8_t err){
return reinterpret_cast<AsyncClient*>(arg)->_connected(pcb, err);
}

Expand Down
10 changes: 5 additions & 5 deletions src/AsyncTCP.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ extern "C" {

//If core is not defined, then we are running in Arduino or PIO
#ifndef CONFIG_ASYNC_TCP_RUNNING_CORE
#define CONFIG_ASYNC_TCP_RUNNING_CORE -1 //any available core
#define CONFIG_ASYNC_TCP_USE_WDT 1 //if enabled, adds between 33us and 200us per event
#define CONFIG_ASYNC_TCP_RUNNING_CORE 0 //any available core
#define CONFIG_ASYNC_TCP_USE_WDT 0 //if enabled, adds between 33us and 200us per event
#endif

class AsyncClient;
Expand Down Expand Up @@ -135,7 +135,7 @@ class AsyncClient {
static int8_t _s_lwip_fin(void *arg, struct tcp_pcb *tpcb, int8_t err);
static void _s_error(void *arg, int8_t err);
static int8_t _s_sent(void *arg, struct tcp_pcb *tpcb, uint16_t len);
static int8_t _s_connected(void* arg, void* tpcb, int8_t err);
static int8_t _s_connected(void* arg, struct tcp_pcb *tpcb, int8_t err);
static void _s_dns_found(const char *name, struct ip_addr *ipaddr, void *arg);

int8_t _recv(tcp_pcb* pcb, pbuf* pb, int8_t err);
Expand Down Expand Up @@ -173,8 +173,8 @@ class AsyncClient {

int8_t _close();
void _free_closed_slot();
void _allocate_closed_slot();
int8_t _connected(void* pcb, int8_t err);
boolean _allocate_closed_slot();
int8_t _connected(tcp_pcb* pcb, int8_t err);
void _error(int8_t err);
int8_t _poll(tcp_pcb* pcb);
int8_t _sent(tcp_pcb* pcb, uint16_t len);
Expand Down

0 comments on commit fd6447f

Please sign in to comment.