root/cc/resources/task_graph_runner.cc

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. current_node_
  2. WillRun
  3. DidRun
  4. HasFinishedRunning
  5. Swap
  6. Reset
  7. shutdown_
  8. GetNamespaceToken
  9. ScheduleTasks
  10. WaitForTasksToFinishRunning
  11. CollectCompletedTasks
  12. Shutdown
  13. RunUntilIdle
  14. RunTaskWithLockAcquired

// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "cc/resources/task_graph_runner.h"

#include <algorithm>

#include "base/debug/trace_event.h"
#include "base/strings/stringprintf.h"
#include "base/threading/thread_restrictions.h"

namespace cc {
namespace internal {
namespace {

// Helper class for iterating over all dependents of a task.
class DependentIterator {
 public:
  DependentIterator(TaskGraph* graph, const Task* task)
      : graph_(graph), task_(task), current_index_(-1), current_node_(NULL) {
    ++(*this);
  }

  TaskGraph::Node& operator->() const {
    DCHECK_LT(current_index_, graph_->edges.size());
    DCHECK_EQ(graph_->edges[current_index_].task, task_);
    DCHECK(current_node_);
    return *current_node_;
  }

  TaskGraph::Node& operator*() const {
    DCHECK_LT(current_index_, graph_->edges.size());
    DCHECK_EQ(graph_->edges[current_index_].task, task_);
    DCHECK(current_node_);
    return *current_node_;
  }

  // Note: Performance can be improved by keeping edges sorted.
  DependentIterator& operator++() {
    // Find next dependency edge for |task_|.
    do {
      ++current_index_;
      if (current_index_ == graph_->edges.size())
        return *this;
    } while (graph_->edges[current_index_].task != task_);

    // Now find the node for the dependent of this edge.
    TaskGraph::Node::Vector::iterator it =
        std::find_if(graph_->nodes.begin(),
                     graph_->nodes.end(),
                     TaskGraph::Node::TaskComparator(
                         graph_->edges[current_index_].dependent));
    DCHECK(it != graph_->nodes.end());
    current_node_ = &(*it);

    return *this;
  }

  operator bool() const { return current_index_ < graph_->edges.size(); }

 private:
  TaskGraph* graph_;
  const Task* task_;
  size_t current_index_;
  TaskGraph::Node* current_node_;
};

class DependencyMismatchComparator {
 public:
  explicit DependencyMismatchComparator(const TaskGraph* graph)
      : graph_(graph) {}

  bool operator()(const TaskGraph::Node& node) const {
    return static_cast<size_t>(std::count_if(graph_->edges.begin(),
                                             graph_->edges.end(),
                                             DependentComparator(node.task))) !=
           node.dependencies;
  }

 private:
  class DependentComparator {
   public:
    explicit DependentComparator(const Task* dependent)
        : dependent_(dependent) {}

    bool operator()(const TaskGraph::Edge& edge) const {
      return edge.dependent == dependent_;
    }

   private:
    const Task* dependent_;
  };

