Skip to content

Commit

Permalink
Fully functional fragmentation and reassembly implementation for larg…
Browse files Browse the repository at this point in the history
…e payloads.

Fragmentation is transparent to calling application.

TODO:
        - multicast fragmentation
        - test with multiple nodes in a larger network
        - use a circular queue for the frame_queue data structure
        - use a LRUCache data structure for the frame reassembly
        - lower memory consumption
        - arduino porting
        - cleanup code
        - remove redundant and unnecessary code

- Based on IPv4 fragmentation https://tools.ietf.org/html/rfc791.
- Tested only on Raspberry Pi, may not work in Arduinos due to memory usage
  Tested only with two raspberries using NRF24L01+ 10 meters apart.
- The lib is capable to detect if the message payload is bigger than frame_size-sizeof(Header)
  and split the payload into chucks and send them separately.
- This commit introduces two new flag types:
        - NETWORK_MORE_FRAGMENTS
        - NETWORK_LAST_FRAGMENT
- As of IPv4 fragmentation this flags are used to mark the frames to notify the receiver if more
  fragments will come or this is the last fragment.
- The previous _write() function to transmit the frames is used.
- Changes in payload size management were made to allow dynamic payload sizes without the need
  to define a field in the header.
- Introduce a new struct type "RF24NetworkFrame" for better frame handling inside the lib.
  This type encapsulates the header, the current size of the payload and a (fixed size) array
  containing the payload.
- The frame_buffer data structure was changes in favor of a std::queue for better handling.
- Make some changes renaming variables and defining the values.
- Use a std::map with double key (header.to_from,header.id) to identify the incoming fragments,
  allowing the reassembly of multiple transmission simultaneously.
- The reassembly of frames happens in the enqueue(frame) function.
  Based on the fragment_id and header.id the fragments are handled differently and in the end
  the completely assembled frames are queued in the frame_queue presenting them to the calling
  application.
- Introduce a new debug flag to print fragmentation debug messages.
- Use definitions for static values like frame size or payload max size.
- Strip trailing spaces and fix some indentation issues.
  • Loading branch information
reixd committed Sep 5, 2014
1 parent 4967084 commit d49e957
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 131 deletions.
149 changes: 99 additions & 50 deletions RPi/RF24Network/RF24Network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ uint32_t nFails = 0, nOK=0;

/******************************************************************/

RF24Network::RF24Network( RF24& _radio ): radio(_radio)
RF24Network::RF24Network( RF24& _radio ): radio(_radio), frame_size(MAX_FRAME_SIZE)
{}

