-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththread_pool_computational.hpp
163 lines (146 loc) · 4.13 KB
/
thread_pool_computational.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#pragma once
#include "base/assert.hpp"
#include "base/thread_utils.hpp"
#include <atomic>
#include <functional>
#include <future>
#include <memory>
#include <queue>
#include <thread>
namespace base
{
using namespace threads;
namespace thread_pool
{
namespace computational
{
// ThreadPool is needed for easy parallelization of tasks.
// ThreadPool can accept tasks that return result as std::future.
// When the destructor is called, all threads will join.
// Warning: ThreadPool works with std::thread instead of SimpleThread and therefore
// should not be used when the JVM is needed.
class ThreadPool
{
public:
using FunctionType = FunctionWrapper;
using Threads = std::vector<std::thread>;
// Constructs a ThreadPool.
// threadCount - number of threads used by the thread pool.
// Warning: The constructor may throw exceptions.
ThreadPool(size_t threadCount) : m_done(false), m_joiner(m_threads)
{
CHECK_GREATER(threadCount, 0, ());
m_threads.reserve(threadCount);
try
{
for (size_t i = 0; i < threadCount; i++)
m_threads.emplace_back(&ThreadPool::Worker, this);
}
catch (...) // std::system_error etc.
{
Stop();
throw;
}
}
// Destroys the ThreadPool.
// This function will block until all runnables have been completed.
~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(m_mutex);
m_done = true;
}
m_condition.notify_all();
}
// Submit task for execution.
// func - task to be performed.
// args - arguments for func.
// The function will return the object future.
// Warning: If the thread pool is stopped then the call will be ignored.
template <typename F, typename... Args>
auto Submit(F && func, Args &&... args) -> std::future<decltype(func(args...))>
{
using ResultType = decltype(func(args...));
std::packaged_task<ResultType()> task(std::bind(std::forward<F>(func),
std::forward<Args>(args)...));
std::future<ResultType> result(task.get_future());
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_done)
return {};
m_queue.emplace(std::move(task));
}
m_condition.notify_one();
return result;
}
// Submit work for execution.
// func - task to be performed.
// args - arguments for func
// Warning: If the thread pool is stopped then the call will be ignored.
template <typename F, typename... Args>
void SubmitWork(F && func, Args &&... args)
{
auto f = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_done)
return;
m_queue.emplace(std::move(f));
}
m_condition.notify_one();
}
// Stop a ThreadPool.
// Removes the tasks that are not yet started from the queue.
// Unlike the destructor, this function does not wait for all runnables to complete:
// the tasks will stop as soon as possible.
void Stop()
{
{
std::unique_lock<std::mutex> lock(m_mutex);
auto empty = std::queue<FunctionType>();
m_queue.swap(empty);
m_done = true;
}
m_condition.notify_all();
}
void WaitingStop()
{
{
std::unique_lock<std::mutex> lock(m_mutex);
m_done = true;
}
m_condition.notify_all();
m_joiner.Join();
}
private:
void Worker()
{
while (true)
{
FunctionType task;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_condition.wait(lock, [&] {
return m_done || !m_queue.empty();
});
if (m_done && m_queue.empty())
return;
// It may seem that at this point the queue may be empty, provided that m_done == false and
// m_queue.empty() == true. But it is not possible that the queue is not empty guarantees
// check in m_condition.wait.
task = std::move(m_queue.front());
m_queue.pop();
}
task();
}
}
bool m_done;
std::mutex m_mutex;
std::condition_variable m_condition;
std::queue<FunctionType> m_queue;
Threads m_threads;
ThreadsJoiner<> m_joiner;
};
} // namespace computational
} // namespace thread_pool
} // namespace base