reactor-c
C Runtime for Lingua Franca
Loading...
Searching...
No Matches
reactor_threaded.h File Reference

Runtime infrastructure for the threaded version of the C target of Lingua Franca. More...

#include "lf_types.h"

Go to the source code of this file.

Functions

void try_advance_level (environment_t *env, volatile size_t *next_reaction_level)
 Advance to the next level. For federated runtimes, this function should stall the advance until we know that we can safely execute the next level given knowledge about upstream network port statuses.
 
void lf_enqueue_port_absent_reactions (environment_t *env)
 
void _lf_increment_tag_barrier (environment_t *env, tag_t future_tag)
 
void _lf_increment_tag_barrier_locked (environment_t *env, tag_t future_tag)
 Version of _lf_increment_tag_barrier to call when the caller holds the mutex. This version does not acquire the mutex belonging to env.
 
void _lf_decrement_tag_barrier_locked (environment_t *env)
 
int _lf_wait_on_tag_barrier (environment_t *env, tag_t proposed_tag)
 
void lf_synchronize_with_other_federates (void)
 
bool wait_until (instant_t logical_time_ns, lf_cond_t *condition)
 
tag_t get_next_event_tag (environment_t *env)
 
tag_t send_next_event_tag (environment_t *env, tag_t tag, bool wait_for_reply)
 
void _lf_next_locked (environment_t *env)
 

Detailed Description

Runtime infrastructure for the threaded version of the C target of Lingua Franca.

Author
Edward A. Lee (eal@b.nosp@m.erke.nosp@m.ley.e.nosp@m.du)
{Marten Lohstroh marte.nosp@m.n@be.nosp@m.rkele.nosp@m.y.ed.nosp@m.u}
{Soroush Bateni sorou.nosp@m.sh@u.nosp@m.tdall.nosp@m.as.e.nosp@m.du}

Function Documentation

◆ _lf_decrement_tag_barrier_locked()

void _lf_decrement_tag_barrier_locked ( environment_t * env)

Decrement the total number of pending barrier requests for the environment tag barrier. If the total number of requests reaches zero, this function resets the tag barrier to FOREVER_TAG and notifies all threads that are waiting on the barrier that the number of requests has reached zero.

This function assumes that the caller already holds the mutex lock on env.

Note
This function is only useful in threaded applications to facilitate certain non-blocking functionalities such as receiving timed messages over the network or handling stop in the federated execution.
Parameters
envThe environment in which we are executing.

◆ _lf_increment_tag_barrier()

void _lf_increment_tag_barrier ( environment_t * env,
tag_t future_tag )

Raise a barrier to prevent the current tag for the specified environment from advancing to or beyond the value of the future_tag argument, if possible. If the current tag is already at or beyond future_tag, then prevent any further advances. This function will increment the total number of pending barrier requests. For each call to this function, there should always be a subsequent call to _lf_decrement_tag_barrier_locked() to release the barrier.

If there is already a barrier raised at a tag later than future_tag, this function will change the barrier to future_tag or the current tag, whichever is larger. If the existing barrier is earlier than future_tag, this function will not change the barrier. If there are no existing barriers and future_tag is in the past relative to the current tag, this function will raise a barrier to the current tag plus one microstep.

This function acquires the mutex on the specified environment.

Note
This function is only useful in threaded applications to facilitate certain non-blocking functionalities such as receiving timed messages over the network or handling stop in a federated execution.
Parameters
envEnvironment within which we are executing.
future_tagA desired tag for the barrier. This function will guarantee that current logical time will not go past future_tag if it is in the future. If future_tag is in the past (or equals to current logical time), the runtime will freeze advancement of logical time.

◆ _lf_increment_tag_barrier_locked()

void _lf_increment_tag_barrier_locked ( environment_t * env,
tag_t future_tag )

Version of _lf_increment_tag_barrier to call when the caller holds the mutex. This version does not acquire the mutex belonging to env.

Parameters
envEnvironment within which we are executing.
future_tagA desired tag for the barrier. This function will guarantee that current logical time will not go past future_tag if it is in the future. If future_tag is in the past (or equals to current logical time), the runtime will freeze advancement of logical time.

◆ _lf_next_locked()

void _lf_next_locked ( environment_t * env)

