ModErn Text Analysis
META Enumerates Textual Applications
thread_pool.h
Go to the documentation of this file.
1 
13 #ifndef META_THREAD_POOL_H_
14 #define META_THREAD_POOL_H_
15 
16 #include <condition_variable>
17 #include <functional>
18 #include <future>
19 #include <mutex>
20 #include <queue>
21 #include <thread>
22 #include <vector>
23 
24 #include "meta/config.h"
25 
26 namespace meta
27 {
28 namespace parallel
29 {
30 
36 {
37  public:
42  thread_pool(size_t num_threads = std::thread::hardware_concurrency())
43  : running_(true)
44  {
45  for (size_t i = 0; i < num_threads; ++i)
46  threads_.push_back(
47  std::thread{std::bind(&thread_pool::worker, this)});
48  }
49 
54  {
55  {
56  std::unique_lock<std::mutex> lock(mutex_);
57  running_ = false;
58  }
59  cond_.notify_all();
60  for (auto& thread : threads_)
61  thread.join();
62  }
63 
70  template <class Function>
71  std::future<typename std::result_of<Function()>::type>
72  submit_task(Function func)
73  {
74  using result_type = typename std::result_of<Function()>::type;
75 
76  std::unique_ptr<concrete_task<result_type>> task(
77  new concrete_task<result_type>(func));
78 
79  auto future = task->get_future();
80  {
81  std::unique_lock<std::mutex> lock(mutex_);
82  tasks_.push(std::move(task));
83  }
84  cond_.notify_one();
85  return future;
86  }
87 
91  std::vector<std::thread::id> thread_ids() const
92  {
93  std::vector<std::thread::id> ids;
94  for (auto& t : threads_)
95  ids.emplace_back(t.get_id());
96  return ids;
97  }
98 
102  size_t tasks() const
103  {
104  std::unique_lock<std::mutex> lock(mutex_);
105  return tasks_.size();
106  }
107 
111  size_t size() const
112  {
113  return threads_.size();
114  }
115 
116  private:
120  struct task
121  {
125  virtual void run() = 0;
126 
130  virtual ~task() = default;
131  };
132 
136  template <class R>
138  {
144  template <class Function>
145  concrete_task(const Function& f) : task_(f)
146  {
147  }
148 
152  virtual ~concrete_task() = default;
153 
154  virtual void run() override
155  {
156  task_();
157  }
158 
162  std::future<R> get_future()
163  {
164  return task_.get_future();
165  }
166 
168  std::packaged_task<R()> task_;
169  };
170 
175  void worker()
176  {
177  while (true)
178  {
179  std::unique_ptr<task> task;
180  {
181  std::unique_lock<std::mutex> lock(mutex_);
182  while (running_ && tasks_.empty())
183  cond_.wait(lock);
184  if (!running_ && tasks_.empty())
185  return;
186  task = std::move(tasks_.front());
187  tasks_.pop();
188  }
189  task->run();
190  }
191  }
192 
194  std::vector<std::thread> threads_;
196  std::queue<std::unique_ptr<task>> tasks_;
197 
199  bool running_;
200 
202  mutable std::mutex mutex_;
204  std::condition_variable cond_;
205 };
206 }
207 }
208 
209 #endif
size_t tasks() const
Definition: thread_pool.h:102
std::future< typename std::result_of< Function()>::type > submit_task(Function func)
Adds a task to the thread_pool.
Definition: thread_pool.h:72
thread_pool(size_t num_threads=std::thread::hardware_concurrency())
Definition: thread_pool.h:42
std::condition_variable cond_
the condition variable that workers sleep on when waiting for work
Definition: thread_pool.h:204
A generic task object.
Definition: thread_pool.h:120
size_t size() const
Definition: thread_pool.h:111
~thread_pool()
Destructor; joins all threads.
Definition: thread_pool.h:53
virtual void run()=0
Runs the given task.
virtual ~task()=default
Virtual destructor to support deletion from base pointers.
A concrete task is templated with a result type.
Definition: thread_pool.h:137
std::packaged_task< R()> task_
the internal task representation
Definition: thread_pool.h:168
The ModErn Text Analysis toolkit is a suite of natural language processing, classification, information retrieval, data mining, and other applications of text processing.
Definition: analyzer.h:25
std::queue< std::unique_ptr< task > > tasks_
the queue containing the tasks to be run
Definition: thread_pool.h:196
Represents a collection of a fixed number of threads, which tasks can be added to.
Definition: thread_pool.h:35
void worker()
Function invoked by the worker threads to process tasks off the internal queue.
Definition: thread_pool.h:175
virtual void run() override
Runs the given task.
Definition: thread_pool.h:154
std::mutex mutex_
the mutex to wrap queue operations
Definition: thread_pool.h:202
std::future< R > get_future()
Definition: thread_pool.h:162
std::vector< std::thread > threads_
the threads in the pool
Definition: thread_pool.h:194
std::vector< std::thread::id > thread_ids() const
Definition: thread_pool.h:91
bool running_
whether or not the pool is currently running
Definition: thread_pool.h:199
concrete_task(const Function &f)
Constructs a new concrete task.
Definition: thread_pool.h:145