25 #ifdef GETFEM_HAS_OPENMP
30 using bgeot::scalar_type;
34 #ifdef GETFEM_HAS_OPENMP
36 std::recursive_mutex omp_guard::mutex;
38 omp_guard::omp_guard()
40 std::make_unique<std::lock_guard<std::recursive_mutex>>(mutex)
44 local_guard::local_guard(std::recursive_mutex& m) :
47 std::make_shared<std::lock_guard<std::recursive_mutex>>(m)
51 local_guard lock_factory::get_lock()
const{
52 return local_guard{mutex};
55 size_type global_thread_policy::this_thread() {
56 return partition_master::get().get_current_partition();
59 size_type global_thread_policy::num_threads(){
60 return partition_master::get().get_nb_partitions();
63 size_type true_thread_policy::this_thread() {
64 return omp_get_thread_num();
67 size_type true_thread_policy::num_threads(){
68 return omp_get_max_threads();
72 omp_set_num_threads(n);
73 partition_master::get().check_threads();
78 if(omp_get_num_threads() == 1 && omp_get_level() == 0)
return false;
80 if(omp_get_num_threads() == 1 && omp_get_level() == 1)
return true;
82 if(omp_in_parallel() == 1)
return true;
88 return omp_get_max_threads() == 1;
92 return std::thread::hardware_concurrency();
97 size_type global_thread_policy::this_thread() {
return 0;}
99 size_type global_thread_policy::num_threads(){
return 1;}
101 size_type true_thread_policy::this_thread() {
return 0;}
103 size_type true_thread_policy::num_threads(){
return 1;}
120 std::vector<std::exception_ptr> exceptions;
122 void captureException(){
123 exceptions[true_thread_policy::this_thread()] = std::current_exception();
128 : exceptions(true_thread_policy::num_threads(),
nullptr)
131 template <
typename function,
typename... parameters>
132 void run(
function f, parameters... params){
133 try {f(params...);}
catch (...) {captureException();}
136 std::vector<std::exception_ptr> caughtExceptions()
const{
137 std::vector<std::exception_ptr> non_empty_exceptions;
138 for (
auto &&pException : exceptions){
139 if (pException !=
nullptr) non_empty_exceptions.push_back(pException);
141 return non_empty_exceptions;
145 for (
auto &&pException : exceptions){
146 if (pException !=
nullptr) std::rethrow_exception(pException);
151 partition_iterator::partition_iterator(
153 : master{m}, it{it_from_set}
156 partition_iterator partition_iterator::operator++(){
158 if (*
this != master.
end()) master.set_current_partition(*it);
162 bool partition_iterator::operator==(
const partition_iterator &it1)
const {
166 bool partition_iterator::operator!=(
const partition_iterator &it1)
const {
167 return !(*
this == it1);
170 size_type partition_iterator::operator*()
const{
174 partition_master partition_master::instance;
176 partition_master& partition_master::get(){
180 void partition_master::check_threads(){
182 auto must_update =
false;
183 if (nb_user_threads != true_thread_policy::num_threads()){
184 nb_user_threads = true_thread_policy::num_threads();
187 if (nb_partitions < nb_user_threads && !partitions_set_by_user){
188 nb_partitions = nb_user_threads;
193 dal::singletons_manager::on_partitions_change();
198 GMM_ASSERT1 (!partitions_set_by_user,
199 "Number of partitions can be set only once.");
200 if (n > nb_partitions){
202 nb_user_threads = true_thread_policy::num_threads();
204 dal::singletons_manager::on_partitions_change();
206 else if (n < nb_partitions){
207 GMM_WARNING1(
"Not reducing number of partitions from "
208 << nb_partitions <<
" to " << n <<
209 " as it might invalidate global storage.");
211 partitions_set_by_user =
true;
215 GMM_ASSERT1(nb_user_threads == true_thread_policy::num_threads(),
216 "The number of omp threads was changed outside partition_master."
217 "Please use getfem::set_num_threads for this.");
218 current_partition = *(std::begin(partitions.thrd_cast()));
229 "Cannot change thread policy in parallel section.");
235 partition_master::partition_master()
236 : nb_user_threads{1}, nb_partitions{1} {
237 partitions_updated =
false;
243 GMM_ASSERT2(behaviour == thread_behaviour::partition_threads ?
244 true_thread_policy::this_thread() < nb_partitions :
true,
245 "Requesting current partition for thread " <<
246 true_thread_policy::this_thread() <<
247 " while number of partitions is " << nb_partitions
249 return behaviour == thread_behaviour::partition_threads ?
250 current_partition : true_thread_policy::this_thread();
254 return behaviour == thread_behaviour::partition_threads ?
255 nb_partitions : true_thread_policy::num_threads();
258 void partition_master::set_current_partition(
size_type p){
259 if (behaviour == thread_behaviour::partition_threads){
260 GMM_ASSERT2(partitions.thrd_cast().count(p) != 0,
"Internal error: "
261 << p <<
" is not a valid partitions for thread "
262 << true_thread_policy::this_thread()
264 current_partition = p;
268 void partition_master::rewind_partitions(){
270 current_partition = *(std::begin(partitions.thrd_cast()));
273 for (
size_type t = 0; t != partitions.num_threads(); ++t){
274 current_partition(t) = *(std::begin(partitions(t)));
279 void partition_master::update_partitions(){
280 partitions_updated =
false;
284 if (partitions_updated)
return;
286 partitions = decltype(partitions){};
287 current_partition = decltype(current_partition){};
289 auto n_threads = true_thread_policy::num_threads();
290 if(n_threads > nb_partitions){
291 GMM_WARNING0(
"Using " << n_threads <<
292 " threads which is above the maximum number of partitions :" <<
296 if (behaviour == thread_behaviour::partition_threads){
297 for (
size_type t = 0; t != n_threads; ++t){
298 auto partition_size =
static_cast<size_type>
299 (std::ceil(
static_cast<scalar_type
>(nb_partitions) /
300 static_cast<scalar_type
>(n_threads)));
301 auto partition_begin = partition_size * t;
302 if (partition_begin >= nb_partitions)
break;
303 auto partition_end = std::min(partition_size * (t + 1), nb_partitions);
304 auto hint_it = std::begin(partitions(t));
305 for (
size_type i = partition_begin; i != partition_end; ++i){
306 hint_it = partitions(t).insert(hint_it, i);
308 current_partition(t) = partition_begin;
312 for (
size_type t = 0; t != n_threads; ++t){
313 partitions(t).insert(t);
314 current_partition(t) = t;
318 partitions_updated =
true;
321 #if defined _WIN32 && !defined (__GNUC__)
322 #define GETFEM_ON_WIN
325 parallel_boilerplate::
326 parallel_boilerplate()
327 : plocale{std::make_unique<standard_locale>()},
328 pexception{std::make_unique<thread_exception>()} {
330 _configthreadlocale(_ENABLE_PER_THREAD_LOCALE);
334 void parallel_boilerplate::run_lambda(std::function<
void(
void)> lambda){
335 pexception->run(lambda);
338 parallel_boilerplate::~parallel_boilerplate(){
340 _configthreadlocale(_DISABLE_PER_THREAD_LOCALE);
342 pexception->rethrow();
345 void parallel_execution(std::function<
void(
void)> lambda,
346 bool iterate_over_partitions){
351 parallel_boilerplate boilerplate;
352 auto &pm = partition_master::get();
353 if (pm.get_nb_partitions() < true_thread_policy::num_threads()){
354 pm.set_nb_partitions(true_thread_policy::num_threads());
356 #pragma omp parallel default(shared)
358 if (iterate_over_partitions) {
359 for (
auto &&partitions : partition_master::get()) {
361 boilerplate.run_lambda(lambda);
365 boilerplate.run_lambda(lambda);
368 if (iterate_over_partitions) partition_master::get().rewind_partitions();
372 #ifdef GETFEM_FORCE_SINGLE_THREAD_BLAS
374 # define BLAS_FORCE_SINGLE_THREAD \
375 int openblas_get_num_threads_res = 1; \
377 typedef int (* ptrfunc1)(); \
378 ptrfunc1 func1 = ptrfunc1(dlsym(NULL, "openblas_get_num_threads")); \
379 if (func1) openblas_get_num_threads_res = (*func1)(); \
380 typedef void (* ptrfunc2)(int); \
381 ptrfunc2 func2 = ptrfunc2(dlsym(NULL, "openblas_set_num_threads")); \
382 if (func2) (*func2)(1); \
384 # define BLAS_RESTORE_NUM_THREAD \
386 typedef void (* ptrfunc2)(int); \
387 ptrfunc func2 = ptrfunc2(dlsym(NULL, "openblas_set_num_threads")); \
388 if (func2) (*func)(openblas_get_num_threads_res); \
391 # define BLAS_FORCE_SINGLE_THREAD
392 # define BLAS_RESTORE_NUM_THREAD
396 struct dummy_class_for_blas_nbthread_init {
397 dummy_class_for_blas_nbthread_init(
void)
398 { BLAS_FORCE_SINGLE_THREAD; }
401 static dummy_class_for_blas_nbthread_init dcfbnti;
Iterator that runs over partitions on the current thread and sets the global (but thread-specific) pa...
A singleton that Manages partitions on individual threads.
partition_iterator begin()
beginning of the partitions for the current thread
size_type get_current_partition() const
active partition on the thread.
size_type get_nb_partitions() const
number of partitions or threads, depending on thread policy
partition_iterator end()
end of the partitions for the current thread
void set_nb_partitions(size_type)
for thread_behaviour::partition_threads set the total number of partitions.
void set_behaviour(thread_behaviour)
Sets the behaviour for the full program: either partitioning parallel loops according to the number o...
Allows to re-throw exceptions, generated in OpemMP parallel section.
A simple singleton implementation.
thread safe standard locale with RAII semantics
Tools for multithreaded, OpenMP and Boost based parallelization.
size_t size_type
used as the common size type in the library
GEneric Tool for Finite Element Methods.
bool not_multithreaded()
is the program is running on a single thread
size_type max_concurrency()
Maximum number of threads that can run concurrently.
bool me_is_multithreaded_now()
is the program running in the parallel section
void set_num_threads(int n)
set maximum number of OpenMP threads