Skip to content

Commit

Permalink
Merge pull request #61 from lf-lang/hotfix-TAN
Browse files Browse the repository at this point in the history
Remove TAN messages and record in-transit messages in the RTI
  • Loading branch information
Soroosh129 authored Jun 11, 2022
2 parents ba7d4dd + 47c70ed commit c1eb156
Show file tree
Hide file tree
Showing 13 changed files with 1,129 additions and 697 deletions.
8 changes: 7 additions & 1 deletion core/federated/RTI/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ include_directories(${CoreLib}/platform)
include_directories(${CoreLib}/federated)

# Declare a new executable target and list all its sources
add_executable(RTI rti.c ${LF_PLATFORM_FILE})
add_executable(
RTI
rti.c
${LF_PLATFORM_FILE}
${CoreLib}/utils/pqueue.c
message_record/message_record.c
)

IF(CMAKE_BUILD_TYPE MATCHES DEBUG)
# Set the LOG_LEVEL to 4 to get DEBUG messages
Expand Down
170 changes: 170 additions & 0 deletions core/federated/RTI/message_record/message_record.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/**
* @file message_record.c
* @author Soroush Bateni (soroush@berkeley.edu)
* @brief Record-keeping for in-transit messages.
* @version 0.1
* @date 2022-06-02
*
* @copyright Copyright (c) 2022, The University of California at Berkeley.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
***************/

#include "message_record.h"
#include <stdlib.h>

/**
* @brief Initialize the in-transit message record queue.
*
* @return in_transit_message_record_q
*/
in_transit_message_record_q_t* initialize_in_transit_message_q() {
in_transit_message_record_q_t* queue =
(in_transit_message_record_q_t*)calloc(
1,
sizeof(in_transit_message_record_q_t)
);
queue->main_queue = pqueue_init(
10,
in_reverse_order,
get_message_record_index,
get_message_record_position,
set_message_record_position,
tags_match,
print_message_record
);

queue->transfer_queue = pqueue_init(
10,
in_reverse_order,
get_message_record_index,
get_message_record_position,
set_message_record_position,
tags_match,
print_message_record
);

return queue;
}

/**
* @brief Free the memory occupied by the `queue`.
*
* @param queue The queue to free.
*/
void free_in_transit_message_q(in_transit_message_record_q_t* queue) {
pqueue_free(queue->main_queue);
pqueue_free(queue->transfer_queue);
free(queue);
}

/**
* @brief Add a record of the in-transit message.
*
* @param queue The queue to add to.
* @param tag The tag of the in-transit message.
* @return 0 on success.
*/
int add_in_transit_message_record(in_transit_message_record_q_t* queue, tag_t tag) {
in_transit_message_record_t* in_transit_record = malloc(sizeof(in_transit_message_record_t));
in_transit_record->tag = tag;
return pqueue_insert(
queue->main_queue,
(void*)in_transit_record
);
}

/**
* @brief Clean the record of in-transit messages up to and including `tag`.
*
* @param queue The queue to clean.
* @param tag Will clean all messages with tags <= tag.
*/
void clean_in_transit_message_record_up_to_tag(in_transit_message_record_q_t* queue, tag_t tag) {
in_transit_message_record_t* head_of_in_transit_messages = (in_transit_message_record_t*)pqueue_peek(queue->main_queue);
while (
head_of_in_transit_messages != NULL && // Queue is not empty
head_of_in_transit_messages->tag.time <= tag.time // The head message record has a time less than or equal to
// `tag.time`.
) {
// Now compare the tags. The message record queue is ordered according to the `time` field, so we need to check
// all records with that `time` and find those that have smaller or equal full tags.
if (lf_tag_compare(
head_of_in_transit_messages->tag,
tag
) <= 0
) {
LF_PRINT_DEBUG(
"RTI: Removed a message with tag (%ld, %u) from the list of in-transit messages.",
head_of_in_transit_messages->tag.time - lf_time_start(),
head_of_in_transit_messages->tag.microstep
);

free(pqueue_pop(queue->main_queue));
} else {
// Add it to the transfer queue
pqueue_insert(queue->transfer_queue, pqueue_pop(queue->main_queue));
}
head_of_in_transit_messages = (in_transit_message_record_t*)pqueue_peek(queue->main_queue);
}
// Empty the transfer queue (which holds messages with equal time but larger microstep) into the main queue.
pqueue_empty_into(&queue->main_queue, &queue->transfer_queue);
}

/**
* @brief Get the minimum tag of all currently recorded in-transit messages.
*
* @param queue The queue to search in (of type `in_transit_message_record_q`).
* @return tag_t The minimum tag of all currently recorded in-transit messages. Return `FOREVER_TAG` if the queue is empty.
*/
tag_t get_minimum_in_transit_message_tag(in_transit_message_record_q_t* queue) {
tag_t minimum_tag = FOREVER_TAG;

in_transit_message_record_t* head_of_in_transit_messages = (in_transit_message_record_t*)pqueue_peek(queue->main_queue);
while (head_of_in_transit_messages != NULL) { // Queue is not empty
// The message record queue is ordered according to the `time` field, so we need to check
// all records with the minimum `time` and find those that have the smallest tag.
if (lf_tag_compare(
head_of_in_transit_messages->tag,
minimum_tag
) <= 0
) {
minimum_tag = head_of_in_transit_messages->tag;
} else if (head_of_in_transit_messages->tag.time > minimum_tag.time) {
break;
}

// Add the head to the transfer queue.
pqueue_insert(queue->transfer_queue, pqueue_pop(queue->main_queue));

head_of_in_transit_messages = (in_transit_message_record_t*)pqueue_peek(queue->main_queue);
}
// Empty the transfer queue (which holds messages with equal time but larger microstep) into the main queue.
pqueue_empty_into(&queue->main_queue, &queue->transfer_queue);

LF_PRINT_DEBUG(
"RTI: Minimum tag of all in-transit messages: (%ld, %u).",
head_of_in_transit_messages->tag.time - lf_time_start(),
head_of_in_transit_messages->tag.microstep
);

return minimum_tag;
}
84 changes: 84 additions & 0 deletions core/federated/RTI/message_record/message_record.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* @file message_record.h
* @author Soroush Bateni (soroush@berkeley.edu)
* @brief Record-keeping for in-transit messages.
* @version 0.1
* @date 2022-06-02
*
* @copyright Copyright (c) 2022, The University of California at Berkeley.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
***************/

