reactor-c
C Runtime for Lingua Franca
Loading...
Searching...
No Matches
reactor_threaded.c File Reference

Runtime infrastructure for the threaded version of the C target of Lingua Franca. More...

#include <assert.h>
#include <signal.h>
#include <string.h>
#include <time.h>
#include "lf_types.h"
#include "low_level_platform.h"
#include "reactor_threaded.h"
#include "reactor.h"
#include "scheduler.h"
#include "tag.h"
#include "environment.h"
#include "rti_local.h"
#include "reactor_common.h"
#include "watchdog.h"

Macros

#define NUMBER_OF_WORKERS   1
 
#define MAX_STALL_INTERVAL   MSEC(1)
 

Functions

void _lf_increment_tag_barrier_locked (environment_t *env, tag_t future_tag)
 Version of _lf_increment_tag_barrier to call when the caller holds the mutex. This version does not acquire the mutex belonging to env.
 
void _lf_increment_tag_barrier (environment_t *env, tag_t future_tag)
 
void _lf_decrement_tag_barrier_locked (environment_t *env)
 
int _lf_wait_on_tag_barrier (environment_t *env, tag_t proposed_tag)
 
void lf_set_present (lf_port_base_t *port)
 Mark the given port's is_present field as true.
 
bool wait_until (instant_t logical_time, lf_cond_t *condition)
 
tag_t get_next_event_tag (environment_t *env)
 
tag_t send_next_event_tag (environment_t *env, tag_t tag, bool wait_for_reply)
 
void _lf_next_locked (environment_t *env)
 
void lf_request_stop (void)
 Request a stop to execution as soon as possible.
 
void _lf_trigger_reaction (environment_t *env, reaction_t *reaction, int worker_number)
 Trigger the specified reaction on the specified worker in the specified environment.
 
void _lf_initialize_start_tag (environment_t *env)
 
bool _lf_worker_handle_deadline_violation_for_reaction (environment_t *env, int worker_number, reaction_t *reaction)
 
bool _lf_worker_handle_STP_violation_for_reaction (environment_t *env, int worker_number, reaction_t *reaction)
 
bool _lf_worker_handle_violations (environment_t *env, int worker_number, reaction_t *reaction)
 
void _lf_worker_invoke_reaction (environment_t *env, int worker_number, reaction_t *reaction)
 
void try_advance_level (environment_t *env, volatile size_t *next_reaction_level)
 Advance to the next level. For federated runtimes, this function should stall the advance until we know that we can safely execute the next level given knowledge about upstream network port statuses.
 
void _lf_worker_do_work (environment_t *env, int worker_number)
 
voidworker (void *arg)
 
void lf_print_snapshot (environment_t *env)
 Print a snapshot of the priority queues used during execution (for debugging).
 
void start_threads (environment_t *env)
 
void determine_number_of_workers (void)
 Determine the number of workers.
 
int lf_reactor_c_main (int argc, const char *argv[])
 
int lf_notify_of_event (environment_t *env)
 Notify of new event by broadcasting on a condition variable.
 
int lf_critical_section_enter (environment_t *env)
 Enter critical section by locking the global mutex.
 
int lf_critical_section_exit (environment_t *env)
 Leave a critical section by unlocking the global mutex.
 

Variables

instant_t start_time
 
lf_mutex_t global_mutex
 
bool lf_stop_requested = false
 True if stop has been requested so it doesn't get re-requested.
 
int worker_thread_count = 0
 

Detailed Description

Runtime infrastructure for the threaded version of the C target of Lingua Franca.

Author
Edward A. Lee (eal@b.nosp@m.erke.nosp@m.ley.e.nosp@m.du)
{Marten Lohstroh marte.nosp@m.n@be.nosp@m.rkele.nosp@m.y.ed.nosp@m.u}
{Soroush Bateni sorou.nosp@m.sh@u.nosp@m.tdall.nosp@m.as.e.nosp@m.du}

Macro Definition Documentation

◆ MAX_STALL_INTERVAL

#define MAX_STALL_INTERVAL   MSEC(1)

The maximum amount of time a worker thread should stall before checking the reaction queue again. This is not currently used.

◆ NUMBER_OF_WORKERS

#define NUMBER_OF_WORKERS   1

