1use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::Arc;
29
30use crossbeam_channel::reconnectable::*;
31
32use super::assembly_impl::RootAssembler;
33use super::*;
34use crate::assembly::*;
35use crate::scheduler::dependencies::DataflowInfo;
36use crate::*;
37
38#[derive(Default)]
44pub struct SchedulerOptions {
45 pub keep_alive: bool,
50
51 pub timeout: Option<Duration>,
55
56 pub threads: usize,
60
61 pub dump_graph: bool,
64}
65
66macro_rules! debug_info {
72 ($e:expr) => {
73 DebugInfoProvider { id_registry: &$e.id_registry }
74 };
75}
76
77macro_rules! push_event {
78 ($scheduler:expr, $evt:expr) => {{
79 trace!("Pushing {}", debug_info!($scheduler).display_event(&$evt));
80 $scheduler.event_queue.push($evt);
81 }};
82}
83
84pub struct SyncScheduler<'x> {
91 latest_processed_tag: Option<EventTag>,
93
94 dataflow: &'x DataflowInfo,
97
98 reactors: ReactorVec<'x>,
100
101 event_queue: EventQueue<'x>,
103
104 rx: Receiver<PhysicalEvent>,
108
109 #[allow(unused)] initial_time: Instant,
112
113 shutdown_time: Option<EventTag>,
120
121 was_terminated: Arc<AtomicBool>,
125
126 id_registry: DebugInfoRegistry,
128}
129
130impl<'x> SyncScheduler<'x> {
131 pub fn run_main<R: ReactorInitializer + 'static>(options: SchedulerOptions, args: R::Params) {
132 let start = Instant::now();
133 info!("Starting assembly...");
134 let (reactors, graph, id_registry) = RootAssembler::assemble_tree::<R>(args);
135 let time = Instant::now() - start;
136 info!("Assembly done in {} µs...", time.as_micros());
137
138 if options.dump_graph {
139 use std::fs::File;
140 use std::io::Write;
141
142 let path = std::env::temp_dir().join("reactors.dot");
143
144 File::create(path.clone())
145 .and_then(|mut dot_file| writeln!(dot_file, "{}", graph.format_dot(&id_registry)))
146 .expect("Error while writing DOT file");
147 eprintln!("Wrote dot file to {}", path.to_string_lossy());
148 }
149
150 let dataflow_info = DataflowInfo::new(graph).map_err(|e| e.lift(&id_registry)).unwrap();
152
153 let initial_time = Instant::now();
160 #[cfg(feature = "parallel-runtime")]
161 let rayon_thread_pool = rayon::ThreadPoolBuilder::new().num_threads(options.threads).build().unwrap();
162
163 let scheduler = SyncScheduler::new(options, id_registry, &dataflow_info, reactors, initial_time);
164
165 cfg_if::cfg_if! {
166 if #[cfg(feature = "parallel-runtime")] {
167 #[allow(non_local_definitions)]
173 unsafe impl Send for SyncScheduler<'_> {}
174
175 rayon_thread_pool.install(|| scheduler.launch_event_loop());
177 } else {
178 scheduler.launch_event_loop();
179 }
180 }
181 }
182
183 fn launch_event_loop(mut self) {
185 self.startup();
190
191 loop {
192 for evt in self.rx.try_iter() {
194 let evt = evt.make_executable(self.dataflow);
195 push_event!(self, evt);
196 }
197
198 if let Some(evt) = self.event_queue.take_earliest() {
199 if self.is_after_shutdown(evt.tag) {
200 trace!("Event is late, shutting down - event tag: {}", evt.tag);
201 break;
202 }
203 trace!("Processing event {}", self.debug().display_event(&evt));
204 match self.catch_up_physical_time(evt.tag.to_logical_time(self.initial_time)) {
205 Ok(_) => {}
206 Err(async_event) => {
207 let async_event = async_event.make_executable(self.dataflow);
208 if async_event.tag < evt.tag {
210 push_event!(self, evt);
212 push_event!(self, async_event);
213 continue;
214 } else {
215 push_event!(self, async_event);
217 }
218 }
219 };
220 if evt.terminate || self.shutdown_time == Some(evt.tag) {
223 return self.shutdown(evt.tag, evt.reactions);
224 }
225
226 self.process_tag(false, evt.tag, evt.reactions);
227 } else if let Some(evt) = self.receive_event() {
228 let evt = evt.make_executable(self.dataflow);
229 push_event!(self, evt);
231 continue;
232 } else {
233 info!("Event queue is empty forever, shutting down.");
235 break;
236 }
237 } let shutdown_tag = self.shutdown_time.unwrap_or_else(|| EventTag::now(self.initial_time));
240 self.shutdown(shutdown_tag, None);
241
242 }
244
245 fn new(
249 options: SchedulerOptions,
250 id_registry: DebugInfoRegistry,
251 dependency_info: &'x DataflowInfo,
252 reactors: ReactorVec<'x>,
253 initial_time: Instant,
254 ) -> Self {
255 if !cfg!(feature = "parallel-runtime") && options.threads != 0 {
256 warn!("'workers' runtime parameter has no effect unless feature 'parallel-runtime' is enabled")
257 }
258
259 if options.keep_alive {
260 warn!("'keepalive' runtime parameter has no effect in the Rust target")
261 }
262
263 let (_, rx) = unbounded::<PhysicalEvent>();
264 Self {
265 rx,
266
267 event_queue: Default::default(),
268 reactors,
269
270 initial_time,
271 latest_processed_tag: None,
272 shutdown_time: options.timeout.map(|timeout| {
273 let shutdown_tag = EventTag::ORIGIN.successor(timeout);
274 trace!("Timeout specified, will shut down at most at tag {}", shutdown_tag);
275 shutdown_tag
276 }),
277 dataflow: dependency_info,
278 id_registry,
279 was_terminated: Default::default(),
280 }
281 }
282
283 fn startup(&mut self) {
287 info!("Triggering startup...");
288 debug_assert!(!self.reactors.is_empty(), "No registered reactors");
289
290 let startup_reactions = self.dataflow.reactions_triggered_by(&TriggerId::STARTUP);
291 self.process_tag(false, EventTag::ORIGIN, Some(Cow::Borrowed(startup_reactions)))
292 }
293
294 fn shutdown(&mut self, shutdown_tag: EventTag, reactions: ReactionPlan<'x>) {
295 info!("Scheduler is shutting down, at {}", shutdown_tag);
296 self.shutdown_time = Some(shutdown_tag);
297 let default_plan: ReactionPlan<'x> = Some(Cow::Borrowed(self.dataflow.reactions_triggered_by(&TriggerId::SHUTDOWN)));
298 let reactions = ExecutableReactions::merge_cows(reactions, default_plan);
299
300 self.process_tag(true, shutdown_tag, reactions);
301
302 self.was_terminated.store(true, Ordering::SeqCst);
304 info!("Scheduler has been shut down")
305 }
306
307 fn is_after_shutdown(&self, t: EventTag) -> bool {
313 self.shutdown_time.map(|shutdown_t| shutdown_t < t).unwrap_or(false)
314 }
315
316 fn receive_event(&mut self) -> Option<PhysicalEvent> {
319 if let Some(shutdown_t) = self.shutdown_time {
320 let absolute = shutdown_t.to_logical_time(self.initial_time);
321 if let Some(timeout) = absolute.checked_duration_since(Instant::now()) {
322 trace!("Will wait for asynchronous event {} ns", timeout.as_nanos());
323 self.rx.recv_timeout(timeout).ok()
324 } else {
325 trace!("Cannot wait, already past programmed shutdown time...");
326 None
327 }
328 } else {
329 trace!("Will wait for asynchronous event without timeout");
330 self.rx.recv().ok()
331 }
332 }
333
334 fn catch_up_physical_time(&mut self, target: Instant) -> Result<(), PhysicalEvent> {
337 let now = Instant::now();
338
339 if now < target {
340 let t = target - now;
341 trace!(" - Need to sleep {} ns", t.as_nanos());
342 match self.rx.recv_timeout(t) {
346 Ok(async_evt) => {
347 trace!(
348 " - Sleep interrupted by async event for tag {}, going back to queue",
349 async_evt.tag
350 );
351 return Err(async_evt);
352 }
353 Err(RecvTimeoutError::Timeout) => { }
354 Err(RecvTimeoutError::Disconnected) => {
355 if let Some(remaining) = target.checked_duration_since(Instant::now()) {
358 std::thread::sleep(remaining);
359 }
360 }
361 }
362 }
363
364 if now > target {
365 let delay = now - target;
366 trace!(
367 " - Running late by {} ns = {} µs = {} ms",
368 delay.as_nanos(),
369 delay.as_micros(),
370 delay.as_millis()
371 )
372 }
373 Ok(())
374 }
375
376 fn new_reaction_ctx<'a>(
379 &self,
380 tag: EventTag,
381 todo: ReactionPlan<'x>,
382 rx: &'a Receiver<PhysicalEvent>,
383 debug_info: DebugInfoProvider<'a>,
384 was_terminated_atomic: &'a Arc<AtomicBool>,
385 was_terminated: bool,
386 ) -> ReactionCtx<'a, 'x> {
387 ReactionCtx::new(
388 rx,
389 tag,
390 self.initial_time,
391 todo,
392 self.dataflow,
393 debug_info,
394 was_terminated_atomic,
395 was_terminated,
396 )
397 }
398
399 #[inline]
400 pub(super) fn debug(&self) -> DebugInfoProvider {
401 debug_info!(self)
402 }
403
404 fn process_tag(&mut self, is_shutdown: bool, tag: EventTag, mut reactions: ReactionPlan<'x>) {
407 if cfg!(debug_assertions) {
408 if let Some(latest) = self.latest_processed_tag {
409 debug_assert!(tag > latest, "Tag ordering mismatch")
410 }
411 }
412 self.latest_processed_tag = Some(tag);
413
414 let mut next_level = reactions.as_ref().and_then(|todo| todo.first_batch());
415 if next_level.is_none() {
416 return;
417 }
418
419 let mut ctx = self.new_reaction_ctx(tag, None, &self.rx, debug_info!(self), &self.was_terminated, is_shutdown);
420
421 while let Some((level_no, batch)) = next_level {
422 let level_no = level_no.cloned();
423 trace!(" - Level {}", level_no);
424 ctx.cur_level = level_no.key;
425
426 const PARALLEL_THRESHOLD: usize = 3;
430
431 if cfg!(feature = "parallel-runtime") && batch.len() >= PARALLEL_THRESHOLD {
432 #[cfg(feature = "parallel-runtime")]
433 parallel_rt_impl::process_batch(&mut ctx, &mut self.reactors, batch);
434 } else {
435 for reaction_id in batch {
437 let reactor = &mut self.reactors[reaction_id.0.container()];
438 ctx.execute(reactor, *reaction_id);
439 }
440 }
441
442 reactions = ExecutableReactions::merge_plans_after(reactions, ctx.insides.todo_now.take(), level_no.key.next());
443 next_level = reactions.as_ref().and_then(|todo| todo.next_batch(level_no.as_ref()));
444 }
445
446 for evt in ctx.insides.future_events.drain(..) {
447 push_event!(self, evt)
448 }
449
450 let ctx = CleanupCtx { tag };
452 for reactor in &mut self.reactors {
455 reactor.cleanup_tag(&ctx)
456 }
457 }
458}
459
460#[cfg(feature = "parallel-runtime")]
461mod parallel_rt_impl {
462 use rayon::prelude::*;
463
464 use super::*;
465 use crate::scheduler::dependencies::Level;
466
467 pub(super) fn process_batch(ctx: &mut ReactionCtx<'_, '_>, reactors: &mut ReactorVec<'_>, batch: &Level) {
468 let reactors_mut = UnsafeSharedPointer(reactors.raw.as_mut_ptr());
469
470 ctx.insides.absorb(
471 batch
472 .iter()
473 .par_bridge()
474 .fold_with(CloneableCtx(ctx.fork()), |CloneableCtx(mut ctx), reaction_id| {
475 let reactors_mut = &reactors_mut;
477 let reactor = unsafe {
478 &mut *reactors_mut.0.add(reaction_id.0.container().index())
482 };
483
484 ctx.execute(reactor, reaction_id);
485
486 CloneableCtx(ctx)
487 })
488 .fold(RContextForwardableStuff::default, |cx1, cx2| cx1.merge(cx2.0.insides))
489 .reduce(Default::default, RContextForwardableStuff::merge),
490 );
491 }
492
493 #[derive(Copy, Clone)]
494 struct UnsafeSharedPointer<T>(*mut T);
495
496 unsafe impl<T> Send for UnsafeSharedPointer<T> {}
497
498 unsafe impl<T> Sync for UnsafeSharedPointer<T> {}
499
500 struct CloneableCtx<'a, 'x>(ReactionCtx<'a, 'x>);
503
504 impl Clone for CloneableCtx<'_, '_> {
505 fn clone(&self) -> Self {
506 Self(self.0.fork())
507 }
508 }
509}