reactor_rt/scheduler/
context.rs

1use std::borrow::Borrow;
2use std::hash::{Hash, Hasher};
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::thread::JoinHandle;
6
7use crossbeam_channel::reconnectable::{Receiver, SendError, Sender};
8use smallvec::SmallVec;
9
10use super::*;
11use crate::assembly::*;
12use crate::scheduler::dependencies::{DataflowInfo, ExecutableReactions, LevelIx};
13use crate::*;
14
15/// The context in which a reaction executes. Its API
16/// allows mutating the event queue of the scheduler.
17/// Only the interactions declared at assembly time
18/// are allowed.
19///
20// Implementation details:
21// ReactionCtx is an API built around a ReactionWave. A single
22// ReactionCtx may be used for multiple ReactionWaves, but
23// obviously at disjoint times (&mut).
24pub struct ReactionCtx<'a, 'x> {
25    pub(super) insides: RContextForwardableStuff<'x>,
26
27    /// Logical time of the execution of this wave, constant
28    /// during the existence of the object
29    tag: EventTag,
30
31    /// Level of the reaction being executed.
32    pub(super) cur_level: LevelIx,
33
34    /// ID of the reaction being executed.
35    current_reaction: Option<GlobalReactionId>,
36
37    /// Sender to schedule events that should be executed later than this wave.
38    rx: &'a Receiver<PhysicalEvent>,
39
40    /// Start time of the program.
41    initial_time: Instant,
42
43    // globals, also they might be copied and passed to AsyncCtx
44    dataflow: &'x DataflowInfo,
45    debug_info: DebugInfoProvider<'a>,
46    /// Whether the scheduler has been shut down.
47    was_terminated_atomic: &'a Arc<AtomicBool>,
48    /// In ReactionCtx, this will only be true if this is the shutdown tag.
49    /// It duplicates [Self::was_terminated_atomic], to avoid an atomic
50    /// operation within [Self::is_shutdown].
51    was_terminated: bool,
52}
53
54impl<'a, 'x> ReactionCtx<'a, 'x> {
55    /// Returns the start time of the execution of this program.
56    ///
57    /// This is a logical instant with microstep zero.
58    #[inline]
59    pub fn get_start_time(&self) -> Instant {
60        self.initial_time
61    }
62
63    /// Returns the current physical time.
64    ///
65    /// Repeated invocation of this method may produce different
66    /// values, although [Instant] is monotonic. The
67    /// physical time is necessarily greater than the logical time.
68    #[inline]
69    pub fn get_physical_time(&self) -> Instant {
70        Instant::now()
71    }
72
73    /// Returns the current logical time.
74    ///
75    /// Logical time is frozen during the execution of a reaction.
76    /// Repeated invocation of this method will always produce
77    /// the same value.
78    #[inline]
79    pub fn get_logical_time(&self) -> Instant {
80        self.tag.to_logical_time(self.get_start_time())
81    }
82
83    /// Returns the tag at which the reaction executes.
84    ///
85    /// Repeated invocation of this method will always produce
86    /// the same value.
87    #[inline]
88    pub fn get_tag(&self) -> EventTag {
89        self.tag
90    }
91
92    /// Returns whether this tag is the shutdown tag of the
93    /// application. If so, it's necessarily the very last
94    /// invocation of the current reaction (on a given reactor
95    /// instance).
96    ///
97    /// Repeated invocation of this method will always produce
98    /// the same value.
99    #[inline]
100    pub fn is_shutdown(&self) -> bool {
101        self.was_terminated
102    }
103
104    /// Returns the amount of logical time elapsed since the
105    /// start of the program. This does not take microsteps
106    /// into account.
107    #[inline]
108    pub fn get_elapsed_logical_time(&self) -> Duration {
109        self.get_logical_time() - self.get_start_time()
110    }
111
112    /// Returns the amount of physical time elapsed since the
113    /// start of the program.
114    ///
115    /// Since this uses [Self::get_physical_time], be aware that
116    /// this function's result may change over time.
117    #[inline]
118    pub fn get_elapsed_physical_time(&self) -> Duration {
119        self.get_physical_time() - self.get_start_time()
120    }
121
122    /// Returns the number of active workers in the execution of
123    /// a reactor program.
124    ///
125    /// Return values:
126    /// * `1` if threading is not enabled.
127    /// * If threading is enabled and a number of workers was specified,
128    ///   it returns that number.
129    /// * And if the number of workers was left unspecified,
130    ///   the return value might vary.
131    pub fn num_workers(&self) -> usize {
132        cfg_if::cfg_if! {
133            if #[cfg(feature = "parallel-runtime")] {
134                rayon::current_num_threads()
135            } else {
136                1
137            }
138        }
139    }
140
141    /// Returns the current value of a port or action at this
142    /// logical time. If the value is absent, [Option::None] is
143    /// returned.  This is the case if the action or port is
144    /// not present ([Self::is_present]), or if no value was
145    /// scheduled (action values are optional, see [Self::schedule_with_v]).
146    ///
147    /// The value is copied out. See also [Self::use_ref] if this
148    /// is to be avoided.
149    ///
150    /// ### Examples
151    ///
152    /// ```no_run
153    /// # use reactor_rt::{ReactionCtx, Port};
154    /// # let ctx: &mut ReactionCtx = panic!();
155    /// # let port: &Port<u32> = panic!();
156    /// if let Some(value) = ctx.get(port) {
157    ///     // branch is taken if the port is set
158    /// }
159    /// ```
160    #[inline]
161    pub fn get<T: Copy>(&self, container: &impl ReactionTrigger<T>) -> Option<T> {
162        container.borrow().get_value(&self.get_tag(), &self.get_start_time())
163    }
164
165    /// Returns a reference to the current value of a port or action at this
166    /// logical time. If the value is absent, [Option::None] is
167    /// returned.  This is the case if the action or port is
168    /// not present ([Self::is_present]), or if no value was
169    /// scheduled (action values are optional, see [Self::schedule_with_v]).
170    ///
171    /// This does not require the value to be Copy, however, the implementation
172    /// of this method currently may require unsafe code. The method is therefore
173    /// not offered when compiling with the `no-unsafe` feature.
174    ///
175    /// ### Examples
176    ///
177    /// ```no_run
178    /// # use reactor_rt::{Port, ReactionCtx};
179    /// # let ctx: &mut ReactionCtx = panic!();
180    /// # let port: &Port<u32> = panic!();
181    /// if let Some(value) = ctx.get_ref(port) {
182    ///     // value is a ref to the internal value
183    /// }
184    /// ```
185    #[inline]
186    #[cfg(not(feature = "no-unsafe"))]
187    pub fn get_ref<'q, T>(&self, container: &'q impl crate::triggers::ReactionTriggerWithRefAccess<T>) -> Option<&'q T> {
188        container.get_value_ref(&self.get_tag(), &self.get_start_time())
189    }
190
191    /// Executes the provided closure on the value of the port
192    /// or action. The value is fetched by reference and not
193    /// copied.
194    ///
195    /// ### Examples
196    ///
197    /// ```no_run
198    /// # use reactor_rt::{ReactionCtx, Port};
199    /// # let ctx: &mut ReactionCtx = panic!();
200    /// # let port: &Port<String> = panic!();
201    /// let len = ctx.use_ref(port, |str| str.map(String::len).unwrap_or(0));
202    /// // equivalent to
203    /// let len = ctx.use_ref_opt(port, String::len).unwrap_or(0);
204    /// ```
205    /// ```no_run
206    /// # use reactor_rt::{ReactionCtx, Port};
207    /// # let ctx: &mut ReactionCtx = unimplemented!();
208    /// # let port: &Port<String> = unimplemented!();
209    ///
210    /// if let Some(str) = ctx.use_ref_opt(port, Clone::clone) {
211    ///     // only entered if the port value is present, so no need to check is_present
212    /// }
213    /// ```
214    ///
215    /// See also the similar [Self::use_ref_opt].
216    #[inline]
217    pub fn use_ref<T, O>(&self, container: &impl ReactionTrigger<T>, action: impl FnOnce(Option<&T>) -> O) -> O {
218        container
219            .borrow()
220            .use_value_ref(&self.get_tag(), &self.get_start_time(), action)
221    }
222
223    /// Executes the provided closure on the value of the port,
224    /// only if it is present. The value is fetched by reference
225    /// and not copied.
226    ///
227    /// See also the similar [Self::use_ref].
228    pub fn use_ref_opt<T, O>(&self, container: &impl ReactionTrigger<T>, action: impl FnOnce(&T) -> O) -> Option<O> {
229        self.use_ref(container, |c| c.map(action))
230    }
231
232    /// Sets the value of the given port.
233    ///
234    /// The change is visible at the same logical time, i.e.
235    /// the value propagates immediately. This may hence
236    /// schedule more reactions that should execute at the
237    /// same logical time.
238    #[inline]
239    pub fn set<T>(&mut self, port: &mut Port<T>, value: T)
240    where
241        T: Sync,
242    {
243        if cfg!(debug_assertions) {
244            self.check_set_port_is_legal(port)
245        }
246        port.set_impl(Some(value));
247        self.enqueue_now(Cow::Borrowed(self.reactions_triggered_by(port.get_id())));
248    }
249
250    fn check_set_port_is_legal<T: Sync>(&self, port: &mut Port<T>) {
251        let port_id = port.get_id();
252        let port_container = self.debug_info.id_registry.get_trigger_container(port_id).unwrap();
253        let reaction_container = self.current_reaction.unwrap().0.container();
254        match port.get_kind() {
255            PortKind::Input => {
256                let port_grandpa = self.debug_info.id_registry.get_container(port_container);
257                assert_eq!(
258                    Some(reaction_container),
259                    port_grandpa,
260                    "Input port {} can only be set by reactions of its grandparent, got reaction {}",
261                    self.debug_info.id_registry.fmt_component(port_id),
262                    self.debug_info.display_reaction(self.current_reaction.unwrap()),
263                );
264            }
265            PortKind::Output => {
266                assert_eq!(
267                    reaction_container,
268                    port_container,
269                    "Input port {} can only be set by reactions of its parent, got reaction {}",
270                    self.debug_info.id_registry.fmt_component(port_id),
271                    self.debug_info.display_reaction(self.current_reaction.unwrap()),
272                );
273            }
274            // todo
275            PortKind::ChildInputReference => {}
276            PortKind::ChildOutputReference => {}
277        }
278    }
279
280    /// Sets the value of the given port, if the given value is `Some`.
281    /// Otherwise the port is not set and no reactions are triggered.
282    ///
283    /// The change is visible at the same logical time, i.e.
284    /// the value propagates immediately. This may hence
285    /// schedule more reactions that should execute at the
286    /// same logical time.
287    ///
288    /// ```no_run
289    /// # use reactor_rt::{ReactionCtx, Port};
290    /// # let ctx: &mut ReactionCtx = unimplemented!();
291    /// # let source: &Port<u32> = unimplemented!();
292    /// # let sink: &mut Port<u32> = unimplemented!();
293    ///
294    /// ctx.set_opt(sink, ctx.get(source));
295    /// // equivalent to
296    /// if let Some(value) = ctx.get(source) {
297    ///     ctx.set(sink, value);
298    /// }
299    /// ```
300    ///
301    #[inline]
302    pub fn set_opt<T>(&mut self, port: &mut Port<T>, value: Option<T>)
303    where
304        T: Sync,
305    {
306        if let Some(v) = value {
307            self.set(port, v)
308        }
309    }
310
311    /// Returns true if the given action was triggered at the
312    /// current logical time.
313    ///
314    /// If so, then it may, but must not, present a value ([Self::get]).
315    #[inline]
316    pub fn is_present<T>(&self, action: &impl ReactionTrigger<T>) -> bool {
317        action.is_present(&self.get_tag(), &self.get_start_time())
318    }
319
320    /// Schedule an action to trigger at some point in the future.
321    /// The action will trigger after its own implicit time delay,
322    /// plus an optional additional time delay (see [Offset]). This
323    /// delay is added to the current logical (resp. physical) time
324    /// for logical (resp. physical) actions.
325    ///
326    /// This is like [Self::schedule_with_v], where the value is [None].
327    ///
328    /// ### Examples
329    ///
330    /// ```no_run
331    /// # use reactor_rt::prelude::*;
332    /// # let ctx: &mut ReactionCtx = panic!();
333    /// # let action: &mut LogicalAction<String> = panic!();
334    /// ctx.schedule(action, Asap);         // will be executed one microstep from now (+ own delay)
335    /// ctx.schedule(action, after!(2 ms)); // will be executed 2 milliseconds from now (+ own delay)
336    /// ctx.schedule(action, After(delay!(2 ms)));             // equivalent to the previous
337    /// ctx.schedule(action, After(Duration::from_millis(2))); // equivalent to the previous
338    /// ```
339    #[inline]
340    pub fn schedule<T: Sync>(&mut self, action: &mut impl SchedulableAsAction<T>, offset: Offset) {
341        self.schedule_with_v(action, None, offset)
342    }
343
344    /// Schedule an action to trigger at some point in the future,
345    ///
346    /// The action will carry the given value at the time it
347    /// is triggered, unless it is overwritten by another call
348    /// to this method. The value can be cleared by using `None`
349    /// as a value. Note that even if the value is absent, the
350    /// *action* will still be present at the time it is triggered
351    /// (see [Self::is_present]).
352    ///
353    /// The action will trigger after its own implicit time delay,
354    /// plus an optional additional time delay (see [Offset]). This
355    /// delay is added to the current logical (resp. physical) time
356    /// for logical (resp. physical) actions.
357    ///
358    /// ### Examples
359    ///
360    /// ```no_run
361    /// # use reactor_rt::prelude::*;
362    /// # let ctx: &mut ReactionCtx = panic!();
363    /// # let action: &mut LogicalAction<&'static str> = panic!();
364    /// // will be executed 2 milliseconds (+ own delay) from now with that value.
365    /// ctx.schedule_with_v(action, Some("value"), after!(2 msec));
366    /// // will be executed one microstep from now, with no value
367    /// ctx.schedule_with_v(action, None, Asap);
368    /// // that's equivalent to
369    /// ctx.schedule(action, Asap);
370    /// ```
371    #[inline]
372    pub fn schedule_with_v<T: Sync>(&mut self, action: &mut impl SchedulableAsAction<T>, value: Option<T>, offset: Offset) {
373        action.schedule_with_v(self, value, offset)
374    }
375
376    /// Add new reactions to execute later (at least 1 microstep later).
377    ///
378    /// This is used for actions.
379    #[inline]
380    pub(crate) fn enqueue_later(&mut self, downstream: &'x ExecutableReactions, tag: EventTag) {
381        debug_assert!(tag > self.get_tag());
382
383        let evt = Event::execute(tag, Cow::Borrowed(downstream));
384        self.insides.future_events.push(evt);
385    }
386
387    #[inline]
388    pub(crate) fn enqueue_now(&mut self, downstream: Cow<'x, ExecutableReactions<'x>>) {
389        match &mut self.insides.todo_now {
390            Some(ref mut do_next) => do_next.to_mut().absorb_after(downstream.as_ref(), self.cur_level.next()),
391            None => self.insides.todo_now = Some(downstream),
392        }
393    }
394
395    fn reactions_triggered_by(&self, trigger: TriggerId) -> &'x ExecutableReactions<'x> {
396        self.dataflow.reactions_triggered_by(&trigger)
397    }
398
399    fn make_successor_tag(&self, offset_from_now: Duration) -> EventTag {
400        self.get_tag().successor(offset_from_now)
401    }
402
403    /// Spawn a new thread that can use a [AsyncCtx]
404    /// to push asynchronous events to the reaction queue. This is
405    /// only useful with [physical actions](crate::PhysicalAction).
406    ///
407    /// Since the thread is allowed to keep references into the
408    /// internals of the scheduler, it is joined when the scheduler
409    /// shuts down. This means the scheduler will wait for the
410    /// thread to finish its task. For that reason, the thread's
411    /// closure should not execute an infinite loop, it should at
412    /// least check that the scheduler has not been terminated by
413    /// polling [AsyncCtx::was_terminated].
414    ///
415    /// ### Example
416    ///
417    /// ```no_run
418    /// # use reactor_rt::prelude::*;
419    /// fn some_reaction(ctx: &mut ReactionCtx, phys_action: &PhysicalActionRef<u32>) {
420    ///     let phys_action = phys_action.clone(); // clone to move it into other thread
421    ///     ctx.spawn_physical_thread(move |link| {
422    ///         std::thread::sleep(Duration::from_millis(200));
423    ///         // This will push an event whose tag is the
424    ///         // current physical time at the point of this
425    ///         // statement.
426    ///         link.schedule_physical_with_v(&phys_action, Some(123), Asap).unwrap();
427    ///     });
428    /// }
429    /// ```
430    ///
431    pub fn spawn_physical_thread<F, R>(&mut self, f: F) -> JoinHandle<R>
432    where
433        // is there a practical reason to encapsulate this?
434        F: FnOnce(&mut AsyncCtx) -> R,
435        F: Send + 'static,
436        R: Send + 'static,
437    {
438        let tx = self.rx.new_sender();
439        let initial_time = self.initial_time;
440        let was_terminated = self.was_terminated_atomic.clone();
441
442        std::thread::spawn(move || {
443            let mut link = AsyncCtx { tx, initial_time, was_terminated };
444            f(&mut link)
445        })
446    }
447
448    /// Request that the application shutdown, possibly with
449    /// a particular offset. Just like for actions, even a zero
450    /// offset will only trigger the special `shutdown` trigger
451    /// at the earliest one microstep after the current tag.
452    ///
453    /// ```no_run
454    /// # use reactor_rt::prelude::*;
455    /// # let ctx: &mut ReactionCtx = panic!();
456    /// # let action: &mut LogicalAction<&'static str> = panic!();
457    /// // trigger shutdown on the next microstep
458    /// ctx.request_stop(Asap);
459    ///
460    /// // trigger shutdown in *at most* 1 msec (in logical time).
461    /// // If in the meantime, another `request_stop` call schedules
462    /// // shutdown for an earlier tag, that one will be honored instead.
463    /// ctx.request_stop(after!(1 msec));
464    /// ```
465    #[inline]
466    pub fn request_stop(&mut self, offset: Offset) {
467        let tag = self.make_successor_tag(offset.to_duration());
468
469        let evt = Event::terminate_at(tag);
470        self.insides.future_events.push(evt);
471    }
472
473    /// Reschedule a periodic timer if need be.
474    /// This is called by a reaction synthesized for each timer.
475    // note: reactions can't call this as they're only passed a shared reference to a timer.
476    #[doc(hidden)]
477    #[inline]
478    pub fn reschedule_timer(&mut self, timer: &mut Timer) {
479        if timer.is_periodic() {
480            let downstream = self.reactions_triggered_by(timer.get_id());
481            self.enqueue_later(downstream, self.make_successor_tag(timer.period));
482        }
483    }
484
485    /// Schedule the first triggering of the given timer.
486    /// This is called by a reaction synthesized for each timer.
487    // note: reactions can't call this as they're only passed a shared references to timers.
488    #[doc(hidden)]
489    #[inline]
490    pub fn bootstrap_timer(&mut self, timer: &mut Timer) {
491        // we're in startup
492        let downstream = self.reactions_triggered_by(timer.get_id());
493        if timer.offset.is_zero() {
494            // no offset
495            self.enqueue_now(Cow::Borrowed(downstream))
496        } else {
497            self.enqueue_later(downstream, self.make_successor_tag(timer.offset))
498        }
499    }
500
501    /// Execute the given reaction with the given reactor.
502    #[inline]
503    pub(super) fn execute(&mut self, reactor: &mut ReactorBox, reaction_id: GlobalReactionId) {
504        trace!(
505            "  - Executing {} (level {})",
506            self.debug_info.display_reaction(reaction_id),
507            self.cur_level
508        );
509        debug_assert_eq!(reactor.id(), reaction_id.0.container(), "Wrong reactor");
510        self.current_reaction.replace(reaction_id);
511        reactor.react(self, reaction_id.0.local());
512        self.current_reaction.take();
513    }
514
515    pub(super) fn new(
516        rx: &'a Receiver<PhysicalEvent>,
517        tag: EventTag,
518        initial_time: Instant,
519        todo: ReactionPlan<'x>,
520        dataflow: &'x DataflowInfo,
521        debug_info: DebugInfoProvider<'a>,
522        was_terminated_atomic: &'a Arc<AtomicBool>,
523        was_terminated: bool,
524    ) -> Self {
525        Self {
526            insides: RContextForwardableStuff { todo_now: todo, future_events: Default::default() },
527            cur_level: Default::default(),
528            tag,
529            current_reaction: None,
530            rx,
531            initial_time,
532            dataflow,
533            was_terminated_atomic,
534            debug_info,
535            was_terminated,
536        }
537    }
538
539    /// Fork a context. Some things are shared, but not the
540    /// mutable stuff.
541    #[cfg(feature = "parallel-runtime")]
542    pub(super) fn fork(&self) -> Self {
543        Self {
544            insides: Default::default(),
545
546            // all of that is common to all contexts
547            tag: self.tag,
548            rx: self.rx,
549            cur_level: self.cur_level,
550            initial_time: self.initial_time,
551            dataflow: self.dataflow,
552            was_terminated: self.was_terminated,
553            was_terminated_atomic: self.was_terminated_atomic,
554            debug_info: self.debug_info.clone(),
555            current_reaction: self.current_reaction,
556        }
557    }
558}
559
560/// Info that executing reactions need to make known to the scheduler.
561#[derive(Default)]
562pub(super) struct RContextForwardableStuff<'x> {
563    /// Remaining reactions to execute before the wave dies.
564    /// Using [Option] and [Cow] optimises for the case where
565    /// zero or exactly one port/action is set, and minimises
566    /// copies.
567    ///
568    /// This is mutable: if a reaction sets a port, then the
569    /// downstream of that port is inserted in into this
570    /// data structure.
571    pub(super) todo_now: ReactionPlan<'x>,
572
573    /// Events that were produced for a strictly greater
574    /// logical time than a current one.
575    pub(super) future_events: SmallVec<[Event<'x>; 4]>,
576}
577
578#[cfg(feature = "parallel-runtime")]
579impl RContextForwardableStuff<'_> {
580    pub(super) fn merge(mut self, other: Self) -> Self {
581        self.absorb(other);
582        self
583    }
584
585    pub(super) fn absorb(&mut self, mut other: Self) {
586        self.todo_now = ExecutableReactions::merge_cows(self.todo_now.take(), other.todo_now);
587        self.future_events.append(&mut other.future_events);
588    }
589}
590
591/// A type that can affect the logical event queue to implement
592/// asynchronous physical actions. This is a "link" to the event
593/// system, from the outside world.
594///
595/// See [ReactionCtx::spawn_physical_thread].
596///
597#[derive(Clone)]
598pub struct AsyncCtx {
599    tx: Sender<PhysicalEvent>,
600    initial_time: Instant,
601    /// Whether the scheduler has been terminated.
602    was_terminated: Arc<AtomicBool>,
603}
604
605impl AsyncCtx {
606    /// Returns true if the scheduler has been shutdown. When
607    /// that's true, calls to other methods of this type will
608    /// fail with [SendError].
609    pub fn was_terminated(&self) -> bool {
610        self.was_terminated.load(Ordering::SeqCst)
611    }
612
613    /// Request that the application shutdown, possibly with
614    /// a particular offset from the current physical time.
615    ///
616    /// This may fail if this is called while the scheduler
617    /// has already been shutdown. An Ok result is also not
618    /// a guarantee that the event will be processed: the
619    /// scheduler may be in the process of shutting down,
620    /// or its shutdown might be programmed for a logical
621    /// time which precedes the current physical time.
622    pub fn request_stop(&mut self, offset: Offset) -> Result<(), SendError<()>> {
623        // physical time must be ahead of logical time so
624        // this event is scheduled for the future
625        let tag = EventTag::absolute(self.initial_time, Instant::now() + offset.to_duration());
626
627        let evt = PhysicalEvent::terminate_at(tag);
628        self.tx.send(evt).map_err(|e| {
629            warn!("Event could not be sent! {:?}", e);
630            SendError(())
631        })
632    }
633
634    /// Schedule an action to run after its own implicit time delay
635    /// plus an optional additional time delay. These delays are in
636    /// logical time.
637    ///
638    /// Note that this locks the action.
639    ///
640    /// This may fail if this is called while the scheduler
641    /// has already been shutdown. An Ok result is also not
642    /// a guarantee that the event will be processed: the
643    /// scheduler may be in the process of shutting down,
644    /// or its shutdown might be programmed for a logical
645    /// time which precedes the current physical time.
646    ///
647    pub fn schedule_physical<T: Sync>(
648        &mut self,
649        action: &PhysicalActionRef<T>,
650        offset: Offset,
651    ) -> Result<(), SendError<Option<T>>> {
652        self.schedule_physical_with_v(action, None, offset)
653    }
654
655    /// Schedule an action to run after its own implicit time delay
656    /// plus an optional additional time delay. These delays are in
657    /// logical time.
658    ///
659    /// Note that this locks the action.
660    ///
661    /// This may fail if this is called while the scheduler
662    /// has already been shutdown. An Ok result is also not
663    /// a guarantee that the event will be processed: the
664    /// scheduler may be in the process of shutting down,
665    /// or its shutdown might be programmed for a logical
666    /// time which precedes the current physical time.
667    ///
668    pub fn schedule_physical_with_v<T: Sync>(
669        &mut self,
670        action: &PhysicalActionRef<T>,
671        value: Option<T>,
672        offset: Offset,
673    ) -> Result<(), SendError<Option<T>>> {
674        // physical time must be ahead of logical time so
675        // this event is scheduled for the future
676        action
677            .use_mut_p(value, |action, value| {
678                let tag = EventTag::absolute(self.initial_time, Instant::now() + offset.to_duration());
679                action.0.schedule_future_value(tag, value);
680
681                let evt = PhysicalEvent::trigger(tag, action.get_id());
682                self.tx.send(evt).map_err(|e| {
683                    warn!("Event could not be sent! {:?}", e);
684                    SendError(action.0.forget_value(&tag))
685                })
686            })
687            .unwrap_or_else(|value| Err(SendError(value)))
688    }
689}
690
691/// Implemented by LogicalAction and PhysicalAction references
692/// to give access to [ReactionCtx::schedule] and variants.
693pub trait SchedulableAsAction<T: Sync> {
694    #[doc(hidden)]
695    fn schedule_with_v(&mut self, ctx: &mut ReactionCtx, value: Option<T>, offset: Offset);
696}
697
698impl<T: Sync> SchedulableAsAction<T> for LogicalAction<T> {
699    fn schedule_with_v(&mut self, ctx: &mut ReactionCtx, value: Option<T>, offset: Offset) {
700        let eta = ctx.make_successor_tag(self.0.min_delay + offset.to_duration());
701        self.0.schedule_future_value(eta, value);
702        let downstream = ctx.dataflow.reactions_triggered_by(&self.get_id());
703        ctx.enqueue_later(downstream, eta);
704    }
705}
706
707impl<T: Sync> SchedulableAsAction<T> for PhysicalActionRef<T> {
708    fn schedule_with_v(&mut self, ctx: &mut ReactionCtx, value: Option<T>, offset: Offset) {
709        self.use_mut_p(value, |action, value| {
710            let tag = EventTag::absolute(ctx.initial_time, Instant::now() + offset.to_duration());
711            action.0.schedule_future_value(tag, value);
712            let downstream = ctx.dataflow.reactions_triggered_by(&action.get_id());
713            ctx.enqueue_later(downstream, tag);
714        })
715        .ok();
716    }
717}
718
719/// An offset from the current event.
720///
721/// This is to be used with [ReactionCtx::schedule].
722#[derive(Copy, Clone, Debug)]
723pub enum Offset {
724    /// Specify that the trigger will fire at least after
725    /// the provided duration.
726    ///
727    /// If the duration is zero (eg [Asap](Self::Asap)), it does not
728    /// mean that the trigger will fire right away. For actions, the
729    /// action's inherent minimum delay must be taken into account,
730    /// and even with a zero minimal delay, a delay of one microstep
731    /// is applied.
732    ///
733    /// You can use the [after!()](crate::after) macro, instead
734    /// of using this directly. For instance:
735    /// ```
736    /// # use reactor_rt::prelude::*;
737    /// assert_eq!(after!(15 ms), After(Duration::from_millis(15)));
738    /// ```
739    After(Duration),
740
741    /// Specify that the trigger will fire as soon as possible.
742    /// This does not mean that the action will trigger right away. The
743    /// action's inherent minimum delay must be taken into account,
744    /// and even with a zero minimal delay, a delay of one microstep
745    /// is applied. This is equivalent to
746    /// ```
747    /// # use reactor_rt::prelude::*;
748    /// assert_eq!(Asap, After(Duration::ZERO));
749    /// ```
750    Asap,
751}
752
753impl Offset {
754    #[inline]
755    pub(crate) fn to_duration(self) -> Duration {
756        match self {
757            Offset::After(d) => d,
758            Offset::Asap => Duration::ZERO,
759        }
760    }
761}
762
763impl PartialEq<Self> for Offset {
764    fn eq(&self, other: &Self) -> bool {
765        self.to_duration() == other.to_duration()
766    }
767}
768
769impl Eq for Offset {}
770
771impl Hash for Offset {
772    fn hash<H: Hasher>(&self, state: &mut H) {
773        self.to_duration().hash(state);
774    }
775}
776
777/// Cleans up a tag
778/// TODO get rid of this!
779///  At least for multiports it's really bad
780///  Maybe we can keep a set of the ports that are present in ReactionCtx
781#[doc(hidden)]
782pub struct CleanupCtx {
783    /// Tag we're cleaning up
784    pub tag: EventTag,
785}
786
787impl CleanupCtx {
788    pub fn cleanup_multiport<T: Sync>(&self, port: &mut Multiport<T>) {
789        // todo bound ports don't need to be cleared
790        for channel in port {
791            channel.clear_value()
792        }
793    }
794
795    pub fn cleanup_port<T: Sync>(&self, port: &mut Port<T>) {
796        port.clear_value()
797    }
798
799    pub fn cleanup_logical_action<T: Sync>(&self, action: &mut LogicalAction<T>) {
800        action.0.forget_value(&self.tag);
801    }
802
803    pub fn cleanup_physical_action<T: Sync>(&self, action: &mut PhysicalActionRef<T>) {
804        action.use_mut(|a| a.0.forget_value(&self.tag)).ok();
805    }
806}