Program Listing for File scheduler.cc

Return to documentation for file (lib/scheduler.cc)

/*
 * Copyright (C) 2019 TU Dresden
 * All rights reserved.
 *
 * Authors:
 *   Christian Menard
 */

#include <cstddef>
#include <memory>
#include <mutex>
#include <utility>

#include "reactor-cpp/scheduler.hh"

#include "reactor-cpp/action.hh"
#include "reactor-cpp/assert.hh"
#include "reactor-cpp/environment.hh"
#include "reactor-cpp/logging.hh"
#include "reactor-cpp/port.hh"
#include "reactor-cpp/reaction.hh"
#include "reactor-cpp/trace.hh"

namespace reactor {

// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
thread_local const Worker* Worker::current_worker = nullptr;

Worker::Worker(Worker&& work) // NOLINT(performance-noexcept-move-constructor)
    : scheduler_{work.scheduler_}
    , identity_{work.identity_} {
  // Need to provide the move constructor in order to organize workers in a
  // std::vector. However, moving is not save if the thread is already running,
  // thus we throw an exception here if the worker is moved but the
  // internal thread is already running.

  if (work.thread_.joinable()) {
    throw std::runtime_error{"Running workers cannot be moved!"};
  }
}

void Worker::work() const {
  // initialize the current worker thread local variable
  current_worker = this;

  log::Debug() << "(Worker " << this->identity_ << ") Starting";

  if (identity_ == 0) {
    log::Debug() << "(Worker 0) do the initial scheduling";
    scheduler_.schedule();
  }

  while (true) {
    // wait for a ready reaction
    auto* reaction = scheduler_.ready_queue_.pop();

    // receiving a nullptr indicates that the worker should terminate
    if (reaction == nullptr) {
      break;
    }

    // execute the reaction
    execute_reaction(reaction);

    // was this the very last reaction?
    if (scheduler_.reactions_to_process_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
      // Yes, then schedule. The atomic decrement above ensures that only one
      // thread enters this block.
      scheduler_.schedule();
    }
    // continue otherwise
  }

  log::Debug() << "(Worker " << identity_ << ") terminates";
}

void Worker::execute_reaction(Reaction* reaction) const {
  log::Debug() << "(Worker " << identity_ << ") "
               << "execute reaction " << reaction->fqn();

  tracepoint(reactor_cpp, reaction_execution_starts, identity_, reaction->fqn(), scheduler_.logical_time());
  reaction->trigger();
  tracepoint(reactor_cpp, reaction_execution_finishes, identity_, reaction->fqn(), scheduler_.logical_time());
}

void Scheduler::schedule() noexcept {
  bool found_ready_reactions = schedule_ready_reactions();

  while (!found_ready_reactions) {
    log::Debug() << "(Scheduler) call next()";
    next();
    reaction_queue_pos_ = 0;

    found_ready_reactions = schedule_ready_reactions();

    if (!continue_execution_ && !found_ready_reactions) {
      // let all workers know that they should terminate
      terminate_all_workers();
      break;
    }
  }
}

auto ReadyQueue::pop() -> Reaction* {
  auto old_size = size_.fetch_sub(1, std::memory_order_acq_rel);

  // If there is no ready reaction available, wait until there is one.
  while (old_size <= 0) {
    log::Debug() << "(Worker " << Worker::current_worker_id() << ") Wait for work";
    sem_.acquire();
    log::Debug() << "(Worker " << Worker::current_worker_id() << ") Waking up";
    old_size = size_.fetch_sub(1, std::memory_order_acq_rel);
    // FIXME: Protect against underflow?
  }

  auto pos = old_size - 1;
  return queue_[pos];
}

void ReadyQueue::fill_up(std::vector<Reaction*>& ready_reactions) {
  // clear the internal queue and swap contents
  queue_.clear();
  queue_.swap(ready_reactions);

  // update the atomic size counter and release the semaphore to wake up
  // waiting worker threads
  auto new_size = static_cast<std::ptrdiff_t>(queue_.size());
  auto old_size = size_.exchange(new_size, std::memory_order_acq_rel);

  // calculate how many workers to wake up. -old_size indicates the number of
  // workers who started waiting since the last update.
  // We want to wake up at most all the waiting workers. If we would release
  // more, other workers that are out of work would not block when acquiring
  // the semaphore.
  // Also, we do not want to wake up more workers than there is work. new_size
  // indicates the number of ready reactions. Since there is always at least
  // one worker running running, new_size - running_workers indicates the
  // number of additional workers needed to process all reactions.
  waiting_workers_ += -old_size;
  auto running_workers = num_workers_ - waiting_workers_;
  auto workers_to_wakeup = std::min(waiting_workers_, new_size - running_workers);

  // wakeup other workers_
  if (workers_to_wakeup > 0) {
    waiting_workers_ -= workers_to_wakeup;
    log::Debug() << "Wakeup " << workers_to_wakeup << " workers";
    sem_.release(static_cast<int>(workers_to_wakeup));
  }
}

void Scheduler::terminate_all_workers() {
  log::Debug() << "(Scheduler) Send termination signal to all workers";
  auto num_workers = environment_->num_workers();
  std::vector<Reaction*> null_reactions{num_workers, nullptr};
  log::Debug() << null_reactions.size();
  ready_queue_.fill_up(null_reactions);
}

auto Scheduler::schedule_ready_reactions() -> bool {
  // insert any triggered reactions_ into the reaction queue
  for (auto& vec_reaction : triggered_reactions_) {
    for (auto* reaction : vec_reaction) {
      reaction_queue_[reaction->index()].push_back(reaction);
    }
    vec_reaction.clear();
  }

  log::Debug() << "(Scheduler) Scanning the reaction queue for ready reactions";

  // continue iterating over the reaction queue
  for (; reaction_queue_pos_ < reaction_queue_.size(); reaction_queue_pos_++) {
    auto& reactions = reaction_queue_[reaction_queue_pos_];

    // any ready reactions of current priority?
    if (!reactions.empty()) {
      log::Debug() << "(Scheduler) Process reactions of priority " << reaction_queue_pos_;

      // Make sure that any reaction is only executed once even if it
      // was triggered multiple times.
      std::sort(reactions.begin(), reactions.end());
      reactions.erase(std::unique(reactions.begin(), reactions.end()), reactions.end());

      if constexpr (log::debug_enabled || tracing_enabled) { // NOLINT
        for (auto* reaction : reactions) {
          log::Debug() << "(Scheduler) Reaction " << reaction->fqn() << " is ready for execution";
          tracepoint(reactor_cpp, trigger_reaction, reaction->container()->fqn(), reaction->name(), logical_time_);
        }
      }

      reactions_to_process_.store(static_cast<std::ptrdiff_t>(reactions.size()), std::memory_order_release);
      ready_queue_.fill_up(reactions);

      // break out of the loop and return
      return true;
    }
  }

  log::Debug() << "(Scheduler) Reached end of reaction queue";
  return false;
}

void Scheduler::start() {
  log::Debug() << "Starting the scheduler...";

  auto num_workers = environment_->num_workers();
  // initialize the reaction queue, set ports vector, and triggered reactions
  // vector
  reaction_queue_.resize(environment_->max_reaction_index() + 1);
  set_ports_.resize(num_workers);
  triggered_reactions_.resize(num_workers);

  // Initialize and start the workers. By resizing the workers vector first,
  // we make sure that there is sufficient space for all the workers and non of
  // them needs to be moved. This is important because a running worker may not
  // be moved.
  workers_.reserve(num_workers);
  for (unsigned i = 0; i < num_workers; i++) {
    workers_.emplace_back(*this, i);
    workers_.back().start_thread();
  }

  // join all worker threads
  for (auto& worker : workers_) {
    worker.join_thread();
  }
}

void Scheduler::next() { // NOLINT
  static ActionListPtr actions{};

  // clean up before scheduling any new events
  if (actions != nullptr) {
    // cleanup all triggered actions
    for (auto* action : *actions) {
      action->cleanup();
    }

    actions->clear();
    action_list_pool_.emplace_back(std::move(actions));
  }

  // cleanup all set ports
  for (auto& vec_ports : set_ports_) {
    for (auto& port : vec_ports) {
      port->cleanup();
    }
    vec_ports.clear();
  }

  {
    std::unique_lock<std::mutex> lock{scheduling_mutex_};

    // shutdown if there are no more events in the queue
    if (event_queue_.empty() && !stop_) {
      if (environment_->run_forever()) {
        // wait for a new asynchronous event
        cv_schedule_.wait(lock, [this]() { return !event_queue_.empty() || stop_; });
      } else {
        log::Debug() << "No more events in queue_. -> Terminate!";
        environment_->sync_shutdown();
      }
    }

    while (actions == nullptr || actions->empty()) {
      if (stop_) {
        continue_execution_ = false;
        log::Debug() << "Shutting down the scheduler";
        Tag t_next = Tag::from_logical_time(logical_time_).delay();
        if (!event_queue_.empty() && t_next == event_queue_.begin()->first) {
          log::Debug() << "Schedule the last round of reactions including all "
                          "termination reactions";
          actions = std::move(event_queue_.begin()->second);
          event_queue_.erase(event_queue_.begin());
          log::Debug() << "advance logical time to tag [" << t_next.time_point() << ", " << t_next.micro_step() << "]";
          logical_time_.advance_to(t_next);
        } else {
          return;
        }
      } else {
        // collect events of the next tag
        auto t_next = event_queue_.begin()->first;

        // synchronize with physical time if not in fast forward mode
        if (!environment_->fast_fwd_execution()) {
          // keep track of the current physical time in a static variable
          static auto physical_time = TimePoint::min();

          // If physical time is smaller than the next logical time point,
          // then update the physical time. This step is small optimization to
          // avoid calling get_physical_time() in every iteration as this
          // would add a significant overhead.
          if (physical_time < t_next.time_point()) {
            physical_time = get_physical_time();
          }

          // If physical time is still smaller than the next logical time
          // point, then wait until the next tag or until a new event is
          // inserted asynchronously into the queue
          if (physical_time < t_next.time_point()) {
            auto status = cv_schedule_.wait_until(lock, t_next.time_point());
            // Start over if an event was inserted into the event queue by a physical action
            if (status == std::cv_status::no_timeout || t_next != event_queue_.begin()->first) {
              continue;
            }
            // update physical time and continue otherwise
            physical_time = t_next.time_point();
            reactor_assert(t_next == event_queue_.begin()->first);
          }
        }

        // retrieve all events with tag equal to current logical time from the
        // queue
        actions = std::move(event_queue_.extract(event_queue_.begin()).mapped());

        // advance logical time
        log::Debug() << "advance logical time to tag [" << t_next.time_point() << ", " << t_next.micro_step() << "]";
        logical_time_.advance_to(t_next);
      }
    }
  } // mutex schedule_

  // execute all setup functions; this sets the values of the corresponding
  // actions
  for (auto* action : *actions) {
    action->setup();
  }

  log::Debug() << "events: " << actions->size();
  for (const auto* action : *actions) {
    log::Debug() << "Action " << action->fqn();
    for (auto* reaction : action->triggers()) {
      // There is no need to acquire the mutex. At this point the scheduler
      // should be the only thread accessing the reaction queue as none of the
      // workers_ are running
      log::Debug() << "insert reaction " << reaction->fqn() << " with index " << reaction->index();
      reaction_queue_[reaction->index()].push_back(reaction);
    }
  }
}

Scheduler::Scheduler(Environment* env)
    : using_workers_(env->num_workers() > 1)
    , environment_(env)
    , ready_queue_(env->num_workers()) {
  fill_action_list_pool();
}

Scheduler::~Scheduler() = default;

void Scheduler::fill_action_list_pool() {
  for (std::size_t i{0}; i < action_list_pool_increment_; i++) {
    action_list_pool_.emplace_back(std::make_unique<ActionList>());
  }
}

void Scheduler::schedule_sync(BaseAction* action, const Tag& tag) {
  log::Debug() << "Schedule action " << action->fqn() << (action->is_logical() ? " synchronously " : " asynchronously ")
               << " with tag [" << tag.time_point() << ", " << tag.micro_step() << "]";
  reactor_assert(logical_time_ < tag);
  tracepoint(reactor_cpp, schedule_action, action->container()->fqn(), action->name(), tag);

  if (using_workers_) {
    auto shared_lock = std::shared_lock<std::shared_mutex>(mutex_event_queue_);

    auto event_it = event_queue_.find(tag);
    if (event_it == event_queue_.end()) {
      shared_lock.unlock();
      {
        auto unique_lock = std::unique_lock<std::shared_mutex>(mutex_event_queue_);
        if (action_list_pool_.empty()) {
          fill_action_list_pool();
        }
        const auto& result = event_queue_.try_emplace(tag, std::move(action_list_pool_.back()));
        if (result.second) {
          action_list_pool_.pop_back();
        }
        result.first->second->push_back(action);
      }
    } else {
      event_it->second->push_back(action);
    }
  } else {
    if (action_list_pool_.empty()) {
      fill_action_list_pool();
    }
    const auto& result = event_queue_.try_emplace(tag, std::move(action_list_pool_.back()));
    if (result.second) {
      action_list_pool_.pop_back();
    }
    result.first->second->push_back(action);
  }
}

auto Scheduler::schedule_async(BaseAction* action, const Duration& delay) -> Tag {
  Tag tag;
  {
    std::lock_guard<std::mutex> lock_guard(scheduling_mutex_);
    tag = Tag::from_physical_time(get_physical_time() + delay);
    schedule_sync(action, tag);
  }
  cv_schedule_.notify_one();
  return tag;
}

void Scheduler::set_port(BasePort* port) {
  log::Debug() << "Set port " << port->fqn();

  // We do not check here if port is already in the list. This means clean()
  // could be called multiple times for a single port. However, calling
  // clean() multiple time is not harmful and more efficient then checking if
  set_ports_[Worker::current_worker_id()].push_back(port);

  // recursively search for triggered reactions
  set_port_helper(port);
}

void Scheduler::set_port_helper(BasePort* port) {
  if (!(port->triggers().empty() && port->dependencies().empty())) {
    if (port->message_multiport()) {
      set_ports_[Worker::current_worker_id()].push_back(port);
    }
  }

  for (auto* reaction : port->triggers()) {
    triggered_reactions_[Worker::current_worker_id()].push_back(reaction);
  }
  for (auto* binding : port->outward_bindings()) {
    set_port_helper(binding);
  }
}

void Scheduler::stop() {
  stop_ = true;
  cv_schedule_.notify_one();
}

} // namespace reactor