diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 558dd0c01d..29272ddcc3 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -95,6 +95,14 @@ jobs: # vcpkgGitCommitId: $(vcpkgGitRef) # vcpkgGitURL: https://github.com/microsoft/vcpkg/ # if: runner.os == 'Windows' + - name: Install RTI; + run: | + cd org.lflang/src/lib/core/federated/RTI + mkdir build + cd build + cmake ../ + make + sudo make install - name: Run C tests; run: | ./gradlew test --tests org.lflang.tests.runtime.CTest.* diff --git a/org.lflang/src/lib/core/federated/RTI/CMakeLists.txt b/org.lflang/src/lib/core/federated/RTI/CMakeLists.txt new file mode 100644 index 0000000000..7f39a21ca8 --- /dev/null +++ b/org.lflang/src/lib/core/federated/RTI/CMakeLists.txt @@ -0,0 +1,63 @@ +# This is a cmake build script providing a solution for compiling +# the RTI in this directory.. +# +# Usage: +# +# To compile with cmake, run the following commands: +# +# $> mkdir build && cd build +# $> cmake ../ +# $> make +# $> sudo make install +# +# This create a binary RTI in the current working directory. Please put this in +# a directory that is on the path. +# +# To enable DEBUG messages, use the following build commands instead: +# +# $> mkdir build && cd build +# $> cmake -DCMAKE_BUILD_TYPE=DEBUG ../ +# $> make +# $> sudo make install +# +# If you would like to go back to non-DEBUG mode, you would have to remove all +# contents of the `build` folder. + +cmake_minimum_required(VERSION 3.12) +project(RTI VERSION 1.0.0 LANGUAGES C) + +set(CoreLib ../../../core) + +# Check which system we are running on to select the correct platform support +# file and assign the file's path to LF_PLATFORM_FILE +if(${CMAKE_SYSTEM_NAME} STREQUAL "Linux") + set(LF_PLATFORM_FILE ${CoreLib}/platform/lf_linux_support.c) +elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") + set(LF_PLATFORM_FILE ${CoreLib}/platform/lf_macos_support.c) +else() + message(FATAL_ERROR "Your platform is not supported! RTI supports Linux and MacOS.") +endif() + +include_directories(${CoreLib}) +include_directories(${CoreLib}/platform) +include_directories(${CoreLib}/federated) + +# Declare a new executable target and list all its sources +add_executable(RTI rti.c ${LF_PLATFORM_FILE}) + +IF(CMAKE_BUILD_TYPE MATCHES DEBUG) + # Set the LOG_LEVEL to 4 to get DEBUG messages + target_compile_definitions(RTI PUBLIC LOG_LEVEL=4) +ENDIF(CMAKE_BUILD_TYPE MATCHES DEBUG) + +# Set the number of workers to enable threading +target_compile_definitions(RTI PUBLIC NUMBER_OF_WORKERS) + +# Find pthreads and link to it +find_package(Threads REQUIRED) +target_link_libraries(RTI Threads::Threads) + +install( + TARGETS RTI + DESTINATION bin +) diff --git a/org.lflang/src/lib/core/federated/RTI/README.md b/org.lflang/src/lib/core/federated/RTI/README.md new file mode 100644 index 0000000000..61d2a101da --- /dev/null +++ b/org.lflang/src/lib/core/federated/RTI/README.md @@ -0,0 +1,20 @@ +This folder contains the source code for the Run-Time Infrastructure (RTI) that +is necessary for federated Lingua Franca programs. To compile and install, do: + +```bash +mkdir build && cd build +cmake ../ +make +sudo make install +``` + +**Note:** To enable DEBUG messages, use the following build commands instead: + +```bash +mkdir build && cd build +cmake -DCMAKE_BUILD_TYPE=DEBUG ../ +make +sudo make install +``` + +If you would like to go back to the non-DEBUG mode, you would have to remove all contents of the `build` folder. \ No newline at end of file diff --git a/org.lflang/src/lib/core/rti.c b/org.lflang/src/lib/core/federated/RTI/rti.c similarity index 71% rename from org.lflang/src/lib/core/rti.c rename to org.lflang/src/lib/core/federated/RTI/rti.c index 45ccb07208..765e7e4678 100644 --- a/org.lflang/src/lib/core/rti.c +++ b/org.lflang/src/lib/core/federated/RTI/rti.c @@ -54,76 +54,44 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include // Defines bzero(). #include #include // Defines wait() for process to change state. -#include "util.c" // Defines network functions. +#include "platform.h" // Platform-specific types and functions +#include "util.c" // Defines network functions. #include "net_util.c" // Defines network functions. -#include "rti.h" // Defines message types, etc. Includes and "reactor.h". +#include "net_common.h" // Defines message types, etc. Includes and "reactor.h". #include "tag.c" // Time-related types and functions. - -// The main mutex lock. -pthread_mutex_t rti_mutex = PTHREAD_MUTEX_INITIALIZER; - -// Condition variable used to signal receipt of all proposed start times. -pthread_cond_t received_start_times = PTHREAD_COND_INITIALIZER; - -// Condition variable used to signal that a start time has been sent to a federate. -pthread_cond_t sent_start_time = PTHREAD_COND_INITIALIZER; - -// RTI's decided stop tag for federates -tag_t max_stop_tag = NEVER_TAG; - -// The federates. -federate_t federates[NUMBER_OF_FEDERATES]; - -// Maximum start time seen so far from the federates. -int64_t max_start_time = 0LL; - -// Number of federates that have proposed start times. -int num_feds_proposed_start = 0; - -// Number of federates handling stop -int num_feds_handling_stop = 0; - -/** - * Boolean indicating that all federates have exited. - * This gets set to true exactly once before the program exits. - * It is marked volatile because the write is not guarded by a mutex. - * The main thread makes this true, then calls shutdown and close on - * the socket, which will cause accept() to return with an error code - * in respond_to_erroneous_connections(). - */ -volatile bool all_federates_exited = false; +#include "rti.h" /** - * The ID of the federation that this RTI will supervise. - * This should be overridden with a command-line -i option to ensure - * that each federate only joins its assigned federation. + * The state of this RTI instance. */ -char* federation_id = "Unidentified Federation"; - -/************* TCP server information *************/ -/** The final port number that the TCP socket server ends up using. */ -uint16_t final_port_TCP = 0; - -/** The TCP socket descriptor for the socket server. */ -int socket_descriptor_TCP = -1; - -/************* UDP server information *************/ -/** The final port number that the UDP socket server ends up using. */ -uint16_t final_port_UDP = USHRT_MAX; - -/** The UDP socket descriptor for the socket server. */ -int socket_descriptor_UDP = -1; +RTI_instance_t _RTI = { + .rti_mutex = PTHREAD_MUTEX_INITIALIZER, + .received_start_times = PTHREAD_COND_INITIALIZER, + .sent_start_time = PTHREAD_COND_INITIALIZER, + .max_stop_tag = NEVER_TAG, + .max_start_time = 0LL, + .number_of_federates = 0, + .num_feds_proposed_start = 0, + .num_feds_handling_stop = 0, + .all_federates_exited = false, + .federation_id = "Unidentified Federation", + .user_specified_port = 0, + .final_port_TCP = 0, + .socket_descriptor_TCP = -1, + .final_port_UDP = UINT16_MAX, + .socket_descriptor_UDP = -1, + .clock_sync_global_status = clock_sync_init, + .clock_sync_period_ns = MSEC(10), + .clock_sync_exchanges_per_interval = 10 +}; -#ifdef _LF_CLOCK_SYNC_ON -pthread_t clock_thread; // Thread performing PTP clock sync sessions periodically. -#endif // _LF_CLOCK_SYNC_ON /** * Mark a federate requesting stop. * * If the number of federates handling stop reaches the * NUM_OF_FEDERATES, broadcast MSG_TYPE_STOP_GRANTED to every federate. * - * This function assumes the rti_mutex is already locked. + * This function assumes the _RTI.rti_mutex is already locked. * * @param fed The federate that has requested a stop or has suddenly * stopped (disconnected). @@ -230,7 +198,7 @@ int create_server(int32_t specified_port, uint16_t port, socket_type_t socket_ty if (result != 0) { if (specified_port == 0) { error_print_and_exit("Failed to bind the RTI socket. Cannot find a usable port. " - "Consider increasing PORT_RANGE_LIMIT in rti.h."); + "Consider increasing PORT_RANGE_LIMIT in net_common.h."); } else { error_print_and_exit("Failed to bind the RTI socket. Specified port is not available. " "Consider leaving the port unspecified"); @@ -240,16 +208,16 @@ int create_server(int32_t specified_port, uint16_t port, socket_type_t socket_ty if (socket_type == UDP) { type = "UDP"; } - info_print("RTI using %s port %d for federation %s.", type, port, federation_id); + info_print("RTI using %s port %d for federation %s.", type, port, _RTI.federation_id); if (socket_type == TCP) { - final_port_TCP = port; + _RTI.final_port_TCP = port; // Enable listening for socket connections. // The second argument is the maximum number of queued socket requests, // which according to the Mac man page is limited to 128. listen(socket_descriptor, 128); } else if (socket_type == UDP) { - final_port_UDP = port; + _RTI.final_port_UDP = port; // No need to listen on the UDP socket } @@ -271,15 +239,15 @@ void handle_port_absent_message(federate_t* sending_federate, unsigned char* buf // Need to acquire the mutex lock to ensure that the thread handling // messages coming from the socket connected to the destination does not // issue a TAG before this message has been forwarded. - pthread_mutex_lock(&rti_mutex); + pthread_mutex_lock(&_RTI.rti_mutex); uint16_t reactor_port_id = extract_uint16(&(buffer[1])); uint16_t federate_id = extract_uint16(&(buffer[1 + sizeof(uint16_t)])); // If the destination federate is no longer connected, issue a warning // and return. - if (federates[federate_id].state == NOT_CONNECTED) { - pthread_mutex_unlock(&rti_mutex); + if (_RTI.federates[federate_id].state == NOT_CONNECTED) { + pthread_mutex_unlock(&_RTI.rti_mutex); warning_print("RTI: Destination federate %d is no longer connected. Dropping message.", federate_id); return; @@ -291,17 +259,17 @@ void handle_port_absent_message(federate_t* sending_federate, unsigned char* buf // Need to make sure that the destination federate's thread has already // sent the starting MSG_TYPE_TIMESTAMP message. - while (federates[federate_id].state == PENDING) { + while (_RTI.federates[federate_id].state == PENDING) { // Need to wait here. - pthread_cond_wait(&sent_start_time, &rti_mutex); + pthread_cond_wait(&_RTI.sent_start_time, &_RTI.rti_mutex); } // Forward the message. - int destination_socket = federates[federate_id].socket; + int destination_socket = _RTI.federates[federate_id].socket; write_to_socket_errexit(destination_socket, message_size + 1, buffer, "RTI failed to forward message to federate %d.", federate_id); - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); } /** @@ -350,12 +318,12 @@ void handle_timed_message(federate_t* sending_federate, unsigned char* buffer) { // Need to acquire the mutex lock to ensure that the thread handling // messages coming from the socket connected to the destination does not // issue a TAG before this message has been forwarded. - pthread_mutex_lock(&rti_mutex); + pthread_mutex_lock(&_RTI.rti_mutex); // If the destination federate is no longer connected, issue a warning // and return. - if (federates[federate_id].state == NOT_CONNECTED) { - pthread_mutex_unlock(&rti_mutex); + if (_RTI.federates[federate_id].state == NOT_CONNECTED) { + pthread_mutex_unlock(&_RTI.rti_mutex); warning_print("RTI: Destination federate %d is no longer connected. Dropping message.", federate_id); return; @@ -367,12 +335,12 @@ void handle_timed_message(federate_t* sending_federate, unsigned char* buffer) { // is a promise that is valid only in the absence of network inputs, // and now there is a network input. Hence, the promise needs to be // updated. - if (compare_tags(federates[federate_id].next_event, intended_tag) > 0) { - federates[federate_id].next_event = intended_tag; + if (compare_tags(_RTI.federates[federate_id].next_event, intended_tag) > 0) { + _RTI.federates[federate_id].next_event = intended_tag; } // Forward the message or message chunk. - int destination_socket = federates[federate_id].socket; + int destination_socket = _RTI.federates[federate_id].socket; DEBUG_PRINT( "RTI forwarding message to port %d of federate %d of length %d.", @@ -383,9 +351,9 @@ void handle_timed_message(federate_t* sending_federate, unsigned char* buffer) { // Need to make sure that the destination federate's thread has already // sent the starting MSG_TYPE_TIMESTAMP message. - while (federates[federate_id].state == PENDING) { + while (_RTI.federates[federate_id].state == PENDING) { // Need to wait here. - pthread_cond_wait(&sent_start_time, &rti_mutex); + pthread_cond_wait(&_RTI.sent_start_time, &_RTI.rti_mutex); } write_to_socket_errexit(destination_socket, bytes_read, buffer, "RTI failed to forward message to federate %d.", federate_id); @@ -405,12 +373,12 @@ void handle_timed_message(federate_t* sending_federate, unsigned char* buffer) { // FIXME: a mutex needs to be held for this so that other threads // do not write to destination_socket and cause interleaving. However, - // holding the rti_mutex might be very expensive. Instead, each outgoing + // holding the _RTI.rti_mutex might be very expensive. Instead, each outgoing // socket should probably have its own mutex. write_to_socket_errexit(destination_socket, bytes_to_read, buffer, "RTI failed to send message chunks."); } - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); } /** @@ -477,7 +445,7 @@ void send_tag_advance_grant(federate_t* fed, tag_t tag) { * this should be fed->next_event). * @param visited An array of booleans indicating which federates * have been visited (for the first invocation, this should be - * an array of falses of size NUMBER_OF_FEDERATES). + * an array of falses of size _RTI.number_of_federates). */ tag_t transitive_next_event(federate_t* fed, tag_t candidate, bool visited[]) { if (visited[fed->id] || fed->state == NOT_CONNECTED) { @@ -505,7 +473,7 @@ tag_t transitive_next_event(federate_t* fed, tag_t candidate, bool visited[]) { // an event that would result in an earlier next event. for (int i = 0; i < fed->num_upstream; i++) { tag_t upstream_result = transitive_next_event( - &federates[fed->upstream[i]], result, visited); + &_RTI.federates[fed->upstream[i]], result, visited); // Add the "after" delay of the connection to the result. upstream_result = delay_tag(upstream_result, fed->upstream_delay[i]); @@ -569,14 +537,14 @@ void send_provisional_tag_advance_grant(federate_t* fed, tag_t tag) { // we have an available encoding of causality interfaces. // That might be more efficient. for (int j = 0; j < fed->num_upstream; j++) { - federate_t* upstream = &federates[fed->upstream[j]]; + federate_t* upstream = &_RTI.federates[fed->upstream[j]]; // Ignore this federate if it has resigned. if (upstream->state == NOT_CONNECTED) continue; // To handle cycles, need to create a boolean array to keep // track of which upstream federates have been visited. - bool visited[NUMBER_OF_FEDERATES] = { }; // Empty initializer initializes to 0. + bool* visited = (bool*)calloc(_RTI.number_of_federates, sizeof(bool)); // Initializes to 0. // Find the (transitive) next event tag upstream. tag_t upstream_next_event = transitive_next_event( @@ -628,7 +596,7 @@ bool send_tag_advance_if_appropriate(federate_t* fed) { tag_t min_upstream_completed = FOREVER_TAG; for (int j = 0; j < fed->num_upstream; j++) { - federate_t* upstream = &federates[fed->upstream[j]]; + federate_t* upstream = &_RTI.federates[fed->upstream[j]]; // Ignore this federate if it has resigned. if (upstream->state == NOT_CONNECTED) continue; @@ -657,7 +625,7 @@ bool send_tag_advance_if_appropriate(federate_t* fed) { // To handle cycles, need to create a boolean array to keep // track of which upstream federates have been visited. - bool visited[NUMBER_OF_FEDERATES] = { }; // Empty initializer initializes to 0. + bool* visited = (bool*)calloc(_RTI.number_of_federates, sizeof(bool)); // Initializes to 0. // Find the tag of the earliest possible incoming message from // upstream federates. @@ -667,7 +635,7 @@ bool send_tag_advance_if_appropriate(federate_t* fed) { NEVER - start_time, 0u); for (int j = 0; j < fed->num_upstream; j++) { - federate_t* upstream = &federates[fed->upstream[j]]; + federate_t* upstream = &_RTI.federates[fed->upstream[j]]; // Ignore this federate if it has resigned. if (upstream->state == NOT_CONNECTED) continue; @@ -734,7 +702,7 @@ void handle_logical_tag_complete(federate_t* fed) { // FIXME: Consolidate this message with NET to get NMR (Next Message Request). // Careful with handling startup and shutdown. - pthread_mutex_lock(&rti_mutex); + pthread_mutex_lock(&_RTI.rti_mutex); fed->completed = extract_tag(buffer); @@ -743,11 +711,11 @@ void handle_logical_tag_complete(federate_t* fed) { // Check downstream federates to see whether they should now be granted a TAG. for (int i = 0; i < fed->num_downstream; i++) { - federate_t* downstream = &federates[fed->downstream[i]]; + federate_t* downstream = &_RTI.federates[fed->downstream[i]]; send_tag_advance_if_appropriate(downstream); } - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); } /** @@ -763,7 +731,7 @@ void handle_logical_tag_complete(federate_t* fed) { void transitive_send_TAG_if_appropriate(federate_t* fed, bool visited[]) { visited[fed->id] = true; for (int i = 0; i < fed->num_downstream; i++) { - federate_t* downstream = &federates[fed->downstream[i]]; + federate_t* downstream = &_RTI.federates[fed->downstream[i]]; if (visited[downstream->id]) continue; send_tag_advance_if_appropriate(downstream); transitive_send_TAG_if_appropriate(downstream, visited); @@ -784,7 +752,7 @@ void handle_next_event_tag(federate_t* fed) { // Acquire a mutex lock to ensure that this state does change while a // message is in transport or being used to determine a TAG. - pthread_mutex_lock(&rti_mutex); + pthread_mutex_lock(&_RTI.rti_mutex); fed->next_event = extract_tag(buffer); @@ -801,10 +769,10 @@ void handle_next_event_tag(federate_t* fed) { // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep // track of which upstream federates have been visited. - bool visited[NUMBER_OF_FEDERATES] = { }; // Empty initializer initializes to 0. + bool* visited = (bool*)calloc(_RTI.number_of_federates, sizeof(bool)); // Initializes to 0. transitive_send_TAG_if_appropriate(fed, visited); - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); } /** @@ -821,7 +789,7 @@ void handle_time_advance_notice(federate_t* fed) { // Acquire a mutex lock to ensure that this state does change while a // message is in transport or being used to determine a TAG. - pthread_mutex_lock(&rti_mutex); + pthread_mutex_lock(&_RTI.rti_mutex); fed->time_advance = extract_int64(buffer); LOG_PRINT("RTI received from federate %d the Time Advance Notice (TAN) %lld.", @@ -844,10 +812,10 @@ void handle_time_advance_notice(federate_t* fed) { // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep // track of which upstream federates have been visited. - bool visited[NUMBER_OF_FEDERATES] = { }; // Empty initializer initializes to 0. + bool* visited = (bool*)calloc(_RTI.number_of_federates, sizeof(bool)); // Initializes to 0. transitive_send_TAG_if_appropriate(fed, visited); - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); } /////////////////// STOP functions //////////////////// @@ -859,11 +827,11 @@ bool _lf_rti_stop_granted_already_sent_to_federates = false; /** * Once the RTI has seen proposed tags from all connected federates, - * it will broadcast a MSG_TYPE_STOP_GRANTED carrying the max_stop_tag. + * it will broadcast a MSG_TYPE_STOP_GRANTED carrying the _RTI.max_stop_tag. * This function also checks the most recently received NET from - * each federate and resets that be no greater than the max_stop_tag. + * each federate and resets that be no greater than the _RTI.max_stop_tag. * - * This function assumes the caller holds the rti_mutex lock. + * This function assumes the caller holds the _RTI.rti_mutex lock. */ void _lf_rti_broadcast_stop_time_to_federates_already_locked() { if (_lf_rti_stop_granted_already_sent_to_federates == true) { @@ -871,24 +839,24 @@ void _lf_rti_broadcast_stop_time_to_federates_already_locked() { } // Reply with a stop granted to all federates unsigned char outgoing_buffer[MSG_TYPE_STOP_GRANTED_LENGTH]; - ENCODE_STOP_GRANTED(outgoing_buffer, max_stop_tag.time, max_stop_tag.microstep); + ENCODE_STOP_GRANTED(outgoing_buffer, _RTI.max_stop_tag.time, _RTI.max_stop_tag.microstep); // Iterate over federates and send each the message. - for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { - if (federates[i].state == NOT_CONNECTED) { + for (int i = 0; i < _RTI.number_of_federates; i++) { + if (_RTI.federates[i].state == NOT_CONNECTED) { continue; } - if (compare_tags(federates[i].next_event, max_stop_tag) >= 0) { + if (compare_tags(_RTI.federates[i].next_event, _RTI.max_stop_tag) >= 0) { // Need the next_event to be no greater than the stop tag. - federates[i].next_event = max_stop_tag; + _RTI.federates[i].next_event = _RTI.max_stop_tag; } - write_to_socket_errexit(federates[i].socket, MSG_TYPE_STOP_GRANTED_LENGTH, outgoing_buffer, - "RTI failed to send MSG_TYPE_STOP_GRANTED message to federate %d.", federates[i].id); + write_to_socket_errexit(_RTI.federates[i].socket, MSG_TYPE_STOP_GRANTED_LENGTH, outgoing_buffer, + "RTI failed to send MSG_TYPE_STOP_GRANTED message to federate %d.", _RTI.federates[i].id); } LOG_PRINT("RTI sent to federates MSG_TYPE_STOP_GRANTED with tag (%lld, %u).", - max_stop_tag.time - start_time, - max_stop_tag.microstep); + _RTI.max_stop_tag.time - start_time, + _RTI.max_stop_tag.microstep); _lf_rti_stop_granted_already_sent_to_federates = true; } @@ -898,7 +866,7 @@ void _lf_rti_broadcast_stop_time_to_federates_already_locked() { * If the number of federates handling stop reaches the * NUM_OF_FEDERATES, broadcast MSG_TYPE_STOP_GRANTED to every federate. * - * This function assumes the rti_mutex is already locked. + * This function assumes the _RTI.rti_mutex is already locked. * * @param fed The federate that has requested a stop or has suddenly * stopped (disconnected). @@ -907,10 +875,10 @@ void mark_federate_requesting_stop(federate_t* fed) { if (!fed->requested_stop) { // Assume that the federate // has requested stop - num_feds_handling_stop++; + _RTI.num_feds_handling_stop++; fed->requested_stop = true; } - if (num_feds_handling_stop == NUMBER_OF_FEDERATES) { + if (_RTI.num_feds_handling_stop == _RTI.number_of_federates) { // We now have information about the stop time of all // federates. _lf_rti_broadcast_stop_time_to_federates_already_locked(); @@ -934,13 +902,13 @@ void handle_stop_request_message(federate_t* fed) { // Acquire a mutex lock to ensure that this state does change while a // message is in transport or being used to determine a TAG. - pthread_mutex_lock(&rti_mutex); + pthread_mutex_lock(&_RTI.rti_mutex); // Check whether we have already received a stop_tag // from this federate - if (federates[fed->id].requested_stop) { + if (_RTI.federates[fed->id].requested_stop) { // Ignore this request - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); return; } @@ -948,8 +916,8 @@ void handle_stop_request_message(federate_t* fed) { tag_t proposed_stop_tag = extract_tag(buffer); // Update the maximum stop tag received from federates - if (compare_tags(proposed_stop_tag, max_stop_tag) > 0) { - max_stop_tag = proposed_stop_tag; + if (compare_tags(proposed_stop_tag, _RTI.max_stop_tag) > 0) { + _RTI.max_stop_tag = proposed_stop_tag; } LOG_PRINT("RTI received from federate %d a MSG_TYPE_STOP_REQUEST message with tag (%lld, %u).", @@ -959,34 +927,34 @@ void handle_stop_request_message(federate_t* fed) { // for a stop, add it to the tally. mark_federate_requesting_stop(fed); - if (num_feds_handling_stop == NUMBER_OF_FEDERATES) { + if (_RTI.num_feds_handling_stop == _RTI.number_of_federates) { // We now have information about the stop time of all // federates. This is extremely unlikely, but it can occur // all federates call request_stop() at the same tag. - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); return; } // Forward the stop request to all other federates that have not // also issued a stop request. unsigned char stop_request_buffer[MSG_TYPE_STOP_REQUEST_LENGTH]; - ENCODE_STOP_REQUEST(stop_request_buffer, max_stop_tag.time, max_stop_tag.microstep); + ENCODE_STOP_REQUEST(stop_request_buffer, _RTI.max_stop_tag.time, _RTI.max_stop_tag.microstep); // Iterate over federates and send each the MSG_TYPE_STOP_REQUEST message // if we do not have a stop_time already for them. - for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { - if (federates[i].id != fed->id && federates[i].requested_stop == false) { - if (federates[i].state == NOT_CONNECTED) { - mark_federate_requesting_stop(&federates[i]); + for (int i = 0; i < _RTI.number_of_federates; i++) { + if (_RTI.federates[i].id != fed->id && _RTI.federates[i].requested_stop == false) { + if (_RTI.federates[i].state == NOT_CONNECTED) { + mark_federate_requesting_stop(&_RTI.federates[i]); continue; } - write_to_socket_errexit(federates[i].socket, MSG_TYPE_STOP_REQUEST_LENGTH, stop_request_buffer, - "RTI failed to forward MSG_TYPE_STOP_REQUEST message to federate %d.", federates[i].id); + write_to_socket_errexit(_RTI.federates[i].socket, MSG_TYPE_STOP_REQUEST_LENGTH, stop_request_buffer, + "RTI failed to forward MSG_TYPE_STOP_REQUEST message to federate %d.", _RTI.federates[i].id); } } LOG_PRINT("RTI forwarded to federates MSG_TYPE_STOP_REQUEST with tag (%lld, %u).", - max_stop_tag.time - start_time, - max_stop_tag.microstep); - pthread_mutex_unlock(&rti_mutex); + _RTI.max_stop_tag.time - start_time, + _RTI.max_stop_tag.microstep); + pthread_mutex_unlock(&_RTI.rti_mutex); } /** @@ -1009,20 +977,20 @@ void handle_stop_request_reply(federate_t* fed) { federate_stop_tag.microstep); // Acquire the mutex lock so that we can change the state of the RTI - pthread_mutex_lock(&rti_mutex); + pthread_mutex_lock(&_RTI.rti_mutex); // If the federate has not requested stop before, count the reply - if (compare_tags(federate_stop_tag, max_stop_tag) > 0) { - max_stop_tag = federate_stop_tag; + if (compare_tags(federate_stop_tag, _RTI.max_stop_tag) > 0) { + _RTI.max_stop_tag = federate_stop_tag; } mark_federate_requesting_stop(fed); - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); } ////////////////////////////////////////////////// /** * Handle address query messages. - * This function reads the body of a MSG_TYPE_ADDRESS_QUERY (@see rti.h) message + * This function reads the body of a MSG_TYPE_ADDRESS_QUERY (@see net_common.h) message * which is the requested destination federate ID and replies with the stored * port value for the socket server of that federate. The port values * are initialized to -1. If no MSG_TYPE_ADDRESS_ADVERTISEMENT message has been received from @@ -1035,7 +1003,7 @@ void handle_address_query(uint16_t fed_id) { // Use buffer both for reading and constructing the reply. // The length is what is needed for the reply. unsigned char buffer[sizeof(int32_t)]; - ssize_t bytes_read = read_from_socket(federates[fed_id].socket, sizeof(uint16_t), (unsigned char*)buffer); + ssize_t bytes_read = read_from_socket(_RTI.federates[fed_id].socket, sizeof(uint16_t), (unsigned char*)buffer); if (bytes_read == 0) { error_print_and_exit("Failed to read address query."); } @@ -1048,27 +1016,27 @@ void handle_address_query(uint16_t fed_id) { // from this federate. In that case, it will respond by sending -1. // Encode the port number. - encode_int32(federates[remote_fed_id].server_port, (unsigned char*)buffer); + encode_int32(_RTI.federates[remote_fed_id].server_port, (unsigned char*)buffer); // Send the port number (which could be -1). - write_to_socket_errexit(federates[fed_id].socket, sizeof(int32_t), (unsigned char*)buffer, + write_to_socket_errexit(_RTI.federates[fed_id].socket, sizeof(int32_t), (unsigned char*)buffer, "Failed to write port number to socket of federate %d.", fed_id); // Send the server IP address to federate. - write_to_socket_errexit(federates[fed_id].socket, sizeof(federates[remote_fed_id].server_ip_addr), - (unsigned char *)&federates[remote_fed_id].server_ip_addr, + write_to_socket_errexit(_RTI.federates[fed_id].socket, sizeof(_RTI.federates[remote_fed_id].server_ip_addr), + (unsigned char *)&_RTI.federates[remote_fed_id].server_ip_addr, "Failed to write ip address to socket of federate %d.", fed_id); - if (federates[remote_fed_id].server_port != -1) { + if (_RTI.federates[remote_fed_id].server_port != -1) { DEBUG_PRINT("Replied to address query from federate %d with address %s:%d.", - fed_id, federates[remote_fed_id].server_hostname, federates[remote_fed_id].server_port); + fed_id, _RTI.federates[remote_fed_id].server_hostname, _RTI.federates[remote_fed_id].server_port); } } /** - * Handle address advertisement messages (@see MSG_TYPE_ADDRESS_ADVERTISEMENT in rti.h). + * Handle address advertisement messages (@see MSG_TYPE_ADDRESS_ADVERTISEMENT in net_common.h). * The federate is expected to send its server port number as the next * byte. The RTI will keep a record of this number in the .server_port - * field of the federates[federate_id] array of structs. + * field of the _RTI.federates[federate_id] array of structs. * * The server_hostname and server_ip_addr fields are assigned * in connect_to_federates() upon accepting the socket @@ -1084,10 +1052,10 @@ void handle_address_ad(uint16_t federate_id) { // connections to other federates int32_t server_port = -1; unsigned char buffer[sizeof(int32_t)]; - ssize_t bytes_read = read_from_socket(federates[federate_id].socket, sizeof(int32_t), (unsigned char *)buffer); + ssize_t bytes_read = read_from_socket(_RTI.federates[federate_id].socket, sizeof(int32_t), (unsigned char *)buffer); if (bytes_read < (ssize_t)sizeof(int32_t)) { - DEBUG_PRINT("Error reading port data from federate %d.", federates[federate_id].id); + DEBUG_PRINT("Error reading port data from federate %d.", _RTI.federates[federate_id].id); // Leave the server port at -1, which means "I don't know". return; } @@ -1096,10 +1064,10 @@ void handle_address_ad(uint16_t federate_id) { assert(server_port < 65536); - pthread_mutex_lock(&rti_mutex); - federates[federate_id].server_port = server_port; + pthread_mutex_lock(&_RTI.rti_mutex); + _RTI.federates[federate_id].server_port = server_port; LOG_PRINT("Received address advertisement from federate %d.", federate_id); - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); } /** @@ -1117,31 +1085,31 @@ void handle_timestamp(federate_t *my_fed) { int64_t timestamp = swap_bytes_if_big_endian_int64(*((int64_t *)(&buffer))); LOG_PRINT("RTI received timestamp message: %lld.", timestamp); - pthread_mutex_lock(&rti_mutex); - num_feds_proposed_start++; - if (timestamp > max_start_time) { - max_start_time = timestamp; + pthread_mutex_lock(&_RTI.rti_mutex); + _RTI.num_feds_proposed_start++; + if (timestamp > _RTI.max_start_time) { + _RTI.max_start_time = timestamp; } - if (num_feds_proposed_start == NUMBER_OF_FEDERATES) { + if (_RTI.num_feds_proposed_start == _RTI.number_of_federates) { // All federates have proposed a start time. - pthread_cond_broadcast(&received_start_times); + pthread_cond_broadcast(&_RTI.received_start_times); } else { // Some federates have not yet proposed a start time. // wait for a notification. - while (num_feds_proposed_start < NUMBER_OF_FEDERATES) { + while (_RTI.num_feds_proposed_start < _RTI.number_of_federates) { // FIXME: Should have a timeout here? - pthread_cond_wait(&received_start_times, &rti_mutex); + pthread_cond_wait(&_RTI.received_start_times, &_RTI.rti_mutex); } } - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); // Send back to the federate the maximum time plus an offset on a TIMESTAMP // message. unsigned char start_time_buffer[MSG_TYPE_TIMESTAMP_LENGTH]; start_time_buffer[0] = MSG_TYPE_TIMESTAMP; // Add an offset to this start time to get everyone starting together. - start_time = max_start_time + DELAY_START; + start_time = _RTI.max_start_time + DELAY_START; encode_int64(swap_bytes_if_big_endian_int64(start_time), &start_time_buffer[1]); ssize_t bytes_written = write_to_socket( @@ -1152,15 +1120,15 @@ void handle_timestamp(federate_t *my_fed) { error_print("Failed to send the starting time to federate %d.", my_fed->id); } - pthread_mutex_lock(&rti_mutex); + pthread_mutex_lock(&_RTI.rti_mutex); // Update state for the federate to indicate that the MSG_TYPE_TIMESTAMP // message has been sent. That MSG_TYPE_TIMESTAMP message grants time advance to // the federate to the start time. my_fed->state = GRANTED; // FIXME: re-acquire the lock. - pthread_cond_broadcast(&sent_start_time); + pthread_cond_broadcast(&_RTI.sent_start_time); LOG_PRINT("RTI sent start time %lld to federate %d.", start_time, my_fed->id); - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); } /** @@ -1169,7 +1137,7 @@ void handle_timestamp(federate_t *my_fed) { * * This version assumes the caller holds the mutex lock. * - * @param message_type The type of the clock sync message (see rti.h). + * @param message_type The type of the clock sync message (see net_common.h). * @param fed The federate to send the physical time to. * @param socket_type The socket type (TCP or UDP). */ @@ -1188,7 +1156,7 @@ void send_physical_clock(unsigned char message_type, federate_t* fed, socket_typ if (socket_type == UDP) { // FIXME: UDP_addr is never initialized. DEBUG_PRINT("Clock sync: RTI sending UDP message type %u.", buffer[0]); - ssize_t bytes_written = sendto(socket_descriptor_UDP, buffer, 1 + sizeof(int64_t), 0, + ssize_t bytes_written = sendto(_RTI.socket_descriptor_UDP, buffer, 1 + sizeof(int64_t), 0, (struct sockaddr*)&fed->UDP_addr, sizeof(fed->UDP_addr)); if (bytes_written < (ssize_t)sizeof(int64_t) + 1) { warning_print("Clock sync: RTI failed to send physical time to federate %d: %s\n", @@ -1224,7 +1192,7 @@ void send_physical_clock(unsigned char message_type, federate_t* fed, socket_typ void handle_physical_clock_sync_message(federate_t* my_fed, socket_type_t socket_type) { // Lock the mutex to prevent interference between sending the two // coded probe messages. - pthread_mutex_lock(&rti_mutex); + pthread_mutex_lock(&_RTI.rti_mutex); // Reply with a T4 type message send_physical_clock(MSG_TYPE_CLOCK_SYNC_T4, my_fed, socket_type); // Send the corresponding coded probe immediately after, @@ -1232,13 +1200,12 @@ void handle_physical_clock_sync_message(federate_t* my_fed, socket_type_t socket if (socket_type == UDP) { send_physical_clock(MSG_TYPE_CLOCK_SYNC_CODED_PROBE, my_fed, socket_type); } - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); } -#ifdef _LF_CLOCK_SYNC_ON /** * A (quasi-)periodic thread that performs clock synchronization with each - * federate. It starts by waiting a time given by _LF_CLOCK_SYNC_PERIOD_NS + * federate. It starts by waiting a time given by _RTI.clock_sync_period_ns * and then iterates over the federates, performing a complete clock synchronization * interaction with each federate before proceeding to the next federate. * The interaction starts with this RTI sending a snapshot of its physical clock @@ -1251,11 +1218,11 @@ void* clock_synchronization_thread(void* noargs) { // Wait until all federates have been notified of the start time. // FIXME: Use lf_ version of this when merged with master. - pthread_mutex_lock(&rti_mutex); - while (num_feds_proposed_start < NUMBER_OF_FEDERATES) { - pthread_cond_wait(&received_start_times, &rti_mutex); + pthread_mutex_lock(&_RTI.rti_mutex); + while (_RTI.num_feds_proposed_start < _RTI.number_of_federates) { + pthread_cond_wait(&_RTI.received_start_times, &_RTI.rti_mutex); } - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); // Wait until the start time before starting clock synchronization. // The above wait ensures that start_time has been set. @@ -1267,9 +1234,9 @@ void* clock_synchronization_thread(void* noargs) { nanosleep(&wait_time, &rem_time); } - // Initiate a clock synchronization every _LF_CLOCK_SYNC_PERIOD_NS - struct timespec sleep_time = {(time_t) _LF_CLOCK_SYNC_PERIOD_NS / BILLION, - _LF_CLOCK_SYNC_PERIOD_NS % BILLION}; + // Initiate a clock synchronization every _RTI.clock_sync_period_ns + struct timespec sleep_time = {(time_t) _RTI.clock_sync_period_ns / BILLION, + _RTI.clock_sync_period_ns % BILLION}; struct timespec remaining_time; bool any_federates_connected = true; @@ -1277,20 +1244,20 @@ void* clock_synchronization_thread(void* noargs) { // Sleep nanosleep(&sleep_time, &remaining_time); // Can be interrupted any_federates_connected = false; - for (int fed = 0; fed < NUMBER_OF_FEDERATES; fed++) { - if (federates[fed].state == NOT_CONNECTED) { + for (int fed = 0; fed < _RTI.number_of_federates; fed++) { + if (_RTI.federates[fed].state == NOT_CONNECTED) { // FIXME: We need better error handling here, but clock sync failure // should not stop execution. - error_print("Clock sync failed with federate %d. Not connected.", federates[fed].id); - // mark_federate_requesting_stop(&federates[fed]); + error_print("Clock sync failed with federate %d. Not connected.", _RTI.federates[fed].id); + // mark_federate_requesting_stop(&_RTI.federates[fed]); continue; - } else if (!federates[fed].clock_synchronization_enabled) { + } else if (!_RTI.federates[fed].clock_synchronization_enabled) { continue; } // Send the RTI's current physical time to the federate // Send on UDP. DEBUG_PRINT("RTI sending T1 message to initiate clock sync round."); - send_physical_clock(MSG_TYPE_CLOCK_SYNC_T1, &federates[fed], UDP); + send_physical_clock(MSG_TYPE_CLOCK_SYNC_T1, &_RTI.federates[fed], UDP); // Listen for reply message, which should be T3. size_t message_size = 1 + sizeof(int32_t); @@ -1301,21 +1268,21 @@ void* clock_synchronization_thread(void* noargs) { int remaining_attempts = 5; while (remaining_attempts > 0) { remaining_attempts--; - int bytes_read = read_from_socket(socket_descriptor_UDP, message_size, buffer); + int bytes_read = read_from_socket(_RTI.socket_descriptor_UDP, message_size, buffer); // If any errors occur, either discard the message or the clock sync round. if (bytes_read == message_size) { if (buffer[0] == MSG_TYPE_CLOCK_SYNC_T3) { int32_t fed_id = extract_int32(&(buffer[1])); // Check that this message came from the correct federate. - if (fed_id != federates[fed].id) { + if (fed_id != _RTI.federates[fed].id) { // Message is from the wrong federate. Discard the message. warning_print("Clock sync: Received T3 message from federate %d, " "but expected one from %d. Discarding message.", - fed_id, federates[fed].id); + fed_id, _RTI.federates[fed].id); continue; } DEBUG_PRINT("Clock sync: RTI received T3 message from federate %d.", fed_id); - handle_physical_clock_sync_message(&federates[fed_id], UDP); + handle_physical_clock_sync_message(&_RTI.federates[fed_id], UDP); break; } else { // The message is not a T3 message. Discard the message and @@ -1325,14 +1292,14 @@ void* clock_synchronization_thread(void* noargs) { "Discarding message.", buffer[0], MSG_TYPE_CLOCK_SYNC_T3, - federates[fed].id); + _RTI.federates[fed].id); continue; } } else { warning_print("Clock sync: Read from UDP socket failed: %s. " "Skipping clock sync round for federate %d.", strerror(errno), - federates[fed].id); + _RTI.federates[fed].id); remaining_attempts = -1; } } @@ -1343,7 +1310,6 @@ void* clock_synchronization_thread(void* noargs) { } return NULL; } -#endif // _LF_CLOCK_SYNC_ON /** * A function to handle messages labeled @@ -1367,7 +1333,7 @@ void* clock_synchronization_thread(void* noargs) { **/ void handle_federate_resign(federate_t *my_fed) { // Nothing more to do. Close the socket and exit. - pthread_mutex_lock(&rti_mutex); + pthread_mutex_lock(&_RTI.rti_mutex); my_fed->state = NOT_CONNECTED; // FIXME: The following results in spurious error messages. // mark_federate_requesting_stop(my_fed); @@ -1390,10 +1356,10 @@ void handle_federate_resign(federate_t *my_fed) { // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep // track of which upstream federates have been visited. - bool visited[NUMBER_OF_FEDERATES] = { }; // Empty initializer initializes to 0. + bool* visited = (bool*)calloc(_RTI.number_of_federates, sizeof(bool)); // Initializes to 0. transitive_send_TAG_if_appropriate(my_fed, visited); - pthread_mutex_unlock(&rti_mutex); + pthread_mutex_unlock(&_RTI.rti_mutex); } /** @@ -1504,7 +1470,7 @@ int32_t receive_and_check_fed_id_message(int socket_id, struct sockaddr_in* clie // FIXME: This should not exit with error but rather should just reject the connection. read_from_socket_errexit(socket_id, length, buffer, "RTI failed to read from accepted socket."); - uint16_t fed_id = NUMBER_OF_FEDERATES; // Initialize to an invalid value. + uint16_t fed_id = _RTI.number_of_federates; // Initialize to an invalid value. // First byte received is the message type. if (buffer[0] != MSG_TYPE_FED_IDS) { @@ -1520,7 +1486,7 @@ int32_t receive_and_check_fed_id_message(int socket_id, struct sockaddr_in* clie } else { send_reject(socket_id, UNEXPECTED_MESSAGE); } - error_print("RTI expected a MSG_TYPE_FED_IDS message. Got %u (see rti.h).", buffer[0]); + error_print("RTI expected a MSG_TYPE_FED_IDS message. Got %u (see net_common.h).", buffer[0]); return -1; } else { // Received federate ID. @@ -1542,21 +1508,21 @@ int32_t receive_and_check_fed_id_message(int socket_id, struct sockaddr_in* clie DEBUG_PRINT("RTI received federation ID: %s.", federation_id_received); // Compare the received federation ID to mine. - if (strncmp(federation_id, federation_id_received, federation_id_length) != 0) { + if (strncmp(_RTI.federation_id, federation_id_received, federation_id_length) != 0) { // Federation IDs do not match. Send back a MSG_TYPE_REJECT message. error_print("WARNING: Federate from another federation %s attempted to connect to RTI in federation %s.\n", federation_id_received, - federation_id); + _RTI.federation_id); send_reject(socket_id, FEDERATION_ID_DOES_NOT_MATCH); return -1; } else { - if (fed_id >= NUMBER_OF_FEDERATES) { + if (fed_id >= _RTI.number_of_federates) { // Federate ID is out of range. error_print("RTI received federate ID %d, which is out of range.", fed_id); send_reject(socket_id, FEDERATE_ID_OUT_OF_RANGE); return -1; } else { - if (federates[fed_id].state != NOT_CONNECTED) { + if (_RTI.federates[fed_id].state != NOT_CONNECTED) { error_print("RTI received duplicate federate ID: %d.", fed_id); send_reject(socket_id, FEDERATE_ID_IN_USE); return -1; @@ -1572,23 +1538,23 @@ int32_t receive_and_check_fed_id_message(int socket_id, struct sockaddr_in* clie // First, convert the sockaddr structure into a sockaddr_in that contains an internet address. struct sockaddr_in* pV4_addr = client_fd; // Then extract the internet address (which is in IPv4 format) and assign it as the federate's socket server - federates[fed_id].server_ip_addr = pV4_addr->sin_addr; + _RTI.federates[fed_id].server_ip_addr = pV4_addr->sin_addr; #if LOG_LEVEL >= LOG_LEVEL_DEBUG // Create the human readable format and copy that into // the .server_hostname field of the federate. char str[INET_ADDRSTRLEN]; - inet_ntop( AF_INET, &federates[fed_id].server_ip_addr, str, INET_ADDRSTRLEN ); - strncpy (federates[fed_id].server_hostname, str, INET_ADDRSTRLEN); + inet_ntop( AF_INET, &_RTI.federates[fed_id].server_ip_addr, str, INET_ADDRSTRLEN ); + strncpy (_RTI.federates[fed_id].server_hostname, str, INET_ADDRSTRLEN); - DEBUG_PRINT("RTI got address %s from federate %d.", federates[fed_id].server_hostname, fed_id); + DEBUG_PRINT("RTI got address %s from federate %d.", _RTI.federates[fed_id].server_hostname, fed_id); #endif - federates[fed_id].socket = socket_id; + _RTI.federates[fed_id].socket = socket_id; // Set the federate's state as pending // because it is waiting for the start time to be // sent by the RTI before beginning its execution. - federates[fed_id].state = PENDING; + _RTI.federates[fed_id].state = PENDING; DEBUG_PRINT("RTI responding with MSG_TYPE_ACK to federate %d.", fed_id); // Send an MSG_TYPE_ACK message. @@ -1599,11 +1565,83 @@ int32_t receive_and_check_fed_id_message(int socket_id, struct sockaddr_in* clie return (int32_t)fed_id; } +/** + * Listen for a MSG_TYPE_NEIGHBOR_STRUCTURE message, and upon receiving it, fill + * out the relevant information in the federate's struct. + */ +int receive_connection_information(int socket_id, uint16_t fed_id) { + DEBUG_PRINT("RTI waiting for MSG_TYPE_NEIGHBOR_STRUCTURE from federate %d.", fed_id); + unsigned char connection_info_header[MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE]; + read_from_socket_errexit( + socket_id, + MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE, + connection_info_header, + "RTI failed to read MSG_TYPE_NEIGHBOR_STRUCTURE message header from federate %d.", + fed_id + ); + + if (connection_info_header[0] != MSG_TYPE_NEIGHBOR_STRUCTURE) { + error_print("RTI was expecting a MSG_TYPE_UDP_PORT message from federate %d. Got %u instead. " + "Rejecting federate.", fed_id, connection_info_header[0]); + send_reject(socket_id, UNEXPECTED_MESSAGE); + return 0; + } else { + // Read the number of upstream and downstream connections + _RTI.federates[fed_id].num_upstream = extract_int32(&(connection_info_header[1])); + _RTI.federates[fed_id].num_downstream = extract_int32(&(connection_info_header[1 + sizeof(int32_t)])); + DEBUG_PRINT( + "RTI got %d upstreams and %d downstreams from federate %d.", + _RTI.federates[fed_id].num_upstream, + _RTI.federates[fed_id].num_downstream, + fed_id); + + // Allocate memory for the upstream and downstream pointers + _RTI.federates[fed_id].upstream = (int*)malloc(sizeof(federate_t*) * _RTI.federates[fed_id].num_upstream); + _RTI.federates[fed_id].downstream = (int*)malloc(sizeof(federate_t*) * _RTI.federates[fed_id].num_downstream); + + // Allocate memory for the upstream delay pointers + _RTI.federates[fed_id].upstream_delay = + (interval_t*)malloc( + sizeof(interval_t) * _RTI.federates[fed_id].num_upstream + ); + + size_t connections_info_body_size = ((sizeof(uint16_t) + sizeof(int64_t)) * + _RTI.federates[fed_id].num_upstream) + (sizeof(uint16_t) * _RTI.federates[fed_id].num_downstream); + unsigned char* connections_info_body = (unsigned char*)malloc(connections_info_body_size); + read_from_socket_errexit( + socket_id, + connections_info_body_size, + connections_info_body, + "RTI failed to read MSG_TYPE_NEIGHBOR_STRUCTURE message body from federate %d.", + fed_id + ); + + // Keep track of where we are in the buffer + size_t message_head = 0; + // First, read the info about upstream federates + for (int i=0; i<_RTI.federates[fed_id].num_upstream; i++) { + _RTI.federates[fed_id].upstream[i] = extract_uint16(&(connections_info_body[message_head])); + message_head += sizeof(uint16_t); + _RTI.federates[fed_id].upstream_delay[i] = extract_int64(&(connections_info_body[message_head])); + message_head += sizeof(int64_t); + } + + // Next, read the info about downstream federates + for (int i=0; i<_RTI.federates[fed_id].num_downstream; i++) { + _RTI.federates[fed_id].downstream[i] = extract_uint16(&(connections_info_body[message_head])); + message_head += sizeof(uint16_t); + } + + free(connections_info_body); + return 1; + } +} + /** * Listen for a MSG_TYPE_UDP_PORT message, and upon receiving it, set up * clock synchronization and perform the initial clock synchronization. * Initial clock synchronization is performed only if the MSG_TYPE_UDP_PORT message - * payload is not USHRT_MAX. If it is also not 0, then this function sets + * payload is not UINT16_MAX. If it is also not 0, then this function sets * up to perform runtime clock synchronization using the UDP port number * specified in the payload to communicate with the federate's clock * synchronization logic. @@ -1625,53 +1663,53 @@ int receive_udp_message_and_set_up_clock_sync(int socket_id, uint16_t fed_id) { send_reject(socket_id, UNEXPECTED_MESSAGE); return 0; } else { -#ifdef _LF_CLOCK_SYNC_INITIAL // If no initial clock sync, no need perform initial clock sync. - uint16_t federate_UDP_port_number = extract_uint16(&(response[1])); - - // A port number of USHRT_MAX means initial clock sync should not be performed. - if (federate_UDP_port_number != USHRT_MAX) { - // Perform the initialization clock synchronization with the federate. - // Send the required number of messages for clock synchronization - for (int i=0; i < _LF_CLOCK_SYNC_EXCHANGES_PER_INTERVAL; i++) { - // Send the RTI's current physical time T1 to the federate. - send_physical_clock(MSG_TYPE_CLOCK_SYNC_T1, &federates[fed_id], TCP); - - // Listen for reply message, which should be T3. - size_t message_size = 1 + sizeof(int32_t); - unsigned char buffer[message_size]; - read_from_socket_errexit(socket_id, message_size, buffer, - "Socket to federate %d unexpectedly closed.", fed_id); - if (buffer[0] == MSG_TYPE_CLOCK_SYNC_T3) { - int32_t fed_id = extract_int32(&(buffer[1])); - assert(fed_id > -1); - assert(fed_id < 65536); - DEBUG_PRINT("RTI received T3 clock sync message from federate %d.", fed_id); - handle_physical_clock_sync_message(&federates[fed_id], TCP); - } else { - error_print("Unexpected message %u from federate %d.", buffer[0], fed_id); - send_reject(socket_id, UNEXPECTED_MESSAGE); - return 0; + if (_RTI.clock_sync_global_status >= clock_sync_init) {// If no initial clock sync, no need perform initial clock sync. + uint16_t federate_UDP_port_number = extract_uint16(&(response[1])); + + // A port number of UINT16_MAX means initial clock sync should not be performed. + if (federate_UDP_port_number != UINT16_MAX) { + // Perform the initialization clock synchronization with the federate. + // Send the required number of messages for clock synchronization + for (int i=0; i < _RTI.clock_sync_exchanges_per_interval; i++) { + // Send the RTI's current physical time T1 to the federate. + send_physical_clock(MSG_TYPE_CLOCK_SYNC_T1, &_RTI.federates[fed_id], TCP); + + // Listen for reply message, which should be T3. + size_t message_size = 1 + sizeof(int32_t); + unsigned char buffer[message_size]; + read_from_socket_errexit(socket_id, message_size, buffer, + "Socket to federate %d unexpectedly closed.", fed_id); + if (buffer[0] == MSG_TYPE_CLOCK_SYNC_T3) { + int32_t fed_id = extract_int32(&(buffer[1])); + assert(fed_id > -1); + assert(fed_id < 65536); + DEBUG_PRINT("RTI received T3 clock sync message from federate %d.", fed_id); + handle_physical_clock_sync_message(&_RTI.federates[fed_id], TCP); + } else { + error_print("Unexpected message %u from federate %d.", buffer[0], fed_id); + send_reject(socket_id, UNEXPECTED_MESSAGE); + return 0; + } } + DEBUG_PRINT("RTI finished initial clock synchronization with federate %d.", fed_id); } - DEBUG_PRINT("RTI finished initial clock synchronization with federate %d.", fed_id); - } -#ifdef _LF_CLOCK_SYNC_ON // If no runtime clock sync, no need to set up the UDP port. - if (federate_UDP_port_number > 0) { - // Initialize the UDP_addr field of the federate struct - federates[fed_id].UDP_addr.sin_family = AF_INET; - federates[fed_id].UDP_addr.sin_port = htons(federate_UDP_port_number); - federates[fed_id].UDP_addr.sin_addr = federates[fed_id].server_ip_addr; + if (_RTI.clock_sync_global_status >= clock_sync_on) { // If no runtime clock sync, no need to set up the UDP port. + if (federate_UDP_port_number > 0) { + // Initialize the UDP_addr field of the federate struct + _RTI.federates[fed_id].UDP_addr.sin_family = AF_INET; + _RTI.federates[fed_id].UDP_addr.sin_port = htons(federate_UDP_port_number); + _RTI.federates[fed_id].UDP_addr.sin_addr = _RTI.federates[fed_id].server_ip_addr; + } + } else { + // Disable clock sync after initial round. + _RTI.federates[fed_id].clock_synchronization_enabled = false; + } + } else { // No clock synchronization at all. + // Clock synchronization is universally disabled via the clock-sync command-line parameter + // (-c off was passed to the RTI). + // Note that the federates are still going to send a MSG_TYPE_UDP_PORT message but with a payload (port) of -1. + _RTI.federates[fed_id].clock_synchronization_enabled = false; } -#else - // Disable clock sync after initial round. - federates[fed_id].clock_synchronization_enabled = false; -#endif -#else // No clock synchronization at all. - // Clock synchronization is universally disabled via the clock-sync target parameter - // (#define _LF_CLOCK_SYNC was not generated for the RTI). - // Note that the federates are still going to send a MSG_TYPE_UDP_PORT message but with a payload (port) of -1. - federates[fed_id].clock_synchronization_enabled = false; -#endif } return 1; } @@ -1684,14 +1722,14 @@ int receive_udp_message_and_set_up_clock_sync(int socket_id, uint16_t fed_id) { * @param socket_descriptor The socket on which to accept connections. */ void connect_to_federates(int socket_descriptor) { - for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { + for (int i = 0; i < _RTI.number_of_federates; i++) { // Wait for an incoming connection request. struct sockaddr client_fd; uint32_t client_length = sizeof(client_fd); // The following blocks until a federate connects. int socket_id = -1; while(1) { - socket_id = accept(socket_descriptor_TCP, &client_fd, &client_length); + socket_id = accept(_RTI.socket_descriptor_TCP, &client_fd, &client_length); if (socket_id >= 0) { // Got a socket break; @@ -1707,13 +1745,14 @@ void connect_to_federates(int socket_descriptor) { // The first message from the federate should contain its ID and the federation ID. int32_t fed_id = receive_and_check_fed_id_message(socket_id, (struct sockaddr_in*)&client_fd); if (fed_id >= 0 + && receive_connection_information(socket_id, (uint16_t)fed_id) && receive_udp_message_and_set_up_clock_sync(socket_id, (uint16_t)fed_id)) { // Create a thread to communicate with the federate. // This has to be done after clock synchronization is finished // or that thread may end up attempting to handle incoming clock // synchronization messages. - pthread_create(&(federates[fed_id].thread_id), NULL, federate_thread_TCP, &(federates[fed_id])); + pthread_create(&(_RTI.federates[fed_id].thread_id), NULL, federate_thread_TCP, &(_RTI.federates[fed_id])); } else { // Received message was rejected. Try again. @@ -1723,21 +1762,21 @@ void connect_to_federates(int socket_descriptor) { // All federates have connected. DEBUG_PRINT("All federates have connected to RTI."); -#ifdef _LF_CLOCK_SYNC_ON - // Create the thread that performs periodic PTP clock synchronization sessions - // over the UDP channel, but only if the UDP channel is open and at least one - // federate is performing runtime clock synchronization. - bool clock_sync_enabled = false; - for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { - if (federates[i].clock_synchronization_enabled) { - clock_sync_enabled = true; - break; - } - } - if (final_port_UDP != USHRT_MAX && clock_sync_enabled) { - pthread_create(&clock_thread, NULL, clock_synchronization_thread, NULL); - } -#endif // _LF_CLOCK_SYNC_ON + if (_RTI.clock_sync_global_status >= clock_sync_on) { + // Create the thread that performs periodic PTP clock synchronization sessions + // over the UDP channel, but only if the UDP channel is open and at least one + // federate is performing runtime clock synchronization. + bool clock_sync_enabled = false; + for (int i = 0; i < _RTI.number_of_federates; i++) { + if (_RTI.federates[i].clock_synchronization_enabled) { + clock_sync_enabled = true; + break; + } + } + if (_RTI.final_port_UDP != UINT16_MAX && clock_sync_enabled) { + pthread_create(&_RTI.clock_thread, NULL, clock_synchronization_thread, NULL); + } + } } /** @@ -1751,11 +1790,11 @@ void* respond_to_erroneous_connections(void* nothing) { struct sockaddr client_fd; uint32_t client_length = sizeof(client_fd); // The following will block until either a federate attempts to connect - // or close(socket_descriptor_TCP) is called. - int socket_id = accept(socket_descriptor_TCP, &client_fd, &client_length); + // or close(_RTI.socket_descriptor_TCP) is called. + int socket_id = accept(_RTI.socket_descriptor_TCP, &client_fd, &client_length); if (socket_id < 0) return NULL; - if (all_federates_exited) { + if (_RTI.all_federates_exited) { return NULL; } @@ -1776,46 +1815,25 @@ void* respond_to_erroneous_connections(void* nothing) { * @param id The federate ID. */ void initialize_federate(uint16_t id) { - federates[id].id = id; - federates[id].socket = -1; // No socket. - federates[id].clock_synchronization_enabled = true; - federates[id].completed = NEVER_TAG; - federates[id].last_granted = NEVER_TAG; - federates[id].last_provisionally_granted = NEVER_TAG; - federates[id].next_event = NEVER_TAG; - federates[id].time_advance = NEVER; - federates[id].state = NOT_CONNECTED; - federates[id].upstream = NULL; - federates[id].upstream_delay = NULL; - federates[id].num_upstream = 0; - federates[id].downstream = NULL; - federates[id].num_downstream = 0; - federates[id].mode = REALTIME; - strncpy(federates[id].server_hostname ,"localhost", INET_ADDRSTRLEN); - federates[id].server_ip_addr.s_addr = 0; - federates[id].server_port = -1; - federates[id].requested_stop = false; -} - -/** - * Initialize logical time to match the physical clock. - */ -void initialize_clock() { - // Initialize logical time to match physical clock. - struct timespec actualStartTime; - clock_gettime(_LF_CLOCK, &actualStartTime); - physical_start_time = actualStartTime.tv_sec * BILLION + actualStartTime.tv_nsec; - - // Set the epoch offset to zero (see tag.h) - _lf_epoch_offset = 0LL; - if (_LF_CLOCK != CLOCK_REALTIME) { - struct timespec real_time_start; - clock_gettime(CLOCK_REALTIME, &real_time_start); - int64_t real_time_start_ns = real_time_start.tv_sec * BILLION + real_time_start.tv_nsec; - // If the clock is not CLOCK_REALTIME, find the necessary epoch offset - _lf_epoch_offset = real_time_start_ns - physical_start_time; - DEBUG_PRINT("Setting epoch offset to %lld.", _lf_epoch_offset); - } + _RTI.federates[id].id = id; + _RTI.federates[id].socket = -1; // No socket. + _RTI.federates[id].clock_synchronization_enabled = true; + _RTI.federates[id].completed = NEVER_TAG; + _RTI.federates[id].last_granted = NEVER_TAG; + _RTI.federates[id].last_provisionally_granted = NEVER_TAG; + _RTI.federates[id].next_event = NEVER_TAG; + _RTI.federates[id].time_advance = NEVER; + _RTI.federates[id].state = NOT_CONNECTED; + _RTI.federates[id].upstream = NULL; + _RTI.federates[id].upstream_delay = NULL; + _RTI.federates[id].num_upstream = 0; + _RTI.federates[id].downstream = NULL; + _RTI.federates[id].num_downstream = 0; + _RTI.federates[id].mode = REALTIME; + strncpy(_RTI.federates[id].server_hostname ,"localhost", INET_ADDRSTRLEN); + _RTI.federates[id].server_ip_addr.s_addr = 0; + _RTI.federates[id].server_port = -1; + _RTI.federates[id].requested_stop = false; } /** @@ -1831,16 +1849,16 @@ int32_t start_rti_server(uint16_t port) { // Use the default starting port. port = STARTING_PORT; } - initialize_clock(); + lf_initialize_clock(); // Create the TCP socket server - socket_descriptor_TCP = create_server(specified_port, port, TCP); + _RTI.socket_descriptor_TCP = create_server(specified_port, port, TCP); info_print("RTI: Listening for federates."); // Create the UDP socket server - // Try to get the final_port_TCP + 1 port -#ifdef _LF_CLOCK_SYNC_ON - socket_descriptor_UDP = create_server(specified_port, final_port_TCP + 1, UDP); -#endif // _LF_CLOCK_SYNC_ON - return socket_descriptor_TCP; + // Try to get the _RTI.final_port_TCP + 1 port + if (_RTI.clock_sync_global_status >= clock_sync_on) { + _RTI.socket_descriptor_UDP = create_server(specified_port, _RTI.final_port_TCP + 1, UDP); + } + return _RTI.socket_descriptor_TCP; } /** @@ -1864,17 +1882,17 @@ void wait_for_federates(int socket_descriptor) { // Wait for federate threads to exit. void* thread_exit_status; - for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { - info_print("RTI: Waiting for thread handling federate %d.", federates[i].id); - pthread_join(federates[i].thread_id, &thread_exit_status); - info_print("RTI: Federate %d thread exited.", federates[i].id); + for (int i = 0; i < _RTI.number_of_federates; i++) { + info_print("RTI: Waiting for thread handling federate %d.", _RTI.federates[i].id); + pthread_join(_RTI.federates[i].thread_id, &thread_exit_status); + info_print("RTI: Federate %d thread exited.", _RTI.federates[i].id); } - all_federates_exited = true; + _RTI.all_federates_exited = true; // Shutdown and close the socket so that the accept() call in // respond_to_erroneous_connections returns. That thread should then - // check all_federates_exited and it should exit. + // check _RTI.all_federates_exited and it should exit. if (shutdown(socket_descriptor, SHUT_RDWR)) { LOG_PRINT("On shut down TCP socket, received reply: %s", strerror(errno)); } @@ -1885,7 +1903,7 @@ void wait_for_federates(int socket_descriptor) { // NOTE: Apparently, closing the socket will not necessarily // cause the respond_to_erroneous_connections accept() call to return, - // so instead, we connect here so that it can check the all_federates_exited + // so instead, we connect here so that it can check the _RTI.all_federates_exited // variable. // Create an IPv4 socket for TCP (not UDP) communication over IP (0). @@ -1904,7 +1922,7 @@ void wait_for_federates(int socket_descriptor) { (char *)&server_fd.sin_addr.s_addr, server->h_length); // Convert the port number from host byte order to network byte order. - server_fd.sin_port = htons(final_port_TCP); + server_fd.sin_port = htons(_RTI.final_port_TCP); connect( tmp_socket, (struct sockaddr *)&server_fd, @@ -1921,11 +1939,11 @@ void wait_for_federates(int socket_descriptor) { close(socket_descriptor); */ - if (socket_descriptor_UDP > 0) { - if (shutdown(socket_descriptor_UDP, SHUT_RDWR)) { + if (_RTI.socket_descriptor_UDP > 0) { + if (shutdown(_RTI.socket_descriptor_UDP, SHUT_RDWR)) { LOG_PRINT("On shut down UDP socket, received reply: %s", strerror(errno)); } - close(socket_descriptor_UDP); + close(_RTI.socket_descriptor_UDP); } } @@ -1936,6 +1954,20 @@ void usage(int argc, char* argv[]) { printf("\nCommand-line arguments: \n\n"); printf(" -i, --id \n"); printf(" The ID of the federation that this RTI will control.\n\n"); + printf(" -n, --number_of_federates \n"); + printf(" The number of federates in the federation that this RTI will control.\n\n"); + printf(" -p, --port \n"); + printf(" The port number to use for the RTI. Must be larger than 0 and smaller than %d. Default is %d.\n\n", UINT16_MAX, STARTING_PORT); + printf(" -c, --clock_sync [off|init|on] [period ] [exchanges-per-interval ]\n"); + printf(" The status of clock synchronization for this federate.\n"); + printf(" - off: Clock synchronization is off.\n"); + printf(" - init (default): Clock synchronization is done only during startup.\n"); + printf(" - on: Clock synchronization is done both at startup and during the execution.\n"); + printf(" Relevant parameters that can be set: \n"); + printf(" - period (in nanoseconds): Controls how often a clock synchronization attempt is made\n"); + printf(" (period in nanoseconds, default is 5 msec). Only applies to 'on'.\n"); + printf(" - exchanges-per-interval : Controls the number of messages that are exchanged for each\n"); + printf(" clock sync attempt (default is 10). Applies to 'init' and 'on'.\n\n"); printf("Command given:\n"); for (int i = 0; i < argc; i++) { @@ -1944,6 +1976,77 @@ void usage(int argc, char* argv[]) { printf("\n\n"); } +/** + * Process command-line arguments related to clock synchronization. Will return + * the last read position of argv if all related arguments are parsed or an + * invalid argument is read. + * + * @param argc: Number of arguments in the list + * @param argv: The list of arguments as a string + * @return Current position (head) of argv; + */ +int process_clock_sync_args(int argc, char* argv[]) { + for (int i = 0; i < argc; i++) { + if (strcmp(argv[i], "off") == 0) { + _RTI.clock_sync_global_status = clock_sync_off; + printf("RTI: Clock sync: off\n"); + } else if (strcmp(argv[i], "init") == 0 || strcmp(argv[i], "initial") == 0) { + _RTI.clock_sync_global_status = clock_sync_init; + printf("RTI: Clock sync: init\n"); + } else if (strcmp(argv[i], "on") == 0) { + _RTI.clock_sync_global_status = clock_sync_on; + printf("RTI: Clock sync: on\n"); + } else if (strcmp(argv[i], "period") == 0) { + if (_RTI.clock_sync_global_status != clock_sync_on) { + fprintf(stderr, "Error: clock sync period can only be set if --clock-sync is set to on.\n"); + usage(argc, argv); + i++; + continue; // Try to parse the rest of the arguments as clock sync args. + } else if (argc < i + 2) { + fprintf(stderr, "Error: clock sync period needs a time (in nanoseconds) argument.\n"); + usage(argc, argv); + continue; + } + i++; + long long period_ns = strtoll(argv[i], NULL, 10); + if (period_ns == 0LL || period_ns == LLONG_MAX || period_ns == LLONG_MIN) { + fprintf(stderr, "Error: clock sync period value is invalid.\n"); + continue; // Try to parse the rest of the arguments as clock sync args. + } + _RTI.clock_sync_period_ns = (int64_t)period_ns; + printf("RTI: Clock sync period: %lld\n", (long long int)_RTI.clock_sync_period_ns); + } else if (strcmp(argv[i], "exchanges-per-interval") == 0) { + if (_RTI.clock_sync_global_status != clock_sync_on && _RTI.clock_sync_global_status != clock_sync_init) { + fprintf(stderr, "Error: clock sync exchanges-per-interval can only be set if\n"); + fprintf(stderr, "--clock-sync is set to on or init.\n"); + usage(argc, argv); + continue; // Try to parse the rest of the arguments as clock sync args. + } else if (argc < i + 2) { + fprintf(stderr, "Error: clock sync exchanges-per-interval needs an integer argument.\n"); + usage(argc, argv); + continue; // Try to parse the rest of the arguments as clock sync args. + } + i++; + long exchanges = (long)strtol(argv[i], NULL, 10); + if (exchanges == 0L || exchanges == LONG_MAX || exchanges == LONG_MIN) { + fprintf(stderr, "Error: clock sync exchanges-per-interval value is invalid.\n"); + continue; // Try to parse the rest of the arguments as clock sync args. + } + _RTI.clock_sync_exchanges_per_interval = (int32_t)exchanges; // FIXME: Loses numbers on 64-bit machines + printf("RTI: Clock sync exchanges per interval: %d\n", _RTI.clock_sync_exchanges_per_interval); + } else if (strcmp(argv[i], " ") == 0) { + // Tolerate spaces + continue; + } else { + // Either done with the clock sync args or there is an invalid + // character. In either case, let the parent function deal with + // the rest of the characters; + return i; + } + } + return argc; +} + /** * Process the command-line arguments. If the command line arguments are not * understood, then print a usage message and return 0. Otherwise, return 1. @@ -1951,20 +2054,90 @@ void usage(int argc, char* argv[]) { */ int process_args(int argc, char* argv[]) { for (int i = 1; i < argc; i++) { - if (strcmp(argv[i], "-i") == 0 || strcmp(argv[i], "--id") == 0) { - if (argc < i + 2) { - fprintf(stderr, "Error: --id needs a string argument.\n"); + if (strcmp(argv[i], "-i") == 0 || strcmp(argv[i], "--id") == 0) { + if (argc < i + 2) { + fprintf(stderr, "Error: --id needs a string argument.\n"); + usage(argc, argv); + return 0; + } + i++; + printf("RTI: Federation ID: %s\n", argv[i]); + _RTI.federation_id = argv[i]; + } else if (strcmp(argv[i], "-n") == 0 || strcmp(argv[i], "--number_of_federates") == 0) { + if (argc < i + 2) { + fprintf(stderr, "Error: --number_of_federates needs an integer argument.\n"); + usage(argc, argv); + return 0; + } + i++; + long num_federates = strtol(argv[i], NULL, 10); + if (num_federates == 0L || num_federates == LONG_MAX || num_federates == LONG_MIN) { + fprintf(stderr, "Error: --number_of_federates needs a valid positive integer argument.\n"); + usage(argc, argv); + return 0; + } + _RTI.number_of_federates = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines + printf("RTI: Number of federates: %d\n", _RTI.number_of_federates); + } else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) { + if (argc < i + 2) { + fprintf( + stderr, + "Error: --port needs a short unsigned integer argument ( > 0 and < %d).\n", + UINT16_MAX + ); + usage(argc, argv); + return 0; + } + i++; + uint32_t RTI_port = (uint32_t)strtoul(argv[i], NULL, 10); + if (RTI_port <= 0 || RTI_port >= UINT16_MAX) { + fprintf( + stderr, + "Error: --port needs a short unsigned integer argument ( > 0 and < %d).\n", + UINT16_MAX + ); + usage(argc, argv); + return 0; + } + _RTI.user_specified_port = (uint16_t)RTI_port; + } else if (strcmp(argv[i], "-c") == 0 || strcmp(argv[i], "--clock_sync") == 0) { + if (argc < i + 2) { + fprintf(stderr, "Error: --clock-sync needs off|init|on.\n"); usage(argc, argv); return 0; } i++; - printf("Federation ID at RTI: %s\n", argv[i]); - federation_id = argv[i++]; - } else { + i += process_clock_sync_args((argc-i), &argv[i]); + } else if (strcmp(argv[i], " ") == 0) { + // Tolerate spaces + continue; + } else { fprintf(stderr, "Error: Unrecognized command-line argument: %s\n", argv[i]); usage(argc, argv); return 0; } } + if (_RTI.number_of_federates == 0) { + fprintf(stderr, "Error: --number_of_federates needs a valid positive integer argument.\n"); + usage(argc, argv); + return 0; + } return 1; } + +int main(int argc, char* argv[]) { + if (!process_args(argc, argv)) { + // Processing command-line arguments failed. + return -1; + } + printf("Starting RTI for %d federates in federation ID %s\n", _RTI.number_of_federates, _RTI.federation_id); + assert(_RTI.number_of_federates < UINT16_MAX); + _RTI.federates = (federate_t*)calloc(_RTI.number_of_federates, sizeof(federate_t)); + for (uint16_t i = 0; i < _RTI.number_of_federates; i++) { + initialize_federate(i); + } + int socket_descriptor = start_rti_server(_RTI.user_specified_port); + wait_for_federates(socket_descriptor); + printf("RTI is exiting.\n"); + return 0; +} \ No newline at end of file diff --git a/org.lflang/src/lib/core/federated/RTI/rti.h b/org.lflang/src/lib/core/federated/RTI/rti.h new file mode 100644 index 0000000000..006658affd --- /dev/null +++ b/org.lflang/src/lib/core/federated/RTI/rti.h @@ -0,0 +1,196 @@ +/** + * @file + * @author Edward A. Lee (eal@berkeley.edu) + * @author Soroush Bateni (soroush@utdallas.edu) + * + * @section LICENSE +Copyright (c) 2020, The University of California at Berkeley. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + * @section DESCRIPTION + * Header file for the runtime infrastructure for distributed Lingua Franca programs. + * +*/ + +#ifndef RTI_H +#define RTI_H + +#include "reactor.h" + +///////////////////////////////////////////// +//// Data structures + +typedef enum socket_type_t { + TCP, + UDP +} socket_type_t; + +/** Mode of execution of a federate. */ +typedef enum execution_mode_t { + FAST, + REALTIME +} execution_mode_t; + +/** State of a federate during execution. */ +typedef enum fed_state_t { + NOT_CONNECTED, // The federate has not connected. + GRANTED, // Most recent MSG_TYPE_NEXT_EVENT_TAG has been granted. + PENDING // Waiting for upstream federates. +} fed_state_t; + +/** + * Information about a federate known to the RTI, including its runtime state, + * mode of execution, and connectivity with other federates. + * The list of upstream and downstream federates does not include + * those that are connected via a "physical" connection (one + * denoted with ~>) because those connections do not impose + * any scheduling constraints. + */ +typedef struct federate_t { + uint16_t id; // ID of this federate. + pthread_t thread_id; // The ID of the thread handling communication with this federate. + int socket; // The TCP socket descriptor for communicating with this federate. + struct sockaddr_in UDP_addr; // The UDP address for the federate. + bool clock_synchronization_enabled; // Indicates the status of clock synchronization + // for this federate. Enabled by default. + tag_t completed; // The largest logical tag completed by the federate (or NEVER if no LTC has been received). + tag_t last_granted; // The maximum TAG that has been granted so far (or NEVER if none granted) + tag_t last_provisionally_granted; // The maximum PTAG that has been provisionally granted (or NEVER if none granted) + tag_t next_event; // Most recent NET received from the federate (or NEVER if none received). + instant_t time_advance; // Most recent TAN received from the federate (or NEVER if none received). + fed_state_t state; // State of the federate. + int* upstream; // Array of upstream federate ids. + interval_t* upstream_delay; // Minimum delay on connections from upstream federates. + // Here, NEVER encodes no delay. 0LL is a microstep delay. + int num_upstream; // Size of the array of upstream federates and delays. + int* downstream; // Array of downstream federate ids. + int num_downstream; // Size of the array of downstream federates. + execution_mode_t mode; // FAST or REALTIME. + char server_hostname[INET_ADDRSTRLEN]; // Human-readable IP address and + int32_t server_port; // port number of the socket server of the federate + // if it has any incoming direct connections from other federates. + // The port number will be -1 if there is no server or if the + // RTI has not been informed of the port number. + struct in_addr server_ip_addr; // Information about the IP address of the socket + // server of the federate. + bool requested_stop; // Indicates that the federate has requested stop or has replied + // to a request for stop from the RTI. Used to prevent double-counting + // a federate when handling request_stop(). +} federate_t; + +/** + * The status of clock synchronization. + */ +typedef enum clock_sync_stat { + clock_sync_off, + clock_sync_init, + clock_sync_on +} clock_sync_stat; + +/** + * Structure that an RTI instance uses to keep track of its own and its + * corresponding federates' state. + */ +typedef struct RTI_instance_t { + // The main mutex lock. + pthread_mutex_t rti_mutex; + + // Condition variable used to signal receipt of all proposed start times. + pthread_cond_t received_start_times; + + // Condition variable used to signal that a start time has been sent to a federate. + pthread_cond_t sent_start_time; + + // RTI's decided stop tag for federates + tag_t max_stop_tag; + + // Number of federates in the federation + int32_t number_of_federates; + + // The federates. + federate_t* federates; + + // Maximum start time seen so far from the federates. + int64_t max_start_time; + + // Number of federates that have proposed start times. + int num_feds_proposed_start; + + // Number of federates handling stop + int num_feds_handling_stop; + + /** + * Boolean indicating that all federates have exited. + * This gets set to true exactly once before the program exits. + * It is marked volatile because the write is not guarded by a mutex. + * The main thread makes this true, then calls shutdown and close on + * the socket, which will cause accept() to return with an error code + * in respond_to_erroneous_connections(). + */ + volatile bool all_federates_exited; + + /** + * The ID of the federation that this RTI will supervise. + * This should be overridden with a command-line -i option to ensure + * that each federate only joins its assigned federation. + */ + char* federation_id; + + /************* TCP server information *************/ + /** The desired port specified by the user on the command line. */ + uint16_t user_specified_port; + + /** The final port number that the TCP socket server ends up using. */ + uint16_t final_port_TCP; + + /** The TCP socket descriptor for the socket server. */ + int socket_descriptor_TCP; + + /************* UDP server information *************/ + /** The final port number that the UDP socket server ends up using. */ + uint16_t final_port_UDP; + + /** The UDP socket descriptor for the socket server. */ + int socket_descriptor_UDP; + + /************* Clock synchronization information *************/ + /* Thread performing PTP clock sync sessions periodically. */ + pthread_t clock_thread; + + /** + * Indicates whether clock sync is globally on for the federation. Federates + * can still selectively disable clock synchronization if they wanted to. + */ + clock_sync_stat clock_sync_global_status; + + /** + * Frequency (period in nanoseconds) between clock sync attempts. + */ + uint64_t clock_sync_period_ns; + + /** + * Number of messages exchanged for each clock sync attempt. + */ + int32_t clock_sync_exchanges_per_interval; +} RTI_instance_t; + +#endif // RTI_H \ No newline at end of file diff --git a/org.lflang/src/lib/core/clock-sync.c b/org.lflang/src/lib/core/federated/clock-sync.c similarity index 99% rename from org.lflang/src/lib/core/clock-sync.c rename to org.lflang/src/lib/core/federated/clock-sync.c index 59ffa3032b..9c2774db1d 100644 --- a/org.lflang/src/lib/core/clock-sync.c +++ b/org.lflang/src/lib/core/federated/clock-sync.c @@ -31,7 +31,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "clock-sync.h" -#include "rti.h" +#include "net_common.h" /** * Keep a record of connection statistics diff --git a/org.lflang/src/lib/core/clock-sync.h b/org.lflang/src/lib/core/federated/clock-sync.h similarity index 100% rename from org.lflang/src/lib/core/clock-sync.h rename to org.lflang/src/lib/core/federated/clock-sync.h diff --git a/org.lflang/src/lib/core/federate.c b/org.lflang/src/lib/core/federated/federate.c similarity index 98% rename from org.lflang/src/lib/core/federate.c rename to org.lflang/src/lib/core/federated/federate.c index 2918284893..78a0f6e714 100644 --- a/org.lflang/src/lib/core/federate.c +++ b/org.lflang/src/lib/core/federated/federate.c @@ -42,11 +42,11 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include #include // Defines sigaction. #include "net_util.c" // Defines network functions. -#include "rti.h" // Defines message types, etc. -#include "reactor.h" // Defines instant_t. -#include "platform.h" -#include "clock-sync.c" // Defines clock synchronization functions. -#include "federate.h" // Defines federate_instance_t +#include "net_common.h" // Defines message types, etc. +#include "../reactor.h" // Defines instant_t. +#include "../platform.h" +#include "clock-sync.c" // Defines clock synchronization functions. +#include "federate.h" // Defines federate_instance_t // Error messages. char* ERROR_SENDING_HEADER = "ERROR sending header information to federate via RTI"; @@ -95,19 +95,29 @@ federate_instance_t _fed = { */ void* listen_to_federates(void* args); + +/** + * Generated function that sends information about connections between this federate and + * other federates where messages are routed through the RTI. Currently, this + * only includes logical connections when the coordination is centralized. This + * information is needed for the RTI to perform the centralized coordination. + * @see MSG_TYPE_NEIGHBOR_STRUCTURE in net_common.h + */ +void send_neighbor_structure_to_RTI(int rti_socket); + /** * Create a server to listen to incoming physical * connections from remote federates. This function * only handles the creation of the server socket. * The reserved port for the server socket is then * sent to the RTI by sending an MSG_TYPE_ADDRESS_ADVERTISEMENT message - * (@see rti.h). This function expects no response + * (@see net_common.h). This function expects no response * from the RTI. * * If a port is specified by the user, that will be used * as the only possibility for the server. This function * will fail if that port is not available. If a port is not - * specified, the STARTING_PORT (@see rti.h) will be used. + * specified, the STARTING_PORT (@see net_common.h) will be used. * The function will keep incrementing the port in this case * until the number of tries reaches PORT_RANGE_LIMIT. * @@ -190,7 +200,7 @@ void create_server(int specified_port) { _fed.server_port = port; // Send the server port number to the RTI - // on an MSG_TYPE_ADDRESS_ADVERTISEMENT message (@see rti.h). + // on an MSG_TYPE_ADDRESS_ADVERTISEMENT message (@see net_common.h). unsigned char buffer[sizeof(int32_t) + 1]; buffer[0] = MSG_TYPE_ADDRESS_ADVERTISEMENT; encode_int32(_fed.server_port, &(buffer[1])); @@ -759,9 +769,9 @@ void connect_to_federate(uint16_t remote_federate_id) { size_t buffer_length = 1 + sizeof(uint16_t) + 1; unsigned char buffer[buffer_length]; buffer[0] = MSG_TYPE_P2P_SENDING_FED_ID; - if (_lf_my_fed_id > USHRT_MAX) { + if (_lf_my_fed_id > UINT16_MAX) { // This error is very unlikely to occur. - error_print_and_exit("Too many federates! More than %d.", USHRT_MAX); + error_print_and_exit("Too many federates! More than %d.", UINT16_MAX); } encode_uint16((uint16_t)_lf_my_fed_id, (unsigned char*)&(buffer[1])); unsigned char federation_id_length = (unsigned char)strnlen(federation_id, 255); @@ -922,8 +932,8 @@ void connect_to_rti(char* hostname, int port) { // Send the message type first. buffer[0] = MSG_TYPE_FED_IDS; // Next send the federate ID. - if (_lf_my_fed_id > USHRT_MAX) { - error_print_and_exit("Too many federates! More than %d.", USHRT_MAX); + if (_lf_my_fed_id > UINT16_MAX) { + error_print_and_exit("Too many federates! More than %d.", UINT16_MAX); } encode_uint16((uint16_t)_lf_my_fed_id, &buffer[1]); // Next send the federation ID length. @@ -957,10 +967,17 @@ void connect_to_rti(char* hostname, int port) { result = -1; continue; } - error_print_and_exit("RTI Rejected MSG_TYPE_FED_IDS message with response (see rti.h): " + error_print_and_exit("RTI Rejected MSG_TYPE_FED_IDS message with response (see net_common.h): " "%d. Error code: %d. Federate quits.\n", response, cause); } else if (response == MSG_TYPE_ACK) { LOG_PRINT("Received acknowledgment from the RTI."); + + // Call a generated (external) function that sends information + // about connections between this federate and other federates + // where messages are routed through the RTI. + // @see MSG_TYPE_NEIGHBOR_STRUCTURE in net_common.h + send_neighbor_structure_to_RTI(_fed.socket_TCP_RTI); + uint16_t udp_port = setup_clock_synchronization_with_rti(); // Write the returned port number to the RTI @@ -970,7 +987,7 @@ void connect_to_rti(char* hostname, int port) { write_to_socket_errexit(_fed.socket_TCP_RTI, 1 + sizeof(uint16_t), UDP_port_number, "Failed to send the UDP port number to the RTI."); } else { - error_print_and_exit("Received unexpected response %u from the RTI (see rti.h).", + error_print_and_exit("Received unexpected response %u from the RTI (see net_common.h).", response); } info_print("Connected to RTI at %s:%d.", hostname, uport); @@ -1002,7 +1019,7 @@ instant_t get_start_time_from_rti(instant_t my_physical_time) { // First byte received is the message ID. if (buffer[0] != MSG_TYPE_TIMESTAMP) { - error_print_and_exit("Expected a MSG_TYPE_TIMESTAMP message from the RTI. Got %u (see rti.h).", + error_print_and_exit("Expected a MSG_TYPE_TIMESTAMP message from the RTI. Got %u (see net_common.h).", buffer[0]); } @@ -2178,7 +2195,7 @@ void terminate_execution() { /** * Thread that listens for inputs from other federates. * This thread listens for messages of type MSG_TYPE_P2P_MESSAGE, - * MSG_TYPE_P2P_TAGGED_MESSAGE, or MSG_TYPE_PORT_ABSENT (@see rti.h) from the specified + * MSG_TYPE_P2P_TAGGED_MESSAGE, or MSG_TYPE_PORT_ABSENT (@see net_common.h) from the specified * peer federate and calls the appropriate handling function for * each message type. If an error occurs or an EOF is received * from the peer, then this procedure sets the corresponding diff --git a/org.lflang/src/lib/core/federate.h b/org.lflang/src/lib/core/federated/federate.h similarity index 100% rename from org.lflang/src/lib/core/federate.h rename to org.lflang/src/lib/core/federated/federate.h diff --git a/org.lflang/src/lib/core/rti.h b/org.lflang/src/lib/core/federated/net_common.h similarity index 88% rename from org.lflang/src/lib/core/rti.h rename to org.lflang/src/lib/core/federated/net_common.h index 49138335c8..d2d3f2e1a5 100644 --- a/org.lflang/src/lib/core/rti.h +++ b/org.lflang/src/lib/core/federated/net_common.h @@ -1,6 +1,7 @@ /** * @file * @author Edward A. Lee (eal@berkeley.edu) + * @author Soroush Bateni (soroush@utdallas.edu) * * @section LICENSE Copyright (c) 2020, The University of California at Berkeley. @@ -24,9 +25,10 @@ PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR B INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - +*/ +/** * @section DESCRIPTION - * Header file for the runtime infrastructure for distributed Lingua Franca programs. + * Header file for common message types and definitions for federated Lingua Franca programs. * * This file defines the message types for the federate to communicate with the RTI. * Each message type has a unique one-byte ID. @@ -62,8 +64,15 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * When the federation IDs match, the RTI will respond with an * MSG_TYPE_ACK. + * + * The next message to the RTI will be a MSG_TYPE_NEIGHBOR_STRUCTURE message + * that informs the RTI about connections between this federate and other + * federates where messages are routed through the RTI. Currently, this only + * includes logical connections when the coordination is centralized. This + * information is needed for the RTI to perform the centralized coordination. + * The burden is on the federates to inform the RTI about relevant connections. * - * The next message to the RTI will a MSG_TYPE_UDP_PORT message, which has + * The next message to the RTI will be a MSG_TYPE_UDP_PORT message, which has * payload USHRT_MAX if clock synchronization is disabled altogether, 0 if * only initial clock synchronization is enabled, and a port number for * UDP communication if runtime clock synchronization is enabled. @@ -105,7 +114,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * If clock synchronization is enabled, then the federate will also * start a thread to listen for incoming UDP messages from the RTI. - * With period given by _LF_CLOCK_SYNC_PERIOD_NS, the RTI + * With period given by the `-c on period ` command-line argument, the RTI * will initiate a clock synchronization round by sending to the * federate a MSG_TYPE_CLOCK_SYNC_T1 message. A similar * protocol to that above is followed to estimate the average clock @@ -163,11 +172,10 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ -#ifndef RTI_H -#define RTI_H +#ifndef NET_COMMON_H +#define NET_COMMON_H #include -#include "reactor.h" /** * The timeout time in ns for TCP operations. @@ -592,6 +600,35 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define MSG_TYPE_PORT_ABSENT 23 + +/** + * A message that informs the RTI about connections between this federate and + * other federates where messages are routed through the RTI. Currently, this + * only includes logical connections when the coordination is centralized. This + * information is needed for the RTI to perform the centralized coordination. + * + * @note Only information about the immediate neighbors is required. The RTI can + * transitively obtain the structure of the federation based on each federate's + * immediate neighbor information. + * + * The next 4 bytes is the number of upstream federates. + * The next 4 bytes is the number of downstream federates. + * + * Depending on the first four bytes, the next bytes are pairs of (fed ID (2 + * bytes), delay (8 bytes)) for this federate's connection to upstream federates + * (by direct connection). The delay is the minimum "after" delay of all + * connections from the upstream federate. + * + * Depending on the second four bytes, the next bytes are fed IDs (2 + * bytes each), of this federate's downstream federates (by direct connection). + * + * @note The upstream and downstream connections are transmitted on the same + * message to prevent (at least to some degree) the scenario where the RTI has + * information about one, but not the other (which is a critical error). + */ +#define MSG_TYPE_NEIGHBOR_STRUCTURE 24 +#define MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE 9 + ///////////////////////////////////////////// //// Rejection codes @@ -615,66 +652,4 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. /** Connected to the wrong server. */ #define WRONG_SERVER 5 -///////////////////////////////////////////// -//// Data structures - -typedef enum socket_type_t { - TCP, - UDP -} socket_type_t; - -/** Mode of execution of a federate. */ -typedef enum execution_mode_t { - FAST, - REALTIME -} execution_mode_t; - -/** State of a federate during execution. */ -typedef enum fed_state_t { - NOT_CONNECTED, // The federate has not connected. - GRANTED, // Most recent MSG_TYPE_NEXT_EVENT_TAG has been granted. - PENDING // Waiting for upstream federates. -} fed_state_t; - -/** - * Information about a federate known to the RTI, including its runtime state, - * mode of execution, and connectivity with other federates. - * The list of upstream and downstream federates does not include - * those that are connected via a "physical" connection (one - * denoted with ~>) because those connections do not impose - * any scheduling constraints. - */ -typedef struct federate_t { - uint16_t id; // ID of this federate. - pthread_t thread_id; // The ID of the thread handling communication with this federate. - int socket; // The TCP socket descriptor for communicating with this federate. - struct sockaddr_in UDP_addr; // The UDP address for the federate. - bool clock_synchronization_enabled; // Indicates the status of clock synchronization - // for this federate. Enabled by default. - tag_t completed; // The largest logical tag completed by the federate (or NEVER if no LTC has been received). - tag_t last_granted; // The maximum TAG that has been granted so far (or NEVER if none granted) - tag_t last_provisionally_granted; // The maximum PTAG that has been provisionally granted (or NEVER if none granted) - tag_t next_event; // Most recent NET received from the federate (or NEVER if none received). - instant_t time_advance; // Most recent TAN received from the federate (or NEVER if none received). - fed_state_t state; // State of the federate. - int* upstream; // Array of upstream federate ids. - interval_t* upstream_delay; // Minimum delay on connections from upstream federates. - // Here, NEVER encodes no delay. 0LL is a microstep delay. - int num_upstream; // Size of the array of upstream federates and delays. - int* downstream; // Array of downstream federate ids. - int num_downstream; // Size of the array of downstream federates. - execution_mode_t mode; // FAST or REALTIME. - char server_hostname[INET_ADDRSTRLEN]; // Human-readable IP address and - int32_t server_port; // port number of the socket server of the federate - // if it has any incoming direct connections from other federates. - // The port number will be -1 if there is no server or if the - // RTI has not been informed of the port number. - struct in_addr server_ip_addr; // Information about the IP address of the socket - // server of the federate. - bool requested_stop; // Indicates that the federate has requested stop or has replied - // to a request for stop from the RTI. Used to prevent double-counting - // a federate when handling request_stop(). -} federate_t; - - -#endif /* RTI_H */ +#endif /* NET_COMMON_H */ diff --git a/org.lflang/src/lib/core/net_util.c b/org.lflang/src/lib/core/federated/net_util.c similarity index 99% rename from org.lflang/src/lib/core/net_util.c rename to org.lflang/src/lib/core/federated/net_util.c index d57cf5e697..30556eff60 100644 --- a/org.lflang/src/lib/core/net_util.c +++ b/org.lflang/src/lib/core/federated/net_util.c @@ -30,7 +30,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * Utility functions for a federate in a federated execution. */ -#include "util.h" +#include "../util.h" #include "net_util.h" #include #include @@ -489,8 +489,7 @@ void extract_header( *federate_id = extract_uint16(&(buffer[sizeof(uint16_t)])); // printf("DEBUG: Message for port %d of federate %d.\n", *port_id, *federate_id); - // FIXME: Better error handling needed here. - assert(*federate_id < NUMBER_OF_FEDERATES); + // The next four bytes are the message length. int32_t local_length_signed = extract_int32(&(buffer[sizeof(uint16_t) + sizeof(uint16_t)])); if (local_length_signed < 0) { diff --git a/org.lflang/src/lib/core/net_util.h b/org.lflang/src/lib/core/federated/net_util.h similarity index 99% rename from org.lflang/src/lib/core/net_util.h rename to org.lflang/src/lib/core/federated/net_util.h index 575bb04242..3fb037da01 100644 --- a/org.lflang/src/lib/core/net_util.h +++ b/org.lflang/src/lib/core/federated/net_util.h @@ -38,8 +38,8 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #ifndef NET_UTIL_H #define NET_UTIL_H -#include "platform.h" // defines lf_mutex_t -#include "tag.h" // Defines tag_t +#include "../platform.h" // defines lf_mutex_t +#include "../tag.h" // Defines tag_t #define HOST_LITTLE_ENDIAN 1 #define HOST_BIG_ENDIAN 2 diff --git a/org.lflang/src/lib/core/tag.h b/org.lflang/src/lib/core/tag.h index 23342997a2..54d2314788 100644 --- a/org.lflang/src/lib/core/tag.h +++ b/org.lflang/src/lib/core/tag.h @@ -34,6 +34,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define TAG_H #include "platform.h" +#include "limits.h" /* Conversion of time to nanoseconds. */ #define NSEC(t) (t * 1LL) diff --git a/org.lflang/src/org/lflang/generator/CGenerator.xtend b/org.lflang/src/org/lflang/generator/CGenerator.xtend index f421d250a6..38c1dc4e68 100644 --- a/org.lflang/src/org/lflang/generator/CGenerator.xtend +++ b/org.lflang/src/org/lflang/generator/CGenerator.xtend @@ -394,7 +394,21 @@ class CGenerator extends GeneratorBase { // Note that net_util.h/c are not used by the infrastructure // unless the program is federated, but they are often useful for user code, // so we include them anyway. - var coreFiles = newArrayList("net_util.c", "net_util.h", "reactor_common.c", "reactor.h", "pqueue.c", "pqueue.h", "tag.h", "tag.c", "trace.h", "trace.c", "util.h", "util.c", "platform.h") + var coreFiles = newArrayList( + "federated" + File.separator + "net_util.c", + "federated" + File.separator + "net_util.h", + "reactor_common.c", + "reactor.h", + "pqueue.c", + "pqueue.h", + "tag.h", + "tag.c", + "trace.h", + "trace.c", + "util.h", + "util.c", + "platform.h" + ); if (targetConfig.threads === 0) { coreFiles.add("reactor.c") } else { @@ -408,41 +422,47 @@ class CGenerator extends GeneratorBase { // for more detail. if ((OS.indexOf("mac") >= 0) || (OS.indexOf("darwin") >= 0)) { // Mac support - coreFiles.add("platform/lf_POSIX_threads_support.c") - coreFiles.add("platform/lf_C11_threads_support.c") - coreFiles.add("platform/lf_POSIX_threads_support.h") - coreFiles.add("platform/lf_C11_threads_support.h") - coreFiles.add("platform/lf_macos_support.c") - coreFiles.add("platform/lf_macos_support.h") - coreFiles.add("platform/lf_unix_clock_support.c") + coreFiles.add("platform" + File.separator + "lf_POSIX_threads_support.c") + coreFiles.add("platform" + File.separator + "lf_C11_threads_support.c") + coreFiles.add("platform" + File.separator + "lf_POSIX_threads_support.h") + coreFiles.add("platform" + File.separator + "lf_C11_threads_support.h") + coreFiles.add("platform" + File.separator + "lf_macos_support.c") + coreFiles.add("platform" + File.separator + "lf_macos_support.h") + coreFiles.add("platform" + File.separator + "lf_unix_clock_support.c") // If there is no main reactor, then compilation will produce a .o file requiring further linking. if (mainDef !== null) { - targetConfig.compileAdditionalSources.add(fileConfig.getSrcGenPath + File.separator + "core/platform/lf_macos_support.c") + targetConfig.compileAdditionalSources.add( + "core" + File.separator + "platform" + File.separator + "lf_macos_support.c" + ); } } else if (OS.indexOf("win") >= 0) { // Windows support - coreFiles.add("platform/lf_C11_threads_support.c") - coreFiles.add("platform/lf_C11_threads_support.h") - coreFiles.add("platform/lf_windows_support.c") - coreFiles.add("platform/lf_windows_support.h") + coreFiles.add("platform" + File.separator + "lf_C11_threads_support.c") + coreFiles.add("platform" + File.separator + "lf_C11_threads_support.h") + coreFiles.add("platform" + File.separator + "lf_windows_support.c") + coreFiles.add("platform" + File.separator + "lf_windows_support.h") // For 64-bit epoch time - coreFiles.add("platform/lf_unix_clock_support.c") + coreFiles.add("platform" + File.separator + "lf_unix_clock_support.c") // If there is no main reactor, then compilation will produce a .o file requiring further linking. if (mainDef !== null) { - targetConfig.compileAdditionalSources.add(fileConfig.getSrcGenPath + File.separator + "core/platform/lf_windows_support.c") + targetConfig.compileAdditionalSources.add( + "core" + File.separator + "platform" + File.separator + "lf_windows_support.c" + ) } } else if (OS.indexOf("nux") >= 0) { // Linux support - coreFiles.add("platform/lf_POSIX_threads_support.c") - coreFiles.add("platform/lf_C11_threads_support.c") - coreFiles.add("platform/lf_POSIX_threads_support.h") - coreFiles.add("platform/lf_C11_threads_support.h") - coreFiles.add("platform/lf_linux_support.c") - coreFiles.add("platform/lf_linux_support.h") - coreFiles.add("platform/lf_unix_clock_support.c") + coreFiles.add("platform" + File.separator + "lf_POSIX_threads_support.c") + coreFiles.add("platform" + File.separator + "lf_C11_threads_support.c") + coreFiles.add("platform" + File.separator + "lf_POSIX_threads_support.h") + coreFiles.add("platform" + File.separator + "lf_C11_threads_support.h") + coreFiles.add("platform" + File.separator + "lf_linux_support.c") + coreFiles.add("platform" + File.separator + "lf_linux_support.h") + coreFiles.add("platform" + File.separator + "lf_unix_clock_support.c") // If there is no main reactor, then compilation will produce a .o file requiring further linking. if (mainDef !== null) { - targetConfig.compileAdditionalSources.add(fileConfig.getSrcGenPath + File.separator + "core/platform/lf_linux_support.c") + targetConfig.compileAdditionalSources.add( + "core" + File.separator + "platform" + File.separator + "lf_linux_support.c" + ) } } else { errorReporter.reportError("Platform " + OS + " is not supported") @@ -452,8 +472,13 @@ class CGenerator extends GeneratorBase { // If there are federates, copy the required files for that. // Also, create the RTI C file and the launcher script. if (isFederated) { - coreFiles.addAll("rti.c", "rti.h", "federate.c", "federate.h", "clock-sync.h", "clock-sync.c") - createFederateRTI() + coreFiles.addAll( + "federated" + File.separator + "net_common.h", + "federated" + File.separator + "federate.c", + "federated" + File.separator + "federate.h", + "federated" + File.separator + "clock-sync.h", + "federated" + File.separator + "clock-sync.c" + ); createLauncher(coreFiles) } @@ -726,6 +751,10 @@ class CGenerator extends GeneratorBase { «ENDIF» } ''') + + if (isFederated) { + pr(generateFederateNeighborStructure(federate).toString()); + } // Generate function to schedule shutdown reactions if any // reactors have reactions to shutdown. @@ -774,9 +803,6 @@ class CGenerator extends GeneratorBase { if (!targetConfig.noCompile) { if (!targetConfig.buildCommands.nullOrEmpty) { runBuildCommand() - } else if (isFederated) { - // Compile the RTI files if there is more than one federate. - compileRTI() } } @@ -856,7 +882,7 @@ class CGenerator extends GeneratorBase { // Insert the #defines at the beginning code.insert(0, ''' #define _LF_CLOCK_SYNC_INITIAL - #define _LF_CLOCK_SYNC_PERIOD_NS «targetConfig.clockSyncOptions.period» + #define _LF_CLOCK_SYNC_PERIOD_NS «targetConfig.clockSyncOptions.period.timeInTargetLanguage» #define _LF_CLOCK_SYNC_EXCHANGES_PER_INTERVAL «targetConfig.clockSyncOptions.trials» #define _LF_CLOCK_SYNC_ATTENUATION «targetConfig.clockSyncOptions.attenuation» ''') @@ -1009,189 +1035,127 @@ class CGenerator extends GeneratorBase { //////////////////////////////////////////// //// Code generators. - /** Create the runtime infrastructure (RTI) source file. + /** + * Generate code that sends the neighbor structure message to the RTI. + * @see MSG_TYPE_NEIGHBOR_STRUCTURE in net_common.h + * + * @param federate The federate that is sending its neighbor structure */ - override createFederateRTI() { - // Derive target filename from the .lf filename. - var cFilename = getTargetFileName(fileConfig.RTIBinName) - - - - // Delete source previously produced by the LF compiler. - var file = fileConfig.getSrcGenPath.resolve(cFilename).toFile - if (file.exists) { - file.delete - } + def generateFederateNeighborStructure(FederateInstance federate) { - // Delete binary previously produced by the C compiler. - file = fileConfig.binPath.resolve(topLevelName).toFile - if (file.exists) { - file.delete - } - - val rtiCode = new StringBuilder() - pr(rtiCode, this.defineLogLevel) - - if (targetConfig.clockSync == ClockSyncMode.INITIAL) { - pr(rtiCode, ''' - #define _LF_CLOCK_SYNC_INITIAL - #define _LF_CLOCK_SYNC_PERIOD_NS «targetConfig.clockSyncOptions.period.toNanoSeconds» - #define _LF_CLOCK_SYNC_EXCHANGES_PER_INTERVAL «targetConfig.clockSyncOptions.trials» - ''') - } else if (targetConfig.clockSync == ClockSyncMode.ON) { - pr(rtiCode, ''' - #define _LF_CLOCK_SYNC_INITIAL - #define _LF_CLOCK_SYNC_ON - #define _LF_CLOCK_SYNC_PERIOD_NS «targetConfig.clockSyncOptions.period.toNanoSeconds» - #define _LF_CLOCK_SYNC_EXCHANGES_PER_INTERVAL «targetConfig.clockSyncOptions.trials» - ''') - } + val rtiCode = new StringBuilder(); pr(rtiCode, ''' - #ifdef NUMBER_OF_FEDERATES - #undefine NUMBER_OF_FEDERATES - #endif - #define NUMBER_OF_FEDERATES «federates.size» - #include "core/rti.c" - int main(int argc, char* argv[]) { + /** + * Generated function that sends information about connections between this federate and + * other federates where messages are routed through the RTI. Currently, this + * only includes logical connections when the coordination is centralized. This + * information is needed for the RTI to perform the centralized coordination. + * @see MSG_TYPE_NEIGHBOR_STRUCTURE in net_common.h + */ + void send_neighbor_structure_to_RTI(int rti_socket) { ''') - indent(rtiCode) - - // Initialize the array of information that the RTI has about the - // federates. - // FIXME: No support below for some federates to be FAST and some REALTIME. + + indent(rtiCode); + + // Initialize the array of information about the federate's immediate upstream + // and downstream relayed (through the RTI) logical connections, to send to the + // RTI. pr(rtiCode, ''' - if (!process_args(argc, argv)) { - // Processing command-line arguments failed. - return -1; - } - printf("Starting RTI for %d federates in federation ID %s\n", NUMBER_OF_FEDERATES, federation_id); - assert(NUMBER_OF_FEDERATES < UINT16_MAX); - for (uint16_t i = 0; i < NUMBER_OF_FEDERATES; i++) { - initialize_federate(i); - «IF targetConfig.fastMode» - federates[i].mode = FAST; - «ENDIF» - } - #pragma GCC diagnostic push - #pragma GCC diagnostic ignored "-Wunused-variable" interval_t candidate_tmp; - #pragma GCC diagnostic pop + size_t buffer_size = 1 + 8 + + «federate.dependsOn.keySet.size» * ( sizeof(uint16_t) + sizeof(int64_t) ) + + «federate.sendsTo.keySet.size» * sizeof(uint16_t); + unsigned char buffer_to_send[buffer_size]; + + size_t message_head = 0; + buffer_to_send[message_head] = MSG_TYPE_NEIGHBOR_STRUCTURE; + message_head++; + encode_int32((int32_t)«federate.dependsOn.keySet.size», &(buffer_to_send[message_head])); + message_head+=sizeof(int32_t); + encode_int32((int32_t)«federate.sendsTo.keySet.size», &(buffer_to_send[message_head])); + message_head+=sizeof(int32_t); ''') - // Initialize the arrays indicating connectivity to upstream and downstream federates. - for(federate : federates) { - if (!federate.dependsOn.keySet.isEmpty) { - // Federate receives non-physical messages from other federates. - // Initialize the upstream and upstream_delay arrays. - val numUpstream = federate.dependsOn.keySet.size - // Allocate memory for the arrays storing the connectivity information. + + if (!federate.dependsOn.keySet.isEmpty) { + // Next, populate these arrays. + // Find the minimum delay in the process. + // FIXME: Zero delay is not really the same as a microstep delay. + for (upstreamFederate : federate.dependsOn.keySet) { pr(rtiCode, ''' - federates[«federate.id»].upstream = (int*)malloc(sizeof(federate_t*) * «numUpstream»); - federates[«federate.id»].upstream_delay = (interval_t*)malloc(sizeof(interval_t*) * «numUpstream»); - federates[«federate.id»].num_upstream = «numUpstream»; + encode_uint16((uint16_t)«upstreamFederate.id», &(buffer_to_send[message_head])); + message_head += sizeof(uint16_t); ''') - // Next, populate these arrays. - // Find the minimum delay in the process. - // FIXME: Zero delay is not really the same as a microstep delay. - var count = 0; - for (upstreamFederate : federate.dependsOn.keySet) { + // The minimum delay calculation needs to be made in the C code because it + // may depend on parameter values. + // FIXME: These would have to be top-level parameters, which don't really + // have any support yet. Ideally, they could be overridden on the command line. + // When that is done, they will need to be in scope here. + val delays = federate.dependsOn.get(upstreamFederate) + if (delays !== null) { + // There is at least one delay, so find the minimum. + // If there is no delay at all, this is encoded as NEVER. pr(rtiCode, ''' - federates[«federate.id»].upstream[«count»] = «upstreamFederate.id»; + candidate_tmp = FOREVER; ''') - // The minimum delay calculation needs to be made in the C code because it - // may depend on parameter values. - // FIXME: These would have to be top-level parameters, which don't really - // have any support yet. Ideally, they could be overridden on the command line. - // When that is done, they will need to be in scope here. - val delays = federate.dependsOn.get(upstreamFederate) - if (delays !== null) { - // There is at least one delay, so find the minimum. - // If there is no delay at all, this is encoded as NEVER. - pr(rtiCode, ''' - federates[«federate.id»].upstream_delay[«count»] = NEVER; - candidate_tmp = FOREVER; - ''') - for (delay : delays) { - if (delay === null) { - // Use NEVER to encode no delay at all. - pr(rtiCode, ''' - candidate_tmp = NEVER; - ''') - } else { - var delayTime = delay.getTargetTime - if (delay.parameter !== null) { - // The delay is given as a parameter reference. Find its value. - delayTime = ASTUtils.getInitialTimeValue(delay.parameter).timeInTargetLanguage - } - pr(rtiCode, ''' - if («delayTime» < candidate_tmp) { - candidate_tmp = «delayTime»; - } - ''') + for (delay : delays) { + if (delay === null) { + // Use NEVER to encode no delay at all. + pr(rtiCode, ''' + candidate_tmp = NEVER; + ''') + } else { + var delayTime = delay.getTargetTime + if (delay.parameter !== null) { + // The delay is given as a parameter reference. Find its value. + delayTime = ASTUtils.getInitialTimeValue(delay.parameter).timeInTargetLanguage } + pr(rtiCode, ''' + if («delayTime» < candidate_tmp) { + candidate_tmp = «delayTime»; + } + ''') } - pr(rtiCode, ''' - if (candidate_tmp < FOREVER) { - federates[«federate.id»].upstream_delay[«count»] = candidate_tmp; - } - ''') - } else { - // Use NEVER to encode no delay at all. - pr(rtiCode, ''' - federates[«federate.id»].upstream_delay[«count»] = NEVER; - ''') } - count++; - } - } - // Next, set up the downstream array. - if (!federate.sendsTo.keySet.isEmpty) { - // Federate sends non-physical messages to other federates. - // Initialize the downstream array. - val numDownstream = federate.sendsTo.keySet.size - // Allocate memory for the array. - pr(rtiCode, ''' - federates[«federate.id»].downstream = (int*)malloc(sizeof(federate_t*) * «numDownstream»); - federates[«federate.id»].num_downstream = «numDownstream»; - ''') - // Next, populate the array. - // Find the minimum delay in the process. - // FIXME: Zero delay is not really the same as a microstep delay. - var count = 0; - for (downstreamFederate : federate.sendsTo.keySet) { + pr(rtiCode, ''' + encode_int64((int64_t)candidate_tmp, &(buffer_to_send[message_head])); + message_head += sizeof(int64_t); + ''') + } else { + // Use NEVER to encode no delay at all. pr(rtiCode, ''' - federates[«federate.id»].downstream[«count»] = «downstreamFederate.id»; + encode_int64(NEVER, &(buffer_to_send[message_head])); + message_head += sizeof(int64_t); ''') - count++; } } } - // Start the RTI server before launching the federates because if it - // fails, e.g. because the port is not available, then we don't want to - // launch the federates. - // Also, generate code that blocks until the federates resign. - pr(rtiCode, ''' - int socket_descriptor = start_rti_server(«federationRTIProperties.get('port')»); - wait_for_federates(socket_descriptor); - ''') + // Next, set up the downstream array. + if (!federate.sendsTo.keySet.isEmpty) { + // Next, populate the array. + // Find the minimum delay in the process. + // FIXME: Zero delay is not really the same as a microstep delay. + for (downstreamFederate : federate.sendsTo.keySet) { + pr(rtiCode, ''' + encode_uint16(«downstreamFederate.id», &(buffer_to_send[message_head])); + message_head += sizeof(uint16_t); + ''') + } + } - // Handle RTI's exit pr(rtiCode, ''' - printf("RTI is exiting.\n"); - return 0; + write_to_socket_errexit( + rti_socket, + buffer_size, + buffer_to_send, + "Failed to send the neighbor structure message to the RTI." + ); ''') unindent(rtiCode) pr(rtiCode, "}") - var fOut = new FileOutputStream(fileConfig.getSrcGenPath.resolve(cFilename).toFile); - fOut.write(rtiCode.toString().getBytes()) - fOut.close() - - // Write a Dockerfile for the RTI. - if (targetConfig.dockerOptions !== null) { - writeDockerFile(fileConfig.RTIBinName) - } + return rtiCode; } /** @@ -1284,7 +1248,7 @@ class CGenerator extends GeneratorBase { # Create a random 48-byte text ID for this federation. # The likelihood of two federations having the same ID is 1/16,777,216 (1/2^24). FEDERATION_ID=`openssl rand -hex 24` - echo "Federate «topLevelName» in Federation ID "$FEDERATION_ID + echo "Federate «topLevelName» in Federation ID '$FEDERATION_ID'" # Launch the federates: ''') val distHeader = ''' @@ -1297,23 +1261,40 @@ class CGenerator extends GeneratorBase { var target = host var path = federationRTIProperties.get('dir') - if(path === null) path = 'LinguaFrancaRemote' + if(path === null) path = '''LinguaFrancaRemote''' var user = federationRTIProperties.get('user') if (user !== null) { target = user + '@' + host } + + var RTILaunchString = ''' + RTI -i ${FEDERATION_ID} \ + -n «federates.size» \ + -c «targetConfig.clockSync.toString()» «IF targetConfig.clockSync == ClockSyncMode.ON» \ + period «targetConfig.clockSyncOptions.period.toNanoSeconds» «ENDIF» \ + exchanges-per-interval «targetConfig.clockSyncOptions.trials» \ + & + ''' + // Launch the RTI in the foreground. if (host == 'localhost' || host == '0.0.0.0') { // FIXME: the paths below will not work on Windows pr(shCode, ''' echo "#### Launching the runtime infrastructure (RTI)." + # First, check if the RTI is on the PATH + if ! command -v RTI &> /dev/null + then + echo "RTI could not be found." + echo "The source code can be found in org.lflang/src/lib/core/federated/RTI" + exit + fi # The RTI is started first to allow proper boot-up # before federates will try to connect. # The RTI will be brought back to foreground # to be responsive to user inputs after all federates # are launched. - «fileConfig.binPath.resolve(topLevelName) + FileConfig.RTI_BIN_SUFFIX» -i $FEDERATION_ID & + «RTILaunchString» # Store the PID of the RTI RTI=$! # Wait for the RTI to boot up before @@ -1328,32 +1309,6 @@ class CGenerator extends GeneratorBase { if (distCode.length === 0) pr(distCode, distHeader) val logFileName = '''log/«topLevelName»_RTI.log''' - val compileCommand = '''«this.targetConfig.compiler» «targetConfig.compilerFlags.join(" ")» src-gen/«topLevelName»_RTI.c -o bin/«topLevelName»_RTI -pthread''' - - // The mkdir -p flag below creates intermediate directories if needed. - pr(distCode, ''' - cd «path» - echo "Making directory «path» and subdirectories src-gen and path on host «target»" - ssh «target» '\ - mkdir -p «path»/src-gen «path»/bin «path»/log «path»/src-gen/core; \ - echo "--------------" >> «path»/«logFileName»; \ - date >> «path»/«logFileName»; \ - ' - pushd src-gen/core > /dev/null - echo "Copying LF core files for RTI to host «target»" - scp rti.c rti.h tag.c tag.h util.h util.c net_util.h net_util.c reactor.h pqueue.h trace.c trace.h «target»:«path»/src-gen/core - popd > /dev/null - pushd src-gen > /dev/null - echo "Copying source files for RTI to host «target»" - scp «topLevelName»_RTI.c ctarget.h «target»:«path»/src-gen - popd > /dev/null - echo "Compiling on host «target» using: «targetConfig.compiler» «targetConfig.compilerFlags.join(" ")» «path»/src-gen/«topLevelName»_RTI.c -o «path»/bin/«fileConfig.RTIBinName» -pthread" - ssh «target» ' \ - cd «path»; \ - echo "In «path» compiling RTI with: «compileCommand»" >> «logFileName» 2>&1; \ - # Capture the output in the log file and stdout. - «compileCommand» 2>&1 | tee -a «logFileName»;' - ''') // Launch the RTI on the remote machine using ssh and screen. // The -t argument to ssh creates a virtual terminal, which is needed by screen. @@ -1368,18 +1323,33 @@ class CGenerator extends GeneratorBase { // The cryptic 2>&1 reroutes stderr to stdout so that both are returned. // The sleep at the end prevents screen from exiting before outgoing messages from // the federate have had time to go out to the RTI through the socket. - val executeCommand = '''bin/«fileConfig.RTIBinName» -i '$FEDERATION_ID' ''' + RTILaunchString = ''' + RTI -i '${FEDERATION_ID}' \ + -n «federates.size» \ + -c «targetConfig.clockSync.toString()» «IF targetConfig.clockSync == ClockSyncMode.ON» \ + period «targetConfig.clockSyncOptions.period.toNanoSeconds» «ENDIF» \ + exchanges-per-interval «targetConfig.clockSyncOptions.trials» \ + & + ''' + pr(shCode, ''' echo "#### Launching the runtime infrastructure (RTI) on remote host «host»." # FIXME: Killing this ssh does not kill the remote process. # A double -t -t option to ssh forces creation of a virtual terminal, which # fixes the problem, but then the ssh command does not execute. The remote # federate does not start! - ssh «target» 'cd «path»; \ + ssh «target» 'mkdir -p log; \ echo "-------------- Federation ID: "'$FEDERATION_ID' >> «logFileName»; \ date >> «logFileName»; \ - echo "In «path», executing RTI: «executeCommand»" 2>&1 | tee -a «logFileName»; \ - «executeCommand» 2>&1 | tee -a «logFileName»' & + echo "Executing RTI: «RTILaunchString»" 2>&1 | tee -a «logFileName»; \ + # First, check if the RTI is on the PATH + if ! command -v RTI &> /dev/null + then + echo "RTI could not be found." + echo "The source code can be found in org.lflang/src/lib/core/federated/RTI" + exit + fi + «RTILaunchString» 2>&1 | tee -a «logFileName»' & # Store the PID of the channel to RTI RTI=$! # Wait for the RTI to boot up before @@ -1395,29 +1365,27 @@ class CGenerator extends GeneratorBase { if (federate.host !== null && federate.host != 'localhost' && federate.host != '0.0.0.0') { if(distCode.length === 0) pr(distCode, distHeader) val logFileName = '''log/«topLevelName»_«federate.name».log''' - val compileCommand = '''«targetConfig.compiler» src-gen/«topLevelName»_«federate.name».c -o bin/«topLevelName»_«federate.name» -pthread «targetConfig.compilerFlags.join(" ")»''' + val compileCommand = compileCCommand('''«topLevelName»_«federate.name»''', false).command.join(" ") + //'''«targetConfig.compiler» src-gen/«topLevelName»_«federate.name».c -o bin/«topLevelName»_«federate.name» -pthread «targetConfig.compilerFlags.join(" ")»''' // FIXME: Should $FEDERATION_ID be used to ensure unique directories, executables, on the remote host? pr(distCode, ''' - echo "Making directory «path» and subdirectories src-gen, src-gen/core, and log on host «federate.host»" + echo "Making directory «path» and subdirectories src-gen, bin, and log on host «federate.host»" # The >> syntax appends stdout to a file. The 2>&1 appends stderr to the same file. ssh «federate.host» '\ - mkdir -p «path»/src-gen «path»/bin «path»/log «path»/src-gen/core; \ + mkdir -p «path»/src-gen/federated/«topLevelName»/core «path»/bin «path»/log «path»/src-gen/core/federated; \ echo "--------------" >> «path»/«logFileName»; \ date >> «path»/«logFileName»; ' - pushd src-gen/core > /dev/null + pushd «fileConfig.srcGenPath» > /dev/null echo "Copying LF core files to host «federate.host»" - scp «coreFiles.join(" ")» «federate.host»:«path»/src-gen/core - popd > /dev/null - pushd src-gen > /dev/null + scp -r core «federate.host»:«path»/src-gen/federated/«topLevelName» echo "Copying source files to host «federate.host»" - scp «topLevelName»_«federate.name».c «FOR file:targetConfig.filesNamesWithoutPath SEPARATOR " "»«file»«ENDFOR» ctarget.h «federate.host»:«path»/src-gen + scp «topLevelName»_«federate.name».c «FOR file:targetConfig.filesNamesWithoutPath SEPARATOR " "»«file»«ENDFOR» ctarget.h «federate.host»:«path»/src-gen/federated/«topLevelName» popd > /dev/null echo "Compiling on host «federate.host» using: «compileCommand»" - ssh «federate.host» '\ - cd «path»; \ + ssh «federate.host» 'cd «path»; \ echo "In «path» compiling with: «compileCommand»" >> «logFileName» 2>&1; \ - # Capture the output in the log file and stdout. + # Capture the output in the log file and stdout. \ «compileCommand» 2>&1 | tee -a «logFileName»;' ''') val executeCommand = '''bin/«topLevelName»_«federate.name» -i '$FEDERATION_ID' ''' @@ -4680,7 +4648,7 @@ class CGenerator extends GeneratorBase { pr("#include \"core/reactor.c\"") } if (isFederated) { - pr("#include \"core/federate.c\"") + pr("#include \"core/federated/federate.c\"") } } diff --git a/org.lflang/src/org/lflang/generator/GeneratorBase.xtend b/org.lflang/src/org/lflang/generator/GeneratorBase.xtend index bd583cec6f..35f0f7d868 100644 --- a/org.lflang/src/org/lflang/generator/GeneratorBase.xtend +++ b/org.lflang/src/org/lflang/generator/GeneratorBase.xtend @@ -661,149 +661,6 @@ abstract class GeneratorBase extends AbstractLFValidator { return "0" // FIXME: do this or throw exception? } - // ////////////////////////////////////////// - // Protected methods for code generation - // of the RTI. - // FIXME: Allow target code generators to specify the directory - // structure for the generated C RTI? - /** Create the runtime infrastructure (RTI) source file. - */ - def createFederateRTI() { - // Derive target filename from the .lf filename. - var cFilename = fileConfig.name + "_RTI.c" - - // Delete source previously produced by the LF compiler. - // - var file = fileConfig.RTISrcPath.resolve(cFilename).toFile - if (file.exists) { - file.delete - } - - // Also make sure the directory exists. - if (!file.parentFile.exists || !file.parentFile.isDirectory) { - file.mkdirs - } - - // Delete binary previously produced by the C compiler. - file = fileConfig.RTIBinPath.resolve(fileConfig.name).toFile - if (file.exists) { - file.delete - } - - val rtiCode = new StringBuilder() - pr(rtiCode, ''' - #ifdef NUMBER_OF_FEDERATES - #undefine NUMBER_OF_FEDERATES - #endif - #define NUMBER_OF_FEDERATES «federates.size» - #include "rti.c" - int main(int argc, char* argv[]) { - ''') - indent(rtiCode) - - // Initialize the array of information that the RTI has about the - // federates. - // FIXME: No support below for some federates to be FAST and some REALTIME. - pr(rtiCode, ''' - for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { - initialize_federate(i); - «IF targetConfig.fastMode» - federates[i].mode = FAST; - «ENDIF» - } - ''') - // Initialize the arrays indicating connectivity to upstream and downstream federates. - for (federate : federates) { - if (federate.dependsOn.size > 0) { - // Federate receives non-physical messages from other federates. - // Initialize the upstream and upstream_delay arrays. - val numUpstream = federate.dependsOn.size - // Allocate memory for the arrays storing the connectivity information. - pr(rtiCode, ''' - federates[«federate.id»].upstream = (int*)malloc(sizeof(federate_t*) * «numUpstream»); - federates[«federate.id»].upstream_delay = (interval_t*)malloc(sizeof(interval_t*) * «numUpstream»); - federates[«federate.id»].num_upstream = «numUpstream»; - ''') - // Next, populate these arrays. - // Find the minimum delay in the process. - // No delay is encoded as NEVER. - var count = 0; - for (upstreamFederate : federate.dependsOn.keySet) { - pr(rtiCode, ''' - federates[«federate.id»].upstream[«count»] = «upstreamFederate.id»; - federates[«federate.id»].upstream_delay[«count»] = NEVER; - ''') - // The minimum delay calculation needs to be made in the C code because it - // may depend on parameter values. - // FIXME: These would have to be top-level parameters, which don't really - // have any support yet. Ideally, they could be overridden on the command line. - // When that is done, they will need to be in scope here. - val delays = federate.dependsOn.get(upstreamFederate) - if (delays !== null) { - for (delay : delays) { - // If delay is null, use the default, NEVER. Otherwise, override if less than seen. - if (delay !== null) { - pr(rtiCode, ''' - if (federates[«federate.id»].upstream_delay[«count»] < «delay.getRTITime») { - federates[«federate.id»].upstream_delay[«count»] = «delay.getRTITime»; - } - ''') - } - } - } - count++; - } - } - // Next, set up the downstream array. - if (!federate.sendsTo.keySet.isEmpty) { - // Federate sends non-physical messages to other federates. - // Initialize the downstream array. - val numDownstream = federate.sendsTo.keySet.size - // Allocate memory for the array. - pr(rtiCode, ''' - federates[«federate.id»].downstream = (int*)malloc(sizeof(federate_t*) * «numDownstream»); - federates[«federate.id»].num_downstream = «numDownstream»; - ''') - // Next, populate the array. - // Find the minimum delay in the process. - // FIXME: Zero delay is not really the same as a microstep delay. - var count = 0; - for (downstreamFederate : federate.sendsTo.keySet) { - pr(rtiCode, ''' - federates[«federate.id»].downstream[«count»] = «downstreamFederate.id»; - ''') - count++; - } - } - } - - // Start the RTI server before launching the federates because if it - // fails, e.g. because the port is not available, then we don't want to - // launch the federates. - // Also generate code that blocks until the federates resign. - pr(rtiCode, ''' - int socket_descriptor = start_rti_server(«federationRTIProperties.get('port')»); - wait_for_federates(socket_descriptor); - ''') - - unindent(rtiCode) - pr(rtiCode, "}") - - var fOut = new FileOutputStream(fileConfig.RTISrcPath.resolve(cFilename).toFile); - fOut.write(rtiCode.toString().getBytes()) - fOut.close() - } - - /** - * Invoke the C compiler on the generated RTI - * The C RTI is used across targets. Thus we need to be able to compile - * it from GeneratorBase. - */ - def compileRTI() { - var fileToCompile = fileConfig.name + '_RTI' - runCCompiler(fileToCompile, false) - } - /** * Run the C compiler. * @@ -923,7 +780,11 @@ abstract class GeneratorBase extends AbstractLFValidator { var compileArgs = newArrayList compileArgs.add(relSrcPathString) - compileArgs.addAll(targetConfig.compileAdditionalSources) + for (file: targetConfig.compileAdditionalSources) { + var relativePath = fileConfig.outPath.relativize( + fileConfig.getSrcGenPath.resolve(Paths.get(file))) + compileArgs.add(FileConfig.toUnixString(relativePath)) + } compileArgs.addAll(targetConfig.compileLibraries) // Only set the output file name if it hasn't already been set diff --git a/org.lflang/src/org/lflang/generator/TypeScriptGenerator.xtend b/org.lflang/src/org/lflang/generator/TypeScriptGenerator.xtend index 0d57ad3d48..9979d18761 100644 --- a/org.lflang/src/org/lflang/generator/TypeScriptGenerator.xtend +++ b/org.lflang/src/org/lflang/generator/TypeScriptGenerator.xtend @@ -335,7 +335,6 @@ class TypeScriptGenerator extends GeneratorBase { // If this is a federated execution, generate C code for the RTI. if (isFederated) { - createFederateRTI() // Copy the required library files into the target file system. // This will overwrite previous versions. @@ -347,7 +346,6 @@ class TypeScriptGenerator extends GeneratorBase { fileConfig.getSrcGenPath.toString + File.separator + file ) } - compileRTI() } } diff --git a/test/C/src/federated/ClockSync.lf b/test/C/src/federated/ClockSync.lf index 1382288728..e11336fb77 100644 --- a/test/C/src/federated/ClockSync.lf +++ b/test/C/src/federated/ClockSync.lf @@ -22,7 +22,7 @@ target C { period: 5 msec, // Period with which runtime clock sync is performed. trials: 10, // Number of messages exchanged to perform clock sync. attenuation: 10 // Attenuation applied to runtime clock sync adjustments. - }, + } }; /** diff --git a/test/C/src/federated/DistributedCount.lf b/test/C/src/federated/DistributedCount.lf index 8e58b64ea3..00ccabeb6b 100644 --- a/test/C/src/federated/DistributedCount.lf +++ b/test/C/src/federated/DistributedCount.lf @@ -33,7 +33,7 @@ reactor Print { =} } -federated reactor DistributedCount(offset:time(200 msec)) { +federated reactor DistributedCount(offset:time(200 msec)) { c = new Count(); p = new Print(); c.out -> p.in after offset; diff --git a/test/C/src/federated/DistributedCountDecentralized.lf b/test/C/src/federated/DistributedCountDecentralized.lf index 23ca6cae42..ee651dae13 100644 --- a/test/C/src/federated/DistributedCountDecentralized.lf +++ b/test/C/src/federated/DistributedCountDecentralized.lf @@ -18,10 +18,10 @@ reactor Print { interval_t elapsed_time = get_elapsed_logical_time(); printf("At tag (%lld, %u), received %d. " "The original intended tag of the message was (%lld, %u).\n", - elapsed_time, + (long long int)elapsed_time, get_microstep(), in->value, - in->intended_tag.time - get_start_time(), + (long long int)(in->intended_tag.time - get_start_time()), in->intended_tag.microstep); if (in->value != self->c) { printf("Expected to receive %d.\n", self->c);