Skip to main content
Version: 0.6.0

Distributed Execution

This article has examples in the following target languages:

note

Distributed execution of Lingua Franca programs is at an early stage of development with many missing capabilities and a rather brittle execution. It is ready for experimentation, but not yet for deployment of serious systems. The capability has been tested on macOS and Linux, and there are no plans currently to support Windows systems.

A distributed Lingua Franca program is called a federation. Each reactor within the main reactor is called a federate. The LF compiler generates a separate program for each federate and synthesizes the code for the federates to communicate. The federates can be distributed across networks and eventually will be able to be written in different target languages, although this is not yet supported.

In addition to the federates, there is a program called the RTI, for runtime infrastructure, that coordinates startup and shutdown and may, if the coordination is centralized, mediate communication. The RTI needs to be compiled and installed separately on the system before any federation can execute.

It is possible to encapsulate federates in Docker containers for deployment. See containerized execution.

Installation of the RTI​

Federated execution requires installation of a separate stand-alone program called the Runtime Infrastructure or RTI. At the current time, the only way to install this is from source files:

git clone https://github.com/lf-lang/reactor-c.git
cd reactor-c/core/federated/RTI/
mkdir build && cd build
cmake ../
make
sudo make install

The above will create a program called RTI and install it at /usr/local/bin/RTI. Once this program is available in your path, you can compile and execute federated Lingua Franca programs using Epoch, VS Code, or the command-line tools. For more details, see the README file.

Minimal Example​

A minimal federated execution is specified by using the federated keyword instead of main for the main federate. An example is given below:

target C reactor Count { output out: int state c: int = 0 timer t(0, 1 sec) reaction(t) -> out {= lf_set(out, self->c++); =} } reactor Print { input in: int reaction(in) {= lf_print("Received: %d at (%lld, %d)", in->value, lf_time_logical_elapsed(), lf_tag().microstep ); =} } federated reactor { c = new Count() p = new Print() c.out -> p.in }

The federated keyword tells the code generator that the program is to be split into several distinct programs, one for each top level reactor.

When you run the code generator on src/Federated.lf containing the above code, the following three programs will appear:

bin/Federated
bin/Federated_s
bin/Federated_d

The root name, Federated, is the name of the .lf file from which these are generated (and the name of the main reactor, which is required to match if it is specified). The suffixes "_s" and "_d" come from the names of the top-level instances. There will always be one federate for each top-level reactor instance.

To run the program, you can simply run bin/Federated, which is a bash script that launches the RTI and two other programs, Federated_s and Federated_d. Alternatively, you can manually execute the RTI followed by the two federate programs by starting them on the command line. It is best to use three separate terminal windows (so that outputs from the three programs do not get jumbled together) to execute the following commands:

RTI -n 2
bin/Federated_s
bin/Federated_d

The -n argument to the RTI specifies that there it should expect two federates to join the federation.

Upon running the program, you will see information printed about the starting and ending of the federation, and buried in the output will be this line:

Federate 1: Received: Hello World!

The prefix Federate 1 is automatically added by the built-in lf_print function to help disambiguate the outputs from multiple concurrent federates.

Federation ID​

You may have several federations running on the same machine(s) or even several instances of the same federation. In this case, it is necessary to distinguish between the federations. To accomplish this, you can pass a -i or --id parameter to the RTI and its federates with an identifier that is unique to the particular federation. For example,

RTI -n 2 -i myFederation
bin/Federated_s -i myFederation
bin/Federated_d -i myFederation

Each federate must have the same ID as the RTI in order to join the federation. The bash script that executes each of the components of the federation automatically generates a unique federation ID each time you run it.

Coordinated Start​

When the above programs execute, each federate registers with the RTI. When all expected federates have registered, the RTI broadcasts to the federates the logical time at which they should start execution. Hence, all federates start at the same logical time.

The starting logical time is determined as follows. When each federate starts executing, it sends its current physical time (drawn from its real-time clock) to the RTI. When the RTI has heard from all the federates, it chooses the largest of these physical times, adds a fixed offset (currently one second), and broadcasts the resulting time to each federate.