Function Documentation

◆ _lf_decrement_tag_barrier_locked()

void _lf_decrement_tag_barrier_locked ( environment_t * env)

Decrement the total number of pending barrier requests for the environment tag barrier. If the total number of requests reaches zero, this function resets the tag barrier to FOREVER_TAG and notifies all threads that are waiting on the barrier that the number of requests has reached zero.

This function assumes that the caller already holds the mutex lock on env.

Note
This function is only useful in threaded applications to facilitate certain non-blocking functionalities such as receiving timed messages over the network or handling stop in the federated execution.
Parameters
envThe environment in which we are executing.

◆ _lf_increment_tag_barrier()

void _lf_increment_tag_barrier ( environment_t * env,
tag_t future_tag )

Raise a barrier to prevent the current tag for the specified environment from advancing to or beyond the value of the future_tag argument, if possible. If the current tag is already at or beyond future_tag, then prevent any further advances. This function will increment the total number of pending barrier requests. For each call to this function, there should always be a subsequent call to _lf_decrement_tag_barrier_locked() to release the barrier.

If there is already a barrier raised at a tag later than future_tag, this function will change the barrier to future_tag or the current tag, whichever is larger. If the existing barrier is earlier than future_tag, this function will not change the barrier. If there are no existing barriers and future_tag is in the past relative to the current tag, this function will raise a barrier to the current tag plus one microstep.

This function acquires the mutex on the specified environment.

Note
This function is only useful in threaded applications to facilitate certain non-blocking functionalities such as receiving timed messages over the network or handling stop in a federated execution.
Parameters
envEnvironment within which we are executing.
future_tagA desired tag for the barrier. This function will guarantee that current logical time will not go past future_tag if it is in the future. If future_tag is in the past (or equals to current logical time), the runtime will freeze advancement of logical time.

◆ _lf_increment_tag_barrier_locked()

void _lf_increment_tag_barrier_locked ( environment_t * env,
tag_t future_tag )

Version of _lf_increment_tag_barrier to call when the caller holds the mutex. This version does not acquire the mutex belonging to env.

Parameters
envEnvironment within which we are executing.
future_tagA desired tag for the barrier. This function will guarantee that current logical time will not go past future_tag if it is in the future. If future_tag is in the past (or equals to current logical time), the runtime will freeze advancement of logical time.

◆ _lf_initialize_start_tag()

void _lf_initialize_start_tag ( environment_t * env)

Perform the necessary operations before tag (0,0) can be processed.

This includes injecting any reactions triggered at (0,0), initializing timers, and for the federated execution, waiting for a proper coordinated start.

This assumes the mutex lock is held by the caller.

Parameters
envEnvironment within which we are executing.

◆ _lf_next_locked()

void _lf_next_locked ( environment_t * env)

If there is at least one event in the event queue, then wait until physical time matches or exceeds the time of the least tag on the event queue; pop the next event(s) from the event queue that all have the same tag; extract from those events the reactions that are to be invoked at this logical time and insert them into the reaction queue. The event queue is sorted by time tag.

If there is no event in the queue and the keepalive command-line option was not given, and this is not a federated execution with centralized coordination, set the stop tag to the current tag. If keepalive was given, then wait for either lf_request_stop() to be called or an event appears in the event queue and then return.

Every time tag is advanced, it is checked against stop tag and if they are equal, shutdown reactions are triggered.

This does not acquire the mutex lock. It assumes the lock is already held.

Parameters
envEnvironment within which we are executing.

◆ _lf_trigger_reaction()

void _lf_trigger_reaction ( environment_t * env,
reaction_t * reaction,
int worker_number )

Trigger the specified reaction on the specified worker in the specified environment.

Parameters
envEnvironment in which we are executing.
reactionThe reaction.
worker_numberThe ID of the worker that is making this call. 0 should be used if there is only one worker (e.g., when the program is using the single-threaded C runtime). -1 is used for an anonymous call in a context where a worker number does not make sense (e.g., the caller is not a worker thread).

◆ _lf_wait_on_tag_barrier()

int _lf_wait_on_tag_barrier ( environment_t * env,
tag_t proposed_tag )

