reactor-c 1.0
C Runtime for Lingua Franca
Loading...
Searching...
No Matches
RTI

Functions for the runtime infrastructure for federated execution. More...

Files

file  README.md
file  rti_common.h
 Common declarations for runtime infrastructure (RTI) for scheduling enclaves and distributed Lingua Franca programs.
file  rti_local.h
 This file declares functions used to implement scheduling enclaves.
file  rti_remote.h
 Declarations for runtime infrastructure (RTI) for distributed Lingua Franca programs.

Data Structures

struct  enclave_info_t
 Structure holding information about each enclave in the program. More...
struct  federate_info_t
 Information about a federate known to the RTI, including its runtime state, mode of execution, and connectivity with other federates. More...
struct  minimum_delay_t
 Struct for minimum delays from upstream nodes. More...
struct  rti_common_t
 Common RTI data structure for both remote standalone RTI and local RTI used in enclaved execution. More...
struct  rti_local_t
 Structure holding information about the local RTI. More...
struct  rti_remote_t
 Structure that an RTI instance uses to keep track of its own and its corresponding federates' state. More...
struct  scheduling_node_t
 Information about the scheduling nodes coordinated by the RTI. More...

Macros

#define MAX_TIME_FOR_REPLY_TO_STOP_REQUEST   SEC(30)
 Time allowed for federates to reply to stop request.

Typedefs

typedef enum clock_sync_stat clock_sync_stat
 The status of clock synchronization.
typedef struct enclave_info_t enclave_info_t
 Structure holding information about each enclave in the program.
typedef enum execution_mode_t execution_mode_t
 Mode of execution of a federate.
typedef struct federate_info_t federate_info_t
 Information about a federate known to the RTI, including its runtime state, mode of execution, and connectivity with other federates.
typedef struct minimum_delay_t minimum_delay_t
 Struct for minimum delays from upstream nodes.
typedef struct rti_common_t rti_common_t
 Common RTI data structure for both remote standalone RTI and local RTI used in enclaved execution.
typedef struct rti_remote_t rti_remote_t
 Structure that an RTI instance uses to keep track of its own and its corresponding federates' state.
typedef enum scheduling_node_state_t scheduling_node_state_t
 State of the scheduling node during execution.
typedef struct scheduling_node_t scheduling_node_t
 Information about the scheduling nodes coordinated by the RTI.

Enumerations

enum  clock_sync_stat { clock_sync_off , clock_sync_init , clock_sync_on }
 The status of clock synchronization. More...
enum  execution_mode_t { FAST , REALTIME }
 Mode of execution of a federate. More...
enum  scheduling_node_state_t { NOT_CONNECTED , GRANTED , PENDING }
 State of the scheduling node during execution. More...

Functions

void _logical_tag_complete (scheduling_node_t *e, tag_t completed)
 Update the completed tag for the specified node.
void * clock_synchronization_thread (void *noargs)
 A (quasi-)periodic thread that performs clock synchronization with each federate.
tag_t downstream_next_event_tag (scheduling_node_t *node, uint16_t node_sending_new_net_id)
 Determine whether the specified scheduling node is needed to receive a downstream next event tag (DNET), and, if so, return the details.
tag_t earliest_future_incoming_message_tag (scheduling_node_t *e)
 Given a node (enclave or federate), find the tag of the earliest possible incoming message (EIMT) from upstream enclaves or federates, which will be the smallest upstream NET plus the least delay.
tag_t eimt_strict (scheduling_node_t *e)
 Given a node (enclave or federate), find the earliest incoming message tag (EIMT) from any immediately upstream node that is not part of zero-delay cycle (ZDC).
void * federate_info_thread_TCP (void *fed)
 Thread handling TCP communication with a federate.
void free_local_rti ()
 Free memory associated with the local the RTI and the local RTI iself.
void free_scheduling_nodes (scheduling_node_t **scheduling_nodes, uint16_t number_of_scheduling_nodes)
 Free dynamically allocated memory on the scheduling nodes and the scheduling node array itself.
tag_t get_dnet_candidate (tag_t next_event_tag, tag_t minimum_delay)
 Find the tag g that is the latest tag that satisfies lf_tag_add(g, minimum_delay) < next_event_tag.
