Skip to content

Commit

Permalink
Merge pull request #323 from lf-lang/federated-cleanup
Browse files Browse the repository at this point in the history
Various bugfixes and cleanups in the support for federated programs
  • Loading branch information
edwardalee authored Jan 22, 2024
2 parents a985205 + 6e4af8e commit 36d3249
Show file tree
Hide file tree
Showing 39 changed files with 4,128 additions and 4,092 deletions.
1 change: 0 additions & 1 deletion core/federated/RTI/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ add_executable(
${CoreLib}/utils/pqueue_base.c
${CoreLib}/utils/pqueue_tag.c
${CoreLib}/utils/pqueue.c
message_record/message_record.c
)

IF(CMAKE_BUILD_TYPE MATCHES DEBUG)
Expand Down
79 changes: 68 additions & 11 deletions core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#include "rti_remote.h"
#include "net_util.h"
#include <signal.h> // To trap ctrl-c and invoke a clean stop to save the trace file, if needed.
#include <string.h>

Expand All @@ -67,16 +68,50 @@ static rti_remote_t rti;
*/
const char *rti_trace_file_name = "rti.lft";

/** Indicator that normal termination of the RTI has occurred. */
bool normal_termination = false;

/**
* @brief A clean termination of the RTI will write the trace file, if tracing is
* enabled, before exiting.
* Send a failed signal to the specified federate.
*/
void termination() {
static void send_failed_signal(federate_info_t* fed) {
size_t bytes_to_write = 1;
unsigned char buffer[bytes_to_write];
buffer[0] = MSG_TYPE_FAILED;
if (rti.base.tracing_enabled) {
stop_trace(rti.base.trace);
lf_print("RTI trace file saved.");
tracepoint_rti_to_federate(rti.base.trace, send_FAILED, fed->enclave.id, NULL);
}
int failed = write_to_socket(fed->socket, bytes_to_write, &(buffer[0]));
if (failed == 0) {
LF_PRINT_LOG("RTI has sent failed signal to federate %d due to abnormal termination.", fed->enclave.id);
} else {
lf_print_error("RTI failed to send failed signal to federate %d on socket ID %d.", fed->enclave.id, fed->socket);
}
}

/**
* @brief Function to run upon termination.
* This function will be invoked both after main() returns and when a signal
* that results in terminating the process, such as SIGINT. In the former
* case, it should do nothing. In the latter case, it will send a MSG_TYPE_FAILED
* signal to each federate and attempt to write the trace file, but without
* acquiring a mutex lock, so the resulting files may be incomplete or even
* corrupted. But this is better than just failing to write the data we have
* collected so far.
*/
void termination() {
if (!normal_termination) {
for (int i = 0; i < rti.base.number_of_scheduling_nodes; i++) {
federate_info_t *f = (federate_info_t*)rti.base.scheduling_nodes[i];
if (!f || f->enclave.state == NOT_CONNECTED) continue;
send_failed_signal(f);
}
if (rti.base.tracing_enabled) {
stop_trace_locked(rti.base.trace);
lf_print("RTI trace file saved.");
}
lf_print("RTI is exiting abnormally.");
}
lf_print("RTI is exiting.");
}

void usage(int argc, const char* argv[]) {
Expand All @@ -86,7 +121,7 @@ void usage(int argc, const char* argv[]) {
lf_print(" -n, --number_of_federates <n>");
lf_print(" The number of federates in the federation that this RTI will control.\n");
lf_print(" -p, --port <n>");
lf_print(" The port number to use for the RTI. Must be larger than 0 and smaller than %d. Default is %d.\n", UINT16_MAX, STARTING_PORT);
lf_print(" The port number to use for the RTI. Must be larger than 0 and smaller than %d. Default is %d.\n", UINT16_MAX, DEFAULT_PORT);
lf_print(" -c, --clock_sync [off|init|on] [period <n>] [exchanges-per-interval <n>]");
lf_print(" The status of clock synchronization for this federate.");
lf_print(" - off: Clock synchronization is off.");
Expand Down Expand Up @@ -254,6 +289,16 @@ int main(int argc, const char* argv[]) {

// Catch the Ctrl-C signal, for a clean exit that does not lose the trace information
signal(SIGINT, exit);
#ifdef SIGPIPE
// Ignore SIGPIPE errors, which terminate the entire application if
// socket write() fails because the reader has closed the socket.
// Instead, cause an EPIPE error to be set when write() fails.
// NOTE: The reason for a broken socket causing a SIGPIPE signal
// instead of just having write() return an error is to robutly
// a foo | bar pipeline where bar crashes. The default behavior
// is for foo to also exit.
signal(SIGPIPE, SIG_IGN);
#endif // SIGPIPE
if (atexit(termination) != 0) {
lf_print_warning("Failed to register termination function!");
}
Expand All @@ -277,16 +322,28 @@ int main(int argc, const char* argv[]) {
// Allocate memory for the federates
rti.base.scheduling_nodes = (scheduling_node_t**)calloc(rti.base.number_of_scheduling_nodes, sizeof(scheduling_node_t*));
for (uint16_t i = 0; i < rti.base.number_of_scheduling_nodes; i++) {
federate_info_t *fed_info = (federate_info_t *) malloc(sizeof(federate_info_t));
federate_info_t *fed_info = (federate_info_t *) calloc(1, sizeof(federate_info_t));
initialize_federate(fed_info, i);
rti.base.scheduling_nodes[i] = (scheduling_node_t *) fed_info;
}

int socket_descriptor = start_rti_server(rti.user_specified_port);
wait_for_federates(socket_descriptor);
if (socket_descriptor >= 0) {
wait_for_federates(socket_descriptor);
normal_termination = true;
if (rti.base.tracing_enabled) {
// No need for a mutex lock because all threads have exited.
stop_trace_locked(rti.base.trace);
lf_print("RTI trace file saved.");
}
}

lf_print("RTI is exiting."); // Do this before freeing scheduling nodes.
free_scheduling_nodes(rti.base.scheduling_nodes, rti.base.number_of_scheduling_nodes);
lf_print("RTI is exiting.");
return 0;

// Even if the RTI is exiting normally, it should report an error code if one of the
// federates has reported an error.
return (int)_lf_federate_reports_error;
}
#endif // STANDALONE_RTI

176 changes: 0 additions & 176 deletions core/federated/RTI/message_record/message_record.c

This file was deleted.

86 changes: 0 additions & 86 deletions core/federated/RTI/message_record/message_record.h

This file was deleted.

Loading

0 comments on commit 36d3249

Please sign in to comment.