If the proposed_tag is greater than or equal to a barrier tag that has been set by a call to _lf_increment_tag_barrier or _lf_increment_tag_barrier_locked, and if there are requestors still pending on that barrier, then wait until all requestors have been satisfied. This is used in federated execution when an incoming timed message has been partially read so that we know its tag, but the rest of message has not yet been read and hence the event has not yet appeared on the event queue. To prevent tardiness, this function blocks the advancement of time until to the proposed tag until the message has been put onto the event queue.

If the proposed_tag is greater than the stop tag, then use the stop tag instead.

This function assumes the mutex is already locked. Thus, it unlocks the mutex while it's waiting to allow the tag barrier to change.

Parameters
envEnvironment within which we are executing.
proposed_tagThe tag that the runtime wants to advance to.
Returns
0 if no wait was needed and 1 if a wait actually occurred.

◆ _lf_worker_do_work()

void _lf_worker_do_work ( environment_t * env,
int worker_number )

The main looping logic of each LF worker thread. This function assumes the caller holds the mutex lock.

Parameters
envEnvironment within which we are executing.
worker_numberThe number assigned to this worker thread

◆ _lf_worker_handle_deadline_violation_for_reaction()

bool _lf_worker_handle_deadline_violation_for_reaction ( environment_t * env,
int worker_number,
reaction_t * reaction )

Handle deadline violation for 'reaction'. The mutex should NOT be locked when this function is called. It might acquire the mutex when scheduling the reactions that are triggered as a result of executing the deadline violation handler on the 'reaction', if it exists.

Parameters
envEnvironment within which we are executing.
worker_numberThe ID of the worker.
reactionThe reaction whose deadline has been violated.
Returns
true if a deadline violation occurred. false otherwise.

◆ _lf_worker_handle_STP_violation_for_reaction()

bool _lf_worker_handle_STP_violation_for_reaction ( environment_t * env,
int worker_number,
reaction_t * reaction )

Handle STP violation for 'reaction'. The mutex should NOT be locked when this function is called. It might acquire the mutex when scheduling the reactions that are triggered as a result of executing the STP violation handler on the 'reaction', if it exists.

Parameters
envEnvironment within which we are executing.
worker_numberThe ID of the worker.
reactionThe reaction whose STP offset has been violated.
Returns
true if an STP violation occurred and was handled. false otherwise.

◆ _lf_worker_handle_violations()

bool _lf_worker_handle_violations ( environment_t * env,
int worker_number,
reaction_t * reaction )

Handle violations for 'reaction'. Currently limited to deadline violations and STP violations. The mutex should NOT be locked when this function is called. It might acquire the mutex when scheduling the reactions that are triggered as a result of executing the deadline or STP violation handler(s) on the 'reaction', if they exist.

Parameters
envEnvironment within which we are executing.
worker_numberThe ID of the worker.
reactionThe reaction.
Returns
true if a violation occurred and was handled. false otherwise.

◆ _lf_worker_invoke_reaction()

void _lf_worker_invoke_reaction ( environment_t * env,
int worker_number,
reaction_t * reaction )

Invoke 'reaction' and schedule any resulting triggered reaction(s) on the reaction queue. The mutex should NOT be locked when this function is called. It might acquire the mutex when scheduling the reactions that are triggered as a result of executing 'reaction'.

Parameters
envEnvironment within which we are executing.
worker_numberThe ID of the worker.
reactionThe reaction to invoke.

◆ determine_number_of_workers()

void determine_number_of_workers ( void )

Determine the number of workers.

◆ get_next_event_tag()

tag_t get_next_event_tag ( environment_t * env)

Return the tag of the next event on the event queue. If the event queue is empty then return either FOREVER_TAG or, is a stop_time (timeout time) has been set, the stop time.

Parameters
envEnvironment within which we are executing.

◆ lf_critical_section_enter()

int lf_critical_section_enter ( environment_t * env)

Enter critical section by locking the global mutex.

Enter critical section within an environment.

Parameters
envEnvironment within which we are executing or GLOBAL_ENVIRONMENT.

◆ lf_critical_section_exit()

int lf_critical_section_exit ( environment_t * env)

Leave a critical section by unlocking the global mutex.

Leave a critical section within an environment.

Parameters
envEnvironment within which we are executing or GLOBAL_ENVIRONMENT.

◆ lf_notify_of_event()