void handle_address_ad (uint16_t federate_id)
 Handle address advertisement messages (.
void handle_address_query (uint16_t fed_id)
 Handle address query messages.
void handle_latest_tag_confirmed (federate_info_t *fed)
 Handle a latest tag confirmed (LTC) message.
void handle_next_event_tag (federate_info_t *fed)
 Handle a next event tag (NET) message.
void handle_physical_clock_sync_message (federate_info_t *my_fed, socket_type_t socket_type)
 Handle clock synchronization T3 messages from federates.
void handle_port_absent_message (federate_info_t *sending_federate, unsigned char *buffer)
 Handle a port absent message being received rom a federate via the RIT.
void handle_stop_request_message (federate_info_t *fed)
 Handle a MSG_TYPE_STOP_REQUEST message.
void handle_stop_request_reply (federate_info_t *fed)
 Handle a MSG_TYPE_STOP_REQUEST_REPLY message.
void handle_timed_message (federate_info_t *sending_federate, unsigned char *buffer)
 Handle a timed message being received from a federate by the RTI to relay to another federate.
void handle_timestamp (federate_info_t *my_fed)
 A function to handle timestamp messages.
void initialize_enclave_info (enclave_info_t *enclave, int idx, environment_t *env)
 Initialize the enclave object.
void initialize_federate (federate_info_t *fed, uint16_t id)
 Initialize the federate with the specified ID.
void initialize_local_rti (environment_t *envs, int num_envs)
 Dynamically create and initialize the local RTI.
void initialize_RTI (rti_remote_t *rti)
 Initialize the _RTI instance.
void initialize_rti_common (rti_common_t *rti_common)
 Initialize the fields of the rti_common struct.
void initialize_scheduling_node (scheduling_node_t *e, uint16_t id)
 Initialize the scheduling node with the specified ID.
void invalidate_min_delays ()
 Invalidate the min_delays, num_min_delays, and the fields that indicate cycles of all nodes.
bool is_in_cycle (scheduling_node_t *node)
 Return true if the node is in a cycle (possibly a zero-delay cycle).
bool is_in_zero_delay_cycle (scheduling_node_t *node)
 Return true if the node is in a zero-delay cycle.
void lf_connect_to_federates (int socket_descriptor)
 Wait for one incoming connection request from each federate, and, upon receiving it, create a thread to communicate with that federate.
int lf_get_downstream_of (int enclave_id, uint16_t **result)
 Get the array of ids of enclaves directly downstream of the specified enclave.
int lf_get_upstream_delay_of (int enclave_id, interval_t **result)
 Retrieve the delays on the connections to direct upstream enclaves.
int lf_get_upstream_of (int enclave_id, uint16_t **result)
 Get the array of ids of enclaves directly upstream of the specified enclave.
void notify_advance_grant_if_safe (scheduling_node_t *e)
 Either send to a federate or unblock an enclave to give it a tag.
void notify_downstream_advance_grant_if_safe (scheduling_node_t *e, bool visited[])
 For all scheduling nodes downstream of the specified node, determine whether they should be notified of a TAG or PTAG and notify them if so.
void notify_downstream_next_event_tag (scheduling_node_t *e, tag_t tag)
 Notify a downstream next event tag (DNET) signal to the specified scheduling node.
void notify_provisional_tag_advance_grant (scheduling_node_t *e, tag_t tag)
 Notify a provisional tag advance grant (PTAG) message to the specified scheduling node.
void notify_tag_advance_grant (scheduling_node_t *e, tag_t tag)
 Notify a tag advance grant (TAG) message to the specified scheduling node.
int process_args (int argc, const char *argv[])
 Process the command-line arguments.
int process_clock_sync_args (int argc, const char *argv[])
 Process command-line arguments related to clock synchronization.
void * respond_to_erroneous_connections (void *nothing)
 Thread to respond to new connections, which could be federates of other federations who are attempting to join the wrong federation.
void rti_logical_tag_complete_locked (enclave_info_t *enclave, tag_t completed)
 Inform the local RTI that enclave has completed tag completed.
tag_t rti_next_event_tag_locked (enclave_info_t *enclave, tag_t next_event_tag)
 Notify the local RTI of a next event tag (NET).
void rti_update_other_net_locked (enclave_info_t *src, enclave_info_t *target, tag_t net)
 Notify the local RTI to update the next event tag (NET) of a target enclave.
void send_physical_clock (unsigned char message_type, federate_info_t *fed, socket_type_t socket_type)
 Take a snapshot of the physical clock time and send it to federate fed_id.
void send_reject (int *socket_id, unsigned char error_code)
 Send a MSG_TYPE_REJECT message to the specified socket and close the socket.
int32_t start_rti_server (uint16_t port)
 Start the socket server for the runtime infrastructure (RTI) and return the socket descriptor.
tag_advance_grant_t tag_advance_grant_if_safe (scheduling_node_t *e)
 Determine whether the specified scheduling node is eligible for a tag advance grant, (TAG) and, if so, return the details.
void tracepoint_rti_from_federate (trace_event_t event_type, int fed_id, tag_t *tag)
 Trace RTI receiving a message from a federate.
void tracepoint_rti_to_federate (trace_event_t event_type, int fed_id, tag_t *tag)
 Trace RTI sending a message to a federate.
void update_federate_next_event_tag_locked (uint16_t federate_id, tag_t next_event_tag)
 Update the next event tag of federate federate_id.
void update_min_delays ()
 If necessary, update the min_delays and the fields that indicate cycles.
void update_scheduling_node_next_event_tag_locked (scheduling_node_t *e, tag_t next_event_tag)
 Update the next event tag of an scheduling node.
void usage (int argc, const char *argv[])
 Print a usage message.
void wait_for_federates (int socket_descriptor)
 Start the runtime infrastructure (RTI) interaction with the federates and wait for the federates to exit.

Detailed Description

Functions for the runtime infrastructure for federated execution.

This group contains functions for the runtime infrastructure for federated execution. The message types and protocols are defined in net_common.h.

Macro Definition Documentation

◆ MAX_TIME_FOR_REPLY_TO_STOP_REQUEST

#define MAX_TIME_FOR_REPLY_TO_STOP_REQUEST   SEC(30)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Time allowed for federates to reply to stop request.

Typedef Documentation

◆ clock_sync_stat

◆ enclave_info_t

typedef struct enclave_info_t enclave_info_t

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_local.h>

Structure holding information about each enclave in the program.

The first field is the generic scheduling_node_info struct.

◆ execution_mode_t

◆ federate_info_t

typedef struct federate_info_t federate_info_t

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Information about a federate known to the RTI, including its runtime state, mode of execution, and connectivity with other federates.

The list of upstream and downstream federates does not include those that are connected via a "physical" connection (one denoted with ~>) because those connections do not impose any scheduling constraints.

◆ minimum_delay_t

typedef struct minimum_delay_t minimum_delay_t

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Struct for minimum delays from upstream nodes.

◆ rti_common_t

typedef struct rti_common_t rti_common_t

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Common RTI data structure for both remote standalone RTI and local RTI used in enclaved execution.

rti_remote_t and rti_local_t will "inherit" from this data structure. The first field is an array of pointers to scheduling nodes. These will be scheduling nodes for the local RTI and federates for the remote RTI.

◆ rti_remote_t

typedef struct rti_remote_t rti_remote_t

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Structure that an RTI instance uses to keep track of its own and its corresponding federates' state.

It is a special case of rti_common_t (declared in enclave.h). Inheritence is mimicked by having the first attributes to be the same as of rti_common_t, except that scheduling_nodes attribute here is of type federate_info_t**, while it is of type scheduling_node_t** in rti_common_t.

Note
IMPORTANT: If you make any change to this struct, you MUST also change rti_common_t in enclave.h! The change must exactly match.

◆ scheduling_node_state_t

◆ scheduling_node_t

typedef struct scheduling_node_t scheduling_node_t

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Information about the scheduling nodes coordinated by the RTI.

The abstract scheduling node could either be an enclave or a federate. The information includes its runtime state, mode of execution, and connectivity with other scheduling nodes. The list of upstream and downstream scheduling nodes does not include those that are connected via a "physical" connection (one denoted with ~>) because those connections do not impose any scheduling constraints.

Enumeration Type Documentation

◆ clock_sync_stat

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

The status of clock synchronization.

Enumerator
clock_sync_off 
clock_sync_init 
clock_sync_on 

◆ execution_mode_t

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Mode of execution of a federate.

Enumerator
FAST 
REALTIME 

◆ scheduling_node_state_t

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

State of the scheduling node during execution.

Enumerator
NOT_CONNECTED 

The scheduling node has not connected.

GRANTED 

Most recent MSG_TYPE_NEXT_EVENT_TAG has been granted.

PENDING 

Waiting for upstream scheduling nodes.

Function Documentation

◆ _logical_tag_complete()

void _logical_tag_complete ( scheduling_node_t * e,
tag_t completed )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Update the completed tag for the specified node.

This checks whether any downstream nodes become eligible to receive TAG or PTAG, and sends those signals if appropriate.

The function is prepended with an underscore because a function called logical_tag_complete is code-generated by the compiler.

Parameters
eThe scheduling node.
completedThe completed tag of the scheduling node.

◆ clock_synchronization_thread()

void * clock_synchronization_thread ( void * noargs)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

A (quasi-)periodic thread that performs clock synchronization with each federate.

This starts by waiting a time given by _RTI.clock_sync_period_ns and then iterates over the federates, performing a complete clock synchronization interaction with each federate before proceeding to the next federate. The interaction starts with this RTI sending a snapshot of its physical clock to the federate (message T1). It then waits for a reply and then sends another snapshot of its physical clock (message T4). It then follows that T4 message with a coded probe message that the federate can use to discard the session if the network is congested.

Parameters
noargsIgnored (present for compatibility).
Returns
NULL.

◆ downstream_next_event_tag()

tag_t downstream_next_event_tag ( scheduling_node_t * node,
uint16_t node_sending_new_net_id )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Determine whether the specified scheduling node is needed to receive a downstream next event tag (DNET), and, if so, return the details.

This function is called upon receiving a NET from one of the specified node's downstream nodes.

This function calculates the minimum tag M over all downstream scheduling nodes of the most recent NET from that node minus the "after delay" (see function get_dnet_candidate). If M is earlier than the startup tag, then set the result as the NEVER_TAG.

Parameters
nodeThe target node that may receive a new DNET.
node_sending_new_net_idThe ID of the node that sends a new NET. If this node's new NET does not change the DNET value, we can exit this function immediately. If it does, we have to look up the target node's downstream federates to compute the exact new DNET value.
Returns
If needed, return the tag value. Otherwise, return the NEVER_TAG.

◆ earliest_future_incoming_message_tag()

tag_t earliest_future_incoming_message_tag ( scheduling_node_t * e)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Given a node (enclave or federate), find the tag of the earliest possible incoming message (EIMT) from upstream enclaves or federates, which will be the smallest upstream NET plus the least delay.

This could be NEVER_TAG if the RTI has not seen a NET from some upstream node.

Parameters
eThe target node.
Returns
The earliest possible incoming message tag.

◆ eimt_strict()

tag_t eimt_strict ( scheduling_node_t * e)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Given a node (enclave or federate), find the earliest incoming message tag (EIMT) from any immediately upstream node that is not part of zero-delay cycle (ZDC).

These tags are treated strictly by the RTI when deciding whether to grant a PTAG. Since the upstream node is not part of a ZDC, there is no need to block on the input from that node since we can simply wait for it to complete its tag without chance of introducing a deadlock. This will return FOREVER_TAG if there are no non-ZDC upstream nodes.

Parameters
eThe target node.
Returns
The earliest possible incoming message tag from a non-ZDC upstream node.

◆ federate_info_thread_TCP()

void * federate_info_thread_TCP ( void * fed)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Thread handling TCP communication with a federate.

Parameters
fedA pointer to the federate's struct that has the socket descriptor for the federate.
Returns
NULL.

◆ free_local_rti()

void free_local_rti ( )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_local.h>

Free memory associated with the local the RTI and the local RTI iself.

◆ free_scheduling_nodes()

void free_scheduling_nodes ( scheduling_node_t ** scheduling_nodes,
uint16_t number_of_scheduling_nodes )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Free dynamically allocated memory on the scheduling nodes and the scheduling node array itself.

Parameters
scheduling_nodesThe scheduling nodes.
number_of_scheduling_nodesThe number of scheduling nodes.

◆ get_dnet_candidate()

tag_t get_dnet_candidate ( tag_t next_event_tag,
tag_t minimum_delay )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Find the tag g that is the latest tag that satisfies lf_tag_add(g, minimum_delay) < next_event_tag.

This function behaves like the tag subtraction, next_event_tag - minimum_delay. minimum_delay cannot be NEVER.

This function is called in function downstream_next_event_tag.

Parameters
next_event_tagThe next event tag of a downstream node.
minimum_delayThe minimum delay between the target upstream node and the downstream node.

◆ handle_address_ad()

void handle_address_ad ( uint16_t federate_id)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Handle address advertisement messages (.

See also
MSG_TYPE_ADDRESS_ADVERTISEMENT in net_common.h).

The federate is expected to send its server port number as the next byte. The RTI will keep a record of this number in the .server_port field of the _RTI.federates[federate_id] array of structs.

The server_hostname and server_ip_addr fields are assigned in lf_connect_to_federates() upon accepting the socket from the remote federate.

This function assumes the caller does not hold the mutex.

Parameters
federate_idThe id of the remote federate that is sending the address advertisement.

◆ handle_address_query()

void handle_address_query ( uint16_t fed_id)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Handle address query messages.

This function reads the body of a MSG_TYPE_ADDRESS_QUERY (

See also
net_common.h) message which is the requested destination federate ID and replies with the stored port value for the socket server of that federate. The port values are initialized to -1. If no MSG_TYPE_ADDRESS_ADVERTISEMENT message has been received from the destination federate, the RTI will simply reply with -1 for the port. The sending federate is responsible for checking back with the RTI after a period of time.
Parameters
fed_idThe federate sending a MSG_TYPE_ADDRESS_QUERY message.

◆ handle_latest_tag_confirmed()

void handle_latest_tag_confirmed ( federate_info_t * fed)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Handle a latest tag confirmed (LTC) message.

This function assumes the caller does not hold the mutex.

See also
MSG_TYPE_LATEST_TAG_CONFIRMED in net_common.h.
Parameters
fedThe federate that has completed a logical tag.

◆ handle_next_event_tag()

void handle_next_event_tag ( federate_info_t * fed)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Handle a next event tag (NET) message.

This function assumes the caller does not hold the mutex.

See also
MSG_TYPE_NEXT_EVENT_TAG in net_common.h.
Parameters
fedThe federate sending a NET message.

◆ handle_physical_clock_sync_message()

void handle_physical_clock_sync_message ( federate_info_t * my_fed,
socket_type_t socket_type )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Handle clock synchronization T3 messages from federates.

These will come in on the TCP channel during initialization and on the UDP channel subsequently. In both cases, this function will reply with a T4 message. If the channel is the UDP channel, then it will follow the T4 message immediately with a "coded probe" message, which will be used by the federate to decide whether to discard this clock synchronization round.

Parameters
my_fedThe sending federate.
socket_typeThe RTI's socket type used for the communication (TCP or UDP)

◆ handle_port_absent_message()

void handle_port_absent_message ( federate_info_t * sending_federate,
unsigned char * buffer )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Handle a port absent message being received rom a federate via the RIT.

This function assumes the caller does not hold the mutex.

Parameters
sending_federateThe sending federate.
bufferThe buffer to read into (the first byte is already there).

◆ handle_stop_request_message()

void handle_stop_request_message ( federate_info_t * fed)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Handle a MSG_TYPE_STOP_REQUEST message.

This function assumes the caller does not hold the mutex.

Parameters
fedThe federate sending a MSG_TYPE_STOP_REQUEST message.

◆ handle_stop_request_reply()

void handle_stop_request_reply ( federate_info_t * fed)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Handle a MSG_TYPE_STOP_REQUEST_REPLY message.

This function assumes the caller does not hold the mutex.

Parameters
fedThe federate replying the MSG_TYPE_STOP_REQUEST

◆ handle_timed_message()

void handle_timed_message ( federate_info_t * sending_federate,
unsigned char * buffer )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Handle a timed message being received from a federate by the RTI to relay to another federate.

This function assumes the caller does not hold the mutex.

Parameters
sending_federateThe sending federate.
bufferThe buffer to read into (the first byte is already there).

◆ handle_timestamp()

void handle_timestamp ( federate_info_t * my_fed)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

A function to handle timestamp messages.

This function assumes the caller does not hold the mutex.

◆ initialize_enclave_info()

void initialize_enclave_info ( enclave_info_t * enclave,
int idx,
environment_t * env )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_local.h>

Initialize the enclave object.

Parameters
enclaveThe enclave object to initialize.
idxThe index of the enclave.
envThe environment of the enclave.

◆ initialize_federate()

void initialize_federate ( federate_info_t * fed,
uint16_t id )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Initialize the federate with the specified ID.

Parameters
fedThe federate to initialize.
idThe federate ID.

◆ initialize_local_rti()

void initialize_local_rti ( environment_t * envs,
int num_envs )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_local.h>

Dynamically create and initialize the local RTI.

Parameters
envsArray of environments.
num_envsNumber of environments.

◆ initialize_RTI()

void initialize_RTI ( rti_remote_t * rti)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Initialize the _RTI instance.

Parameters
rtiThe RTI instance to initialize.

◆ initialize_rti_common()

void initialize_rti_common ( rti_common_t * rti_common)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Initialize the fields of the rti_common struct.

It also stores the pointer to the struct and uses it internally.

Parameters
rti_commonThe rti_common_t struct to initialize.

◆ initialize_scheduling_node()

void initialize_scheduling_node ( scheduling_node_t * e,
uint16_t id )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Initialize the scheduling node with the specified ID.

Parameters
eThe scheduling node.
idThe scheduling node ID.

◆ invalidate_min_delays()

void invalidate_min_delays ( )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Invalidate the min_delays, num_min_delays, and the fields that indicate cycles of all nodes.

This should be called whenever the structure of the connections have changed.

◆ is_in_cycle()

bool is_in_cycle ( scheduling_node_t * node)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Return true if the node is in a cycle (possibly a zero-delay cycle).

Parameters
nodeThe node.

◆ is_in_zero_delay_cycle()

bool is_in_zero_delay_cycle ( scheduling_node_t * node)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Return true if the node is in a zero-delay cycle.

Parameters
nodeThe node.

◆ lf_connect_to_federates()

void lf_connect_to_federates ( int socket_descriptor)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Wait for one incoming connection request from each federate, and, upon receiving it, create a thread to communicate with that federate.

Return when all federates have connected.

Parameters
socket_descriptorThe socket on which to accept connections.

◆ lf_get_downstream_of()

int lf_get_downstream_of ( int enclave_id,
uint16_t ** result )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_local.h>

Get the array of ids of enclaves directly downstream of the specified enclave.

This updates the specified result pointer to point to a statically allocated array of IDs and returns the length of the array. The implementation is code-generated.

Parameters
enclave_idThe enclave for which to report downstream IDs.
resultThe pointer to dereference and update to point to the resulting array.
Returns
The number of direct downstream enclaves.

◆ lf_get_upstream_delay_of()

int lf_get_upstream_delay_of ( int enclave_id,
interval_t ** result )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_local.h>

Retrieve the delays on the connections to direct upstream enclaves.

This updates the result pointer to point to a statically allocated array of delays. The implementation is code-generated.

Parameters
enclave_idThe enclave for which to search for upstream delays.
resultThe pointer to dereference and update to point to the resulting array.
Returns
int The number of direct upstream enclaves.

◆ lf_get_upstream_of()

int lf_get_upstream_of ( int enclave_id,
uint16_t ** result )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_local.h>

Get the array of ids of enclaves directly upstream of the specified enclave.

This updates the specified result pointer to point to a statically allocated array of IDs and returns the length of the array. The implementation is code-generated.

Parameters
enclave_idThe enclave for which to report upstream IDs.
resultThe pointer to dereference and update to point to the resulting array.
Returns
The number of direct upstream enclaves.

◆ notify_advance_grant_if_safe()

void notify_advance_grant_if_safe ( scheduling_node_t * e)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Either send to a federate or unblock an enclave to give it a tag.

This function requires two different implementations, one for enclaves and one for federates.

This assumes the caller holds the RTI mutex.

Parameters
eThe scheduling node.

◆ notify_downstream_advance_grant_if_safe()

void notify_downstream_advance_grant_if_safe ( scheduling_node_t * e,
bool visited[] )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

For all scheduling nodes downstream of the specified node, determine whether they should be notified of a TAG or PTAG and notify them if so.

This assumes the caller holds the RTI mutex.

Parameters
eThe upstream node.
visitedAn array of booleans used to determine whether a node has been visited (initially all false).

◆ notify_downstream_next_event_tag()

void notify_downstream_next_event_tag ( scheduling_node_t * e,
tag_t tag )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Notify a downstream next event tag (DNET) signal to the specified scheduling node.

Parameters
eThe target node.
tagThe downstream next event tag for e.

◆ notify_provisional_tag_advance_grant()

void notify_provisional_tag_advance_grant ( scheduling_node_t * e,
tag_t tag )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Notify a provisional tag advance grant (PTAG) message to the specified scheduling node.

Do not notify it if a previously sent PTAG or TAG was greater or equal.

This function will keep a record of this PTAG in the node's last_provisionally_granted field.

This function assumes that the caller holds the RTI mutex.

Parameters
eThe scheduling node.
tagThe tag to grant.

◆ notify_tag_advance_grant()

void notify_tag_advance_grant ( scheduling_node_t * e,
tag_t tag )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Notify a tag advance grant (TAG) message to the specified scheduling node.

Do not notify it if a previously sent PTAG was greater or if a previously sent TAG was greater or equal.

This function will keep a record of this TAG in the node's last_granted field.

This function assumes that the caller holds the RTI mutex.

Parameters
eThe scheduling node.
tagThe tag to grant.

◆ process_args()

int process_args ( int argc,
const char * argv[] )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Process the command-line arguments.

If the command line arguments are not understood, then print a usage message and return 0. Otherwise, return 1.

Returns
1 if the arguments processed successfully, 0 otherwise.

◆ process_clock_sync_args()

int process_clock_sync_args ( int argc,
const char * argv[] )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Process command-line arguments related to clock synchronization.

This will return the last read position of argv if all related arguments are parsed or an invalid argument is read.

Parameters
argcThe number of arguments in the list.
argvThe list of arguments as a string.
Returns
Current position (head) of argv.

◆ respond_to_erroneous_connections()

void * respond_to_erroneous_connections ( void * nothing)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Thread to respond to new connections, which could be federates of other federations who are attempting to join the wrong federation.

Parameters
nothingNothing needed here.
Returns
NULL.

◆ rti_logical_tag_complete_locked()

void rti_logical_tag_complete_locked ( enclave_info_t * enclave,
tag_t completed )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_local.h>

Inform the local RTI that enclave has completed tag completed.

This will update the data structures and can release other enclaves waiting on a TAG.

This assumes the caller is holding the environment mutex of the source enclave.

Parameters
enclaveThe enclave
completedThe tag just completed by the enclave.

◆ rti_next_event_tag_locked()

tag_t rti_next_event_tag_locked ( enclave_info_t * enclave,
tag_t next_event_tag )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_local.h>

Notify the local RTI of a next event tag (NET).

This function call may block. A call to this function serves two purposes. 1) It is a promise that, unless receiving events from other enclaves, this enclave will not produce any event until the next_event_tag (NET) argument. 2) It is a request for permission to advance the logical tag of the enclave until the NET.

