From d49e9579b1bdf173606084223c0bf42175a7cb66 Mon Sep 17 00:00:00 2001 From: Rei Date: Fri, 5 Sep 2014 17:14:51 +0200 Subject: [PATCH] Fully functional fragmentation and reassembly implementation for large payloads. Fragmentation is transparent to calling application. TODO: - multicast fragmentation - test with multiple nodes in a larger network - use a circular queue for the frame_queue data structure - use a LRUCache data structure for the frame reassembly - lower memory consumption - arduino porting - cleanup code - remove redundant and unnecessary code - Based on IPv4 fragmentation https://tools.ietf.org/html/rfc791. - Tested only on Raspberry Pi, may not work in Arduinos due to memory usage Tested only with two raspberries using NRF24L01+ 10 meters apart. - The lib is capable to detect if the message payload is bigger than frame_size-sizeof(Header) and split the payload into chucks and send them separately. - This commit introduces two new flag types: - NETWORK_MORE_FRAGMENTS - NETWORK_LAST_FRAGMENT - As of IPv4 fragmentation this flags are used to mark the frames to notify the receiver if more fragments will come or this is the last fragment. - The previous _write() function to transmit the frames is used. - Changes in payload size management were made to allow dynamic payload sizes without the need to define a field in the header. - Introduce a new struct type "RF24NetworkFrame" for better frame handling inside the lib. This type encapsulates the header, the current size of the payload and a (fixed size) array containing the payload. - The frame_buffer data structure was changes in favor of a std::queue for better handling. - Make some changes renaming variables and defining the values. - Use a std::map with double key (header.to_from,header.id) to identify the incoming fragments, allowing the reassembly of multiple transmission simultaneously. - The reassembly of frames happens in the enqueue(frame) function. Based on the fragment_id and header.id the fragments are handled differently and in the end the completely assembled frames are queued in the frame_queue presenting them to the calling application. - Introduce a new debug flag to print fragmentation debug messages. - Use definitions for static values like frame size or payload max size. - Strip trailing spaces and fix some indentation issues. --- RPi/RF24Network/RF24Network.cpp | 149 ++++++++++++++++++--------- RPi/RF24Network/RF24Network.h | 111 ++++++-------------- RPi/RF24Network/RF24Network_config.h | 9 +- 3 files changed, 138 insertions(+), 131 deletions(-) diff --git a/RPi/RF24Network/RF24Network.cpp b/RPi/RF24Network/RF24Network.cpp index 9602eb73..0f248de4 100644 --- a/RPi/RF24Network/RF24Network.cpp +++ b/RPi/RF24Network/RF24Network.cpp @@ -32,7 +32,7 @@ uint32_t nFails = 0, nOK=0; /******************************************************************/ -RF24Network::RF24Network( RF24& _radio ): radio(_radio) +RF24Network::RF24Network( RF24& _radio ): radio(_radio), frame_size(MAX_FRAME_SIZE) {} /******************************************************************/ @@ -53,6 +53,7 @@ void RF24Network::begin(uint8_t _channel, uint16_t _node_address ) { radio.setDataRate(RF24_1MBPS); radio.setCRCLength(RF24_CRC_16); radio.enableDynamicAck(); + radio.enableDynamicPayloads(); //uint8_t retryVar = (node_address % 7) + 5; uint8_t retryVar = (((node_address % 6)+1) *2) + 3; @@ -110,16 +111,20 @@ uint8_t RF24Network::update(void) memcpy(&header,frame_buffer,sizeof(RF24NetworkHeader)); IF_SERIAL_DEBUG(printf_P("%u: MAC Received on %u %s\n\r",millis(),pipe_num,header.toString())); - IF_SERIAL_DEBUG(const uint16_t* i = reinterpret_cast(frame_buffer + sizeof(RF24NetworkHeader));printf("%u: NET message %04x\n\r",millis(),*i)); + if (len) { + IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: MAC frame size %i",millis(),len);); + IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: MAC frame ",millis()); const char* charPtr = reinterpret_cast(frame_buffer); for (size_t i = 0; i < len; i++) { printf("%02X ", charPtr[i]); }; printf("\n\r")); + } // Throw it away if it's not a valid address if ( !is_valid_address(header.to_node) ){ continue; } + IF_SERIAL_DEBUG(printf_P("%u: MAC Valid frame from %i with size %i received.\n\r",millis(),header.from_node,len)); + // Build the full frame - size_t payload_size = len-sizeof(RF24NetworkHeader); - RF24NetworkFrame frame = RF24NetworkFrame(header,frame_buffer+sizeof(RF24NetworkHeader),payload_size); + RF24NetworkFrame frame = RF24NetworkFrame(header,frame_buffer+sizeof(RF24NetworkHeader),len-sizeof(RF24NetworkHeader)); uint8_t res = header.type; // Is this for us? @@ -186,27 +191,30 @@ uint8_t RF24Network::update(void) bool RF24Network::enqueue(RF24NetworkFrame frame) { bool result = false; - if (frame.header.fragment_id > 0) { - IF_SERIAL_DEBUG(printf("%u: NET message as fragment %i received\n\r",millis(),header.fragment_id);); - - //if fragment_id > 1 then - if (frame.header.fragment_id > 1) { - //Enqueue the frame with it fragmented payload in the LRUCache - //The cache will assemble all payloads into one big payload - //!frameAssemblyCache.put(frame.header.from_node,frame); - result = true; - } else if (frame.header.fragment_id == 1) { - //if we got a fragment_id == 1, then we got the last fragment - //Enqueue this frame into the frame_queue - //!frame_queue.push( frameAssemblyCache.get(frame.header.from_node) ); - result = true; - } else { - // otherwise discard the message with the large payload - result = false; - } + if (frame.header.fragment_id > 1 && frame.header.type == NETWORK_MORE_FRAGMENTS) { + //Set the more fragments flag to indicate a fragmented frame + IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: MAC fragmented payload of size %i Bytes with fragmentID '%i' received.\n\r",millis(),frame.message_size,frame.header.fragment_id);); + + //Append payload + appendFragmentToFrame(frame); + result = true; + + } else if (frame.header.fragment_id == 1 && frame.header.type == NETWORK_LAST_FRAGMENT) { + //Set the last fragment flag to indicate the last fragment + IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: MAC Last fragment with size %i Bytes and fragmentID '%i' received.\n\r",millis(),frame.message_size,frame.header.fragment_id);); + + //Append payload + appendFragmentToFrame(frame); + + IF_SERIAL_DEBUG(printf_P(PSTR("%u: NET Enqueue assembled frame @%x "),millis(),frame_queue.size())); + //Push the assembled frame in the frame_queue and remove it from cache + frame_queue.push( frameFragmentsCache[ std::make_pair(frame.header.from_node,frame.header.id) ] ); + frameFragmentsCache.erase( std::make_pair(frame.header.from_node,frame.header.id) ); + + result = true; } else { - IF_SERIAL_DEBUG(printf_P(PSTR("%u: NET Enqueue @%x "),millis(),frame_queue.remain())); + IF_SERIAL_DEBUG(printf_P(PSTR("%u: NET Enqueue @%x "),millis(),frame_queue.size())); // Copy the current frame into the frame queue frame_queue.push(frame); @@ -224,10 +232,31 @@ bool RF24Network::enqueue(RF24NetworkFrame frame) { /******************************************************************/ +void RF24Network::appendFragmentToFrame(RF24NetworkFrame frame) { + + if (frameFragmentsCache.count(std::make_pair(frame.header.from_node,frame.header.id)) == 0 ) { + //This is the first of many fragments + frameFragmentsCache[ std::make_pair(frame.header.from_node,frame.header.id) ] = frame; + } else { + //We have at least received one fragments. + + //Append payload + RF24NetworkFrame *f = &(frameFragmentsCache[ std::make_pair(frame.header.from_node,frame.header.id) ]); + memcpy(f->message_buffer+f->message_size, frame.message_buffer, frame.message_size); + + //Increment message size + f->message_size += frame.message_size; + //Update header + f->header = frame.header; + } +} + +/******************************************************************/ + bool RF24Network::available(void) { // Are there frames on the queue for us? - return (!frame_queue.isEmpty()); + return (!frame_queue.empty()); } /******************************************************************/ @@ -259,17 +288,20 @@ size_t RF24Network::read(RF24NetworkHeader& header,void* message, size_t maxlen) if ( available() ) { - RF24NetworkFrame frame = frame_queue.pop(); + RF24NetworkFrame frame = frame_queue.front(); // How much buffer size should we actually copy? - bufsize = std::min(frame.getPayloadSize(),maxlen); + bufsize = std::min(frame.message_size,maxlen); memcpy(&header,&(frame.header),sizeof(RF24NetworkHeader)); - memcpy(message,frame.getPayloadArray(),bufsize); + memcpy(message,frame.message_buffer,bufsize); - IF_SERIAL_DEBUG(printf_P(PSTR("%u: NET Received %s\n\r"),millis(),header.toString())); - } + IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: NET message size %i\n",millis(),frame.message_size);); + IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: NET message ",millis()); const char* charPtr = reinterpret_cast(message); for (size_t i = 0; i < bufsize; i++) { printf("%02X ", charPtr[i]); }; printf("\n\r")); + IF_SERIAL_DEBUG(printf_P(PSTR("%u: NET readed %s\n\r"),millis(),header.toString())); + frame_queue.pop(); + } return bufsize; } @@ -309,25 +341,33 @@ bool RF24Network::write(RF24NetworkHeader& header,const void* message, size_t le bool txSuccess = true; //Check payload size + if (len > MAX_PAYLOAD_SIZE) { + IF_SERIAL_DEBUG(printf("%u: NET write message failed. Given 'len' is bigger than the MAX Payload size of %i\n\r",millis(),MAX_PAYLOAD_SIZE);); + return false; + } + + //If message payload length fits in a single message + //then use the normal _write() function if (len <= max_frame_payload_size) { - //If message payload length fits in a single message - //then use the normal _write() function return _write(header,message,len,writeDirect); } - if (len >= 255*max_frame_payload_size) { - //If the message payload is too big, whe cannot generate enough fragments - //and enumerate them + //If the message payload is too big, whe cannot generate enough fragments + //and enumerate them + if (len > 255*max_frame_payload_size) { + txSuccess = false; return txSuccess; } - //The payload is smaller than 6kBytes. We cann transmit it. + + //The payload is smaller than MAX_PAYLOAD_SIZE and we can enumerate the fragments. + // --> We cann transmit the message. //Divide the message payload into chuncks of max_frame_payload_size uint8_t fragment_id = 1 + ((len - 1) / max_frame_payload_size); //the number of fragments to send = ceil(len/max_frame_payload_size) uint8_t msgCount = 0; - IF_SERIAL_DEBUG(printf("%u: NET total message fragments %i\n\r",millis(),fragment_id);); + IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: NET total message fragments %i\n\r",millis(),fragment_id);); //Iterate over the payload chuncks // Assemble a new message, copy and fill out the header @@ -338,34 +378,38 @@ bool RF24Network::write(RF24NetworkHeader& header,const void* message, size_t le while (fragment_id > 0) { //Copy and fill out the header - RF24NetworkHeader fragmentHeader; - fragmentHeader = header; + RF24NetworkHeader fragmentHeader = header; fragmentHeader.fragment_id = fragment_id; + if (fragment_id == 1) { + fragmentHeader.type = NETWORK_LAST_FRAGMENT; //Set the last fragment flag to indicate the last fragment + } else { + fragmentHeader.type = NETWORK_MORE_FRAGMENTS; //Set the more fragments flag to indicate a fragmented frame + } + size_t offset = msgCount*max_frame_payload_size; + size_t fragmentLen = std::min(len-offset,max_frame_payload_size); - IF_SERIAL_DEBUG(printf("%u: NET try to transmit msg with fragment '%i'\n\r",millis(),(msgCount+1));); + IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: NET try to transmit fragmented payload of size %i Bytes with fragmentID '%i'\n\r",millis(),fragmentLen,fragment_id);); //Try to send the payload chunk with the copied header - bool ok = _write(fragmentHeader,message+offset,len-offset,writeDirect); + bool ok = _write(fragmentHeader,message+offset,fragmentLen,writeDirect); if (!ok) { - IF_SERIAL_DEBUG(printf("%u: NET message transmission with fragmentID '%i' failed. Abort.\n\r",millis(),(msgCount+1));); + IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: NET message transmission with fragmentID '%i' failed. Abort.\n\r",millis(),fragment_id);); txSuccess = false; break; } - IF_SERIAL_DEBUG(printf("%u: NET message transmission with fragmentID '%i' sucessfull.\n\r",millis(),(msgCount+1));); + IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: NET message transmission with fragmentID '%i' sucessfull.\n\r",millis(),fragment_id);); //Message was successful sent //Check and modify counters fragment_id--; msgCount++; - - IF_SERIAL_DEBUG(printf("%u: NET DEBUG: fragmentID=%i msgCount=%i loop_done?=%i\n\r",millis(),fragment_id,msgCount,!(fragment_id > 0));); } //Return true if all the chuncks where sent successfuly //else return false - IF_SERIAL_DEBUG(printf("%u: NET total message fragments sent %i. Status",millis(),msgCount); printf("%s\n\r", ok ? "ok" : "not ok");); + IF_SERIAL_DEBUG(printf("%u: NET total message fragments sent %i. txSuccess ",millis(),msgCount); printf("%s\n\r", txSuccess ? "YES" : "NO");); return txSuccess; } /******************************************************************/ @@ -376,21 +420,26 @@ bool RF24Network::_write(RF24NetworkHeader& header,const void* message, size_t l header.from_node = node_address; // Build the full frame to send - if (len) - memcpy(frame_buffer + sizeof(RF24NetworkHeader),message,std::min(frame_size-sizeof(RF24NetworkHeader),len)); + memcpy(frame_buffer,&header,sizeof(RF24NetworkHeader)); + frame_size = sizeof(RF24NetworkHeader); //Set the current frame size + if (len) { + memcpy(frame_buffer + sizeof(RF24NetworkHeader),message,std::min(MAX_FRAME_SIZE-sizeof(RF24NetworkHeader),len)); + frame_size += len; //Set the current frame size + } IF_SERIAL_DEBUG(printf_P(PSTR("%u: NET Sending %s\n\r"),millis(),header.toString())); - if (len) + if (frame_size) { // IF_SERIAL_DEBUG(const uint16_t* i = reinterpret_cast(message);printf_P(PSTR("%u: NET message %04x\n\r"),millis(),*i)); - IF_SERIAL_DEBUG(printf("%u: NET message ",millis());const char* charPtr = reinterpret_cast(message);while(len--){ printf("%02x ",charPtr[len]);} printf("\n\r") ); + IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: MAC frame size %i",millis(),frame_size);); + IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: MAC frame ",millis()); const char* charPtr = reinterpret_cast(frame_buffer); for (size_t i = 0; i < frame_size; i++) { printf("%02X ", charPtr[i]); }; printf("\n\r")); } // If the user is trying to send it to himself if ( header.to_node == node_address ){ // Build the frame to send - RF24NetworkFrame frame = RF24NetworkFrame(header,message,sizeof(message)); + RF24NetworkFrame frame = RF24NetworkFrame(header,message,std::min(MAX_FRAME_SIZE-sizeof(RF24NetworkHeader),len)); // Just queue it in the received queue return enqueue(frame); }else{ diff --git a/RPi/RF24Network/RF24Network.h b/RPi/RF24Network/RF24Network.h index e28454c1..c0d16fd4 100644 --- a/RPi/RF24Network/RF24Network.h +++ b/RPi/RF24Network/RF24Network.h @@ -25,17 +25,23 @@ #include #include #include -//!#include -#include +#include +#include // std::pair +#include #include "RF24Network_config.h" -#include "CircularBuffer.h" -#include "FrameLRUCache.h" #define MAX_FRAME_SIZE 32 #define MAX_FRAME_BUFFER_SIZE 255 -#define MAX_PAYLOAD_SIZE ((MAX_FRAME_SIZE-sizeof(RF24NetworkHeader))*255) +#define MAX_PAYLOAD_SIZE 1500 #define MAX_LRU_CACHE_SIZE 32 +//Network header message types +#define NETWORK_ACK_REQUEST 128 +#define NETWORK_ACK 129 +#define NETWORK_MORE_FRAGMENTS 130 +#define NETWORK_LAST_FRAGMENT 131 + + class RF24; /** @@ -96,8 +102,8 @@ struct RF24NetworkHeader struct RF24NetworkFrame { RF24NetworkHeader header; /**< Header which is sent with each message */ - size_t payload_size; /**< The size in bytes of the payload length */ - std::list payload_buffer; /**< Vector to put the frame payload that will be sent/received over the air */ + size_t message_size; /**< The size in bytes of the payload length */ + uint8_t message_buffer[MAX_PAYLOAD_SIZE]; /**< Vector to put the frame payload that will be sent/received over the air */ /** * Default constructor @@ -110,39 +116,24 @@ struct RF24NetworkFrame * Send constructor * * Use this constructor to create a frame with header and payload and then send a message - * - * @code - * RF24NetworkFrame frame(recipient_address,'t',message,sizeof(message)); - * network.write(frame); - * @endcode - * - * @param _to The logical node address where the message is going. - * @param _type The type of message which follows. Only 0-127 are allowed for - * user messages. - * @param _payload The message content. - * @param _psize Length in bytes of the payload. */ - RF24NetworkFrame(uint16_t _to, unsigned char _type = 0, const void* _payload = NULL, size_t _psize = 0) : header(RF24NetworkHeader(_to,_type)), payload_size(_psize) { - appendToPayload(_payload,payload_size); + RF24NetworkFrame(uint16_t _to, unsigned char _type = 0, const void* _message = NULL, size_t _len = 0) : + header(RF24NetworkHeader(_to,_type)), message_size(_len) { + if (_message && _len) { + memcpy(message_buffer,_message,_len); + } } /** * Send constructor * * Use this constructor to create a frame with header and payload and then send a message - * - * @code - * RF24NetworkHeader header(recipient_address,'t'); - * RF24NetworkFrame frame(header,message,sizeof(message)); - * network.write(frame); - * @endcode - * - * @param _header The header struct of the frame - * @param _payload The message content. - * @param _psize Length in bytes of the payload. */ - RF24NetworkFrame(RF24NetworkHeader& _header, const void* _payload = NULL, size_t _psize = 0) : header(_header), payload_size(_psize) { - appendToPayload(_payload,payload_size); + RF24NetworkFrame(RF24NetworkHeader& _header, const void* _message = NULL, size_t _len = 0) : + header(_header), message_size(_len) { + if (_message && _len) { + memcpy(message_buffer,_message,_len); + } } /** @@ -156,47 +147,6 @@ struct RF24NetworkFrame */ const char* toString(void) const; - /** - * Get the frame payload (message) as array - * - * Internally is the payload a vector. - * @return array pointer to the payload - */ - const void* getPayloadArray(void) { - //!const void* res = &payload_buffer[0]; - //!return res; - uint8_t *arr = new uint8_t[payload_buffer.size()]; - std::copy(payload_buffer.begin(),payload_buffer.end(),arr); - return arr; - } - - /** - * Get the payload size - * - * @return Size in bytes of the current payload - */ - size_t getPayloadSize(void) { - return payload_buffer.size(); - } - - /** - * Append the given payload to the current payload_buffer - * - */ - void appendToPayload(const void* payload, size_t len) { - if (payload && len) { - if ((payload_buffer.size()+len) <= MAX_PAYLOAD_SIZE) { - //Cast the payload as uint8_t - const uint8_t *q = (const uint8_t *)payload; - //Append the payload to the buffer - payload_buffer.insert(payload_buffer.end(), q, q+len); - //Save space in RAM - //!payload_buffer.shrink_to_fit(); - } - } - //Set the new payload_buffer size - payload_size = payload_buffer.size(); - }; }; /** @@ -373,6 +323,7 @@ class RF24Network uint8_t pipe_to_descendant( uint16_t node ); void setup_address(void); bool _write(RF24NetworkHeader& header,const void* message, size_t len, uint16_t writeDirect); + void appendFragmentToFrame(RF24NetworkFrame frame); private: #if defined (RF24NetworkMulticast) @@ -381,17 +332,17 @@ class RF24Network #endif RF24& radio; /**< Underlying radio driver, provides link/physical layers */ uint16_t node_address; /**< Logical node address of this unit, 1 .. UINT_MAX */ - const static unsigned int frame_size = MAX_FRAME_SIZE; /**< How large is each frame over the air */ - const static unsigned int max_frame_payload_size = frame_size-sizeof(RF24NetworkHeader); - uint8_t frame_buffer[frame_size]; /**< Space to put the frame that will be sent/received over the air */ - CircularBuffer frame_queue; /**< RPi can buffer 500 frames (16kB) - Arduino does 5 by default. Space for a small set of frames that need to be delivered to the app layer */ - //!FrameLRUCache frameAssemblyCache; + uint8_t frame_size; /**< How large is each frame over the air */ + const static unsigned int max_frame_payload_size = MAX_FRAME_SIZE-sizeof(RF24NetworkHeader); + uint8_t frame_buffer[MAX_FRAME_SIZE]; /**< Space to put the frame that will be sent/received over the air */ + std::queue frame_queue; + std::map, RF24NetworkFrame> frameFragmentsCache; + uint16_t parent_node; /**< Our parent's node address */ uint8_t parent_pipe; /**< The pipe our parent uses to listen to us */ uint16_t node_mask; /**< The bits which contain signfificant node address information */ - #define NETWORK_ACK_REQUEST 128 - #define NETWORK_ACK 129 + }; /** diff --git a/RPi/RF24Network/RF24Network_config.h b/RPi/RF24Network/RF24Network_config.h index 693e6715..fd3de785 100644 --- a/RPi/RF24Network/RF24Network_config.h +++ b/RPi/RF24Network/RF24Network_config.h @@ -15,8 +15,9 @@ /********** USER CONFIG **************/ //#define RF24NetworkMulticast -//#define SERIAL_DEBUG //Change #undef to #define for debug +#define SERIAL_DEBUG //Change #undef to #define for debug #define SERIAL_DEBUG_ROUTING +//#define SERIAL_DEBUG_FRAGMENTATION /*************************************/ @@ -35,6 +36,12 @@ #define IF_SERIAL_DEBUG(x) #endif +#if defined (SERIAL_DEBUG_FRAGMENTATION) +#define IF_SERIAL_DEBUG_FRAGMENTATION(x) ({x;}) +#else +#define IF_SERIAL_DEBUG_FRAGMENTATION(x) +#endif + // Avoid spurious warnings #if ! defined( NATIVE ) && defined( ARDUINO ) #undef PROGMEM