#ifndef RTI_MESSAGE_RECORD_H
#define RTI_MESSAGE_RECORD_H

#include "rti_pqueue_support.h"

/**
* @brief Queue to keep a record of in-transit messages.
*
*/
typedef struct {
pqueue_t* main_queue; // The primary queue.
pqueue_t* transfer_queue; // Queue used for housekeeping.
} in_transit_message_record_q_t;

/**
* @brief Initialize the in-transit message record queue.
*
* @return in_transit_message_record_q
*/
in_transit_message_record_q_t* initialize_in_transit_message_q();

/**
* @brief Free the memory occupied by the `queue`.
*
* @param queue The queue to free.
*/
void free_in_transit_message_q(in_transit_message_record_q_t* queue);

/**
* @brief Add a record of the in-transit message.
*
* @param queue The queue to add to (of type `in_transit_message_record_q`).
* @param tag The tag of the in-transit message.
* @return 0 on success.
*/
int add_in_transit_message_record(in_transit_message_record_q_t* queue, tag_t tag);

/**
* @brief Clean the record of in-transit messages up to and including `tag`.
*
* @param queue The queue to clean (of type `in_transit_message_record_q`).
* @param tag Will clean all messages with tags <= tag.
*/
void clean_in_transit_message_record_up_to_tag(in_transit_message_record_q_t* queue, tag_t tag);

/**
* @brief Get the minimum tag of all currently recorded in-transit messages.
*
* @param queue The queue to search in (of type `in_transit_message_record_q`).
* @return tag_t The minimum tag of all currently recorded in-transit messages. Return `FOREVER_TAG` if the queue is empty.
*/
tag_t get_minimum_in_transit_message_tag(in_transit_message_record_q_t* queue);

#endif // RTI_MESSAGE_RECORD_H
112 changes: 112 additions & 0 deletions core/federated/RTI/message_record/rti_pqueue_support.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* @file rti_pqueue_support.h
* @author Soroush Bateni (soroush@berkeley.edu)
* @brief Header-only support functions for pqueue (in the RTI).
* @version 0.1
* @date 2022-06-02
*
* @copyright Copyright (c) 2022, The University of California at Berkeley.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
***************/

#ifndef RTI_PQUEUE_SUPPORT_H
#define RTI_PQUEUE_SUPPORT_H

#include "tag.h"
#include "utils/pqueue.h"
#include "utils/util.h"

// ********** Priority Queue Support Start
/**
* @brief Represent an in-transit message.
*
*/
typedef struct in_transit_message_record {
tag_t tag; // Tag of the in-transit message.
size_t pos; // Position in the priority queue.
} in_transit_message_record_t;


/**
* Return whether the first and second argument are given in reverse order.
*/
static int in_reverse_order(pqueue_pri_t thiz, pqueue_pri_t that) {
return (thiz > that);
}

/**
* Return false (0) regardless of tag order.
*/
static int in_no_particular_order(pqueue_pri_t thiz, pqueue_pri_t that) {
return false;
}

/**
* Return whether or not the given `in_transit_message_record_t` types have the same tag.
*/
static int tags_match(void* next, void* curr) {
return (lf_tag_compare(
((in_transit_message_record_t*)next)->tag,
((in_transit_message_record_t*)curr)->tag
) == 0);
}

/**
* Report a priority equal to the time of the given in-transit message.
* Used for sorting pointers to in_transit_message_record_t structs.
*/
static pqueue_pri_t get_message_record_index(void *a) {
return (pqueue_pri_t)(((in_transit_message_record_t*) a)->tag.time);
}

/**
* Return the given in_transit_message_record_t's position in the queue.
*/
static size_t get_message_record_position(void *a) {
return ((in_transit_message_record_t*) a)->pos;
}

/**
* Set the given in_transit_message_record_t's position in the queue.
*/
static void set_message_record_position(void *a, size_t pos) {
((in_transit_message_record_t*) a)->pos = pos;
}

/**
* Print some information about the given in-transit message.
*
* DEBUG function only.
*/
static void print_message_record(void *message) {
in_transit_message_record_t *r = (in_transit_message_record_t*)message;
LF_PRINT_DEBUG(
"Tag of the in_transit_message_record_t: (%ld, %u). "
"Its position in the priority queue: %u",
r->tag.time - lf_time_start(),
r->tag.microstep,
r->pos
);
}

// ********** Priority Queue Support End
#endif
Loading

0 comments on commit c1eb156

Please sign in to comment.