reactor_rt/scheduler/
scheduler_impl.rs

1/*
2 * Copyright (c) 2021, TU Dresden.
3 *
4 * Redistribution and use in source and binary forms, with or without modification,
5 * are permitted provided that the following conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above copyright notice,
8 *    this list of conditions and the following disclaimer.
9 *
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 *    this list of conditions and the following disclaimer in the documentation
12 *    and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
15 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
16 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
17 * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
19 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
21 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
22 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23 */
24
25//! Home of the scheduler component.
26
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::Arc;
29
30use crossbeam_channel::reconnectable::*;
31
32use super::assembly_impl::RootAssembler;
33use super::*;
34use crate::assembly::*;
35use crate::scheduler::dependencies::DataflowInfo;
36use crate::*;
37
38/// Construction parameters for the scheduler.
39///
40/// LFC uses target properties to set them. With the "cli"
41/// feature, generated programs also feature CLI options to
42/// override the defaults at runtime.
43#[derive(Default)]
44pub struct SchedulerOptions {
45    /// If true, we won't shut down the scheduler as soon as
46    /// the event queue is empty, provided there are still
47    /// live threads that can send messages to the scheduler
48    /// asynchronously.
49    pub keep_alive: bool,
50
51    /// Timeout of reactor execution. If provided, the reactor
52    /// program will be shut down *at the latest* at `T0 + timeout`.
53    /// Calls to `request_stop` may make the program terminate earlier.
54    pub timeout: Option<Duration>,
55
56    /// Max number of threads to use in the thread pool.
57    /// If zero, uses one thread per core. Ignored unless
58    /// building with feature `parallel-runtime`.
59    pub threads: usize,
60
61    /// If true, dump the dependency graph to a file before
62    /// starting execution.
63    pub dump_graph: bool,
64}
65
66// Macros are placed a bit out of order to avoid exporting them
67// (they're only visible in code placed AFTER them).
68// We use macros instead of private methods as the borrow checker
69// needs to know we're borrowing disjoint parts of self at any time.
70
71macro_rules! debug_info {
72    ($e:expr) => {
73        DebugInfoProvider { id_registry: &$e.id_registry }
74    };
75}
76
77macro_rules! push_event {
78    ($scheduler:expr, $evt:expr) => {{
79        trace!("Pushing {}", debug_info!($scheduler).display_event(&$evt));
80        $scheduler.event_queue.push($evt);
81    }};
82}
83
84/// The runtime scheduler.
85///
86/// Lifetime parameters: 'x and 't are carried around everywhere,
87/// 'x allows us to take references into the dataflow graph, and
88/// 't to spawn new scoped threads for physical actions. 'a is more
89/// useless but is needed to compile.
90pub struct SyncScheduler<'x> {
91    /// The latest processed logical time (necessarily behind physical time).
92    latest_processed_tag: Option<EventTag>,
93
94    /// Reference to the data flow graph, which allows us to
95    /// order reactions properly for each tag.
96    dataflow: &'x DataflowInfo,
97
98    /// All reactors.
99    reactors: ReactorVec<'x>,
100
101    /// Pending events/ tags to process.
102    event_queue: EventQueue<'x>,
103
104    /// Receiver through which asynchronous events are
105    /// communicated to the scheduler. We only block when
106    /// no events are ready to be processed.
107    rx: Receiver<PhysicalEvent>,
108
109    /// Initial time of the logical system.
110    #[allow(unused)] // might be useful someday
111    initial_time: Instant,
112
113    /// Scheduled shutdown time. If Some, shutdown will be
114    /// initiated at that logical time.
115    ///
116    /// This is set when an event sent by a [ReactionCtx::request_stop]
117    /// is *processed* (so, at its given tag), and upon
118    /// initialization if a timeout was specified.
119    shutdown_time: Option<EventTag>,
120
121    /// Whether the app has been terminated. Only used for
122    /// communication with asynchronous threads. Set by the
123    /// scheduler only.
124    was_terminated: Arc<AtomicBool>,
125
126    /// Debug information.
127    id_registry: DebugInfoRegistry,
128}
129
130impl<'x> SyncScheduler<'x> {
131    pub fn run_main<R: ReactorInitializer + 'static>(options: SchedulerOptions, args: R::Params) {
132        let start = Instant::now();
133        info!("Starting assembly...");
134        let (reactors, graph, id_registry) = RootAssembler::assemble_tree::<R>(args);
135        let time = Instant::now() - start;
136        info!("Assembly done in {} µs...", time.as_micros());
137
138        if options.dump_graph {
139            use std::fs::File;
140            use std::io::Write;
141
142            let path = std::env::temp_dir().join("reactors.dot");
143
144            File::create(path.clone())
145                .and_then(|mut dot_file| writeln!(dot_file, "{}", graph.format_dot(&id_registry)))
146                .expect("Error while writing DOT file");
147            eprintln!("Wrote dot file to {}", path.to_string_lossy());
148        }
149
150        // collect dependency information
151        let dataflow_info = DataflowInfo::new(graph).map_err(|e| e.lift(&id_registry)).unwrap();
152
153        // Using thread::scope here introduces an unnamed lifetime for
154        // the scope, which is captured as 't by the SyncScheduler.
155        // This is useful because it captures the constraint that the
156        // dataflow_info outlives 't, so that physical contexts
157        // can be spawned in threads that capture references
158        // to 'x.
159        let initial_time = Instant::now();
160        #[cfg(feature = "parallel-runtime")]
161        let rayon_thread_pool = rayon::ThreadPoolBuilder::new().num_threads(options.threads).build().unwrap();
162
163        let scheduler = SyncScheduler::new(options, id_registry, &dataflow_info, reactors, initial_time);
164
165        cfg_if::cfg_if! {
166            if #[cfg(feature = "parallel-runtime")] {
167                /// The unsafe impl is safe if scheduler instances
168                /// are only sent between threads like this (their Rc
169                /// internals are not copied).
170                /// So long as the framework entirely controls the lifetime
171                /// of SyncScheduler instances, this is enforceable.
172                #[allow(non_local_definitions)]
173                unsafe impl Send for SyncScheduler<'_> {}
174
175                // install makes calls to parallel iterators use that thread pool
176                rayon_thread_pool.install(|| scheduler.launch_event_loop());
177            } else {
178                scheduler.launch_event_loop();
179            }
180        }
181    }
182
183    /// Launch the event loop in this thread.
184    fn launch_event_loop(mut self) {
185        /************************************************
186         * This is the main event loop of the scheduler *
187         ************************************************/
188
189        self.startup();
190
191        loop {
192            // flush pending events, this doesn't block
193            for evt in self.rx.try_iter() {
194                let evt = evt.make_executable(self.dataflow);
195                push_event!(self, evt);
196            }
197
198            if let Some(evt) = self.event_queue.take_earliest() {
199                if self.is_after_shutdown(evt.tag) {
200                    trace!("Event is late, shutting down - event tag: {}", evt.tag);
201                    break;
202                }
203                trace!("Processing event {}", self.debug().display_event(&evt));
204                match self.catch_up_physical_time(evt.tag.to_logical_time(self.initial_time)) {
205                    Ok(_) => {}
206                    Err(async_event) => {
207                        let async_event = async_event.make_executable(self.dataflow);
208                        // an asynchronous event woke our sleep
209                        if async_event.tag < evt.tag {
210                            // reinsert both events to order them and try again.
211                            push_event!(self, evt);
212                            push_event!(self, async_event);
213                            continue;
214                        } else {
215                            // we can process this event first and not care about the async event
216                            push_event!(self, async_event);
217                        }
218                    }
219                };
220                // at this point we're at the correct time
221
222                if evt.terminate || self.shutdown_time == Some(evt.tag) {
223                    return self.shutdown(evt.tag, evt.reactions);
224                }
225
226                self.process_tag(false, evt.tag, evt.reactions);
227            } else if let Some(evt) = self.receive_event() {
228                let evt = evt.make_executable(self.dataflow);
229                // this may block
230                push_event!(self, evt);
231                continue;
232            } else {
233                // all senders have hung up, or timeout
234                info!("Event queue is empty forever, shutting down.");
235                break;
236            }
237        } // end loop
238
239        let shutdown_tag = self.shutdown_time.unwrap_or_else(|| EventTag::now(self.initial_time));
240        self.shutdown(shutdown_tag, None);
241
242        // self destructor is called here
243    }
244
245    /// Creates a new scheduler. An empty scheduler doesn't
246    /// do anything unless some events are pushed to the queue.
247    /// See [Self::launch_event_loop].
248    fn new(
249        options: SchedulerOptions,
250        id_registry: DebugInfoRegistry,
251        dependency_info: &'x DataflowInfo,
252        reactors: ReactorVec<'x>,
253        initial_time: Instant,
254    ) -> Self {
255        if !cfg!(feature = "parallel-runtime") && options.threads != 0 {
256            warn!("'workers' runtime parameter has no effect unless feature 'parallel-runtime' is enabled")
257        }
258
259        if options.keep_alive {
260            warn!("'keepalive' runtime parameter has no effect in the Rust target")
261        }
262
263        let (_, rx) = unbounded::<PhysicalEvent>();
264        Self {
265            rx,
266
267            event_queue: Default::default(),
268            reactors,
269
270            initial_time,
271            latest_processed_tag: None,
272            shutdown_time: options.timeout.map(|timeout| {
273                let shutdown_tag = EventTag::ORIGIN.successor(timeout);
274                trace!("Timeout specified, will shut down at most at tag {}", shutdown_tag);
275                shutdown_tag
276            }),
277            dataflow: dependency_info,
278            id_registry,
279            was_terminated: Default::default(),
280        }
281    }
282
283    /// Fix the origin of the logical timeline to the current
284    /// physical time, and runs the startup reactions
285    /// of all reactors.
286    fn startup(&mut self) {
287        info!("Triggering startup...");
288        debug_assert!(!self.reactors.is_empty(), "No registered reactors");
289
290        let startup_reactions = self.dataflow.reactions_triggered_by(&TriggerId::STARTUP);
291        self.process_tag(false, EventTag::ORIGIN, Some(Cow::Borrowed(startup_reactions)))
292    }
293
294    fn shutdown(&mut self, shutdown_tag: EventTag, reactions: ReactionPlan<'x>) {
295        info!("Scheduler is shutting down, at {}", shutdown_tag);
296        self.shutdown_time = Some(shutdown_tag);
297        let default_plan: ReactionPlan<'x> = Some(Cow::Borrowed(self.dataflow.reactions_triggered_by(&TriggerId::SHUTDOWN)));
298        let reactions = ExecutableReactions::merge_cows(reactions, default_plan);
299
300        self.process_tag(true, shutdown_tag, reactions);
301
302        // notify concurrent threads.
303        self.was_terminated.store(true, Ordering::SeqCst);
304        info!("Scheduler has been shut down")
305    }
306
307    /// Returns whether the given event should be ignored and
308    /// the event loop be terminated. This would be the case
309    /// if the tag of the event is later than the projected
310    /// shutdown time. Such 'late' events may be emitted by
311    /// the shutdown wave.
312    fn is_after_shutdown(&self, t: EventTag) -> bool {
313        self.shutdown_time.map(|shutdown_t| shutdown_t < t).unwrap_or(false)
314    }
315
316    /// Wait for an asynchronous event for as long as we can
317    /// expect it.
318    fn receive_event(&mut self) -> Option<PhysicalEvent> {
319        if let Some(shutdown_t) = self.shutdown_time {
320            let absolute = shutdown_t.to_logical_time(self.initial_time);
321            if let Some(timeout) = absolute.checked_duration_since(Instant::now()) {
322                trace!("Will wait for asynchronous event {} ns", timeout.as_nanos());
323                self.rx.recv_timeout(timeout).ok()
324            } else {
325                trace!("Cannot wait, already past programmed shutdown time...");
326                None
327            }
328        } else {
329            trace!("Will wait for asynchronous event without timeout");
330            self.rx.recv().ok()
331        }
332    }
333
334    /// Sleep/wait until the given time OR an asynchronous
335    /// event is received first.
336    fn catch_up_physical_time(&mut self, target: Instant) -> Result<(), PhysicalEvent> {
337        let now = Instant::now();
338
339        if now < target {
340            let t = target - now;
341            trace!("  - Need to sleep {} ns", t.as_nanos());
342            // we use recv_timeout as a thread::sleep so that
343            // our sleep is interrupted properly when an async
344            // event arrives
345            match self.rx.recv_timeout(t) {
346                Ok(async_evt) => {
347                    trace!(
348                        "  - Sleep interrupted by async event for tag {}, going back to queue",
349                        async_evt.tag
350                    );
351                    return Err(async_evt);
352                }
353                Err(RecvTimeoutError::Timeout) => { /*great*/ }
354                Err(RecvTimeoutError::Disconnected) => {
355                    // ok, there are no physical actions in the program so it's useless to block on self.rx
356                    // we still need to wait though..
357                    if let Some(remaining) = target.checked_duration_since(Instant::now()) {
358                        std::thread::sleep(remaining);
359                    }
360                }
361            }
362        }
363
364        if now > target {
365            let delay = now - target;
366            trace!(
367                "  - Running late by {} ns = {} µs = {} ms",
368                delay.as_nanos(),
369                delay.as_micros(),
370                delay.as_millis()
371            )
372        }
373        Ok(())
374    }
375
376    /// Create a new reaction wave to process the given
377    /// reactions at some point in time.
378    fn new_reaction_ctx<'a>(
379        &self,
380        tag: EventTag,
381        todo: ReactionPlan<'x>,
382        rx: &'a Receiver<PhysicalEvent>,
383        debug_info: DebugInfoProvider<'a>,
384        was_terminated_atomic: &'a Arc<AtomicBool>,
385        was_terminated: bool,
386    ) -> ReactionCtx<'a, 'x> {
387        ReactionCtx::new(
388            rx,
389            tag,
390            self.initial_time,
391            todo,
392            self.dataflow,
393            debug_info,
394            was_terminated_atomic,
395            was_terminated,
396        )
397    }
398
399    #[inline]
400    pub(super) fn debug(&self) -> DebugInfoProvider {
401        debug_info!(self)
402    }
403
404    /// Actually process a tag. The provided reactions are the
405    /// root reactions that startup the "wave".
406    fn process_tag(&mut self, is_shutdown: bool, tag: EventTag, mut reactions: ReactionPlan<'x>) {
407        if cfg!(debug_assertions) {
408            if let Some(latest) = self.latest_processed_tag {
409                debug_assert!(tag > latest, "Tag ordering mismatch")
410            }
411        }
412        self.latest_processed_tag = Some(tag);
413
414        let mut next_level = reactions.as_ref().and_then(|todo| todo.first_batch());
415        if next_level.is_none() {
416            return;
417        }
418
419        let mut ctx = self.new_reaction_ctx(tag, None, &self.rx, debug_info!(self), &self.was_terminated, is_shutdown);
420
421        while let Some((level_no, batch)) = next_level {
422            let level_no = level_no.cloned();
423            trace!("  - Level {}", level_no);
424            ctx.cur_level = level_no.key;
425
426            /// Minimum number of reactions (inclusive) required
427            /// to parallelize reactions.
428            /// TODO experiment with tweaking this
429            const PARALLEL_THRESHOLD: usize = 3;
430
431            if cfg!(feature = "parallel-runtime") && batch.len() >= PARALLEL_THRESHOLD {
432                #[cfg(feature = "parallel-runtime")]
433                parallel_rt_impl::process_batch(&mut ctx, &mut self.reactors, batch);
434            } else {
435                // the impl for non-parallel runtime
436                for reaction_id in batch {
437                    let reactor = &mut self.reactors[reaction_id.0.container()];
438                    ctx.execute(reactor, *reaction_id);
439                }
440            }
441
442            reactions = ExecutableReactions::merge_plans_after(reactions, ctx.insides.todo_now.take(), level_no.key.next());
443            next_level = reactions.as_ref().and_then(|todo| todo.next_batch(level_no.as_ref()));
444        }
445
446        for evt in ctx.insides.future_events.drain(..) {
447            push_event!(self, evt)
448        }
449
450        // cleanup tag-specific resources, eg clear port values
451        let ctx = CleanupCtx { tag };
452        // TODO measure performance of cleaning up all reactors w/ virtual dispatch like this.
453        //   see also efforts in the C runtime to  avoid this
454        for reactor in &mut self.reactors {
455            reactor.cleanup_tag(&ctx)
456        }
457    }
458}
459
460#[cfg(feature = "parallel-runtime")]
461mod parallel_rt_impl {
462    use rayon::prelude::*;
463
464    use super::*;
465    use crate::scheduler::dependencies::Level;
466
467    pub(super) fn process_batch(ctx: &mut ReactionCtx<'_, '_>, reactors: &mut ReactorVec<'_>, batch: &Level) {
468        let reactors_mut = UnsafeSharedPointer(reactors.raw.as_mut_ptr());
469
470        ctx.insides.absorb(
471            batch
472                .iter()
473                .par_bridge()
474                .fold_with(CloneableCtx(ctx.fork()), |CloneableCtx(mut ctx), reaction_id| {
475                    // capture the newtype instead of capturing its field, which is not Send
476                    let reactors_mut = &reactors_mut;
477                    let reactor = unsafe {
478                        // safety:
479                        // - no two reactions in the batch belong to the same reactor
480                        // - the vec does not change size so there is no reallocation
481                        &mut *reactors_mut.0.add(reaction_id.0.container().index())
482                    };
483
484                    ctx.execute(reactor, reaction_id);
485
486                    CloneableCtx(ctx)
487                })
488                .fold(RContextForwardableStuff::default, |cx1, cx2| cx1.merge(cx2.0.insides))
489                .reduce(Default::default, RContextForwardableStuff::merge),
490        );
491    }
492
493    #[derive(Copy, Clone)]
494    struct UnsafeSharedPointer<T>(*mut T);
495
496    unsafe impl<T> Send for UnsafeSharedPointer<T> {}
497
498    unsafe impl<T> Sync for UnsafeSharedPointer<T> {}
499
500    /// We need a Clone bound to use fold_with, but this clone
501    /// implementation is not general purpose so I hide it.
502    struct CloneableCtx<'a, 'x>(ReactionCtx<'a, 'x>);
503
504    impl Clone for CloneableCtx<'_, '_> {
505        fn clone(&self) -> Self {
506            Self(self.0.fork())
507        }
508    }
509}