When a federate receives the starting time from the RTI, if it is running in realtime mode (the default), then it will wait until its local physical clock matches or exceeds that starting time. Thus, to the extent that the machines have synchronized clocks, the federates will all start executing at roughly the same physical time, a physical time close to the starting logical time.

Coordinated Shutdown​

Coordinating the shutdown of a distributed program is discussed in Termination.

Communication Between Federates​

When one federate sends data to another, by default, the timestamp at the receiver will match the timestamp at the sender. You can also specify a logical delay on the communication using the after keyword. For example, if we had instead specified

s.out -> p.in after 200 msec;

then the timestamp at the receiving end will be incremented by 200 msec compared to the timestamp at the sender.

The preservation of timestamps across federates implies some constraints (unless you use physical connections). How these constraints are managed depends on whether you choose centralized or decentralized coordination.

Centralized Coordination​

In the centralized mode of coordination (the default), the RTI regulates the advancement of time in each of the federates in order to ensure that the logical time semantics of Lingua Franca is respected. If the p federate above has an event with timestamp t that it wants to react to (it is the earliest event in its event queue), then it needs to get the OK from the RTI to advance its logical time to t. The RTI grants this time advance only when it can assure that p has received all messages that it will ever receive with timestamps t or less.

First, note that, by default, logical time on each federate never advances ahead of physical time, as reported by its local physical clock. Consider the consequences for the above connection. Suppose the timestamp of the message sent by s is t. This message cannot be sent before the local clock at s reaches t and also cannot be sent before the RTI grants to s a time advance to t. Since s has no federates upstream of it, the RTI will always grant it such a time advance (in fact, it does not even wait for a response from the RTI).

Suppose that the communication latency is L. That is, it takes L time units (in physical time) for a message to traverse the network. Then the p federate will not see the message from s before physical time t + L, where this physical time is measured by the physical clock on s's host. If that clock differs from the clock on p's host by E, then p will see the message at physical time t + E + L, as measured by its own clock. Let the value of the after specification (200 msec above) be a. Then the timestamp of the received message is t + a. The relationship between logical and physical times at the receiving end (the p federate), therefore, will depend on the relationship between a and E + L. If, for example, E + L > a, then federate p will lag behind physical time by at least E + L - a.

Assume the RTI has granted a time advance to t to federate s. Hence, s is able to send a message with timestamp t. The RTI now cannot grant any time advance to p that is greater than or equal to t + a until the message has been delivered to p. In centralized coordination, all messages flow through the RTI, so the RTI will deliver a Tag Advance Grant (TAG) message to p only after it has delivered the message.

If a > E + L, then the existence of this communication does not cause p's logical time to lag behind physical time. This means that if we were to modify p to have a physical action, the RTI will be able to immediately grant a TAG to p to advance the timestamp of that physical action. However, if a < E + L, then the RTI will delay granting a time advance to p by at least E + L - a. Hence, E + L - a represents an additional latency in the processing of physical actions! This latency could present a problem for meeting deadlines. For this reason, if there are physical actions or deadlines at a federate that receives network messages, it is desirable to have after delays on the connection to that federate larger than any expected E + L. This way, there is no additional latency to processing physical actions at this federate and no additional risk of missing deadlines.

If, in addition, the physical clocks on the hosts are allowed to drift with respect to one another, then E can grow without bound, and hence the lag between logical time and physical time in processing events can grow without bound. This is mitigated either by hosts that themselves realize some clock synchronization algorithm, such as NTP or PTP, or by utilizing Lingua Franca's own built in clock synchronization. If the federates lack physical actions and deadlines, however, then unsynchronized clocks present no semantic problem if you are using centralized coordination. However, because of logical time chases physical time, federates will slow to match the slowest clock of federates upstream of them.

With centralized coordination, all messages (except those on physical connections) go through the RTI. This can create a bottleneck and a single point of failure. To avoid this bottleneck, you can use decentralized coordination.

Decentralized Coordination​