  const TaskGraph* graph_;
};

}  // namespace

Task::Task() : did_run_(false) {}

Task::~Task() {}

void Task::WillRun() {
  DCHECK(!did_run_);
}

void Task::DidRun() { did_run_ = true; }

bool Task::HasFinishedRunning() const { return did_run_; }

TaskGraph::TaskGraph() {}

TaskGraph::~TaskGraph() {}

void TaskGraph::Swap(TaskGraph* other) {
  nodes.swap(other->nodes);
  edges.swap(other->edges);
}

void TaskGraph::Reset() {
  nodes.clear();
  edges.clear();
}

TaskGraphRunner::TaskNamespace::TaskNamespace() {}

TaskGraphRunner::TaskNamespace::~TaskNamespace() {}

TaskGraphRunner::TaskGraphRunner()
    : lock_(),
      has_ready_to_run_tasks_cv_(&lock_),
      has_namespaces_with_finished_running_tasks_cv_(&lock_),
      next_namespace_id_(1),
      shutdown_(false) {}

TaskGraphRunner::~TaskGraphRunner() {
  {
    base::AutoLock lock(lock_);

    DCHECK_EQ(0u, ready_to_run_namespaces_.size());
    DCHECK_EQ(0u, namespaces_.size());
  }
}

NamespaceToken TaskGraphRunner::GetNamespaceToken() {
  base::AutoLock lock(lock_);

  NamespaceToken token(next_namespace_id_++);
  DCHECK(namespaces_.find(token.id_) == namespaces_.end());
  return token;
}

void TaskGraphRunner::ScheduleTasks(NamespaceToken token, TaskGraph* graph) {
  TRACE_EVENT2("cc",
               "TaskGraphRunner::ScheduleTasks",
               "num_nodes",
               graph->nodes.size(),
               "num_edges",
               graph->edges.size());

  DCHECK(token.IsValid());
  DCHECK(std::find_if(graph->nodes.begin(),
                      graph->nodes.end(),
                      DependencyMismatchComparator(graph)) ==
         graph->nodes.end());

  {
    base::AutoLock lock(lock_);

    DCHECK(!shutdown_);

    TaskNamespace& task_namespace = namespaces_[token.id_];

    // First adjust number of dependencies to reflect completed tasks.
    for (Task::Vector::iterator it = task_namespace.completed_tasks.begin();
         it != task_namespace.completed_tasks.end();
         ++it) {
      for (DependentIterator node_it(graph, it->get()); node_it; ++node_it) {
        TaskGraph::Node& node = *node_it;
        DCHECK_LT(0u, node.dependencies);
        node.dependencies--;
      }
    }

    // Build new "ready to run" queue and remove nodes from old graph.
    task_namespace.ready_to_run_tasks.clear();
    for (TaskGraph::Node::Vector::iterator it = graph->nodes.begin();
         it != graph->nodes.end();
         ++it) {
      TaskGraph::Node& node = *it;

      // Remove any old nodes that are associated with this task. The result is
      // that the old graph is left with all nodes not present in this graph,
      // which we use below to determine what tasks need to be canceled.
      TaskGraph::Node::Vector::iterator old_it =
          std::find_if(task_namespace.graph.nodes.begin(),
                       task_namespace.graph.nodes.end(),
                       TaskGraph::Node::TaskComparator(node.task));
      if (old_it != task_namespace.graph.nodes.end()) {
        std::swap(*old_it, task_namespace.graph.nodes.back());
        task_namespace.graph.nodes.pop_back();
      }

      // Task is not ready to run if dependencies are not yet satisfied.
      if (node.dependencies)
        continue;

      // Skip if already finished running task.
      if (node.task->HasFinishedRunning())
        continue;

      // Skip if already running.
      if (std::find(task_namespace.running_tasks.begin(),
                    task_namespace.running_tasks.end(),
                    node.task) != task_namespace.running_tasks.end())
        continue;

      task_namespace.ready_to_run_tasks.push_back(
          PrioritizedTask(node.task, node.priority));
    }

    // Rearrange the elements in |ready_to_run_tasks| in such a way that they
    // form a heap.
    std::make_heap(task_namespace.ready_to_run_tasks.begin(),
                   task_namespace.ready_to_run_tasks.end(),
                   CompareTaskPriority);

    // Swap task graph.
    task_namespace.graph.Swap(graph);

    // Determine what tasks in old graph need to be canceled.
    for (TaskGraph::Node::Vector::iterator it = graph->nodes.begin();
         it != graph->nodes.end();
         ++it) {
      TaskGraph::Node& node = *it;

      // Skip if already finished running task.
      if (node.task->HasFinishedRunning())
        continue;

      // Skip if already running.
      if (std::find(task_namespace.running_tasks.begin(),
                    task_namespace.running_tasks.end(),
                    node.task) != task_namespace.running_tasks.end())
        continue;

      DCHECK(std::find(task_namespace.completed_tasks.begin(),
                       task_namespace.completed_tasks.end(),
                       node.task) == task_namespace.completed_tasks.end());
      task_namespace.completed_tasks.push_back(node.task);
    }

    // Build new "ready to run" task namespaces queue.
    ready_to_run_namespaces_.clear();
    for (TaskNamespaceMap::iterator it = namespaces_.begin();
         it != namespaces_.end();
         ++it) {
      if (!it->second.ready_to_run_tasks.empty())
        ready_to_run_namespaces_.push_back(&it->second);
    }

    // Rearrange the task namespaces in |ready_to_run_namespaces_| in such a way
    // that they form a heap.
    std::make_heap(ready_to_run_namespaces_.begin(),
                   ready_to_run_namespaces_.end(),
                   CompareTaskNamespacePriority);

    // If there is more work available, wake up worker thread.
    if (!ready_to_run_namespaces_.empty())
      has_ready_to_run_tasks_cv_.Signal();
  }
}

void TaskGraphRunner::WaitForTasksToFinishRunning(NamespaceToken token) {
  TRACE_EVENT0("cc", "TaskGraphRunner::WaitForTasksToFinishRunning");

  DCHECK(token.IsValid());

  {
    base::AutoLock lock(lock_);

    TaskNamespaceMap::const_iterator it = namespaces_.find(token.id_);
    if (it == namespaces_.end())
      return;

    const TaskNamespace& task_namespace = it->second;

    while (!HasFinishedRunningTasksInNamespace(&task_namespace))
      has_namespaces_with_finished_running_tasks_cv_.Wait();

    // There may be other namespaces that have finished running tasks, so wake
    // up another origin thread.
    has_namespaces_with_finished_running_tasks_cv_.Signal();
  }
}

void TaskGraphRunner::CollectCompletedTasks(NamespaceToken token,
                                            Task::Vector* completed_tasks) {
  TRACE_EVENT0("cc", "TaskGraphRunner::CollectCompletedTasks");

  DCHECK(token.IsValid());

  {
    base::AutoLock lock(lock_);

    TaskNamespaceMap::iterator it = namespaces_.find(token.id_);
    if (it == namespaces_.end())
      return;

    TaskNamespace& task_namespace = it->second;

    DCHECK_EQ(0u, completed_tasks->size());
    completed_tasks->swap(task_namespace.completed_tasks);
    if (!HasFinishedRunningTasksInNamespace(&task_namespace))
      return;

    // Remove namespace if finished running tasks.
    DCHECK_EQ(0u, task_namespace.completed_tasks.size());
    DCHECK_EQ(0u, task_namespace.ready_to_run_tasks.size());
    DCHECK_EQ(0u, task_namespace.running_tasks.size());
    namespaces_.erase(it);
  }
}

void TaskGraphRunner::Shutdown() {
  base::AutoLock lock(lock_);

  DCHECK_EQ(0u, ready_to_run_namespaces_.size());
  DCHECK_EQ(0u, namespaces_.size());

  DCHECK(!shutdown_);
  shutdown_ = true;

  // Wake up a worker so it knows it should exit. This will cause all workers
  // to exit as each will wake up another worker before exiting.
  has_ready_to_run_tasks_cv_.Signal();
}

void TaskGraphRunner::Run() {
  base::AutoLock lock(lock_);

  while (true) {
    if (ready_to_run_namespaces_.empty()) {
      // Exit when shutdown is set and no more tasks are pending.
      if (shutdown_)
        break;

      // Wait for more tasks.
      has_ready_to_run_tasks_cv_.Wait();
      continue;
    }

    RunTaskWithLockAcquired();
  }

  // We noticed we should exit. Wake up the next worker so it knows it should
  // exit as well (because the Shutdown() code only signals once).
  has_ready_to_run_tasks_cv_.Signal();
}

void TaskGraphRunner::RunUntilIdle() {
  base::AutoLock lock(lock_);

  while (!ready_to_run_namespaces_.empty())
    RunTaskWithLockAcquired();
}

void TaskGraphRunner::RunTaskWithLockAcquired() {
  TRACE_EVENT0("toplevel", "TaskGraphRunner::RunTask");

  lock_.AssertAcquired();
  DCHECK(!ready_to_run_namespaces_.empty());

  // Take top priority TaskNamespace from |ready_to_run_namespaces_|.
  std::pop_heap(ready_to_run_namespaces_.begin(),
                ready_to_run_namespaces_.end(),
                CompareTaskNamespacePriority);
  TaskNamespace* task_namespace = ready_to_run_namespaces_.back();
  ready_to_run_namespaces_.pop_back();
  DCHECK(!task_namespace->ready_to_run_tasks.empty());

  // Take top priority task from |ready_to_run_tasks|.
  std::pop_heap(task_namespace->ready_to_run_tasks.begin(),
                task_namespace->ready_to_run_tasks.end(),
                CompareTaskPriority);
  scoped_refptr<Task> task(task_namespace->ready_to_run_tasks.back().task);
  task_namespace->ready_to_run_tasks.pop_back();

  // Add task namespace back to |ready_to_run_namespaces_| if not empty after
  // taking top priority task.
  if (!task_namespace->ready_to_run_tasks.empty()) {
    ready_to_run_namespaces_.push_back(task_namespace);
    std::push_heap(ready_to_run_namespaces_.begin(),
                   ready_to_run_namespaces_.end(),
                   CompareTaskNamespacePriority);
  }

  // Add task to |running_tasks|.
  task_namespace->running_tasks.push_back(task.get());

  // There may be more work available, so wake up another worker thread.
  has_ready_to_run_tasks_cv_.Signal();

  // Call WillRun() before releasing |lock_| and running task.
  task->WillRun();

  {
    base::AutoUnlock unlock(lock_);

    task->RunOnWorkerThread();
  }

  // This will mark task as finished running.
  task->DidRun();

  // Remove task from |running_tasks|.
  TaskVector::iterator it = std::find(task_namespace->running_tasks.begin(),
                                      task_namespace->running_tasks.end(),
                                      task.get());
  DCHECK(it != task_namespace->running_tasks.end());
  std::swap(*it, task_namespace->running_tasks.back());
  task_namespace->running_tasks.pop_back();

  // Now iterate over all dependents to decrement dependencies and check if they
  // are ready to run.
  bool ready_to_run_namespaces_has_heap_properties = true;
  for (DependentIterator it(&task_namespace->graph, task.get()); it; ++it) {
    TaskGraph::Node& dependent_node = *it;

    DCHECK_LT(0u, dependent_node.dependencies);
    dependent_node.dependencies--;
    // Task is ready if it has no dependencies. Add it to |ready_to_run_tasks_|.
    if (!dependent_node.dependencies) {
      bool was_empty = task_namespace->ready_to_run_tasks.empty();
      task_namespace->ready_to_run_tasks.push_back(
          PrioritizedTask(dependent_node.task, dependent_node.priority));
      std::push_heap(task_namespace->ready_to_run_tasks.begin(),
                     task_namespace->ready_to_run_tasks.end(),
                     CompareTaskPriority);
      // Task namespace is ready if it has at least one ready to run task. Add
      // it to |ready_to_run_namespaces_| if it just become ready.
      if (was_empty) {
        DCHECK(std::find(ready_to_run_namespaces_.begin(),
                         ready_to_run_namespaces_.end(),
                         task_namespace) == ready_to_run_namespaces_.end());
        ready_to_run_namespaces_.push_back(task_namespace);
      }
      ready_to_run_namespaces_has_heap_properties = false;
    }
  }

  // Rearrange the task namespaces in |ready_to_run_namespaces_| in such a way
  // that they yet again form a heap.
  if (!ready_to_run_namespaces_has_heap_properties) {
    std::make_heap(ready_to_run_namespaces_.begin(),
                   ready_to_run_namespaces_.end(),
                   CompareTaskNamespacePriority);
  }

  // Finally add task to |completed_tasks_|.
  task_namespace->completed_tasks.push_back(task);

  // If namespace has finished running all tasks, wake up origin thread.
  if (HasFinishedRunningTasksInNamespace(task_namespace))
    has_namespaces_with_finished_running_tasks_cv_.Signal();
}

}  // namespace internal
}  // namespace cc

/* [<][>][^][v][top][bottom][index][help] */