reactor_rt/scheduler/
assembly_impl.rs

1/*
2 * Copyright (c) 2021, TU Dresden.
3 *
4 * Redistribution and use in source and binary forms, with or without modification,
5 * are permitted provided that the following conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above copyright notice,
8 *    this list of conditions and the following disclaimer.
9 *
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 *    this list of conditions and the following disclaimer in the documentation
12 *    and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
15 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
16 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
17 * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
19 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
21 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
22 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23 */
24
25use std::borrow::Cow;
26use std::marker::PhantomData;
27
28use index_vec::{Idx, IndexVec};
29
30use super::{ReactorBox, ReactorVec};
31use crate::assembly::*;
32use crate::scheduler::dependencies::DepGraph;
33use crate::*;
34
35/// Globals shared by all assemblers.
36pub(super) struct RootAssembler {
37    /// All registered reactors
38    pub(super) reactors: IndexVec<ReactorId, Option<ReactorBox<'static>>>,
39    /// Dependency graph
40    pub(super) graph: DepGraph,
41    /// Debug infos
42    pub(super) debug_info: DebugInfoRegistry,
43
44    /// Next reactor ID to assign
45    reactor_id: ReactorId,
46    /// Next trigger ID to assign
47    cur_trigger: TriggerId,
48}
49
50impl RootAssembler {
51    /// Register a reactor into the global data structure that owns them during execution.
52    fn register_reactor<R: ReactorInitializer + 'static>(&mut self, child: R) {
53        if child.id().index() >= self.reactors.len() {
54            self.reactors.resize_with(child.id().index() + 1, || None)
55        }
56        let prev = self.reactors[child.id()].replace(Box::new(child));
57        // this is impossible because we control how we allocate IDs entirely
58        debug_assert!(prev.is_none(), "Overwrote a reactor during initialization")
59    }
60
61    /// Register reactors into the global data structure that owns them during execution.
62    fn register_bank<R: ReactorInitializer + 'static>(&mut self, bank: Vec<R>) {
63        for child in bank {
64            self.register_reactor(child)
65        }
66    }
67
68    /// Top level fun that assembles the main reactor
69    pub fn assemble_tree<R: ReactorInitializer + 'static>(
70        main_args: R::Params,
71    ) -> (ReactorVec<'static>, DepGraph, DebugInfoRegistry) {
72        let mut root = RootAssembler::default();
73        let assembler = AssemblyCtx::new(&mut root, ReactorDebugInfo::root::<R::Wrapped>());
74
75        let main_reactor = match R::assemble(main_args, assembler) {
76            Ok(main) => main.finish(),
77            Err(e) => std::panic::panic_any(e.lift(&root.debug_info)),
78        };
79        root.debug_info.record_main_reactor(main_reactor.id());
80        root.register_reactor(main_reactor);
81
82        let RootAssembler { graph, reactors, debug_info: id_registry, .. } = root;
83
84        let reactors = reactors.into_iter().map(|r| r.expect("Uninitialized reactor!")).collect();
85        (reactors, graph, id_registry)
86    }
87}
88
89impl Default for RootAssembler {
90    fn default() -> Self {
91        Self {
92            reactor_id: ReactorId::new(0),
93            graph: DepGraph::new(),
94            debug_info: DebugInfoRegistry::new(),
95            reactors: Default::default(),
96            cur_trigger: TriggerId::FIRST_REGULAR,
97        }
98    }
99}
100
101/// Helper struct to assemble reactors during initialization.
102/// One assembly context is used per reactor, they can't be shared.
103pub struct AssemblyCtx<'x, S>
104where
105    S: ReactorInitializer,
106{
107    globals: &'x mut RootAssembler,
108    /// Next local ID for components != reactions
109    cur_local: LocalReactionId,
110
111    /// Contains debug info for this reactor. Empty after
112    /// assemble_self has run, and the info is recorded
113    /// into the debug info registry.
114    debug: Option<ReactorDebugInfo>,
115    /// IDs of children used for debug info.
116    children_ids: Vec<ReactorId>,
117
118    _phantom: PhantomData<S>,
119}
120
121/// Final result of the assembly of a reactor.
122#[allow(unused_variables)]
123pub struct FinishedReactor<'x, S>(PhantomData<&'x mut u8>, S)
124where
125    S: ReactorInitializer;
126
127/// Intermediate result of assembly.
128pub struct AssemblyIntermediate<'x, S>(AssemblyCtx<'x, S>, S)
129where
130    S: ReactorInitializer;
131
132impl<'x, S> AssemblyCtx<'x, S>
133where
134    S: ReactorInitializer,
135{
136    fn new(globals: &'x mut RootAssembler, debug: ReactorDebugInfo) -> Self {
137        Self {
138            globals,
139            // this is not zero, so that reaction ids and component ids are disjoint
140            cur_local: S::MAX_REACTION_ID,
141            debug: Some(debug),
142            _phantom: PhantomData,
143            children_ids: Vec::default(),
144        }
145    }
146
147    /// top level function
148    pub fn assemble(
149        self,
150        build_reactor_tree: impl FnOnce(Self) -> AssemblyResult<AssemblyIntermediate<'x, S>>,
151    ) -> AssemblyResult<FinishedReactor<'x, S>> {
152        let AssemblyIntermediate(_, reactor) = build_reactor_tree(self)?;
153        Ok(FinishedReactor(PhantomData, reactor))
154    }
155
156    /// Innermost function.
157    pub fn assemble_self<const N: usize>(
158        mut self,
159        create_self: impl FnOnce(&mut ComponentCreator<S>, ReactorId) -> Result<S, AssemblyError>,
160        num_non_synthetic_reactions: usize,
161        reaction_names: [Option<&'static str>; N],
162        declare_dependencies: impl FnOnce(&mut DependencyDeclarator<S>, &mut S, [GlobalReactionId; N]) -> AssemblyResult<()>,
163    ) -> AssemblyResult<AssemblyIntermediate<'x, S>> {
164        // todo when feature(generic_const_exprs) is stabilized,
165        //  replace const parameter N with S::MAX_REACTION_ID.index().
166        debug_assert_eq!(N, S::MAX_REACTION_ID.index(), "Should initialize all reactions");
167
168        // note: the ID is not known until all descendants
169        // have been built.
170        // This is because the ID is the index in the different
171        // IndexVec we use around within the scheduler
172        // (in SyncScheduler and also in DebugInfoRegistry),
173        // and descendants need to be pushed before Self.
174        // Effectively, IDs are assigned depth first. This
175        // makes this whole debug info recording very complicated.
176        let id = self.globals.reactor_id.get_and_incr();
177        let debug = self.debug.take().expect("unreachable - can only call assemble_self once");
178        trace!("Children of {}: {:?}", debug, self.children_ids);
179        self.globals.debug_info.record_reactor(id, debug);
180        for child in self.children_ids.drain(..) {
181            self.globals.debug_info.record_reactor_container(id, child);
182        }
183
184        let first_trigger_id = self.globals.cur_trigger;
185
186        let mut ich = create_self(&mut ComponentCreator { assembler: &mut self }, id)?;
187        // after creation, globals.cur_trigger has been mutated
188        // record proper debug info.
189        self.globals
190            .debug_info
191            .set_id_range(id, first_trigger_id..self.globals.cur_trigger);
192
193        // declare dependencies
194        let reactions = self.new_reactions(id, num_non_synthetic_reactions, reaction_names);
195        declare_dependencies(&mut DependencyDeclarator { assembler: &mut self }, &mut ich, reactions)?;
196        Ok(AssemblyIntermediate(self, ich))
197    }
198
199    /// Create N reactions. The first `num_non_synthetic` get
200    /// priority edges, as they are taken to be those declared
201    /// in LF by the user.
202    /// The rest do not have priority edges, and their
203    /// implementation must hence have no observable side-effect.
204    fn new_reactions<const N: usize>(
205        &mut self,
206        my_id: ReactorId,
207        num_non_synthetic: usize,
208        names: [Option<&'static str>; N],
209    ) -> [GlobalReactionId; N] {
210        assert!(num_non_synthetic <= N);
211
212        let result = array![i => GlobalReactionId::new(my_id, LocalReactionId::from_usize(i)); N];
213
214        let mut prev: Option<GlobalReactionId> = None;
215        for (i, r) in result.iter().cloned().enumerate() {
216            if let Some(label) = names[i] {
217                self.globals.debug_info.record_reaction(r, Cow::Borrowed(label))
218            }
219            self.globals.graph.record_reaction(r);
220            if i < num_non_synthetic {
221                if let Some(prev) = prev {
222                    // Add an edge that represents that the
223                    // previous reaction takes precedence
224                    self.globals.graph.reaction_priority(prev, r);
225                }
226            }
227            prev = Some(r);
228        }
229
230        self.cur_local = self.cur_local.plus(N);
231        result
232    }
233
234    /// Assembles a child reactor and makes it available in
235    /// the scope of a function.
236    #[inline]
237    pub fn with_child<Sub: ReactorInitializer + 'static, F>(
238        mut self,
239        inst_name: &'static str,
240        args: Sub::Params,
241        action: F,
242    ) -> AssemblyResult<AssemblyIntermediate<'x, S>>
243    // we can't use impl FnOnce(...) because we want to specify explicit type parameters in the caller
244    where
245        F: FnOnce(Self, &mut Sub) -> AssemblyResult<AssemblyIntermediate<'x, S>>,
246    {
247        trace!("Assembling {}", inst_name);
248        let mut sub = self.assemble_sub(inst_name, None, args)?;
249        let AssemblyIntermediate(ich, s) = action(self, &mut sub)?;
250        trace!("Registering {}", inst_name);
251        ich.globals.register_reactor(sub);
252        Ok(AssemblyIntermediate(ich, s))
253    }
254
255    /// Assembles a bank of children reactor and makes it
256    /// available in the scope of a function.
257    #[inline]
258    pub fn with_child_bank<Sub, A, F>(
259        mut self,
260        inst_name: &'static str,
261        bank_width: usize,
262        arg_maker: A,
263        action: F,
264    ) -> AssemblyResult<AssemblyIntermediate<'x, S>>
265    where
266        Sub: ReactorInitializer + 'static,
267        // we can't use impl Fn(...) because we want to specify explicit type parameters in the calle
268        F: FnOnce(Self, &mut Vec<Sub>) -> AssemblyResult<AssemblyIntermediate<'x, S>>,
269        A: Fn(/*bank_index:*/ usize) -> Sub::Params,
270    {
271        trace!("Assembling bank {}", inst_name);
272
273        let mut sub = (0..bank_width)
274            .map(|i| self.assemble_sub(inst_name, Some(i), arg_maker(i)))
275            .collect::<Result<Vec<Sub>, _>>()?;
276
277        let AssemblyIntermediate(ich, r) = action(self, &mut sub)?;
278
279        trace!("Registering bank {}", inst_name);
280        ich.globals.register_bank(sub);
281        Ok(AssemblyIntermediate(ich, r))
282    }
283
284    /// Assemble a child reactor. The child needs to be registered
285    /// using [Self::register_reactor] later.
286    #[inline(always)]
287    fn assemble_sub<Sub: ReactorInitializer>(
288        &mut self,
289        inst_name: &'static str,
290        bank_idx: Option<usize>,
291        args: Sub::Params,
292    ) -> AssemblyResult<Sub> {
293        let my_debug = self.debug.as_ref().expect("should assemble sub-reactors before self");
294
295        let debug_info = match bank_idx {
296            None => my_debug.derive::<Sub>(inst_name),
297            Some(i) => my_debug.derive_bank_item::<Sub>(inst_name, i),
298        };
299
300        let subctx = AssemblyCtx::new(self.globals, debug_info);
301        let subinst = Sub::assemble(args, subctx)?.finish();
302        self.children_ids.push(subinst.id());
303        Ok(subinst)
304    }
305}
306
307impl<S> FinishedReactor<'_, S>
308where
309    S: ReactorInitializer,
310{
311    fn finish(self) -> S {
312        let FinishedReactor(_, reactor) = self;
313        reactor
314    }
315}
316
317/// Declares dependencies between components and reactions.
318pub struct DependencyDeclarator<'a, 'x, S: ReactorInitializer> {
319    assembler: &'a mut AssemblyCtx<'x, S>,
320}
321
322impl<S: ReactorInitializer> DependencyDeclarator<'_, '_, S> {
323    #[inline]
324    pub fn declare_triggers(&mut self, trigger: TriggerId, reaction: GlobalReactionId) -> AssemblyResult<()> {
325        self.graph().triggers_reaction(trigger, reaction);
326        Ok(())
327    }
328
329    #[inline]
330    pub fn effects_port<T: Sync>(&mut self, reaction: GlobalReactionId, port: &Port<T>) -> AssemblyResult<()> {
331        self.effects_instantaneous(reaction, port.get_id())
332    }
333
334    #[inline]
335    pub fn effects_multiport<T: Sync>(&mut self, reaction: GlobalReactionId, port: &Multiport<T>) -> AssemblyResult<()> {
336        self.effects_instantaneous(reaction, port.get_id())
337    }
338
339    #[doc(hidden)] // used by synthesized timer reactions
340    pub fn effects_timer(&mut self, reaction: GlobalReactionId, timer: &Timer) -> AssemblyResult<()> {
341        self.effects_instantaneous(reaction, timer.get_id())
342    }
343
344    #[inline]
345    fn effects_instantaneous(&mut self, reaction: GlobalReactionId, trigger: TriggerId) -> AssemblyResult<()> {
346        self.graph().reaction_effects(reaction, trigger);
347        Ok(())
348    }
349
350    #[inline]
351    pub fn declare_uses(&mut self, reaction: GlobalReactionId, trigger: TriggerId) -> AssemblyResult<()> {
352        self.graph().reaction_uses(reaction, trigger);
353        Ok(())
354    }
355
356    /// Bind two ports together.
357    #[inline]
358    pub fn bind_ports<T: Sync>(&mut self, upstream: &mut Port<T>, downstream: &mut Port<T>) -> AssemblyResult<()> {
359        upstream.forward_to(downstream)?;
360        self.graph().port_bind(upstream, downstream);
361        Ok(())
362    }
363
364    /// Bind the ports of the upstream to those of the downstream,
365    /// as if zipping both iterators.
366    /// todo this will just throw away bindings if both iterators are not of the same size
367    ///  normally this should be reported by LFC as a warning, maybe we should implement the same thing here
368    #[inline]
369    pub fn bind_ports_zip<'a, T: Sync + 'a>(
370        &mut self,
371        upstream: impl Iterator<Item = &'a mut Port<T>>,
372        downstream: impl Iterator<Item = &'a mut Port<T>>,
373    ) -> AssemblyResult<()> {
374        for (upstream, downstream) in upstream.zip(downstream) {
375            self.bind_ports(upstream, downstream)?;
376        }
377        Ok(())
378    }
379
380    #[inline]
381    pub fn bind_ports_iterated<'a, T: Sync + 'a>(
382        &mut self,
383        upstream: impl Iterator<Item = &'a mut Port<T>>,
384        mut downstream: impl Iterator<Item = &'a mut Port<T>>,
385    ) -> AssemblyResult<()> {
386        let mut upstream = upstream.collect::<Vec<_>>();
387        assert!(!upstream.is_empty(), "Empty upstream!");
388        let up_len = upstream.len();
389        // we have to implement this loop manually instead of with an iterator
390        // because we can't clone mutable references in the upstream iterator
391        for i in 0.. {
392            let up = &mut upstream[i % up_len];
393            if let Some(down) = downstream.next() {
394                self.bind_ports(up, down)?;
395            } else {
396                break;
397            }
398        }
399        Ok(())
400    }
401
402    #[inline]
403    fn graph(&mut self) -> &mut DepGraph {
404        &mut self.assembler.globals.graph
405    }
406}
407
408/// Creates the components of a reactor.
409pub struct ComponentCreator<'a, 'x, S: ReactorInitializer> {
410    assembler: &'a mut AssemblyCtx<'x, S>,
411}
412
413impl<S: ReactorInitializer> ComponentCreator<'_, '_, S> {
414    pub fn new_port<T: Sync>(&mut self, lf_name: &'static str, kind: PortKind) -> Port<T> {
415        self.new_port_impl(Cow::Borrowed(lf_name), kind)
416    }
417
418    fn new_port_impl<T: Sync>(&mut self, lf_name: Cow<'static, str>, kind: PortKind) -> Port<T> {
419        let id = self.next_comp_id(lf_name);
420        self.graph().record_port(id);
421        Port::new(id, kind)
422    }
423
424    pub fn new_multiport<T: Sync>(
425        &mut self,
426        lf_name: &'static str,
427        kind: PortKind,
428        len: usize,
429    ) -> Result<Multiport<T>, AssemblyError> {
430        let bank_id = self.next_comp_id(Cow::Borrowed(lf_name));
431        self.graph().record_port_bank(bank_id, len)?;
432        Ok(Multiport::new(
433            (0..len)
434                .map(|i| self.new_port_bank_component(lf_name, kind, bank_id, i))
435                .collect(),
436            bank_id,
437        ))
438    }
439
440    fn new_port_bank_component<T: Sync>(
441        &mut self,
442        lf_name: &'static str,
443        kind: PortKind,
444        bank_id: TriggerId,
445        index: usize,
446    ) -> Port<T> {
447        let channel_id = self.next_comp_id(Cow::Owned(format!("{}[{}]", lf_name, index)));
448        self.graph().record_port_bank_component(bank_id, channel_id);
449        Port::new(channel_id, kind)
450    }
451
452    pub fn new_logical_action<T: Sync>(&mut self, lf_name: &'static str, min_delay: Option<Duration>) -> LogicalAction<T> {
453        let id = self.next_comp_id(Cow::Borrowed(lf_name));
454        self.graph().record_laction(id);
455        LogicalAction::new(id, min_delay)
456    }
457
458    pub fn new_physical_action<T: Sync>(&mut self, lf_name: &'static str, min_delay: Option<Duration>) -> PhysicalActionRef<T> {
459        let id = self.next_comp_id(Cow::Borrowed(lf_name));
460        self.graph().record_paction(id);
461        PhysicalActionRef::new(id, min_delay)
462    }
463
464    pub fn new_timer(&mut self, lf_name: &'static str, offset: Duration, period: Duration) -> Timer {
465        let id = self.next_comp_id(Cow::Borrowed(lf_name));
466        self.graph().record_timer(id);
467        Timer::new(id, offset, period)
468    }
469
470    /// Create and return a new id for a trigger component.
471    fn next_comp_id(&mut self, debug_name: Cow<'static, str>) -> TriggerId {
472        let id = self
473            .assembler
474            .globals
475            .cur_trigger
476            .get_and_incr()
477            .expect("Overflow while allocating ID");
478        self.assembler.globals.debug_info.record_trigger(id, debug_name);
479        id
480    }
481
482    #[inline]
483    fn graph(&mut self) -> &mut DepGraph {
484        &mut self.assembler.globals.graph
485    }
486}
487
488/// Iterates a bank, produces an `Iterator<Item=&mut Port<_>>`.
489/// Does not explicitly borrow the bank, which is unsafe, but
490/// we trust the code generator to fail if a port is both on
491/// the LHS and RHS of a connection.
492///
493/// This is necessary to be iterate the same bank over distinct
494/// ports or multiports to bind them together.
495#[macro_export]
496#[doc(hidden)]
497macro_rules! unsafe_iter_bank {
498    // the field is not a multiport
499    ($bank:ident # $field_name:ident) => {{
500        let __ptr = $bank.as_mut_ptr();
501        (0..$bank.len())
502            .into_iter()
503            .map(move |i| unsafe { &mut (*__ptr.add(i)).$field_name })
504    }};
505    // the field is a multiport, we select a single index
506    ($bank:ident # $field_name:ident[$idx:expr]) => {{
507        let __ptr = $bank.as_mut_ptr();
508        (0..$bank.len())
509            .into_iter()
510            .map(move |i| unsafe { &mut (*__ptr.add(i)).$field_name[$idx] })
511    }};
512    // the field is a multiport, we select all of them
513    ($bank:ident # ($field_name:ident)+) => {{
514        let __ptr = $bank.as_mut_ptr();
515        (0..$bank.len())
516            .into_iter()
517            .map(move |i| unsafe { &mut (*__ptr.add(i)) })
518            .flat_map(|a| a.$field_name.iter_mut())
519    }};
520    // the field is a multiport, we interleave all of them
521    ($bank:ident # interleaved($field_name:ident)) => {{
522        let __ptr = $bank.as_mut_ptr();
523        let __bank_len = $bank.len();
524        // Assume that the contained multiports of the bank
525        // are all of the same length.
526        let __multiport_len = $bank[0].$field_name.len();
527
528        // Build an iterator of tuples that get mapped to their
529        // respective bank element and multiport.
530        let mut bank_idx = 0;
531        let mut multiport_idx = 0;
532        let mut iter = std::iter::from_fn(move || {
533            // The inner loop iterates over bank_idx.
534            if bank_idx >= __bank_len {
535                // When one iteration is done we reset the bank_idx
536                // and increment the outer loop over multiport_idx.
537                bank_idx = 0;
538                multiport_idx += 1;
539
540                if multiport_idx >= __multiport_len {
541                    return None;
542                }
543            }
544
545            let bank_idx_copy = bank_idx;
546            bank_idx += 1;
547            Some((bank_idx_copy, multiport_idx))
548        });
549        iter.map(move |(i, j)| unsafe { &mut (&mut *__ptr.add(i)).$field_name[j] })
550    }};
551}