/******************************************************************/
Expand All @@ -53,6 +53,7 @@ void RF24Network::begin(uint8_t _channel, uint16_t _node_address ) {
radio.setDataRate(RF24_1MBPS);
radio.setCRCLength(RF24_CRC_16);
radio.enableDynamicAck();
radio.enableDynamicPayloads();

//uint8_t retryVar = (node_address % 7) + 5;
uint8_t retryVar = (((node_address % 6)+1) *2) + 3;
Expand Down Expand Up @@ -110,16 +111,20 @@ uint8_t RF24Network::update(void)
memcpy(&header,frame_buffer,sizeof(RF24NetworkHeader));

IF_SERIAL_DEBUG(printf_P("%u: MAC Received on %u %s\n\r",millis(),pipe_num,header.toString()));
IF_SERIAL_DEBUG(const uint16_t* i = reinterpret_cast<const uint16_t*>(frame_buffer + sizeof(RF24NetworkHeader));printf("%u: NET message %04x\n\r",millis(),*i));
if (len) {
IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: MAC frame size %i",millis(),len););
IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: MAC frame ",millis()); const char* charPtr = reinterpret_cast<const char*>(frame_buffer); for (size_t i = 0; i < len; i++) { printf("%02X ", charPtr[i]); }; printf("\n\r"));
}

// Throw it away if it's not a valid address
if ( !is_valid_address(header.to_node) ){
continue;
}

IF_SERIAL_DEBUG(printf_P("%u: MAC Valid frame from %i with size %i received.\n\r",millis(),header.from_node,len));

// Build the full frame
size_t payload_size = len-sizeof(RF24NetworkHeader);
RF24NetworkFrame frame = RF24NetworkFrame(header,frame_buffer+sizeof(RF24NetworkHeader),payload_size);
RF24NetworkFrame frame = RF24NetworkFrame(header,frame_buffer+sizeof(RF24NetworkHeader),len-sizeof(RF24NetworkHeader));

uint8_t res = header.type;
// Is this for us?
Expand Down Expand Up @@ -186,27 +191,30 @@ uint8_t RF24Network::update(void)
bool RF24Network::enqueue(RF24NetworkFrame frame) {
bool result = false;

if (frame.header.fragment_id > 0) {
IF_SERIAL_DEBUG(printf("%u: NET message as fragment %i received\n\r",millis(),header.fragment_id););

//if fragment_id > 1 then
if (frame.header.fragment_id > 1) {
//Enqueue the frame with it fragmented payload in the LRUCache
//The cache will assemble all payloads into one big payload
//!frameAssemblyCache.put(frame.header.from_node,frame);
result = true;
} else if (frame.header.fragment_id == 1) {
//if we got a fragment_id == 1, then we got the last fragment
//Enqueue this frame into the frame_queue
//!frame_queue.push( frameAssemblyCache.get(frame.header.from_node) );
result = true;
} else {
// otherwise discard the message with the large payload
result = false;
}
if (frame.header.fragment_id > 1 && frame.header.type == NETWORK_MORE_FRAGMENTS) {
//Set the more fragments flag to indicate a fragmented frame
IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: MAC fragmented payload of size %i Bytes with fragmentID '%i' received.\n\r",millis(),frame.message_size,frame.header.fragment_id););

//Append payload
appendFragmentToFrame(frame);
result = true;

} else if (frame.header.fragment_id == 1 && frame.header.type == NETWORK_LAST_FRAGMENT) {
//Set the last fragment flag to indicate the last fragment
IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: MAC Last fragment with size %i Bytes and fragmentID '%i' received.\n\r",millis(),frame.message_size,frame.header.fragment_id););

//Append payload
appendFragmentToFrame(frame);

IF_SERIAL_DEBUG(printf_P(PSTR("%u: NET Enqueue assembled frame @%x "),millis(),frame_queue.size()));
//Push the assembled frame in the frame_queue and remove it from cache
frame_queue.push( frameFragmentsCache[ std::make_pair(frame.header.from_node,frame.header.id) ] );
frameFragmentsCache.erase( std::make_pair(frame.header.from_node,frame.header.id) );

result = true;

} else {
IF_SERIAL_DEBUG(printf_P(PSTR("%u: NET Enqueue @%x "),millis(),frame_queue.remain()));
IF_SERIAL_DEBUG(printf_P(PSTR("%u: NET Enqueue @%x "),millis(),frame_queue.size()));

// Copy the current frame into the frame queue
frame_queue.push(frame);
Expand All @@ -224,10 +232,31 @@ bool RF24Network::enqueue(RF24NetworkFrame frame) {

/******************************************************************/

void RF24Network::appendFragmentToFrame(RF24NetworkFrame frame) {

if (frameFragmentsCache.count(std::make_pair(frame.header.from_node,frame.header.id)) == 0 ) {
//This is the first of many fragments
frameFragmentsCache[ std::make_pair(frame.header.from_node,frame.header.id) ] = frame;
} else {
//We have at least received one fragments.

//Append payload
RF24NetworkFrame *f = &(frameFragmentsCache[ std::make_pair(frame.header.from_node,frame.header.id) ]);
memcpy(f->message_buffer+f->message_size, frame.message_buffer, frame.message_size);

//Increment message size
f->message_size += frame.message_size;
//Update header
f->header = frame.header;
}
}

/******************************************************************/

bool RF24Network::available(void)
{
// Are there frames on the queue for us?
return (!frame_queue.isEmpty());
return (!frame_queue.empty());
}

/******************************************************************/
Expand Down Expand Up @@ -259,17 +288,20 @@ size_t RF24Network::read(RF24NetworkHeader& header,void* message, size_t maxlen)

if ( available() )
{
RF24NetworkFrame frame = frame_queue.pop();
RF24NetworkFrame frame = frame_queue.front();

// How much buffer size should we actually copy?
bufsize = std::min(frame.getPayloadSize(),maxlen);
bufsize = std::min(frame.message_size,maxlen);

memcpy(&header,&(frame.header),sizeof(RF24NetworkHeader));
memcpy(message,frame.getPayloadArray(),bufsize);
memcpy(message,frame.message_buffer,bufsize);

IF_SERIAL_DEBUG(printf_P(PSTR("%u: NET Received %s\n\r"),millis(),header.toString()));
}
IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: NET message size %i\n",millis(),frame.message_size););
IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: NET message ",millis()); const char* charPtr = reinterpret_cast<const char*>(message); for (size_t i = 0; i < bufsize; i++) { printf("%02X ", charPtr[i]); }; printf("\n\r"));
IF_SERIAL_DEBUG(printf_P(PSTR("%u: NET readed %s\n\r"),millis(),header.toString()));

frame_queue.pop();
}
return bufsize;
}

Expand Down Expand Up @@ -309,25 +341,33 @@ bool RF24Network::write(RF24NetworkHeader& header,const void* message, size_t le
bool txSuccess = true;

//Check payload size
if (len > MAX_PAYLOAD_SIZE) {
IF_SERIAL_DEBUG(printf("%u: NET write message failed. Given 'len' is bigger than the MAX Payload size of %i\n\r",millis(),MAX_PAYLOAD_SIZE););
return false;
}

//If message payload length fits in a single message
//then use the normal _write() function
if (len <= max_frame_payload_size) {
//If message payload length fits in a single message
//then use the normal _write() function
return _write(header,message,len,writeDirect);
}

if (len >= 255*max_frame_payload_size) {
//If the message payload is too big, whe cannot generate enough fragments
//and enumerate them
//If the message payload is too big, whe cannot generate enough fragments
//and enumerate them
if (len > 255*max_frame_payload_size) {

txSuccess = false;
return txSuccess;
}
//The payload is smaller than 6kBytes. We cann transmit it.

//The payload is smaller than MAX_PAYLOAD_SIZE and we can enumerate the fragments.
// --> We cann transmit the message.

//Divide the message payload into chuncks of max_frame_payload_size
uint8_t fragment_id = 1 + ((len - 1) / max_frame_payload_size); //the number of fragments to send = ceil(len/max_frame_payload_size)
uint8_t msgCount = 0;

IF_SERIAL_DEBUG(printf("%u: NET total message fragments %i\n\r",millis(),fragment_id););
IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: NET total message fragments %i\n\r",millis(),fragment_id););

//Iterate over the payload chuncks
// Assemble a new message, copy and fill out the header
Expand All @@ -338,34 +378,38 @@ bool RF24Network::write(RF24NetworkHeader& header,const void* message, size_t le
while (fragment_id > 0) {

//Copy and fill out the header
RF24NetworkHeader fragmentHeader;
fragmentHeader = header;
RF24NetworkHeader fragmentHeader = header;
fragmentHeader.fragment_id = fragment_id;

if (fragment_id == 1) {
fragmentHeader.type = NETWORK_LAST_FRAGMENT; //Set the last fragment flag to indicate the last fragment
} else {
fragmentHeader.type = NETWORK_MORE_FRAGMENTS; //Set the more fragments flag to indicate a fragmented frame
}

size_t offset = msgCount*max_frame_payload_size;
size_t fragmentLen = std::min(len-offset,max_frame_payload_size);

IF_SERIAL_DEBUG(printf("%u: NET try to transmit msg with fragment '%i'\n\r",millis(),(msgCount+1)););
IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: NET try to transmit fragmented payload of size %i Bytes with fragmentID '%i'\n\r",millis(),fragmentLen,fragment_id););

//Try to send the payload chunk with the copied header
bool ok = _write(fragmentHeader,message+offset,len-offset,writeDirect);
bool ok = _write(fragmentHeader,message+offset,fragmentLen,writeDirect);
if (!ok) {
IF_SERIAL_DEBUG(printf("%u: NET message transmission with fragmentID '%i' failed. Abort.\n\r",millis(),(msgCount+1)););
IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: NET message transmission with fragmentID '%i' failed. Abort.\n\r",millis(),fragment_id););
txSuccess = false;
break;
}
IF_SERIAL_DEBUG(printf("%u: NET message transmission with fragmentID '%i' sucessfull.\n\r",millis(),(msgCount+1)););
IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: NET message transmission with fragmentID '%i' sucessfull.\n\r",millis(),fragment_id););

//Message was successful sent
//Check and modify counters
fragment_id--;
msgCount++;

IF_SERIAL_DEBUG(printf("%u: NET DEBUG: fragmentID=%i msgCount=%i loop_done?=%i\n\r",millis(),fragment_id,msgCount,!(fragment_id > 0)););
}

//Return true if all the chuncks where sent successfuly
//else return false
IF_SERIAL_DEBUG(printf("%u: NET total message fragments sent %i. Status",millis(),msgCount); printf("%s\n\r", ok ? "ok" : "not ok"););
IF_SERIAL_DEBUG(printf("%u: NET total message fragments sent %i. txSuccess ",millis(),msgCount); printf("%s\n\r", txSuccess ? "YES" : "NO"););
return txSuccess;
}
/******************************************************************/
Expand All @@ -376,21 +420,26 @@ bool RF24Network::_write(RF24NetworkHeader& header,const void* message, size_t l
header.from_node = node_address;

// Build the full frame to send
if (len)
memcpy(frame_buffer + sizeof(RF24NetworkHeader),message,std::min(frame_size-sizeof(RF24NetworkHeader),len));
memcpy(frame_buffer,&header,sizeof(RF24NetworkHeader));
frame_size = sizeof(RF24NetworkHeader); //Set the current frame size
if (len) {
memcpy(frame_buffer + sizeof(RF24NetworkHeader),message,std::min(MAX_FRAME_SIZE-sizeof(RF24NetworkHeader),len));
frame_size += len; //Set the current frame size
}

IF_SERIAL_DEBUG(printf_P(PSTR("%u: NET Sending %s\n\r"),millis(),header.toString()));
if (len)
if (frame_size)
{
// IF_SERIAL_DEBUG(const uint16_t* i = reinterpret_cast<const uint16_t*>(message);printf_P(PSTR("%u: NET message %04x\n\r"),millis(),*i));
IF_SERIAL_DEBUG(printf("%u: NET message ",millis());const char* charPtr = reinterpret_cast<const char*>(message);while(len--){ printf("%02x ",charPtr[len]);} printf("\n\r") );
IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: MAC frame size %i",millis(),frame_size););
IF_SERIAL_DEBUG_FRAGMENTATION(printf("%u: MAC frame ",millis()); const char* charPtr = reinterpret_cast<const char*>(frame_buffer); for (size_t i = 0; i < frame_size; i++) { printf("%02X ", charPtr[i]); }; printf("\n\r"));
}


// If the user is trying to send it to himself
if ( header.to_node == node_address ){
// Build the frame to send
RF24NetworkFrame frame = RF24NetworkFrame(header,message,sizeof(message));
RF24NetworkFrame frame = RF24NetworkFrame(header,message,std::min(MAX_FRAME_SIZE-sizeof(RF24NetworkHeader),len));
// Just queue it in the received queue
return enqueue(frame);
}else{
Expand Down
Loading

0 comments on commit d49e957

Please sign in to comment.