diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 25c832772..871dc1b9f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,7 +33,7 @@ jobs: lf-gedf-np: needs: fetch-lf - uses: lf-lang/lingua-franca/.github/workflows/c-tests.yml@c-new-scheduler + uses: lf-lang/lingua-franca/.github/workflows/c-tests.yml@master with: runtime-ref: ${{ github.ref }} compiler-ref: ${{ needs.fetch-lf.outputs.ref }} @@ -41,7 +41,7 @@ jobs: lf-gedf-np-ci: needs: fetch-lf - uses: lf-lang/lingua-franca/.github/workflows/c-tests.yml@c-new-scheduler + uses: lf-lang/lingua-franca/.github/workflows/c-tests.yml@master with: runtime-ref: ${{ github.ref }} compiler-ref: ${{ needs.fetch-lf.outputs.ref }} diff --git a/core/reactor.c b/core/reactor.c index 938ee53b4..813413097 100644 --- a/core/reactor.c +++ b/core/reactor.c @@ -158,6 +158,13 @@ void print_snapshot() { * worker number does not make sense (e.g., the caller is not a worker thread). */ void _lf_trigger_reaction(reaction_t* reaction, int worker_number) { +#ifdef MODAL_REACTORS + // Check if reaction is disabled by mode inactivity + if (!_lf_mode_is_active(reaction->mode)) { + DEBUG_PRINT("Suppressing downstream reaction %s due inactivity of mode %s.", reaction->name, reaction->mode->name); + return; // Suppress reaction by preventing entering reaction queue + } +#endif // Do not enqueue this reaction twice. if (reaction->status == inactive) { DEBUG_PRINT("Enqueing downstream reaction %s, which has level %lld.", @@ -233,6 +240,11 @@ int _lf_do_step() { reaction->status = inactive; } +#ifdef MODAL_REACTORS + // At the end of the step, perform mode transitions + _lf_handle_mode_changes(); +#endif + // No more reactions should be blocked at this point. //assert(pqueue_size(blocked_q) == 0); diff --git a/core/reactor.h b/core/reactor.h index f39528353..0bc0f05a3 100644 --- a/core/reactor.h +++ b/core/reactor.h @@ -247,6 +247,21 @@ do { \ #endif +/** + * Sets the next mode of a modal reactor. Same as SET for outputs, only + * the last value will have effect if invoked multiple times. + * Works only in reactions with the target mode declared as effect. + * + * @param mode The target mode to set for activation. + */ +#ifdef MODAL_REACTORS +#define _LF_SET_MODE(mode) \ +do { \ + self->_lf__mode_state.next_mode = mode; \ + self->_lf__mode_state.mode_change = _lf_##mode##_change_type; \ +} while(0) +#endif + /** * Macro for extracting the deadline from the index of a reaction. * The reaction queue is sorted according to this index, and the @@ -442,6 +457,45 @@ typedef struct token_present_t { bool reset_is_present; // True to set is_present to false after calling done_using(). } token_present_t; + +#ifdef MODAL_REACTORS +/** Typedef for reactor_mode_t struct, used for representing a mode. */ +typedef struct reactor_mode_t reactor_mode_t; +/** Typedef for reactor_mode_state_t struct, used for storing modal state of reactor and/or its relation to enclosing modes. */ +typedef struct reactor_mode_state_t reactor_mode_state_t; +/** Typedef for mode_state_variable_reset_data_t struct, used for storing data for resetting state variables nested in modes. */ +typedef struct mode_state_variable_reset_data_t mode_state_variable_reset_data_t; + +/** A struct to represent a single mode instace in a reactor instance. */ +struct reactor_mode_t { + reactor_mode_state_t* state; // Pointer to a struct with the reactor's mode state. INSTANCE. + string name; // Name of this mode. + instant_t deactivation_time; // Time when the mode was left. +}; +/** A struct to store state of the modes in a reactor instance and/or its relation to enclosing modes. */ +struct reactor_mode_state_t { + reactor_mode_t* parent_mode; // Pointer to the next enclosing mode (if exsits). + reactor_mode_t* initial_mode; // Pointer to the initial mode. + reactor_mode_t* active_mode; // Pointer to the currently active mode. + reactor_mode_t* next_mode; // Pointer to the next mode to activate at the end of this step (if set). + char mode_change; // A mode change type flag (0: no change, 1: reset, 2: history). +}; +/** A struct to store data for resetting state variables nested in modes. */ +struct mode_state_variable_reset_data_t { + reactor_mode_t* mode; // Pointer to the enclosing mode. + void* target; // Pointer to the target variable. + void* source; // Pointer to the data source. + size_t size; // The size of the variable. +}; +#else +/* + * Reactions and triggers must have a mode pointer to set up connection to enclosing modes, + * also when they are precompiled without modal reactors in order to later work in modal reactors. + * Hence define mode type as void in the absence of modes to treat mode pointer as void pointers for that time being. + */ +typedef void reactor_mode_t; +#endif + /** * Reaction activation record to push onto the reaction queue. * Some of the information in this struct is common among all instances @@ -487,6 +541,8 @@ struct reaction_t { char* name; // If logging is set to LOG or higher, then this will // point to the full name of the reactor followed by // the reaction number. + reactor_mode_t* mode; // The enclosing mode of this reaction (if exists). + // If enclosed in multiple, this will point to the innermost mode. }; /** Typedef for event_t struct, used for storing activation records. */ @@ -537,6 +593,8 @@ struct trigger_t { // coordination. // - Finally, if status is 'present', then this is an error since multiple // downstream messages have been produced for the same port for the same logical time. + reactor_mode_t* mode; // The enclosing mode of this reaction (if exists). + // If enclosed in multiple, this will point to the innermost mode. #ifdef FEDERATED tag_t last_known_status_tag; // Last known status of the port, either via a timed message, a port absent, or a // TAG from the RTI. @@ -717,6 +775,13 @@ void terminate_execution(void); */ bool _lf_trigger_shutdown_reactions(void); +/** + * Function (to be code generated) to handle mode changes. + */ +#ifdef MODAL_REACTORS +void _lf_handle_mode_changes(); +#endif + /** * Create a new token and initialize it. * The value pointer will be NULL and the length will be 0. diff --git a/core/reactor_common.c b/core/reactor_common.c index f3fef046a..32fa3006e 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -167,6 +167,13 @@ parse_rti_code_t parse_rti_addr(char* rti_addr); void set_federation_id(char* fid); #endif +// Forward declaration for mode related functions +#ifdef MODAL_REACTORS +bool _lf_mode_is_active(reactor_mode_t* mode); +void _lf_add_suspended_event(event_t* event); +void _lf_terminate_modal_reactors(); +#endif + /** * Allocate memory using calloc (so the allocated memory is zeroed out) * and record the allocated memory on the specified self struct so that @@ -587,6 +594,14 @@ void _lf_pop_events() { continue; } +#ifdef MODAL_REACTORS + // If this event is associated with an incative it should haven been suspended and no longer on the event queue. + // FIXME This should not be possible + if (!_lf_mode_is_active(event->trigger->mode)) { + warning_print("Assumption violated. There is an event on the event queue that is associated to an inactive mode."); + } +#endif + lf_token_t *token = event->token; // Put the corresponding reactions onto the reaction queue. @@ -616,6 +631,14 @@ void _lf_pop_events() { } } #endif + +#ifdef MODAL_REACTORS + // Check if reaction is disabled by mode inactivity + if (!_lf_mode_is_active(reaction->mode)) { + DEBUG_PRINT("Suppressing reaction %s due inactive mode.", reaction->name); + continue; // Suppress reaction by preventing entering reaction queue + } +#endif DEBUG_PRINT("Triggering reaction %s.", reaction->name); _lf_trigger_reaction(reaction, -1); } else { @@ -689,6 +712,17 @@ void _lf_pop_events() { */ void _lf_initialize_timer(trigger_t* timer) { interval_t delay = 0; + +#ifdef MODAL_REACTORS + // Suspend all timer events that start in inactive mode + if (!_lf_mode_is_active(timer->mode) && (timer->offset != 0 || timer->period != 0)) { + event_t* e = _lf_get_new_event(); + e->trigger = timer; + e->time = get_logical_time() + timer->offset; + _lf_add_suspended_event(e); + return; + } +#endif if (timer->offset == 0) { for (int i = 0; i < timer->number_of_reactions; i++) { _lf_trigger_reaction(timer->reactions[i], -1); @@ -1283,6 +1317,14 @@ trigger_handle_t _lf_insert_reactions_for_trigger(trigger_t* trigger, lf_token_t return 0; } +#ifdef MODAL_REACTORS + // If this trigger is associated with an inactive mode, it should not trigger any reaction. + if (!_lf_mode_is_active(trigger->mode)) { + DEBUG_PRINT("Suppressing reactions of trigger due inactivity of mode %s.", trigger->mode->name); + return 1; + } +#endif + // Increment the reference count of the token. if (token != NULL) { token->ref_count++; @@ -1329,6 +1371,15 @@ trigger_handle_t _lf_insert_reactions_for_trigger(trigger_t* trigger, lf_token_t // onto the reaction queue. for (int i = 0; i < trigger->number_of_reactions; i++) { reaction_t* reaction = trigger->reactions[i]; + +#ifdef MODAL_REACTORS + // Check if reaction is disabled by mode inactivity + if (!_lf_mode_is_active(reaction->mode)) { + DEBUG_PRINT("Suppressing reaction %s due inactivity of mode %s.", reaction->name, reaction->mode->name); + continue; // Suppress reaction by preventing entering reaction queue + } +#endif + // Do not enqueue this reaction twice. if (reaction->status == inactive) { reaction->is_STP_violated = is_STP_violated; @@ -1885,6 +1936,11 @@ void termination() { // In order to free tokens, we perform the same actions we would have for a new time step. _lf_start_time_step(); +#ifdef MODAL_REACTORS + // Free events and tokens suspended by modal reactors. + _lf_terminate_modal_reactors(); +#endif + // If the event queue still has events on it, report that. if (event_q != NULL && pqueue_size(event_q) > 0) { warning_print("---- There are %zu unprocessed future events on the event queue.", pqueue_size(event_q)); @@ -1921,3 +1977,260 @@ void termination() { free(_lf_is_present_fields); free(_lf_is_present_fields_abbreviated); } + +// Functions for handling modal reactors. +#ifdef MODAL_REACTORS + +/** + * Checks whether the given mode is currently considered active. + * This includes checking all enclosing modes. + * If any of those is inactive, then so is this one. + * + * @param mode The mode instance to check. + */ +bool _lf_mode_is_active(reactor_mode_t* mode) { + /* + * This code could be optimized by introducing a cached activity indicator + * in all mode states. But for now: no premature optimization. + */ + if (mode != NULL) { + //DEBUG_PRINT("Checking mode state of %s", mode->name); + reactor_mode_state_t* state = mode->state; + while (state != NULL) { + // If this or any parent mode is inactive, return inactive + if (state->active_mode != mode) { + //DEBUG_PRINT(" => Mode is inactive"); + return false; + } + mode = state->parent_mode; + if (mode != NULL) { + state = mode->state; + } else { + state = NULL; + } + } + //DEBUG_PRINT(" => Mode is active"); + } + return true; +} + +// Linked list element for suspended events in inactive modes +typedef struct _lf_suspended_event { + struct _lf_suspended_event* next; + event_t* event; +} _lf_suspended_event_t; +_lf_suspended_event_t* _lf_suspended_events_head = NULL; // Start of linked collection of suspended events (managed automatically!) +int _lf_suspended_events_num = 0; // Number of suspended events (managed automatically!) +_lf_suspended_event_t* _lf_unsused_suspended_events_head = NULL; // Internal collection of reusable list elements (managed automatically!) + +/** + * Save the given event as suspended. + */ +void _lf_add_suspended_event(event_t* event) { + _lf_suspended_event_t* new_suspended_event; + if (_lf_unsused_suspended_events_head != NULL) { + new_suspended_event = _lf_unsused_suspended_events_head; + _lf_unsused_suspended_events_head = _lf_unsused_suspended_events_head->next; + } else { + new_suspended_event = (_lf_suspended_event_t*) malloc(sizeof(_lf_suspended_event_t)); + } + + new_suspended_event->event = event; + new_suspended_event->next = _lf_suspended_events_head; // prepend + _lf_suspended_events_num++; + + _lf_suspended_events_head = new_suspended_event; +} + +/** + * Removes the given node from the list of suspended events. + * Returns the next element in the list. + */ +_lf_suspended_event_t* _lf_remove_suspended_event(_lf_suspended_event_t* event) { + _lf_suspended_event_t* next = event->next; + + // Clear content + event->event = NULL; + event->next = NULL; + _lf_suspended_events_num--; + + // Store for recycling + if (_lf_unsused_suspended_events_head == NULL) { + _lf_unsused_suspended_events_head = event; + } else { + event->next = _lf_unsused_suspended_events_head; + _lf_unsused_suspended_events_head = event; + } + + if (_lf_suspended_events_head == event) { + _lf_suspended_events_head = next; // Adjust head + } else { + _lf_suspended_event_t* predecessor = _lf_suspended_events_head; + while(predecessor->next != event && predecessor != NULL) { + predecessor = predecessor->next; + } + if (predecessor != NULL) { + predecessor->next = next; // Remove from linked list + } + } + + return next; +} + +/** + * Performs transitions in all modal reactors. + * @param state An array of mode state of modal reactor instance Must be ordered hierarchically. + * Enclosing mode must come before inner. + * @param num_states The number of mode state. + */ +void _lf_process_mode_changes(reactor_mode_state_t* states[], int num_states, mode_state_variable_reset_data_t reset_data[], int reset_data_size) { + bool transition = false; // any mode change in this step + + // Detect mode changes (top down for hierarchical reset) + for (int i = 0; i < num_states; i++) { + reactor_mode_state_t* state = states[i]; + if (state != NULL) { + // Hierarchical reset: if this mode has parent that is entered in this step with a reset this reactor has to enter its initial mode + if (state->parent_mode != NULL && + state->parent_mode->state != NULL && + state->parent_mode->state->next_mode == state->parent_mode && + state->parent_mode->state->mode_change == 1) { + // Reset to initial state + state->next_mode = state->initial_mode; + state->mode_change = 1; // Enter with reset, to cascade it further down + DEBUG_PRINT("Modes: Hierarchical mode reset to %s when entering %s.", state->initial_mode->name, state->parent_mode->name); + } + + // Handle effect of entering next mode + if (state->next_mode != NULL) { + DEBUG_PRINT("Modes: Transition to %s.", state->next_mode->name); + transition = true; + + if (state->mode_change == 1) { + // Reset state variables + for (int i = 0; i < reset_data_size; i++) { + mode_state_variable_reset_data_t data = reset_data[i]; + if (data.mode == state->next_mode) { + DEBUG_PRINT("Modes: Reseting state variable."); + memcpy(data.target, data.source, data.size); + } + } + } + + // Reset/Reactivate previously suspended events of next state + _lf_suspended_event_t* suspended_event = _lf_suspended_events_head; + while(suspended_event != NULL) { + event_t* event = suspended_event->event; + if (event != NULL && event->trigger != NULL && event->trigger->mode == state->next_mode) { + if (state->mode_change == 1) { // Reset transition + if (event->trigger->is_timer) { // Only reset timers + trigger_t* timer = event->trigger; + + DEBUG_PRINT("Modes: Re-enqueuing reset timer."); + // Reschedule the timer with no additional delay. + // This will take care of super dense time when offset is 0. + _lf_schedule(timer, event->trigger->offset, NULL); + } + // No further processing; drops all events upon reset (timer event was recreated by schedule and original can be removed here) + } else if (state->next_mode != state->active_mode && event->trigger != NULL) { // History transition to a different mode + // Remaining time that the event would have been waiting before mode was left + instant_t local_remaining_delay = event->time - (state->next_mode->deactivation_time != 0 ? state->next_mode->deactivation_time : get_start_time()); + tag_t current_logical_tag = get_current_tag(); + + // Reschedule event with original local delay + DEBUG_PRINT("Modes: Re-enqueuing event with a suspended delay of %d (previous TTH: %u, Mode suspended at: %u).", local_remaining_delay, event->time, state->next_mode->deactivation_time); + tag_t schedule_tag = {.time = current_logical_tag.time + local_remaining_delay, .microstep = (local_remaining_delay == 0 ? current_logical_tag.microstep + 1 : 0)}; + _lf_schedule_at_tag(event->trigger, schedule_tag, event->token); + + if (event->next != NULL) { + // The event has more events stacked up in super dense time, attach them to the newly created event. + if (event->trigger->last->next == NULL) { + event->trigger->last->next = event->next; + } else { + error_print("Modes: Cannot attach events stacked up in super dense to the just unsuspended root event."); + } + } + } + // A fresh event was created by schedule, hence, recycle old one + _lf_recycle_event(event); + + // Remove suspended event and continue + suspended_event = _lf_remove_suspended_event(suspended_event); + } else { + suspended_event = suspended_event->next; + } + } + } + } + } + + // Handle leaving active mode in all states + if (transition) { + // Set new active mode and clear mode change flags + for (int i = 0; i < num_states; i++) { + reactor_mode_state_t* state = states[i]; + if (state != NULL && state->next_mode != NULL) { + // Save time when mode was left to handle suspended events in the future + state->active_mode->deactivation_time = get_logical_time(); + + // Apply transition + state->active_mode = state->next_mode; + state->next_mode = NULL; + state->mode_change = 0; + } + } + + // Retract all events from the event queue that are associated with now inactive modes + if (event_q != NULL) { + size_t q_size = pqueue_size(event_q); + if (q_size > 0) { + event_t** delayed_removal = (event_t**) calloc(q_size, sizeof(event_t*)); + size_t delayed_removal_count = 0; + + // Find events + for (int i = 0; i < q_size; i++) { + event_t* event = (event_t*)event_q->d[i + 1]; // internal queue data structure omits index 0 + if (event != NULL && event->trigger != NULL && !_lf_mode_is_active(event->trigger->mode)) { + delayed_removal[delayed_removal_count++] = event; + // This will store the event including possibly those chained up in super dense time + _lf_add_suspended_event(event); + } + } + + // Events are removed delayed in order to allow linear iteration over the queue + DEBUG_PRINT("Modes: Pulling %d events from the event queue to suspend them. %d events are now suspended.", delayed_removal_count, _lf_suspended_events_num); + for (int i = 0; i < delayed_removal_count; i++) { + pqueue_remove(event_q, delayed_removal[i]); + } + + free(delayed_removal); + } + } + } +} + +/** + * Releases internal data structures for modes. + * - Frees all suspended events. + */ +void _lf_terminate_modal_reactors() { + _lf_suspended_event_t* suspended_event = _lf_suspended_events_head; + while(suspended_event != NULL) { + _lf_recycle_event(suspended_event->event); + _lf_suspended_event_t* next = suspended_event->next; + free(suspended_event); + suspended_event = next; + } + _lf_suspended_events_head = NULL; + _lf_suspended_events_num = 0; + + // Also free suspended_event elements stored for recycling + suspended_event = _lf_unsused_suspended_events_head; + while(suspended_event != NULL) { + _lf_suspended_event_t* next = suspended_event->next; + free(suspended_event); + suspended_event = next; + } + _lf_unsused_suspended_events_head = NULL; +} +#endif diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 8c8001512..66227a92e 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -574,6 +574,11 @@ tag_t send_next_event_tag(tag_t tag, bool wait_for_reply) { * This does not acquire the mutex lock. It assumes the lock is already held. */ void _lf_next_locked() { +#ifdef MODAL_REACTORS + // Perform mode transitions + _lf_handle_mode_changes(); +#endif + // Previous logical time is complete. tag_t next_tag = get_next_event_tag(); @@ -737,7 +742,17 @@ void request_stop() { * worker number does not make sense (e.g., the caller is not a worker thread). */ void _lf_trigger_reaction(reaction_t* reaction, int worker_number) { +#ifdef MODAL_REACTORS + // Check if reaction is disabled by mode inactivity + if (_lf_mode_is_active(reaction->mode)) { +#endif lf_sched_trigger_reaction(reaction, worker_number); +#ifdef MODAL_REACTORS + } else { // Suppress reaction by preventing entering reaction queue + DEBUG_PRINT("Suppressing downstream reaction %s due inactivity of mode %s.", + reaction->name, reaction->mode->name); + } +#endif } /** diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index 1ff183ed6..1794d5ad3 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -350,12 +350,10 @@ void lf_sched_done_with_reaction(size_t worker_number, * worker number does not make sense (e.g., the caller is not a worker thread). */ void lf_sched_trigger_reaction(reaction_t* reaction, int worker_number) { - // Protect against putting a reaction twice on the reaction queue by - // checking its status. - if (reaction != NULL && - lf_bool_compare_and_swap(&reaction->status, inactive, queued)) { - DEBUG_PRINT("Scheduler: Enqueing reaction %s, which has level %lld.", - reaction->name, LEVEL(reaction->index)); - _lf_sched_insert_reaction(reaction); + if (reaction == NULL || !lf_bool_compare_and_swap(&reaction->status, inactive, queued)) { + return; } + DEBUG_PRINT("Scheduler: Enqueing reaction %s, which has level %lld.", + reaction->name, LEVEL(reaction->index)); + _lf_sched_insert_reaction(reaction); } diff --git a/core/threaded/scheduler_GEDF_NP_CI.c b/core/threaded/scheduler_GEDF_NP_CI.c index e08abbf87..645d2fe63 100644 --- a/core/threaded/scheduler_GEDF_NP_CI.c +++ b/core/threaded/scheduler_GEDF_NP_CI.c @@ -512,32 +512,19 @@ void lf_sched_done_with_reaction(size_t worker_number, * worker number does not make sense (e.g., the caller is not a worker thread). */ void lf_sched_trigger_reaction(reaction_t* reaction, int worker_number) { - if (worker_number == -1) { - // The scheduler should handle this immediately - // Protect against putting a reaction twice on the reaction queue by - // checking its status. - if (reaction != NULL && - lf_bool_compare_and_swap(&reaction->status, inactive, queued)) { - lf_mutex_lock(&mutex); - DEBUG_PRINT( - "Scheduler: Enqueing reaction %s, which has level %lld.", - reaction->name, LEVEL(reaction->index)); - // Immediately put 'reaction' on the reaction queue. - pqueue_insert( - (pqueue_t*)_lf_sched_instance->_lf_sched_triggered_reactions, - (void*)reaction); - lf_mutex_unlock(&mutex); - } + if (reaction == NULL || !lf_bool_compare_and_swap(&reaction->status, inactive, queued)) { return; } - // Protect against putting a reaction twice on the reaction queue by - // checking its status. - if (reaction != NULL && - lf_bool_compare_and_swap(&reaction->status, inactive, queued)) { - DEBUG_PRINT( - "Scheduler: Worker %d: Enqueuing reaction %s, which has level " - "%lld.", - worker_number, reaction->name, LEVEL(reaction->index)); + DEBUG_PRINT("Scheduler: Enqueing reaction %s, which has level %lld.", + reaction->name, LEVEL(reaction->index)); + if (worker_number == -1) { + lf_mutex_lock(&mutex); + // Immediately put 'reaction' on the reaction queue. + pqueue_insert( + (pqueue_t*)_lf_sched_instance->_lf_sched_triggered_reactions, + (void*)reaction); + lf_mutex_unlock(&mutex); + } else { reaction->worker_affinity = worker_number; // Note: The scheduler has already checked that we are not enqueueing // this reaction twice. diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index 848fb15ae..57d8a250c 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -416,6 +416,8 @@ void lf_sched_done_with_reaction(size_t worker_number, * * If a worker number is not available (e.g., this function is not called by a * worker thread), -1 should be passed as the 'worker_number'. + * + * This scheduler ignores the worker number. * * The scheduler will ensure that the same reaction is not triggered twice in * the same tag. @@ -428,12 +430,10 @@ void lf_sched_done_with_reaction(size_t worker_number, * */ void lf_sched_trigger_reaction(reaction_t* reaction, int worker_number) { - // Protect against putting a reaction twice in the reaction vectors by - // checking its status. - if (reaction != NULL && - lf_bool_compare_and_swap(&reaction->status, inactive, queued)) { - DEBUG_PRINT("Scheduler: Enqueing reaction %s, which has level %lld.", - reaction->name, LEVEL(reaction->index)); - _lf_sched_insert_reaction(reaction); + if (reaction == NULL || !lf_bool_compare_and_swap(&reaction->status, inactive, queued)) { + return; } + DEBUG_PRINT("Scheduler: Enqueing reaction %s, which has level %lld.", + reaction->name, LEVEL(reaction->index)); + _lf_sched_insert_reaction(reaction); } diff --git a/core/threaded/scheduler_PEDF_NP.c b/core/threaded/scheduler_PEDF_NP.c index f812c7c51..ffc52e217 100644 --- a/core/threaded/scheduler_PEDF_NP.c +++ b/core/threaded/scheduler_PEDF_NP.c @@ -656,22 +656,17 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction * */ void lf_sched_trigger_reaction(reaction_t* reaction, int worker_number) { + if (reaction == NULL || !lf_bool_compare_and_swap(&reaction->status, inactive, queued)) { + return; + } + DEBUG_PRINT("Scheduler: Enqueing reaction %s, which has level %lld.", + reaction->name, LEVEL(reaction->index)); if (worker_number == -1) { - // The scheduler should handle this immediately lf_mutex_lock(&mutex); - // Do not enqueue this reaction twice. - if (reaction != NULL && lf_bool_compare_and_swap(&reaction->status, inactive, queued)) { - DEBUG_PRINT("Enqueing downstream reaction %s, which has level %lld.", - reaction->name, reaction->index & 0xffffLL); - // Immediately put 'reaction' on the reaction queue. - pqueue_insert((pqueue_t*)_lf_sched_instance->_lf_sched_triggered_reactions, reaction); - } + // Immediately put 'reaction' on the reaction queue. + pqueue_insert((pqueue_t*)_lf_sched_instance->_lf_sched_triggered_reactions, reaction); lf_mutex_unlock(&mutex); - return; - } - if (reaction != NULL && lf_bool_compare_and_swap(&reaction->status, inactive, queued)) { - DEBUG_PRINT("Worker %d: Enqueuing downstream reaction %s, which has level %lld.", - worker_number, reaction->name, reaction->index & 0xffffLL); + } else { reaction->worker_affinity = worker_number; // Note: The scheduler will check that we don't enqueue this reaction // twice when it is actually pushing it to the global reaction queue. diff --git a/include/ctarget.h b/include/ctarget.h index addd683b4..3aea626be 100644 --- a/include/ctarget.h +++ b/include/ctarget.h @@ -136,6 +136,20 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define SET_TOKEN(out, newtoken) _LF_SET_TOKEN(out, newtoken) +////////////////////////////////////////////////////////////// +///////////// SET_MODE Function (to switch a mode) + +/** + * Sets the next mode of a modal reactor. Same as SET for outputs, only + * the last value will have effect if invoked multiple times. + * Works only in reactions with the target mode declared as effect. + * + * @param mode The target mode to set for activation. + */ +#ifdef MODAL_REACTORS +#define SET_MODE(mode) _LF_SET_MODE(mode) +#endif + ////////////////////////////////////////////////////////////// ///////////// Schedule Functions