Skip to content
This repository has been archived by the owner on Jan 20, 2025. It is now read-only.

Fix #884, protect list concurrent access with mutex #887

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 64 additions & 32 deletions src/AsyncEventSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
}

AsyncEventSourceClient::~AsyncEventSourceClient(){
_messageQueue.free();
_lockmq.lock();
_messageQueue.free();
_lockmq.unlock();
close();
}

Expand All @@ -184,33 +186,41 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
delete dataMessage;
return;
}
//length() is not thread-safe, thus acquiring the lock before this call..
_lockmq.lock();
if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){
ets_printf("ERROR: Too many messages queued\n");
delete dataMessage;
ets_printf("ERROR: Too many messages queued\n");
delete dataMessage;
} else {
_messageQueue.add(dataMessage);
_messageQueue.add(dataMessage);
// runqueue trigger when new messages added
if(_client->canSend()) {
_runQueue();
}
}
if(_client->canSend())
_runQueue();
_lockmq.unlock();
}

void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
// Same here, acquiring the lock early
_lockmq.lock();
while(len && !_messageQueue.isEmpty()){
len = _messageQueue.front()->ack(len, time);
if(_messageQueue.front()->finished())
_messageQueue.remove(_messageQueue.front());
}

_runQueue();
_lockmq.unlock();
}

void AsyncEventSourceClient::_onPoll(){
_lockmq.lock();
if(!_messageQueue.isEmpty()){
_runQueue();
}
_lockmq.unlock();
}


void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){
_client->close(true);
}
Expand All @@ -225,7 +235,7 @@ void AsyncEventSourceClient::close(){
_client->close();
}

void AsyncEventSourceClient::write(const char * message, size_t len){
void AsyncEventSourceClient::_write(const char * message, size_t len){
_queueMessage(new AsyncEventSourceMessage(message, len));
}

Expand All @@ -234,15 +244,23 @@ void AsyncEventSourceClient::send(const char *message, const char *event, uint32
_queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length()));
}

void AsyncEventSourceClient::_runQueue(){
while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){
_messageQueue.remove(_messageQueue.front());
}
size_t AsyncEventSourceClient::packetsWaiting() const {
size_t len;
_lockmq.lock();
len = _messageQueue.length();
_lockmq.unlock();
return len;
}

for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i)
{
if(!(*i)->sent())
void AsyncEventSourceClient::_runQueue() {
// Calls to this private method now already protected by _lockmq acquisition
// so no extra call of _lockmq.lock() here..
for (auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) {
// If it crashes here, iterator (i) has been invalidated as _messageQueue
// has been changed... (UL 2020-11-15: Not supposed to happen any more ;-) )
if (!(*i)->sent()) {
(*i)->send(_client);
}
}
}

Expand Down Expand Up @@ -276,56 +294,70 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
client->write((const char *)temp, 2053);
free(temp);
}*/

_client_queue_lock.lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By calling the connect callback with the client queue lock held, this code introduces a self-deadlock if the connect callback, in turn, tries to broadcast a message to the event source using AsyncEventSource::send (or tries to use any other public API that tries to take that very same lock).

I had this exact scenario in my project, and I had to fix it as shown in commit yubox-node-org@646eab7 of yuboxfixes branch of the ESPAsyncWebServer fork at yubox-node-org.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just now realized that the extern void *pxCurrentTCB member of class AsyncWebLock does /not/ point to the TCP TCB struct (TCB as in "transfer control block") but to the FreeRTOS stack pointer, and that this is how the AsyncWebLockGuard works..
I only did minimal testing, but I assume this is safe to pull into this PR for completeness.

I hope that the original author does find some time in the near future to go through all these forks and PRs...
Thank you very much for the notice!

_clients.add(client);
if(_connectcb)
_connectcb(client);
_client_queue_lock.unlock();
}

void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
_client_queue_lock.lock();
_clients.remove(client);
_client_queue_lock.unlock();
}

void AsyncEventSource::close(){
// While the whole loop is not done, the linked list is locked and so the
// iterator should remain valid even when AsyncEventSource::_handleDisconnect()
// is called very early
_client_queue_lock.lock();
for(const auto &c: _clients){
if(c->connected())
c->close();
}
_client_queue_lock.unlock();
}

// pmb fix
size_t AsyncEventSource::avgPacketsWaiting() const {
if(_clients.isEmpty())
size_t aql = 0;
uint32_t nConnectedClients = 0;
_client_queue_lock.lock();
if (_clients.isEmpty()) {
_client_queue_lock.unlock();
return 0;

size_t aql=0;
uint32_t nConnectedClients=0;

}
for(const auto &c: _clients){
if(c->connected()) {
aql+=c->packetsWaiting();
aql += c->packetsWaiting();
++nConnectedClients;
}
}
// return aql / nConnectedClients;
return ((aql) + (nConnectedClients/2))/(nConnectedClients); // round up
_client_queue_lock.unlock();
return ((aql) + (nConnectedClients/2)) / (nConnectedClients); // round up
}

