diff --git a/src/AsyncEventSource.cpp b/src/AsyncEventSource.cpp index f2914df54..c8bc6adf3 100644 --- a/src/AsyncEventSource.cpp +++ b/src/AsyncEventSource.cpp @@ -173,7 +173,9 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A } AsyncEventSourceClient::~AsyncEventSourceClient(){ - _messageQueue.free(); + _lockmq.lock(); + _messageQueue.free(); + _lockmq.unlock(); close(); } @@ -184,33 +186,41 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage) delete dataMessage; return; } + //length() is not thread-safe, thus acquiring the lock before this call.. + _lockmq.lock(); if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){ - ets_printf("ERROR: Too many messages queued\n"); - delete dataMessage; + ets_printf("ERROR: Too many messages queued\n"); + delete dataMessage; } else { - _messageQueue.add(dataMessage); + _messageQueue.add(dataMessage); + // runqueue trigger when new messages added + if(_client->canSend()) { + _runQueue(); + } } - if(_client->canSend()) - _runQueue(); + _lockmq.unlock(); } void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){ + // Same here, acquiring the lock early + _lockmq.lock(); while(len && !_messageQueue.isEmpty()){ len = _messageQueue.front()->ack(len, time); if(_messageQueue.front()->finished()) _messageQueue.remove(_messageQueue.front()); } - _runQueue(); + _lockmq.unlock(); } void AsyncEventSourceClient::_onPoll(){ + _lockmq.lock(); if(!_messageQueue.isEmpty()){ _runQueue(); } + _lockmq.unlock(); } - void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){ _client->close(true); } @@ -225,7 +235,7 @@ void AsyncEventSourceClient::close(){ _client->close(); } -void AsyncEventSourceClient::write(const char * message, size_t len){ +void AsyncEventSourceClient::_write(const char * message, size_t len){ _queueMessage(new AsyncEventSourceMessage(message, len)); } @@ -234,15 +244,23 @@ void AsyncEventSourceClient::send(const char *message, const char *event, uint32 _queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length())); } -void AsyncEventSourceClient::_runQueue(){ - while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){ - _messageQueue.remove(_messageQueue.front()); - } +size_t AsyncEventSourceClient::packetsWaiting() const { + size_t len; + _lockmq.lock(); + len = _messageQueue.length(); + _lockmq.unlock(); + return len; +} - for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) - { - if(!(*i)->sent()) +void AsyncEventSourceClient::_runQueue() { + // Calls to this private method now already protected by _lockmq acquisition + // so no extra call of _lockmq.lock() here.. + for (auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) { + // If it crashes here, iterator (i) has been invalidated as _messageQueue + // has been changed... (UL 2020-11-15: Not supposed to happen any more ;-) ) + if (!(*i)->sent()) { (*i)->send(_client); + } } } @@ -276,17 +294,22 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){ client->write((const char *)temp, 2053); free(temp); }*/ - + AsyncWebLockGuard l(_client_queue_lock); _clients.add(client); if(_connectcb) _connectcb(client); } void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){ + AsyncWebLockGuard l(_client_queue_lock); _clients.remove(client); } void AsyncEventSource::close(){ + // While the whole loop is not done, the linked list is locked and so the + // iterator should remain valid even when AsyncEventSource::_handleDisconnect() + // is called very early + AsyncWebLockGuard l(_client_queue_lock); for(const auto &c: _clients){ if(c->connected()) c->close(); @@ -295,37 +318,39 @@ void AsyncEventSource::close(){ // pmb fix size_t AsyncEventSource::avgPacketsWaiting() const { - if(_clients.isEmpty()) + size_t aql = 0; + uint32_t nConnectedClients = 0; + AsyncWebLockGuard l(_client_queue_lock); + if (_clients.isEmpty()) { return 0; - - size_t aql=0; - uint32_t nConnectedClients=0; - + } for(const auto &c: _clients){ if(c->connected()) { - aql+=c->packetsWaiting(); + aql += c->packetsWaiting(); ++nConnectedClients; } } -// return aql / nConnectedClients; - return ((aql) + (nConnectedClients/2))/(nConnectedClients); // round up + return ((aql) + (nConnectedClients/2)) / (nConnectedClients); // round up } -void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){ - - +void AsyncEventSource::send( + const char *message, const char *event, uint32_t id, uint32_t reconnect){ String ev = generateEventMessage(message, event, id, reconnect); + AsyncWebLockGuard l(_client_queue_lock); for(const auto &c: _clients){ if(c->connected()) { - c->write(ev.c_str(), ev.length()); + c->_write(ev.c_str(), ev.length()); } } } size_t AsyncEventSource::count() const { - return _clients.count_if([](AsyncEventSourceClient *c){ - return c->connected(); - }); + size_t n_clients; + AsyncWebLockGuard l(_client_queue_lock); + n_clients = _clients.count_if([](AsyncEventSourceClient *c){ + return c->connected(); + }); + return n_clients; } bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){ diff --git a/src/AsyncEventSource.h b/src/AsyncEventSource.h index b097fa623..6c8fad7b0 100644 --- a/src/AsyncEventSource.h +++ b/src/AsyncEventSource.h @@ -72,6 +72,8 @@ class AsyncEventSourceClient { AsyncEventSource *_server; uint32_t _lastId; LinkedList _messageQueue; + // ArFi 2020-08-27 for protecting/serializing _messageQueue + AsyncPlainLock _lockmq; void _queueMessage(AsyncEventSourceMessage *dataMessage); void _runQueue(); @@ -82,12 +84,12 @@ class AsyncEventSourceClient { AsyncClient* client(){ return _client; } void close(); - void write(const char * message, size_t len); void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0); bool connected() const { return (_client != NULL) && _client->connected(); } uint32_t lastId() const { return _lastId; } - size_t packetsWaiting() const { return _messageQueue.length(); } + size_t packetsWaiting() const; + void _write(const char * message, size_t len); //system callbacks (do not call) void _onAck(size_t len, uint32_t time); void _onPoll(); @@ -99,7 +101,11 @@ class AsyncEventSource: public AsyncWebHandler { private: String _url; LinkedList _clients; + // Same as for individual messages, protect mutations of _clients list + // since simultaneous access from different tasks is possible + AsyncWebLock _client_queue_lock; ArEventHandlerFunction _connectcb; + public: AsyncEventSource(const String& url); ~AsyncEventSource(); @@ -108,7 +114,7 @@ class AsyncEventSource: public AsyncWebHandler { void close(); void onConnect(ArEventHandlerFunction cb); void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0); - size_t count() const; //number clinets connected + size_t count() const; //number clients connected size_t avgPacketsWaiting() const; //system callbacks (do not call) diff --git a/src/AsyncWebSynchronization.h b/src/AsyncWebSynchronization.h index f36c52dcf..02ad2dc61 100644 --- a/src/AsyncWebSynchronization.h +++ b/src/AsyncWebSynchronization.h @@ -7,6 +7,38 @@ #ifdef ESP32 +// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore +// Modified 'AsyncWebLock' to just only use mutex since pxCurrentTCB is not +// always available. According to example by Arjan Filius, changed name, +// added unimplemented version for ESP8266 +class AsyncPlainLock +{ +private: + SemaphoreHandle_t _lock; + +public: + AsyncPlainLock() { + _lock = xSemaphoreCreateBinary(); + // In this fails, the system is likely that much out of memory that + // we should abort anyways. If assertions are disabled, nothing is lost.. + assert(_lock); + xSemaphoreGive(_lock); + } + + ~AsyncPlainLock() { + vSemaphoreDelete(_lock); + } + + bool lock() const { + xSemaphoreTake(_lock, portMAX_DELAY); + return true; + } + + void unlock() const { + xSemaphoreGive(_lock); + } +}; + // This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore class AsyncWebLock { @@ -17,6 +49,9 @@ class AsyncWebLock public: AsyncWebLock() { _lock = xSemaphoreCreateBinary(); + // In this fails, the system is likely that much out of memory that + // we should abort anyways. If assertions are disabled, nothing is lost.. + assert(_lock); _lockedBy = NULL; xSemaphoreGive(_lock); } @@ -61,6 +96,10 @@ class AsyncWebLock void unlock() const { } }; + +// Same for AsyncPlainLock, for ESP8266 this is just the unimplemented version above. +using AsyncPlainLock = AsyncWebLock; + #endif class AsyncWebLockGuard @@ -84,4 +123,4 @@ class AsyncWebLockGuard } }; -#endif // ASYNCWEBSYNCHRONIZATION_H_ \ No newline at end of file +#endif // ASYNCWEBSYNCHRONIZATION_H_