int lf_notify_of_event ( environment_t * env)

Notify of new event by broadcasting on a condition variable.

Notify of new event.

Parameters
envEnvironment within which we are executing.

◆ lf_print_snapshot()

void lf_print_snapshot ( environment_t * env)

Print a snapshot of the priority queues used during execution (for debugging).

This function implementation will be empty if the NDEBUG macro is defined; that macro is normally defined for release builds.

Parameters
envThe environment in which we are executing.

◆ lf_reactor_c_main()

int lf_reactor_c_main ( int argc,
const char * argv[] )

The main loop of the LF program.

An unambiguous function name that can be called by external libraries.

Note: In target languages that use the C core library, there should be an unambiguous way to execute the LF program's main function that will not conflict with other main functions that might get resolved and linked at compile time.

◆ lf_request_stop()

void lf_request_stop ( void )

Request a stop to execution as soon as possible.

In a non-federated execution with only a single enclave, this will occur one microstep later than the current tag. In a federated execution or when there is more than one enclave, it will likely occur at a later tag determined by the RTI so that all federates and enclaves stop at the same tag.

◆ lf_set_present()

void lf_set_present ( lf_port_base_t * port)

Mark the given port's is_present field as true.

Parameters
portA pointer to the port struct as an lf_port_base_t*.

◆ send_next_event_tag()

tag_t send_next_event_tag ( environment_t * env,
tag_t tag,
bool wait_for_reply )

In a federated execution with centralized coordination, this function returns a tag that is less than or equal to the specified tag when, as far as the federation is concerned, it is safe to commit to advancing to the returned tag. That is, all incoming network messages with tags less than the returned tag have been received. In unfederated execution or in federated execution with decentralized control, this function returns the specified tag immediately.

Parameters
envEnvironment within which we are executing.
tagThe tag to which to advance.
wait_for_replyIf true, wait for the RTI to respond.
Returns
The tag to which it is safe to advance.

◆ start_threads()

void start_threads ( environment_t * env)

◆ try_advance_level()

void try_advance_level ( environment_t * env,
volatile size_t * next_reaction_level )

Advance to the next level. For federated runtimes, this function should stall the advance until we know that we can safely execute the next level given knowledge about upstream network port statuses.

Parameters
envThe environment.
next_reaction_levelThe place to store the next reaction level.

◆ wait_until()

bool wait_until ( instant_t logical_time,
lf_cond_t * condition )

Wait until physical time matches or exceeds the specified logical time, unless -fast is given. For decentralized coordination, this function will add the STA offset to the wait time.

If an event is put on the event queue during the wait, then the wait is interrupted and this function returns false. It also returns false if the timeout time is reached before the wait has completed. Note this this could return true even if the a new event was placed on the queue if that event time matches or exceeds the specified time.

The mutex lock associated with the condition argument is assumed to be held by the calling thread. This mutex is released while waiting. If the wait time is too small to actually wait (less than MIN_SLEEP_DURATION), then this function immediately returns true and the mutex is not released.

Parameters
envEnvironment within which we are executing.
logical_timeLogical time to wait until physical time matches it.
conditionA condition variable that can interrupt the wait. The mutex associated with this condition variable will be released during the wait.
Returns
Return false if the wait is interrupted either because of an event queue signal or if the wait time was interrupted early by reaching the stop time, if one was specified. Return true if the full wait time was reached.

◆ worker()

void * worker ( void * arg)

Worker thread for the thread pool. Its argument is the environment within which is working The very first worker per environment/enclave is in charge of synchronizing with the other enclaves by getting a TAG to (0,0) this might block until upstream enclaves have finished tag (0,0). This is unlike federated scheduling where each federate will get a PTAG to (0,0) and use network control reactions to handle upstream dependencies

Parameters
argEnvironment within which the worker should execute.

Variable Documentation

◆ global_mutex

lf_mutex_t global_mutex

Global mutex, used for synchronizing across environments. Mainly used for token-management and tracing

◆ lf_stop_requested

bool lf_stop_requested = false

True if stop has been requested so it doesn't get re-requested.

◆ start_time

instant_t start_time
extern

The start time read from the trace file.

◆ worker_thread_count

int worker_thread_count = 0

For logging and debugging, each worker thread is numbered.