This function call will block until the enclave has been granted a TAG, which might not be the tag requested.

This assumes the caller is holding the environment mutex of the source enclave.

Parameters
enclaveThe enclave requesting to advance to the NET.
next_event_tagThe tag of the next event in the enclave
Returns
tag_t A tag which the enclave can safely advance its time to. It might be smaller or larger than the requested tag.

◆ rti_update_other_net_locked()

void rti_update_other_net_locked ( enclave_info_t * src,
enclave_info_t * target,
tag_t net )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_local.h>

Notify the local RTI to update the next event tag (NET) of a target enclave.

This function is called after scheduling an event onto the event queue of another enclave. The source enclave must call this function to potentially update the NET of the target enclave.

This assumes the caller is holding the environment mutex of the target enclave.

Parameters
srcThe enclave that has scheduled an event.
targetThe enclave of which we want to update the NET of.
netThe proposed next event tag.

◆ send_physical_clock()

void send_physical_clock ( unsigned char message_type,
federate_info_t * fed,
socket_type_t socket_type )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Take a snapshot of the physical clock time and send it to federate fed_id.

This version assumes the caller holds the mutex lock.

Parameters
message_typeThe type of the clock sync message (see net_common.h).
fedThe federate to send the physical time to.
socket_typeThe socket type (TCP or UDP).