If there is at least one event in the event queue, then wait until physical time matches or exceeds the time of the least tag on the event queue; pop the next event(s) from the event queue that all have the same tag; extract from those events the reactions that are to be invoked at this logical time and insert them into the reaction queue. The event queue is sorted by time tag.

If there is no event in the queue and the keepalive command-line option was not given, and this is not a federated execution with centralized coordination, set the stop tag to the current tag. If keepalive was given, then wait for either lf_request_stop() to be called or an event appears in the event queue and then return.

Every time tag is advanced, it is checked against stop tag and if they are equal, shutdown reactions are triggered.

This does not acquire the mutex lock. It assumes the lock is already held.

Parameters
envEnvironment within which we are executing.

◆ _lf_wait_on_tag_barrier()

int _lf_wait_on_tag_barrier ( environment_t * env,
tag_t proposed_tag )

If the proposed_tag is greater than or equal to a barrier tag that has been set by a call to _lf_increment_tag_barrier or _lf_increment_tag_barrier_locked, and if there are requestors still pending on that barrier, then wait until all requestors have been satisfied. This is used in federated execution when an incoming timed message has been partially read so that we know its tag, but the rest of message has not yet been read and hence the event has not yet appeared on the event queue. To prevent tardiness, this function blocks the advancement of time until to the proposed tag until the message has been put onto the event queue.

If the proposed_tag is greater than the stop tag, then use the stop tag instead.

This function assumes the mutex is already locked. Thus, it unlocks the mutex while it's waiting to allow the tag barrier to change.

Parameters
envEnvironment within which we are executing.
proposed_tagThe tag that the runtime wants to advance to.
Returns
0 if no wait was needed and 1 if a wait actually occurred.

◆ get_next_event_tag()

tag_t get_next_event_tag ( environment_t * env)

Return the tag of the next event on the event queue. If the event queue is empty then return either FOREVER_TAG or, is a stop_time (timeout time) has been set, the stop time.

Parameters
envEnvironment within which we are executing.

◆ lf_enqueue_port_absent_reactions()

void lf_enqueue_port_absent_reactions ( environment_t * env)

Enqueue port absent reactions that will send a PORT_ABSENT message to downstream federates if a given network output port is not present.

Parameters
envThe environment in which we are executing

◆ lf_synchronize_with_other_federates()

void lf_synchronize_with_other_federates ( void )

◆ send_next_event_tag()

tag_t send_next_event_tag ( environment_t * env,
tag_t tag,
bool wait_for_reply )

In a federated execution with centralized coordination, this function returns a tag that is less than or equal to the specified tag when, as far as the federation is concerned, it is safe to commit to advancing to the returned tag. That is, all incoming network messages with tags less than the returned tag have been received. In unfederated execution or in federated execution with decentralized control, this function returns the specified tag immediately.

Parameters
envEnvironment within which we are executing.
tagThe tag to which to advance.
wait_for_replyIf true, wait for the RTI to respond.
Returns
The tag to which it is safe to advance.

◆ try_advance_level()

void try_advance_level ( environment_t * env,
volatile size_t * next_reaction_level )

Advance to the next level. For federated runtimes, this function should stall the advance until we know that we can safely execute the next level given knowledge about upstream network port statuses.

Parameters
envThe environment.
next_reaction_levelThe place to store the next reaction level.

◆ wait_until()

bool wait_until ( instant_t logical_time,
lf_cond_t * condition )

Wait until physical time matches or exceeds the specified logical time, unless -fast is given. For decentralized coordination, this function will add the STA offset to the wait time.

If an event is put on the event queue during the wait, then the wait is interrupted and this function returns false. It also returns false if the timeout time is reached before the wait has completed. Note this this could return true even if the a new event was placed on the queue if that event time matches or exceeds the specified time.

The mutex lock associated with the condition argument is assumed to be held by the calling thread. This mutex is released while waiting. If the wait time is too small to actually wait (less than MIN_SLEEP_DURATION), then this function immediately returns true and the mutex is not released.

Parameters
envEnvironment within which we are executing.
logical_timeLogical time to wait until physical time matches it.
conditionA condition variable that can interrupt the wait. The mutex associated with this condition variable will be released during the wait.
Returns
Return false if the wait is interrupted either because of an event queue signal or if the wait time was interrupted early by reaching the stop time, if one was specified. Return true if the full wait time was reached.