reactor_rt/scheduler/context.rs
1use std::borrow::Borrow;
2use std::hash::{Hash, Hasher};
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::thread::JoinHandle;
6
7use crossbeam_channel::reconnectable::{Receiver, SendError, Sender};
8use smallvec::SmallVec;
9
10use super::*;
11use crate::assembly::*;
12use crate::scheduler::dependencies::{DataflowInfo, ExecutableReactions, LevelIx};
13use crate::*;
14
15/// The context in which a reaction executes. Its API
16/// allows mutating the event queue of the scheduler.
17/// Only the interactions declared at assembly time
18/// are allowed.
19///
20// Implementation details:
21// ReactionCtx is an API built around a ReactionWave. A single
22// ReactionCtx may be used for multiple ReactionWaves, but
23// obviously at disjoint times (&mut).
24pub struct ReactionCtx<'a, 'x> {
25 pub(super) insides: RContextForwardableStuff<'x>,
26
27 /// Logical time of the execution of this wave, constant
28 /// during the existence of the object
29 tag: EventTag,
30
31 /// Level of the reaction being executed.
32 pub(super) cur_level: LevelIx,
33
34 /// ID of the reaction being executed.
35 current_reaction: Option<GlobalReactionId>,
36
37 /// Sender to schedule events that should be executed later than this wave.
38 rx: &'a Receiver<PhysicalEvent>,
39
40 /// Start time of the program.
41 initial_time: Instant,
42
43 // globals, also they might be copied and passed to AsyncCtx
44 dataflow: &'x DataflowInfo,
45 debug_info: DebugInfoProvider<'a>,
46 /// Whether the scheduler has been shut down.
47 was_terminated_atomic: &'a Arc<AtomicBool>,
48 /// In ReactionCtx, this will only be true if this is the shutdown tag.
49 /// It duplicates [Self::was_terminated_atomic], to avoid an atomic
50 /// operation within [Self::is_shutdown].
51 was_terminated: bool,
52}
53
54impl<'a, 'x> ReactionCtx<'a, 'x> {
55 /// Returns the start time of the execution of this program.
56 ///
57 /// This is a logical instant with microstep zero.
58 #[inline]
59 pub fn get_start_time(&self) -> Instant {
60 self.initial_time
61 }
62
63 /// Returns the current physical time.
64 ///
65 /// Repeated invocation of this method may produce different
66 /// values, although [Instant] is monotonic. The
67 /// physical time is necessarily greater than the logical time.
68 #[inline]
69 pub fn get_physical_time(&self) -> Instant {
70 Instant::now()
71 }
72
73 /// Returns the current logical time.
74 ///
75 /// Logical time is frozen during the execution of a reaction.
76 /// Repeated invocation of this method will always produce
77 /// the same value.
78 #[inline]
79 pub fn get_logical_time(&self) -> Instant {
80 self.tag.to_logical_time(self.get_start_time())
81 }
82
83 /// Returns the tag at which the reaction executes.
84 ///
85 /// Repeated invocation of this method will always produce
86 /// the same value.
87 #[inline]
88 pub fn get_tag(&self) -> EventTag {
89 self.tag
90 }
91
92 /// Returns whether this tag is the shutdown tag of the
93 /// application. If so, it's necessarily the very last
94 /// invocation of the current reaction (on a given reactor
95 /// instance).
96 ///
97 /// Repeated invocation of this method will always produce
98 /// the same value.
99 #[inline]
100 pub fn is_shutdown(&self) -> bool {
101 self.was_terminated
102 }
103
104 /// Returns the amount of logical time elapsed since the
105 /// start of the program. This does not take microsteps
106 /// into account.
107 #[inline]
108 pub fn get_elapsed_logical_time(&self) -> Duration {
109 self.get_logical_time() - self.get_start_time()
110 }
111
112 /// Returns the amount of physical time elapsed since the
113 /// start of the program.
114 ///
115 /// Since this uses [Self::get_physical_time], be aware that
116 /// this function's result may change over time.
117 #[inline]
118 pub fn get_elapsed_physical_time(&self) -> Duration {
119 self.get_physical_time() - self.get_start_time()
120 }
121
122 /// Returns the number of active workers in the execution of
123 /// a reactor program.
124 ///
125 /// Return values:
126 /// * `1` if threading is not enabled.
127 /// * If threading is enabled and a number of workers was specified,
128 /// it returns that number.
129 /// * And if the number of workers was left unspecified,
130 /// the return value might vary.
131 pub fn num_workers(&self) -> usize {
132 cfg_if::cfg_if! {
133 if #[cfg(feature = "parallel-runtime")] {
134 rayon::current_num_threads()
135 } else {
136 1
137 }
138 }
139 }
140
141 /// Returns the current value of a port or action at this
142 /// logical time. If the value is absent, [Option::None] is
143 /// returned. This is the case if the action or port is
144 /// not present ([Self::is_present]), or if no value was
145 /// scheduled (action values are optional, see [Self::schedule_with_v]).
146 ///
147 /// The value is copied out. See also [Self::use_ref] if this
148 /// is to be avoided.
149 ///
150 /// ### Examples
151 ///
152 /// ```no_run
153 /// # use reactor_rt::{ReactionCtx, Port};
154 /// # let ctx: &mut ReactionCtx = panic!();
155 /// # let port: &Port<u32> = panic!();
156 /// if let Some(value) = ctx.get(port) {
157 /// // branch is taken if the port is set
158 /// }
159 /// ```
160 #[inline]
161 pub fn get<T: Copy>(&self, container: &impl ReactionTrigger<T>) -> Option<T> {
162 container.borrow().get_value(&self.get_tag(), &self.get_start_time())
163 }
164
165 /// Returns a reference to the current value of a port or action at this
166 /// logical time. If the value is absent, [Option::None] is
167 /// returned. This is the case if the action or port is
168 /// not present ([Self::is_present]), or if no value was
169 /// scheduled (action values are optional, see [Self::schedule_with_v]).
170 ///
171 /// This does not require the value to be Copy, however, the implementation
172 /// of this method currently may require unsafe code. The method is therefore
173 /// not offered when compiling with the `no-unsafe` feature.
174 ///
175 /// ### Examples
176 ///
177 /// ```no_run
178 /// # use reactor_rt::{Port, ReactionCtx};
179 /// # let ctx: &mut ReactionCtx = panic!();
180 /// # let port: &Port<u32> = panic!();
181 /// if let Some(value) = ctx.get_ref(port) {
182 /// // value is a ref to the internal value
183 /// }
184 /// ```
185 #[inline]
186 #[cfg(not(feature = "no-unsafe"))]
187 pub fn get_ref<'q, T>(&self, container: &'q impl crate::triggers::ReactionTriggerWithRefAccess<T>) -> Option<&'q T> {
188 container.get_value_ref(&self.get_tag(), &self.get_start_time())
189 }
190
191 /// Executes the provided closure on the value of the port
192 /// or action. The value is fetched by reference and not
193 /// copied.
194 ///
195 /// ### Examples
196 ///
197 /// ```no_run
198 /// # use reactor_rt::{ReactionCtx, Port};
199 /// # let ctx: &mut ReactionCtx = panic!();
200 /// # let port: &Port<String> = panic!();
201 /// let len = ctx.use_ref(port, |str| str.map(String::len).unwrap_or(0));
202 /// // equivalent to
203 /// let len = ctx.use_ref_opt(port, String::len).unwrap_or(0);
204 /// ```
205 /// ```no_run
206 /// # use reactor_rt::{ReactionCtx, Port};
207 /// # let ctx: &mut ReactionCtx = unimplemented!();
208 /// # let port: &Port<String> = unimplemented!();
209 ///
210 /// if let Some(str) = ctx.use_ref_opt(port, Clone::clone) {
211 /// // only entered if the port value is present, so no need to check is_present
212 /// }
213 /// ```
214 ///
215 /// See also the similar [Self::use_ref_opt].
216 #[inline]
217 pub fn use_ref<T, O>(&self, container: &impl ReactionTrigger<T>, action: impl FnOnce(Option<&T>) -> O) -> O {
218 container
219 .borrow()
220 .use_value_ref(&self.get_tag(), &self.get_start_time(), action)
221 }
222
223 /// Executes the provided closure on the value of the port,
224 /// only if it is present. The value is fetched by reference
225 /// and not copied.
226 ///
227 /// See also the similar [Self::use_ref].
228 pub fn use_ref_opt<T, O>(&self, container: &impl ReactionTrigger<T>, action: impl FnOnce(&T) -> O) -> Option<O> {
229 self.use_ref(container, |c| c.map(action))
230 }
231
232 /// Sets the value of the given port.
233 ///
234 /// The change is visible at the same logical time, i.e.
235 /// the value propagates immediately. This may hence
236 /// schedule more reactions that should execute at the
237 /// same logical time.
238 #[inline]
239 pub fn set<T>(&mut self, port: &mut Port<T>, value: T)
240 where
241 T: Sync,
242 {
243 if cfg!(debug_assertions) {
244 self.check_set_port_is_legal(port)
245 }
246 port.set_impl(Some(value));
247 self.enqueue_now(Cow::Borrowed(self.reactions_triggered_by(port.get_id())));
248 }
249
250 fn check_set_port_is_legal<T: Sync>(&self, port: &mut Port<T>) {
251 let port_id = port.get_id();
252 let port_container = self.debug_info.id_registry.get_trigger_container(port_id).unwrap();
253 let reaction_container = self.current_reaction.unwrap().0.container();
254 match port.get_kind() {
255 PortKind::Input => {
256 let port_grandpa = self.debug_info.id_registry.get_container(port_container);
257 assert_eq!(
258 Some(reaction_container),
259 port_grandpa,
260 "Input port {} can only be set by reactions of its grandparent, got reaction {}",
261 self.debug_info.id_registry.fmt_component(port_id),
262 self.debug_info.display_reaction(self.current_reaction.unwrap()),
263 );
264 }
265 PortKind::Output => {
266 assert_eq!(
267 reaction_container,
268 port_container,
269 "Input port {} can only be set by reactions of its parent, got reaction {}",
270 self.debug_info.id_registry.fmt_component(port_id),
271 self.debug_info.display_reaction(self.current_reaction.unwrap()),
272 );
273 }
274 // todo
275 PortKind::ChildInputReference => {}
276 PortKind::ChildOutputReference => {}
277 }
278 }
279
280 /// Sets the value of the given port, if the given value is `Some`.
281 /// Otherwise the port is not set and no reactions are triggered.
282 ///
283 /// The change is visible at the same logical time, i.e.
284 /// the value propagates immediately. This may hence
285 /// schedule more reactions that should execute at the
286 /// same logical time.
287 ///
288 /// ```no_run
289 /// # use reactor_rt::{ReactionCtx, Port};
290 /// # let ctx: &mut ReactionCtx = unimplemented!();
291 /// # let source: &Port<u32> = unimplemented!();
292 /// # let sink: &mut Port<u32> = unimplemented!();
293 ///
294 /// ctx.set_opt(sink, ctx.get(source));
295 /// // equivalent to
296 /// if let Some(value) = ctx.get(source) {
297 /// ctx.set(sink, value);
298 /// }
299 /// ```
300 ///
301 #[inline]
302 pub fn set_opt<T>(&mut self, port: &mut Port<T>, value: Option<T>)
303 where
304 T: Sync,
305 {
306 if let Some(v) = value {
307 self.set(port, v)
308 }
309 }
310
311 /// Returns true if the given action was triggered at the
312 /// current logical time.
313 ///
314 /// If so, then it may, but must not, present a value ([Self::get]).
315 #[inline]
316 pub fn is_present<T>(&self, action: &impl ReactionTrigger<T>) -> bool {
317 action.is_present(&self.get_tag(), &self.get_start_time())
318 }
319
320 /// Schedule an action to trigger at some point in the future.
321 /// The action will trigger after its own implicit time delay,
322 /// plus an optional additional time delay (see [Offset]). This
323 /// delay is added to the current logical (resp. physical) time
324 /// for logical (resp. physical) actions.
325 ///
326 /// This is like [Self::schedule_with_v], where the value is [None].
327 ///
328 /// ### Examples
329 ///
330 /// ```no_run
331 /// # use reactor_rt::prelude::*;
332 /// # let ctx: &mut ReactionCtx = panic!();
333 /// # let action: &mut LogicalAction<String> = panic!();
334 /// ctx.schedule(action, Asap); // will be executed one microstep from now (+ own delay)
335 /// ctx.schedule(action, after!(2 ms)); // will be executed 2 milliseconds from now (+ own delay)
336 /// ctx.schedule(action, After(delay!(2 ms))); // equivalent to the previous
337 /// ctx.schedule(action, After(Duration::from_millis(2))); // equivalent to the previous
338 /// ```
339 #[inline]
340 pub fn schedule<T: Sync>(&mut self, action: &mut impl SchedulableAsAction<T>, offset: Offset) {
341 self.schedule_with_v(action, None, offset)
342 }
343
344 /// Schedule an action to trigger at some point in the future,
345 ///
346 /// The action will carry the given value at the time it
347 /// is triggered, unless it is overwritten by another call
348 /// to this method. The value can be cleared by using `None`
349 /// as a value. Note that even if the value is absent, the
350 /// *action* will still be present at the time it is triggered
351 /// (see [Self::is_present]).
352 ///
353 /// The action will trigger after its own implicit time delay,
354 /// plus an optional additional time delay (see [Offset]). This
355 /// delay is added to the current logical (resp. physical) time
356 /// for logical (resp. physical) actions.
357 ///
358 /// ### Examples
359 ///
360 /// ```no_run
361 /// # use reactor_rt::prelude::*;
362 /// # let ctx: &mut ReactionCtx = panic!();
363 /// # let action: &mut LogicalAction<&'static str> = panic!();
364 /// // will be executed 2 milliseconds (+ own delay) from now with that value.
365 /// ctx.schedule_with_v(action, Some("value"), after!(2 msec));
366 /// // will be executed one microstep from now, with no value
367 /// ctx.schedule_with_v(action, None, Asap);
368 /// // that's equivalent to
369 /// ctx.schedule(action, Asap);
370 /// ```
371 #[inline]
372 pub fn schedule_with_v<T: Sync>(&mut self, action: &mut impl SchedulableAsAction<T>, value: Option<T>, offset: Offset) {
373 action.schedule_with_v(self, value, offset)
374 }
375
376 /// Add new reactions to execute later (at least 1 microstep later).
377 ///
378 /// This is used for actions.
379 #[inline]
380 pub(crate) fn enqueue_later(&mut self, downstream: &'x ExecutableReactions, tag: EventTag) {
381 debug_assert!(tag > self.get_tag());
382
383 let evt = Event::execute(tag, Cow::Borrowed(downstream));
384 self.insides.future_events.push(evt);
385 }
386
387 #[inline]
388 pub(crate) fn enqueue_now(&mut self, downstream: Cow<'x, ExecutableReactions<'x>>) {
389 match &mut self.insides.todo_now {
390 Some(ref mut do_next) => do_next.to_mut().absorb_after(downstream.as_ref(), self.cur_level.next()),
391 None => self.insides.todo_now = Some(downstream),
392 }
393 }
394
395 fn reactions_triggered_by(&self, trigger: TriggerId) -> &'x ExecutableReactions<'x> {
396 self.dataflow.reactions_triggered_by(&trigger)
397 }
398
399 fn make_successor_tag(&self, offset_from_now: Duration) -> EventTag {
400 self.get_tag().successor(offset_from_now)
401 }
402
403 /// Spawn a new thread that can use a [AsyncCtx]
404 /// to push asynchronous events to the reaction queue. This is
405 /// only useful with [physical actions](crate::PhysicalAction).
406 ///
407 /// Since the thread is allowed to keep references into the
408 /// internals of the scheduler, it is joined when the scheduler
409 /// shuts down. This means the scheduler will wait for the
410 /// thread to finish its task. For that reason, the thread's
411 /// closure should not execute an infinite loop, it should at
412 /// least check that the scheduler has not been terminated by
413 /// polling [AsyncCtx::was_terminated].
414 ///
415 /// ### Example
416 ///
417 /// ```no_run
418 /// # use reactor_rt::prelude::*;
419 /// fn some_reaction(ctx: &mut ReactionCtx, phys_action: &PhysicalActionRef<u32>) {
420 /// let phys_action = phys_action.clone(); // clone to move it into other thread
421 /// ctx.spawn_physical_thread(move |link| {
422 /// std::thread::sleep(Duration::from_millis(200));
423 /// // This will push an event whose tag is the
424 /// // current physical time at the point of this
425 /// // statement.
426 /// link.schedule_physical_with_v(&phys_action, Some(123), Asap).unwrap();
427 /// });
428 /// }
429 /// ```
430 ///
431 pub fn spawn_physical_thread<F, R>(&mut self, f: F) -> JoinHandle<R>
432 where
433 // is there a practical reason to encapsulate this?
434 F: FnOnce(&mut AsyncCtx) -> R,
435 F: Send + 'static,
436 R: Send + 'static,
437 {
438 let tx = self.rx.new_sender();
439 let initial_time = self.initial_time;
440 let was_terminated = self.was_terminated_atomic.clone();
441
442 std::thread::spawn(move || {
443 let mut link = AsyncCtx { tx, initial_time, was_terminated };
444 f(&mut link)
445 })
446 }
447
448 /// Request that the application shutdown, possibly with
449 /// a particular offset. Just like for actions, even a zero
450 /// offset will only trigger the special `shutdown` trigger
451 /// at the earliest one microstep after the current tag.
452 ///
453 /// ```no_run
454 /// # use reactor_rt::prelude::*;
455 /// # let ctx: &mut ReactionCtx = panic!();
456 /// # let action: &mut LogicalAction<&'static str> = panic!();
457 /// // trigger shutdown on the next microstep
458 /// ctx.request_stop(Asap);
459 ///
460 /// // trigger shutdown in *at most* 1 msec (in logical time).
461 /// // If in the meantime, another `request_stop` call schedules
462 /// // shutdown for an earlier tag, that one will be honored instead.
463 /// ctx.request_stop(after!(1 msec));
464 /// ```
465 #[inline]
466 pub fn request_stop(&mut self, offset: Offset) {
467 let tag = self.make_successor_tag(offset.to_duration());
468
469 let evt = Event::terminate_at(tag);
470 self.insides.future_events.push(evt);
471 }
472
473 /// Reschedule a periodic timer if need be.
474 /// This is called by a reaction synthesized for each timer.
475 // note: reactions can't call this as they're only passed a shared reference to a timer.
476 #[doc(hidden)]
477 #[inline]
478 pub fn reschedule_timer(&mut self, timer: &mut Timer) {
479 if timer.is_periodic() {
480 let downstream = self.reactions_triggered_by(timer.get_id());
481 self.enqueue_later(downstream, self.make_successor_tag(timer.period));
482 }
483 }
484
485 /// Schedule the first triggering of the given timer.
486 /// This is called by a reaction synthesized for each timer.
487 // note: reactions can't call this as they're only passed a shared references to timers.
488 #[doc(hidden)]
489 #[inline]
490 pub fn bootstrap_timer(&mut self, timer: &mut Timer) {
491 // we're in startup
492 let downstream = self.reactions_triggered_by(timer.get_id());
493 if timer.offset.is_zero() {
494 // no offset
495 self.enqueue_now(Cow::Borrowed(downstream))
496 } else {
497 self.enqueue_later(downstream, self.make_successor_tag(timer.offset))
498 }
499 }
500
501 /// Execute the given reaction with the given reactor.
502 #[inline]
503 pub(super) fn execute(&mut self, reactor: &mut ReactorBox, reaction_id: GlobalReactionId) {
504 trace!(
505 " - Executing {} (level {})",
506 self.debug_info.display_reaction(reaction_id),
507 self.cur_level
508 );
509 debug_assert_eq!(reactor.id(), reaction_id.0.container(), "Wrong reactor");
510 self.current_reaction.replace(reaction_id);
511 reactor.react(self, reaction_id.0.local());
512 self.current_reaction.take();
513 }
514
515 pub(super) fn new(
516 rx: &'a Receiver<PhysicalEvent>,
517 tag: EventTag,
518 initial_time: Instant,
519 todo: ReactionPlan<'x>,
520 dataflow: &'x DataflowInfo,
521 debug_info: DebugInfoProvider<'a>,
522 was_terminated_atomic: &'a Arc<AtomicBool>,
523 was_terminated: bool,
524 ) -> Self {
525 Self {
526 insides: RContextForwardableStuff { todo_now: todo, future_events: Default::default() },
527 cur_level: Default::default(),
528 tag,
529 current_reaction: None,
530 rx,
531 initial_time,
532 dataflow,
533 was_terminated_atomic,
534 debug_info,
535 was_terminated,
536 }
537 }
538
539 /// Fork a context. Some things are shared, but not the
540 /// mutable stuff.
541 #[cfg(feature = "parallel-runtime")]
542 pub(super) fn fork(&self) -> Self {
543 Self {
544 insides: Default::default(),
545
546 // all of that is common to all contexts
547 tag: self.tag,
548 rx: self.rx,
549 cur_level: self.cur_level,
550 initial_time: self.initial_time,
551 dataflow: self.dataflow,
552 was_terminated: self.was_terminated,
553 was_terminated_atomic: self.was_terminated_atomic,
554 debug_info: self.debug_info.clone(),
555 current_reaction: self.current_reaction,
556 }
557 }
558}
559
560/// Info that executing reactions need to make known to the scheduler.
561#[derive(Default)]
562pub(super) struct RContextForwardableStuff<'x> {
563 /// Remaining reactions to execute before the wave dies.
564 /// Using [Option] and [Cow] optimises for the case where
565 /// zero or exactly one port/action is set, and minimises
566 /// copies.
567 ///
568 /// This is mutable: if a reaction sets a port, then the
569 /// downstream of that port is inserted in into this
570 /// data structure.
571 pub(super) todo_now: ReactionPlan<'x>,
572
573 /// Events that were produced for a strictly greater
574 /// logical time than a current one.
575 pub(super) future_events: SmallVec<[Event<'x>; 4]>,
576}
577
578#[cfg(feature = "parallel-runtime")]
579impl RContextForwardableStuff<'_> {
580 pub(super) fn merge(mut self, other: Self) -> Self {
581 self.absorb(other);
582 self
583 }
584
585 pub(super) fn absorb(&mut self, mut other: Self) {
586 self.todo_now = ExecutableReactions::merge_cows(self.todo_now.take(), other.todo_now);
587 self.future_events.append(&mut other.future_events);
588 }
589}
590
591/// A type that can affect the logical event queue to implement
592/// asynchronous physical actions. This is a "link" to the event
593/// system, from the outside world.
594///
595/// See [ReactionCtx::spawn_physical_thread].
596///
597#[derive(Clone)]
598pub struct AsyncCtx {
599 tx: Sender<PhysicalEvent>,
600 initial_time: Instant,
601 /// Whether the scheduler has been terminated.
602 was_terminated: Arc<AtomicBool>,
603}
604
605impl AsyncCtx {
606 /// Returns true if the scheduler has been shutdown. When
607 /// that's true, calls to other methods of this type will
608 /// fail with [SendError].
609 pub fn was_terminated(&self) -> bool {
610 self.was_terminated.load(Ordering::SeqCst)
611 }
612
613 /// Request that the application shutdown, possibly with
614 /// a particular offset from the current physical time.
615 ///
616 /// This may fail if this is called while the scheduler
617 /// has already been shutdown. An Ok result is also not
618 /// a guarantee that the event will be processed: the
619 /// scheduler may be in the process of shutting down,
620 /// or its shutdown might be programmed for a logical
621 /// time which precedes the current physical time.
622 pub fn request_stop(&mut self, offset: Offset) -> Result<(), SendError<()>> {
623 // physical time must be ahead of logical time so
624 // this event is scheduled for the future
625 let tag = EventTag::absolute(self.initial_time, Instant::now() + offset.to_duration());
626
627 let evt = PhysicalEvent::terminate_at(tag);
628 self.tx.send(evt).map_err(|e| {
629 warn!("Event could not be sent! {:?}", e);
630 SendError(())
631 })
632 }
633
634 /// Schedule an action to run after its own implicit time delay
635 /// plus an optional additional time delay. These delays are in
636 /// logical time.
637 ///
638 /// Note that this locks the action.
639 ///
640 /// This may fail if this is called while the scheduler
641 /// has already been shutdown. An Ok result is also not
642 /// a guarantee that the event will be processed: the
643 /// scheduler may be in the process of shutting down,
644 /// or its shutdown might be programmed for a logical
645 /// time which precedes the current physical time.
646 ///
647 pub fn schedule_physical<T: Sync>(
648 &mut self,
649 action: &PhysicalActionRef<T>,
650 offset: Offset,
651 ) -> Result<(), SendError<Option<T>>> {
652 self.schedule_physical_with_v(action, None, offset)
653 }
654
655 /// Schedule an action to run after its own implicit time delay
656 /// plus an optional additional time delay. These delays are in
657 /// logical time.
658 ///
659 /// Note that this locks the action.
660 ///
661 /// This may fail if this is called while the scheduler
662 /// has already been shutdown. An Ok result is also not
663 /// a guarantee that the event will be processed: the
664 /// scheduler may be in the process of shutting down,
665 /// or its shutdown might be programmed for a logical
666 /// time which precedes the current physical time.
667 ///
668 pub fn schedule_physical_with_v<T: Sync>(
669 &mut self,
670 action: &PhysicalActionRef<T>,
671 value: Option<T>,
672 offset: Offset,
673 ) -> Result<(), SendError<Option<T>>> {
674 // physical time must be ahead of logical time so
675 // this event is scheduled for the future
676 action
677 .use_mut_p(value, |action, value| {
678 let tag = EventTag::absolute(self.initial_time, Instant::now() + offset.to_duration());
679 action.0.schedule_future_value(tag, value);
680
681 let evt = PhysicalEvent::trigger(tag, action.get_id());
682 self.tx.send(evt).map_err(|e| {
683 warn!("Event could not be sent! {:?}", e);
684 SendError(action.0.forget_value(&tag))
685 })
686 })
687 .unwrap_or_else(|value| Err(SendError(value)))
688 }
689}
690
691/// Implemented by LogicalAction and PhysicalAction references
692/// to give access to [ReactionCtx::schedule] and variants.
693pub trait SchedulableAsAction<T: Sync> {
694 #[doc(hidden)]
695 fn schedule_with_v(&mut self, ctx: &mut ReactionCtx, value: Option<T>, offset: Offset);
696}
697
698impl<T: Sync> SchedulableAsAction<T> for LogicalAction<T> {
699 fn schedule_with_v(&mut self, ctx: &mut ReactionCtx, value: Option<T>, offset: Offset) {
700 let eta = ctx.make_successor_tag(self.0.min_delay + offset.to_duration());
701 self.0.schedule_future_value(eta, value);
702 let downstream = ctx.dataflow.reactions_triggered_by(&self.get_id());
703 ctx.enqueue_later(downstream, eta);
704 }
705}
706
707impl<T: Sync> SchedulableAsAction<T> for PhysicalActionRef<T> {
708 fn schedule_with_v(&mut self, ctx: &mut ReactionCtx, value: Option<T>, offset: Offset) {
709 self.use_mut_p(value, |action, value| {
710 let tag = EventTag::absolute(ctx.initial_time, Instant::now() + offset.to_duration());
711 action.0.schedule_future_value(tag, value);
712 let downstream = ctx.dataflow.reactions_triggered_by(&action.get_id());
713 ctx.enqueue_later(downstream, tag);
714 })
715 .ok();
716 }
717}
718
719/// An offset from the current event.
720///
721/// This is to be used with [ReactionCtx::schedule].
722#[derive(Copy, Clone, Debug)]
723pub enum Offset {
724 /// Specify that the trigger will fire at least after
725 /// the provided duration.
726 ///
727 /// If the duration is zero (eg [Asap](Self::Asap)), it does not
728 /// mean that the trigger will fire right away. For actions, the
729 /// action's inherent minimum delay must be taken into account,
730 /// and even with a zero minimal delay, a delay of one microstep
731 /// is applied.
732 ///
733 /// You can use the [after!()](crate::after) macro, instead
734 /// of using this directly. For instance:
735 /// ```
736 /// # use reactor_rt::prelude::*;
737 /// assert_eq!(after!(15 ms), After(Duration::from_millis(15)));
738 /// ```
739 After(Duration),
740
741 /// Specify that the trigger will fire as soon as possible.
742 /// This does not mean that the action will trigger right away. The
743 /// action's inherent minimum delay must be taken into account,
744 /// and even with a zero minimal delay, a delay of one microstep
745 /// is applied. This is equivalent to
746 /// ```
747 /// # use reactor_rt::prelude::*;
748 /// assert_eq!(Asap, After(Duration::ZERO));
749 /// ```
750 Asap,
751}
752
753impl Offset {
754 #[inline]
755 pub(crate) fn to_duration(self) -> Duration {
756 match self {
757 Offset::After(d) => d,
758 Offset::Asap => Duration::ZERO,
759 }
760 }
761}
762
763impl PartialEq<Self> for Offset {
764 fn eq(&self, other: &Self) -> bool {
765 self.to_duration() == other.to_duration()
766 }
767}
768
769impl Eq for Offset {}
770
771impl Hash for Offset {
772 fn hash<H: Hasher>(&self, state: &mut H) {
773 self.to_duration().hash(state);
774 }
775}
776
777/// Cleans up a tag
778/// TODO get rid of this!
779/// At least for multiports it's really bad
780/// Maybe we can keep a set of the ports that are present in ReactionCtx
781#[doc(hidden)]
782pub struct CleanupCtx {
783 /// Tag we're cleaning up
784 pub tag: EventTag,
785}
786
787impl CleanupCtx {
788 pub fn cleanup_multiport<T: Sync>(&self, port: &mut Multiport<T>) {
789 // todo bound ports don't need to be cleared
790 for channel in port {
791 channel.clear_value()
792 }
793 }
794
795 pub fn cleanup_port<T: Sync>(&self, port: &mut Port<T>) {
796 port.clear_value()
797 }
798
799 pub fn cleanup_logical_action<T: Sync>(&self, action: &mut LogicalAction<T>) {
800 action.0.forget_value(&self.tag);
801 }
802
803 pub fn cleanup_physical_action<T: Sync>(&self, action: &mut PhysicalActionRef<T>) {
804 action.use_mut(|a| a.0.forget_value(&self.tag)).ok();
805 }
806}