void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){


void AsyncEventSource::send(
const char *message, const char *event, uint32_t id, uint32_t reconnect){
String ev = generateEventMessage(message, event, id, reconnect);
_client_queue_lock.lock();
for(const auto &c: _clients){
if(c->connected()) {
c->write(ev.c_str(), ev.length());
c->_write(ev.c_str(), ev.length());
}
}
_client_queue_lock.unlock();
}

size_t AsyncEventSource::count() const {
return _clients.count_if([](AsyncEventSourceClient *c){
return c->connected();
});
size_t n_clients;
_client_queue_lock.lock();
n_clients = _clients.count_if([](AsyncEventSourceClient *c){
return c->connected();
});
_client_queue_lock.unlock();
return n_clients;
}

bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){
Expand Down
12 changes: 9 additions & 3 deletions src/AsyncEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class AsyncEventSourceClient {
AsyncEventSource *_server;
uint32_t _lastId;
LinkedList<AsyncEventSourceMessage *> _messageQueue;
// ArFi 2020-08-27 for protecting/serializing _messageQueue
AsyncPlainLock _lockmq;
void _queueMessage(AsyncEventSourceMessage *dataMessage);
void _runQueue();

Expand All @@ -82,12 +84,12 @@ class AsyncEventSourceClient {

AsyncClient* client(){ return _client; }
void close();
void write(const char * message, size_t len);
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
bool connected() const { return (_client != NULL) && _client->connected(); }
uint32_t lastId() const { return _lastId; }
size_t packetsWaiting() const { return _messageQueue.length(); }
size_t packetsWaiting() const;

void _write(const char * message, size_t len);
//system callbacks (do not call)
void _onAck(size_t len, uint32_t time);
void _onPoll();
Expand All @@ -99,7 +101,11 @@ class AsyncEventSource: public AsyncWebHandler {
private:
String _url;
LinkedList<AsyncEventSourceClient *> _clients;
// Same as for individual messages, protect mutations of _clients list
// since simultaneous access from different tasks is possible
AsyncPlainLock _client_queue_lock;
ArEventHandlerFunction _connectcb;

public:
AsyncEventSource(const String& url);
~AsyncEventSource();
Expand All @@ -108,7 +114,7 @@ class AsyncEventSource: public AsyncWebHandler {
void close();
void onConnect(ArEventHandlerFunction cb);
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
size_t count() const; //number clinets connected
size_t count() const; //number clients connected
size_t avgPacketsWaiting() const;

//system callbacks (do not call)
Expand Down
41 changes: 40 additions & 1 deletion src/AsyncWebSynchronization.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,38 @@

#ifdef ESP32

// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore
// Modified 'AsyncWebLock' to just only use mutex since pxCurrentTCB is not
// always available. According to example by Arjan Filius, changed name,
// added unimplemented version for ESP8266
class AsyncPlainLock
{
private:
SemaphoreHandle_t _lock;

public:
AsyncPlainLock() {
_lock = xSemaphoreCreateBinary();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xSemaphoreCreateBinary() can fail in case of less memory.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_lock = xSemaphoreCreateBinary();
_lock = xSemaphoreCreateBinary();
// If this fails, the system is likely so much out of memory that we should abort
assert(_lock);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original implementation of the AsyncWebLock (below the new class) does the same without an assert etc.

What would you suggest, put an abort() or assert() there? If yes, this should go in both places.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert() is fine and give the developer enough information I think.

// In this fails, the system is likely that much out of memory that
// we should abort anyways. If assertions are disabled, nothing is lost..
assert(_lock);
xSemaphoreGive(_lock);
}

~AsyncPlainLock() {
vSemaphoreDelete(_lock);
}

bool lock() const {
xSemaphoreTake(_lock, portMAX_DELAY);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xSemaphoreTake() can fail

Copy link
Author

@ul-gh ul-gh Nov 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(edited) No, actually in this case, xSemaphoreTake, which is an alias to xQueueGenericReceive, would only ever have a pdFALSE return value if an effective timeout was supplied as an argument. But the portMAX_DELAY in this case means the timeout is disabled. xSemaphoreTake does either lock indefinitely or return pdTRUE.

Copy link
Author

@ul-gh ul-gh Nov 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xQueueGenericReceive, I meant it /can/ only return "pdFALSE" when invoked with a non-infinite timeout.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. I expected a (void)xSemaphoreTake(...), but this may depend on the coding style.

return true;
}

void unlock() const {
xSemaphoreGive(_lock);
}
};

// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its the ESP8266 version, not ESP32 ;-)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the define is OK and for ESP8266, but still I noticed a bug there, as I accidentally left the "mutable void *_lockedBy;" member in for the supposedly "plain" lock class.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, your right here. Looks like I read the comment and scrolled to fast down.

class AsyncWebLock
{
Expand All @@ -17,6 +49,9 @@ class AsyncWebLock
public:
AsyncWebLock() {
_lock = xSemaphoreCreateBinary();
// In this fails, the system is likely that much out of memory that
// we should abort anyways. If assertions are disabled, nothing is lost..
assert(_lock);
_lockedBy = NULL;
xSemaphoreGive(_lock);
}
Expand Down Expand Up @@ -61,6 +96,10 @@ class AsyncWebLock
void unlock() const {
}
};

// Same for AsyncPlainLock, for ESP8266 this is just the unimplemented version above.
using AsyncPlainLock = AsyncWebLock;

#endif

class AsyncWebLockGuard
Expand All @@ -84,4 +123,4 @@ class AsyncWebLockGuard
}
};

#endif // ASYNCWEBSYNCHRONIZATION_H_
#endif // ASYNCWEBSYNCHRONIZATION_H_