◆ send_reject()

void send_reject ( int * socket_id,
unsigned char error_code )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Send a MSG_TYPE_REJECT message to the specified socket and close the socket.

Parameters
socket_idPointer to the socket ID.
error_codeAn error code.

◆ start_rti_server()

int32_t start_rti_server ( uint16_t port)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Start the socket server for the runtime infrastructure (RTI) and return the socket descriptor.

Parameters
portThe port on which to listen for socket connections, or 0 to use the default port range.

◆ tag_advance_grant_if_safe()

tag_advance_grant_t tag_advance_grant_if_safe ( scheduling_node_t * e)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Determine whether the specified scheduling node is eligible for a tag advance grant, (TAG) and, if so, return the details.

This is called upon receiving a LTC, NET or resign from an upstream node.

This function calculates the minimum M over all upstream scheduling nodes of the "after" delay plus the most recently received LTC from that node. If M is greater than the most recent TAG to e or greater than or equal to the most recent PTAG, then return TAG(M).

If the above conditions do not result in returning a TAG, then find the minimum M of the earliest possible future message from upstream federates. This is calculated by transitively looking at the most recently received NET calls from upstream scheduling nodes. If M is greater than the NET of e or the most recent PTAG to e, then return a TAG with tag equal to the NET of e or the PTAG. If M is equal to the NET of the federate, then return PTAG(M).

