neoGFX
Cross-platform C++ app/game engine
Loading...
Searching...
No Matches
thread_pool.hpp
Go to the documentation of this file.
1// thread_pool.hpp
2/*
3 * Copyright (c) 2007 Leigh Johnston.
4 *
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are
9 * met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 *
14 * * Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in the
16 * documentation and/or other materials provided with the distribution.
17 *
18 * * Neither the name of Leigh Johnston nor the names of any
19 * other contributors to this software may be used to endorse or
20 * promote products derived from this software without specific prior
21 * written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
24 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
25 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
26 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
27 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
28 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
29 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34*/
35
36#pragma once
37
38#include <neolib/neolib.hpp>
39#include <atomic>
40#include <memory>
41#include <vector>
42#include <deque>
43#include <future>
44#include <mutex>
46#include <neolib/task/task.hpp>
47
48namespace neolib
49{
50 class thread_pool_thread;
51
52 class NEOLIB_EXPORT thread_pool
53 {
54 friend class thread_pool_thread;
55 public:
56 typedef std::shared_ptr<i_task> task_pointer;
57 public:
58 struct no_threads : std::logic_error { no_threads() : std::logic_error("neolib::thread_pool::no_threads") {} };
59 struct task_not_found : std::logic_error { task_not_found() : std::logic_error("neolib::thread_pool::task_not_found") {} };
60 private:
61 typedef std::vector<std::unique_ptr<i_thread>> thread_list;
62 public:
65 public:
66 void reserve(std::size_t aMaxThreads);
67 std::size_t active_threads() const;
68 std::size_t available_threads() const;
69 std::size_t total_threads() const;
70 std::size_t max_threads() const;
71 public:
72 void start(i_task& aTask, int32_t aPriority = 0);
73 void start(task_pointer aTask, int32_t aPriority = 0);
74 bool try_start(i_task& aTask, int32_t aPriority = 0);
75 bool try_start(task_pointer aTask, int32_t aPriority = 0);
76 std::pair<std::future<void>, task_pointer> run(std::function<void()> aFunction, int32_t aPriority = 0);
77 template <typename T>
78 std::pair<std::future<T>, task_pointer> run(std::function<T()> aFunction, int32_t aPriority = 0);
79 public:
80 bool idle() const;
82 bool busy() const;
83 void wait() const;
84 bool stopped() const;
85 void stop();
86 public:
88 std::recursive_mutex& mutex() const;
89 private:
90 void steal_work(thread_pool_thread& aIdleThread);
91 void thread_gone_idle();
92 void thread_gone_busy();
93 private:
94 mutable std::recursive_mutex iMutex;
95 std::atomic<bool> iIdle;
96 std::atomic<bool> iStopped;
97 std::size_t iMaxThreads;
98 thread_list iThreads;
99 mutable std::mutex iWaitMutex;
100 mutable std::condition_variable iWaitConditionVariable;
101 };
102
103 template <typename T>
104 inline std::pair<std::future<T>, thread_pool::task_pointer> thread_pool::run(std::function<T()> aFunction, int32_t aPriority)
105 {
106 if (stopped())
107 return {};
108 auto newTask = std::make_shared<function_task<T>>(aFunction);
109 start(newTask, aPriority);
110 return std::make_pair(newTask->get_future(), newTask);
111 }
112
113 template <typename Container>
114 inline void parallel_apply(thread_pool& aThreadPool, Container& aContainer, std::function<void(typename Container::value_type& aElement)> aFunction, std::size_t aMinimumParallelismCount = 0)
115 {
116 if (aThreadPool.stopped())
117 return;
118 if (aContainer.size() < aMinimumParallelismCount)
119 {
120 for (auto& e : aContainer)
121 aFunction(e);
122 return;
123 }
124 auto subrange = aContainer.size() / aThreadPool.max_threads();
125 if (subrange < 1)
126 subrange = 1;
127 auto next = aContainer.begin();
128 for (auto left = aContainer.size(); left >= subrange; left -= subrange)
129 {
130 auto end = std::next(next, subrange);
131 aThreadPool.run([next, end, &aFunction]()
132 {
133 for (auto i = next; i != end; ++i)
134 aFunction(*i);
135 });
136 next = end;
137 }
138 if (next != aContainer.end())
139 aThreadPool.run([next, &aContainer, &aFunction]()
140 {
141 for (auto i = next; i != aContainer.end(); ++i)
142 aFunction(*i);
143 });
144 aThreadPool.wait();
145 }
146}
std::recursive_mutex & mutex() const
std::size_t active_threads() const
bool idle() const
std::pair< std::future< void >, task_pointer > run(std::function< void()> aFunction, int32_t aPriority=0)
std::size_t max_threads() const
std::size_t total_threads() const
std::size_t available_threads() const
std::shared_ptr< i_task > task_pointer
void start(i_task &aTask, int32_t aPriority=0)
void wait() const
bool try_start(i_task &aTask, int32_t aPriority=0)
bool try_start(task_pointer aTask, int32_t aPriority=0)
static thread_pool & default_thread_pool()
bool busy() const
void start(task_pointer aTask, int32_t aPriority=0)
void reserve(std::size_t aMaxThreads)
bool stopped() const
void parallel_apply(thread_pool &aThreadPool, Container &aContainer, std::function< void(typename Container::value_type &aElement)> aFunction, std::size_t aMinimumParallelismCount=0)
it_type next(it_type it, const typename iterator_traits< it_type >::difference_type distance=1)
Definition plf_hive.h:89