Skip to content

Commit

Permalink
Fix ComQueue Does Not Deallocate Buffers on Overflow (nasa#2853)
Browse files Browse the repository at this point in the history
* Added Fw.BufferSend deallocate port to `ComQueue`

* Updated FPP to v2.2.0a2

* Change `buffQueueIn` port from `drop` to `hook`

* Added queue overflow hook method implementation

* Added return status to `ComQueue::enqueue()`

* `Fw::Buffer` is now deallocated on queue overflow

* Enabled `UT_AUTO_HELPERS` in `ComQueue` UT build

* Updated `ComQueue` UTs

* Explicitly discard `enqueue()` return status

* Replaced `overflowhook()` call with `deallocate()`

* Fixed comment style

* Renamed UT test case

* Added internal queue overflow UT

* Added assertion on `overflowHook()` argument
  • Loading branch information
DJKessler authored Sep 18, 2024
1 parent 3b6e3c5 commit 6a921a1
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 77 deletions.
2 changes: 2 additions & 0 deletions Svc/ComQueue/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ set(UT_SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/test/ut/ComQueueTestMain.cpp"
"${CMAKE_CURRENT_LIST_DIR}/test/ut/ComQueueTester.cpp"
)

set(UT_AUTO_HELPERS ON)
register_fprime_ut()
32 changes: 26 additions & 6 deletions Svc/ComQueue/ComQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,19 @@ void ComQueue::configure(QueueConfigurationTable queueConfig,
void ComQueue::comQueueIn_handler(const NATIVE_INT_TYPE portNum, Fw::ComBuffer& data, U32 context) {
// Ensure that the port number of comQueueIn is consistent with the expectation
FW_ASSERT(portNum >= 0 && portNum < COM_PORT_COUNT, portNum);
this->enqueue(portNum, QueueType::COM_QUEUE, reinterpret_cast<const U8*>(&data), sizeof(Fw::ComBuffer));
(void)this->enqueue(portNum, QueueType::COM_QUEUE, reinterpret_cast<const U8*>(&data), sizeof(Fw::ComBuffer));
}

void ComQueue::buffQueueIn_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& fwBuffer) {
const NATIVE_INT_TYPE queueNum = portNum + COM_PORT_COUNT;
// Ensure that the port number of buffQueueIn is consistent with the expectation
FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, portNum);
FW_ASSERT(queueNum < TOTAL_PORT_COUNT);
this->enqueue(queueNum, QueueType::BUFFER_QUEUE, reinterpret_cast<const U8*>(&fwBuffer), sizeof(Fw::Buffer));
bool status =
this->enqueue(queueNum, QueueType::BUFFER_QUEUE, reinterpret_cast<const U8*>(&fwBuffer), sizeof(Fw::Buffer));
if (!status) {
this->deallocate_out(portNum, fwBuffer);
}
}

void ComQueue::comStatusIn_handler(const NATIVE_INT_TYPE portNum, Fw::Success& condition) {
Expand Down Expand Up @@ -181,29 +185,45 @@ void ComQueue::run_handler(const NATIVE_INT_TYPE portNum, U32 context) {
this->tlmWrite_buffQueueDepth(buffQueueDepth);
}

// ----------------------------------------------------------------------
// Hook implementations for typed async input ports
// ----------------------------------------------------------------------

void ComQueue::buffQueueIn_overflowHook(FwIndexType portNum, Fw::Buffer& fwBuffer) {
FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, portNum);
this->deallocate_out(portNum, fwBuffer);
}

// ----------------------------------------------------------------------
// Private helper methods
// ----------------------------------------------------------------------