The default coordination between mechanisms is centralized, equivalent to specifying the target property:

  coordination: centralized

An alternative is decentralized coordination, which extends a technique realized in PTIDES and Google Spanner, a globally distributed database system:

  coordination: decentralized

With decentralized coordination, the RTI coordinates startup, shutdown, and clock synchronization, but is otherwise not involved in the execution of the distributed program.

In decentralized coordination, each federate and some reactions have a safe-to-process (STP) offset. When one federate communicates with another, it does so directly through a dedicated socket without going through the RTI. Moreover, it does not consult the RTI to advance logical time. Instead, it can advance its logical time to t when its physical clock matches or exceeds t + STP.

By default, the STP is zero. An STP of zero is OK for any federate where either every logical connection into the federate has a sufficiently large after clause, or the federate has only one upstream federate sending it messages and it has no local timers or actions. The value of the after delay on each connection must exceed the sum of the clock synchronization error E, a bound L on the network latency, and the time lag on the sender D (the physical time at which it sends the message minus the timestamp of the message). The sender's time lag D can be enforced by using a deadline. For example:

target C { timeout: 5 sec, coordination: decentralized } import Count, Print from "Federated.lf" reactor PrintTimer extends Print { timer t(0, 1 sec) reaction(t) {= lf_print("Timer ticked at (%lld, %d).", lf_time_logical_elapsed(), lf_tag().microstep ); =} } federated reactor { c = new Count() p = new PrintTimer() c.out -> p.in after 10 msec }

This example inherits from the Federated example above. In this example, as long as the messages from federate c arrive at federate p within 10 msec, all messages will be processed in tag order, as with an unfederated program.

An alternative to the after delays is to add an STP offset to downstream federates, as in the following example:

target C { timeout: 5 sec, coordination: decentralized } import Count, Print from "Federated.lf" reactor PrintTimer(STP_offset: time = 10 msec) extends Print { timer t(0, 1 sec) reaction(t) {= lf_print("Timer ticked at (%lld, %d).", lf_time_logical_elapsed(), lf_tag().microstep ); =} } federated reactor { c = new Count() p = new PrintTimer() c.out -> p.in }

Here, a parameter named STP_offset (not case sensitive) gives a time value, and the federate waits this specified amount of time (physical time) beyond a logical time t before advancing its logical time to t. In the above example, reactions to the timer events will be delayed by the amount specified by the STP_offset parameter. Just as with the use of after, if the STP_offset exceeds the sum of network latency, clock synchronization error, and execution times, then all events will be processed in tag order.

Of course, the assumptions about network latency, etc., can be violated in practice. Analogous to a deadline violation, Lingua Franca provides a mechanism for handling such a violation by providing an STP violation handler. The pattern is:

reaction(in) {= // User code =} STP (0) {= // Error handling code =}

If the tag at which this reaction is to be invoked (the value returned by lf_tag()) exceeds the tag of an incoming message in (the current tag has already advanced beyond the intended tag of in), then the STP violation handler will be invoked instead of the normal reaction. Within the body of the STP handler, the code can access the intended tag of in using in->intended_tag, which has two fields, a timestamp in->intended_tag.time and a microstep in->intended_tag.microstep. The code can then ascertain the severity of the error and act accordingly. For example:

target C { timeout: 5 sec, coordination: decentralized } import Count from "Federated.lf" reactor PrintTimer { timer t(0, 1 sec) input in: int reaction(in) {= lf_print("Received: %d at (%lld, %d)", in->value, lf_time_logical_elapsed(), lf_tag().microstep ); =} STP(0) {= lf_print("****** STP violation handler invoked at (%lld, %d). " "Intended tag was (%lld, %d).", lf_time_logical_elapsed(), lf_tag().microstep, in->intended_tag.time - lf_time_start(), in->intended_tag.microstep ); =} reaction(t) {= lf_print("Timer ticked at (%lld, %d).", lf_time_logical_elapsed(), lf_tag().microstep ); =} } federated reactor { c = new Count() p = new PrintTimer() c.out -> p.in after 10 msec }

For more advanced users, the LF API provides two functions that can be used to dynamically adjust the STP:

interval_t lf_get_stp_offset();
void lf_set_stp_offset(interval_t offset);

Using these functions, however, is a pretty advanced operation.

Physical Connections​

Coordinating the execution of the federates so that timestamps are preserved is tricky. If your application does not require the deterministic execution that results from preserving the timestamps, then you can alternatively specify a physical connection as follows:

source.out ~> print.in;

The tilde specifies that the timestamp of the sender should be discarded. A new timestamp will be assigned at the receiving end based on the local physical clock, much like a physical action. To distinguish it from a physical connection, the normal connection is called a logical connection.

There are a number of subtleties with physical connections. One is that if you specify an after clause, for example like this:

count.out ~> print.in after 10 msec;

then what does this mean? At the receiving end, the timestamp assigned to the incoming event will be the current physical time plus 10 msec.

Prerequisites for Distributed Execution​

In the above example, all of the generated programs expect to run on localhost. This is the default. With these defaults, every federate has to run on the same machine as the RTI because localhost is not a host that is visible from other machines on the network. In order to run federates or the RTI on remote machines, you can specify a domain name or IP address for the RTI and/or federates.

In order for a federated execution to work, there is some setup required on the machines to be used. First, each machine must be running on ssh server. On a Linux machine, this is typically done with a command like this:

  sudo systemctl <start|enable> ssh.service

Enable means to always start the service at startup, whereas start means to just start it this once. On macOS, open System Preferences from the Apple menu and click on the "Sharing" preference panel. Select the checkbox next to "Remote Login" to enable it.

It will also be much more convenient if the launcher does not have to enter passwords to gain access to the remote machine. This can be accomplished by installing your public key (typically found in ~/.ssh/id_rsa.pub) in ~/.ssh/authorized_keys on the remote host.

Second, the RTI must be installed on the remote machine. See instructions for installation the RTI.

Specifying RTI Hosts​

You can specify a domain name on which the RTI should run as follows:

federated reactor DistributedCount at www.example.com { ... }

You can alternatively specify an IP address (either IPv4 or IPv6):

federated reactor DistributedCount at 10.0.0.198 { ... }

By default, the RTI starts a socket server on port 15045, if that port is available, and increments the port number by 1 until it finds an available port. The number of increments is limited by a target-specific number. In the C target, in rti.h, STARTING_PORT defines the number 15045 and PORT_RANGE_LIMIT limits the range of ports attempted (currently 1024).

You can also specify a port for the RTI to use as follows:

federated reactor DistributedCount at 10.0.0.198:8080 { ... }

If you specify a specific port, then it will use that port if it is available and fail otherwise. The above changes this to port 8080.

Note that if the machine uses DHCP to obtain its address, then the generated code may not work in the future since the address of the machine may change in the future.

Address 0.0.0.0: The default host, localhost is used if no address is specified. Using localhost requires that the generated federates run on the local machine. This is ideal for testing. If you use 0.0.0.0, then you are also specifying that the local machine (the one performing the code generation) will be the host, but now the process(es) running on this local machine can establish connections with processes on remote machines. The code generator will determine the IP address of the local machine, and any other hosts that need to communicate with reactors on the local host will use the current IP address of that local host at the time of code generation.

Specifying Federate Hosts​

A federate may be mapped to a particular remote machine using a syntax like this:

count = new Count() at user@host:port/path;

The port is ignored in centralized mode because all communication is routed through the RTI, but in decentralized mode it will specify the port on which a socket server listens for incoming connections from other federates.

If any federate has such a remote designator, then a Federation_distribute.sh shell script will be generated. This script will distribute the generated code for the RTI to the remote machine at the specified directory.

You can also specify a user name on the remote machine for cases where the username will not match whoever launches the federation:

federated reactor DistributedCount at user@10.0.0.198:8080 { ... }

The general form of the host designation is

federated reactor DistributedCount at user@host:port/path { ... }

where user@, :port, and /path are all optional. The path specifies the directory on the remote machine (relative to the home directory of the user) where the generated code will be put. The host should be an IPv4 address (e.g. 93.184.216.34), IPv6 address (e.g. 2606:2800:220:1:248:1893:25c8:1946), or a domain name (e.g. www.example.com). It can also be localhost or 0.0.0.0. The host can be remote as long as it is accessible from the machine where the programs will be started.

If user@ is not given, then it is assumed that the username on the remote host is the same as on the machine that launches the programs. If :port is not given, then it defaults to port 15045. If /path is not given, then ~user/LinguaFrancaRemote will be the root directory on the remote machine.

Clock Synchronization​

Both centralized and decentralized coordination have some reliance on clock synchronization. First, the RTI determines the start time of all federates, and the actually physical start time will differ by the extent that their physical clocks differ. This is particularly problematic if clocks differ by hours or more, which is certainly possible. If the hosts on which you are running run a clock synchronization algorithm, such as NTP or PTP, then you may not need to be concerned about this at all. Windows, Mac, and most versions of Linux, by default, run NTP, which synchronizes their clocks to some remote host. NTP is not particularly precise, however, so clock synchronization error can be hundreds of milliseconds or larger. PTP protocols are much more precise, so if your hosts derive their physical clocks from a PTP implementation, then you probably don't need to do anything further. Unfortunately, as of this writing, even though almost all networking hardware provides support for PTP, few operating systems utilize it. We expect this to change when people have finally understood the value of precise clock synchronization.

If your host is not running any clock synchronization, or if it is running only NTP and your application needs tighter latencies, then Lingua Franca's own built-in clock synchronization may provide better precision, depending on your network conditions. Like NTP, it realizes a software-only protocol, which are much less precise than hardware-supported protocols such as PTP, but if your hosts are on the same local area network, then network conditions may be such that the performance of LF clock synchronization will be much better than NTP. If your network is equipped with PTP, you will want to disable the clock synchronization in Lingua Franca by specifying in your target properties the following:

clock-sync: off

When a federation is mapped onto multiple machines, then, by default, any federate mapped to a machine that is not the one running the RTI will attempt during startup to synchronize its clock with the one on the machine running the RTI. The determination of whether the federate is running on the same machine is determined by comparing the string that comes after the at clause between the federate and the RTI. If they differ at all, then they will be treated as if the federate is running on a different machine even if it is actually running on the same machine. This default behavior can be obtained by either specifying nothing in the target properties or saying:

clock-sync: initial

This results in clock synchronization being done during startup only. To account for the possibility of your clocks drifting during execution of the program, you can alternatively specify:

clock-sync: on

With this specification, in addition to synchronization during startup, synchronization will be redone periodically during program execution.

Clock Synchronization Options​

A number of options can be specified using the clock-sync-options target parameter. For example:

clock-sync-options: {local-federates-on: true, test-offset: 200 msec}

The supported options are:

  • local-federates-on: Should be true or false. By default, if a federate is mapped to the same host as the RTI (using the at keyword), then clock synchronization is turned off. This assumes that the federate will be using the same clock as the RTI, so there is no point in performing clock synchronization. However, sometimes it is useful to force clock synchronization to be run even in this case, for example to test the performance of clock synchronization. To force clock synchronization on in this case, set this option to true.

  • test-offset: The value should be a time value with units, e.g. 200 msec. This will establish an artificial fixed offset for each federate's clock of one plus the federate ID times the time value given. For example, with the value 200 msec, a fixed offset of 200 milliseconds will be set on the clock for federate 0, 400 msec on the clock of federate 1, etc.

  • period: A time value (with units) that specifies how often runtime clock synchronization will be performed if it is turned on. The default is 5 msec.

  • attenuation: A positive integer specifying a divisor applied to the estimated clock error during runtime clock synchronization when adjusting the clock offset. The default is 10. Making this number bigger reduces each adjustment to the clock. Making the number equal to 1 means that each round of clock synchronization fully applies its estimated clock synchronization error.

  • trials: The number of rounds of message exchange with the RTI in each clock synchronization round. This defaults to 10.