This should be called whenever an immediately upstream federate sends to the RTI an LTC (latest tag confirmed), or when a transitive upstream federate sends a NET (Next Event Tag) message. It is also called when an upstream federate resigns from the federation.

This function assumes that the caller holds the RTI mutex.

Parameters
eThe scheduling node.
Returns
If granted, return the tag value and whether it is provisional. Otherwise, return the NEVER_TAG.

◆ tracepoint_rti_from_federate()

void tracepoint_rti_from_federate ( trace_event_t event_type,
int fed_id,
tag_t * tag )

#include </Users/runner/work/reactor-c/reactor-c/include/core/tracepoint.h>

Trace RTI receiving a message from a federate.

Parameters
event_typeThe type of event. Possible values are:
fed_idThe fedaerate ID.
tagPointer to the tag that has been sent, or NULL.

◆ tracepoint_rti_to_federate()

void tracepoint_rti_to_federate ( trace_event_t event_type,
int fed_id,
tag_t * tag )

#include </Users/runner/work/reactor-c/reactor-c/include/core/tracepoint.h>

Trace RTI sending a message to a federate.

Parameters
event_typeThe type of event. Possible values are:
fed_idThe fedaerate ID.
tagPointer to the tag that has been sent, or NULL.

◆ update_federate_next_event_tag_locked()