void ComQueue::enqueue(const FwIndexType queueNum, QueueType queueType, const U8* data, const FwSizeType size) {
bool ComQueue::enqueue(const FwIndexType queueNum, QueueType queueType, const U8* data, const FwSizeType size) {
// Enqueue the given message onto the matching queue. When no space is available then emit the queue overflow event,
// set the appropriate throttle, and move on. Will assert if passed a message for a depth 0 queue.
const FwSizeType expectedSize = (queueType == QueueType::COM_QUEUE) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
const FwIndexType portNum = queueNum - ((queueType == QueueType::COM_QUEUE) ? 0 : COM_PORT_COUNT);
bool rvStatus = true;
FW_ASSERT(
expectedSize == size,
static_cast<FwAssertArgType>(size),
static_cast<FwAssertArgType>(expectedSize));
FW_ASSERT(portNum >= 0, portNum);
Fw::SerializeStatus status = this->m_queues[queueNum].enqueue(data, size);
if (status == Fw::FW_SERIALIZE_NO_ROOM_LEFT && !this->m_throttle[queueNum]) {
this->log_WARNING_HI_QueueOverflow(queueType, static_cast<U32>(portNum));
this->m_throttle[queueNum] = true;
if (status == Fw::FW_SERIALIZE_NO_ROOM_LEFT) {
if (!this->m_throttle[queueNum]) {
this->log_WARNING_HI_QueueOverflow(queueType, static_cast<U32>(portNum));
this->m_throttle[queueNum] = true;
}

rvStatus = false;
}
// When the component is already in READY state process the queue to send out the next available message immediately
if (this->m_state == READY) {
this->processQueue();
}

return rvStatus;
}

void ComQueue::sendComBuffer(Fw::ComBuffer& comBuffer) {
Expand Down
7 changes: 5 additions & 2 deletions Svc/ComQueue/ComQueue.fpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Svc {

@ Array of queue depths for Fw::Com types
array ComQueueDepth = [ComQueueComPorts] U32

@ Array of queue depths for Fw::Buffer types
array BuffQueueDepth = [ComQueueBufferPorts] U32

Expand All @@ -22,14 +22,17 @@ module Svc {
@ Fw::Buffer output port
output port buffQueueSend: Fw.BufferSend

@ Port for deallocating Fw::Buffer on queue overflow
output port deallocate: Fw.BufferSend

@ Port for receiving the status signal
async input port comStatusIn: Fw.SuccessCondition

@ Port array for receiving Fw::ComBuffers
async input port comQueueIn: [ComQueueComPorts] Fw.Com drop

@ Port array for receiving Fw::Buffers
async input port buffQueueIn: [ComQueueBufferPorts] Fw.BufferSend drop
async input port buffQueueIn: [ComQueueBufferPorts] Fw.BufferSend hook

@ Port for scheduling telemetry output
async input port run: Svc.Sched drop
Expand Down
12 changes: 11 additions & 1 deletion Svc/ComQueue/ComQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,23 @@ class ComQueue : public ComQueueComponentBase {
U32 context /*!<The call order*/
);

// ----------------------------------------------------------------------
// Hook implementations for typed async input ports
// ----------------------------------------------------------------------

//! Queue overflow hook method that deallocates the fwBuffer
//!
void buffQueueIn_overflowHook(FwIndexType portNum, //!< The port number
Fw::Buffer& fwBuffer //!< The buffer
);

// ----------------------------------------------------------------------
// Helper Functions
// ----------------------------------------------------------------------

//! Enqueues a message on the appropriate queue
//!
void enqueue(const FwIndexType queueNum, //!< Index of the queue to enqueue the message
bool enqueue(const FwIndexType queueNum, //!< Index of the queue to enqueue the message
QueueType queueType, //!< Type of the queue and message data
const U8* data, //!< Pointer to the message data
const FwSizeType size //!< Size of the message
Expand Down
1 change: 1 addition & 0 deletions Svc/ComQueue/docs/sdd.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The diagram below shows the `Svc::ComQueue` component.
|---------------|-------------------|---------------------------------------|--------------------------------------------------------|
| `output` | `comQueueSend` | `Fw.Com` | Fw::ComBuffer output port |
| `output` | `buffQueueSend` | `Fw.BufferSend` | Fw::Buffer output port |
| `output` | `deallocate` | `Fw.BufferSend` | Port for deallocating Fw::Buffer on queue overflow |
| `async input` | `comStatusIn` | `Fw.SuccessCondition` | Port for receiving the status signal |
| `async input` | `comQueueIn` | `[ComQueueComPorts] Fw.Com` | Port array for receiving Fw::ComBuffers |
| `async input` | `buffQueueIn` | `[ComQueueBufferPorts] Fw.BufferSend` | Port array for receiving Fw::Buffers |
Expand Down
9 changes: 7 additions & 2 deletions Svc/ComQueue/test/ut/ComQueueTestMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ TEST(Nominal, Priority) {
tester.testPrioritySend();
}

TEST(Nominal, Full) {
TEST(Nominal, ExternalQueueOverflow) {
Svc::ComQueueTester tester;
tester.testQueueOverflow();
tester.testExternalQueueOverflow();
}

TEST(Nominal, InternalQueueOverflow) {
Svc::ComQueueTester tester;
tester.testInternalQueueOverflow();
}

TEST(Nominal, ReadyFirst) {
Expand Down
108 changes: 57 additions & 51 deletions Svc/ComQueue/test/ut/ComQueueTester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ void ComQueueTester ::configure() {
component.configure(configurationTable, 0, mallocAllocator);
}

void ComQueueTester ::sendByQueueNumber(NATIVE_INT_TYPE queueNum, NATIVE_INT_TYPE& portNum, QueueType& queueType) {
U8 data[BUFFER_LENGTH] = {0xde, 0xad, 0xbe};
Fw::ComBuffer comBuffer(&data[0], sizeof(data));
Fw::Buffer buffer(&data[0], sizeof(data));
void ComQueueTester ::sendByQueueNumber(Fw::Buffer& buffer,
NATIVE_INT_TYPE queueNum,
NATIVE_INT_TYPE& portNum,
QueueType& queueType) {
if (queueNum < ComQueue::COM_PORT_COUNT) {
Fw::ComBuffer comBuffer(buffer.getData(), buffer.getSize());
portNum = queueNum;
queueType = QueueType::COM_QUEUE;
invoke_to_comQueueIn(portNum, comBuffer, 0);
Expand Down Expand Up @@ -186,12 +187,12 @@ void ComQueueTester ::testPrioritySend() {
component.cleanup();
}

void ComQueueTester::testQueueOverflow(){
void ComQueueTester::testExternalQueueOverflow() {
ComQueue::QueueConfigurationTable configurationTable;
ComQueueDepth expectedComDepth;
BuffQueueDepth expectedBuffDepth;

for (NATIVE_UINT_TYPE i = 0; i < ComQueue::TOTAL_PORT_COUNT; i++){
for (NATIVE_UINT_TYPE i = 0; i < ComQueue::TOTAL_PORT_COUNT; i++) {
configurationTable.entries[i].priority = i;
configurationTable.entries[i].depth = 2;

Expand All @@ -205,25 +206,40 @@ void ComQueueTester::testQueueOverflow(){

component.configure(configurationTable, 0, mallocAllocator);

for(NATIVE_INT_TYPE queueNum = 0; queueNum < ComQueue::TOTAL_PORT_COUNT; queueNum++) {
U8 data[BUFFER_LENGTH] = {0xde, 0xad, 0xbe};
Fw::Buffer buffer(&data[0], sizeof(data));

for (NATIVE_INT_TYPE queueNum = 0; queueNum < ComQueue::TOTAL_PORT_COUNT; queueNum++) {
QueueType overflow_type;
NATIVE_INT_TYPE portNum;
// queue[portNum].depth + 2 to deliberately cause overflow and check throttle of exactly 1
for (NATIVE_UINT_TYPE msgCount = 0; msgCount < configurationTable.entries[queueNum].depth + 2; msgCount++) {
sendByQueueNumber(queueNum, portNum, overflow_type);
sendByQueueNumber(buffer, queueNum, portNum, overflow_type);
dispatchAll();
}

if (QueueType::BUFFER_QUEUE == overflow_type) {
ASSERT_from_deallocate_SIZE(2);
ASSERT_from_deallocate(0, buffer);
ASSERT_from_deallocate(1, buffer);
}

ASSERT_EVENTS_QueueOverflow_SIZE(1);
ASSERT_EVENTS_QueueOverflow(0, overflow_type, portNum);

// Drain a message, and see if throttle resets
emitOne();

// Force another overflow by filling then deliberately overflowing the queue
sendByQueueNumber(queueNum, portNum, overflow_type);
sendByQueueNumber(queueNum, portNum, overflow_type);
sendByQueueNumber(buffer, queueNum, portNum, overflow_type);
sendByQueueNumber(buffer, queueNum, portNum, overflow_type);
dispatchAll();

if (QueueType::BUFFER_QUEUE == overflow_type) {
ASSERT_from_deallocate_SIZE(3);
ASSERT_from_deallocate(2, buffer);
}

ASSERT_EVENTS_QueueOverflow_SIZE(2);
ASSERT_EVENTS_QueueOverflow(1, overflow_type, portNum);

Expand All @@ -243,6 +259,37 @@ void ComQueueTester::testQueueOverflow(){
component.cleanup();
}

void ComQueueTester::testInternalQueueOverflow() {
U8 data[BUFFER_LENGTH] = {0xde, 0xad, 0xbe};
Fw::Buffer buffer(data, sizeof(data));

const NATIVE_INT_TYPE queueNum = ComQueue::COM_PORT_COUNT;
const NATIVE_INT_TYPE msgCountMax = this->component.m_queue.getQueueSize();
QueueType overflow_type;
NATIVE_INT_TYPE portNum;

// fill the queue
for (NATIVE_INT_TYPE msgCount = 0; msgCount < msgCountMax; msgCount++) {
sendByQueueNumber(buffer, queueNum, portNum, overflow_type);
ASSERT_EQ(overflow_type, QueueType::BUFFER_QUEUE);
}

// send one more to overflow the queue
sendByQueueNumber(buffer, queueNum, portNum, overflow_type);

ASSERT_from_deallocate_SIZE(1);
ASSERT_from_deallocate(0, buffer);

// send another
sendByQueueNumber(buffer, queueNum, portNum, overflow_type);

ASSERT_from_deallocate_SIZE(2);
ASSERT_from_deallocate(0, buffer);
ASSERT_from_deallocate(1, buffer);

component.cleanup();
}

void ComQueueTester ::testReadyFirst() {
U8 data[BUFFER_LENGTH] = {0xde, 0xad, 0xbe};
Fw::ComBuffer comBuffer(&data[0], sizeof(data));
Expand Down Expand Up @@ -283,45 +330,4 @@ void ComQueueTester ::from_comQueueSend_handler(const NATIVE_INT_TYPE portNum, F
// Helper methods
// ----------------------------------------------------------------------

void ComQueueTester ::connectPorts() {
// buffQueueIn
for (NATIVE_INT_TYPE i = 0; i < ComQueue::BUFFER_PORT_COUNT; ++i) {
this->connect_to_buffQueueIn(i, this->component.get_buffQueueIn_InputPort(i));
}

// comQueueIn
for (NATIVE_INT_TYPE i = 0; i < ComQueue::COM_PORT_COUNT; ++i) {
this->connect_to_comQueueIn(i, this->component.get_comQueueIn_InputPort(i));
}

// comStatusIn
this->connect_to_comStatusIn(0, this->component.get_comStatusIn_InputPort(0));

// run
this->connect_to_run(0, this->component.get_run_InputPort(0));

// Log
this->component.set_Log_OutputPort(0, this->get_from_Log(0));

// LogText
this->component.set_LogText_OutputPort(0, this->get_from_LogText(0));

// Time
this->component.set_Time_OutputPort(0, this->get_from_Time(0));

// Tlm
this->component.set_Tlm_OutputPort(0, this->get_from_Tlm(0));

// buffQueueSend
this->component.set_buffQueueSend_OutputPort(0, this->get_from_buffQueueSend(0));

// comQueueSend
this->component.set_comQueueSend_OutputPort(0, this->get_from_comQueueSend(0));
}

void ComQueueTester ::initComponents() {
this->init();
this->component.init(QUEUE_DEPTH, INSTANCE);
}

} // end namespace Svc
22 changes: 20 additions & 2 deletions Svc/ComQueue/test/ut/ComQueueTester.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@
namespace Svc {

class ComQueueTester : public ComQueueGTestBase {

public:

// ----------------------------------------------------------------------
// Constants
// ----------------------------------------------------------------------

// Instance ID supplied to the component instance under test
static const NATIVE_INT_TYPE TEST_INSTANCE_ID = 0;

// Queue depth supplied to the component instance under test
static const NATIVE_INT_TYPE TEST_INSTANCE_QUEUE_DEPTH = 10;

private:
// ----------------------------------------------------------------------
// Construction and destruction
Expand All @@ -38,7 +51,10 @@ class ComQueueTester : public ComQueueGTestBase {
// ----------------------------------------------------------------------
void configure();

void sendByQueueNumber(NATIVE_INT_TYPE queueNumber, NATIVE_INT_TYPE& portNum, QueueType& queueType);
void sendByQueueNumber(Fw::Buffer& buffer,
NATIVE_INT_TYPE queueNumber,
NATIVE_INT_TYPE& portNum,
QueueType& queueType);

void emitOne();

Expand All @@ -57,7 +73,9 @@ class ComQueueTester : public ComQueueGTestBase {

void testPrioritySend();

void testQueueOverflow();
void testExternalQueueOverflow();

void testInternalQueueOverflow();

void testReadyFirst();

Expand Down
26 changes: 13 additions & 13 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ fprime-fpl-convert-xml==1.0.3
fprime-fpl-extract-xml==1.0.3
fprime-fpl-layout==1.0.3
fprime-fpl-write-pic==1.0.3
fprime-fpp-check==2.2.0a1
fprime-fpp-depend==2.2.0a1
fprime-fpp-filenames==2.2.0a1
fprime-fpp-format==2.2.0a1
fprime-fpp-from-xml==2.2.0a1
fprime-fpp-locate-defs==2.2.0a1
fprime-fpp-locate-uses==2.2.0a1
fprime-fpp-syntax==2.2.0a1
fprime-fpp-to-cpp==2.2.0a1
fprime-fpp-to-dict==2.2.0a1
fprime-fpp-to-json==2.2.0a1
fprime-fpp-to-xml==2.2.0a1
fprime-fpp-check==2.2.0a2
fprime-fpp-depend==2.2.0a2
fprime-fpp-filenames==2.2.0a2
fprime-fpp-format==2.2.0a2
fprime-fpp-from-xml==2.2.0a2
fprime-fpp-locate-defs==2.2.0a2
fprime-fpp-locate-uses==2.2.0a2
fprime-fpp-syntax==2.2.0a2
fprime-fpp-to-cpp==2.2.0a2
fprime-fpp-to-dict==2.2.0a2
fprime-fpp-to-json==2.2.0a2
fprime-fpp-to-xml==2.2.0a2
fprime-gds==3.4.4a3
fprime-tools==3.4.4
fprime-tools==v3.4.5a1
fprime-visual==1.0.2
gcovr==6.0
idna==3.4
Expand Down

0 comments on commit 6a921a1

Please sign in to comment.