void update_federate_next_event_tag_locked ( uint16_t federate_id,
tag_t next_event_tag )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Update the next event tag of federate federate_id.

It will update the recorded next event tag of federate federate_id to the minimum of next_event_tag and the minimum tag of in-transit messages (if any) to the federate.

Will try to see if the RTI can grant new TAG or PTAG messages to any downstream federates based on this new next event tag.

This function assumes that the caller is holding the _RTI.mutex.

Parameters
federate_idThe id of the federate that needs to be updated.
next_event_tagThe next event tag for federate_id.

◆ update_min_delays()

void update_min_delays ( )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

If necessary, update the min_delays and the fields that indicate cycles.

These fields will be updated only if they have not been previously updated or if invalidate_min_delays has been called since they were last updated.

◆ update_scheduling_node_next_event_tag_locked()

void update_scheduling_node_next_event_tag_locked ( scheduling_node_t * e,
tag_t next_event_tag )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_common.h>

Update the next event tag of an scheduling node.

This will notify downstream scheduling nodes with a TAG or PTAG if appropriate.

This function assumes that the caller is holding the RTI mutex.

Parameters
eThe scheduling node.
next_event_tagThe next event tag for e.

◆ usage()

void usage ( int argc,
const char * argv[] )

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Print a usage message.

Parameters
argcThe number of arguments in the list.
argvThe list of arguments as a string.

◆ wait_for_federates()

void wait_for_federates ( int socket_descriptor)

#include </Users/runner/work/reactor-c/reactor-c/core/federated/RTI/rti_remote.h>

Start the runtime infrastructure (RTI) interaction with the federates and wait for the federates to exit.

Parameters
socket_descriptorThe socket descriptor returned by start_rti_server().