Branch data Line data Source code
1 : : /* GLIB - Library of useful routines for C programming
2 : : * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
3 : : *
4 : : * GThreadPool: thread pool implementation.
5 : : * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6 : : *
7 : : * SPDX-License-Identifier: LGPL-2.1-or-later
8 : : *
9 : : * This library is free software; you can redistribute it and/or
10 : : * modify it under the terms of the GNU Lesser General Public
11 : : * License as published by the Free Software Foundation; either
12 : : * version 2.1 of the License, or (at your option) any later version.
13 : : *
14 : : * This library is distributed in the hope that it will be useful,
15 : : * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 : : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 : : * Lesser General Public License for more details.
18 : : *
19 : : * You should have received a copy of the GNU Lesser General Public
20 : : * License along with this library; if not, see <http://www.gnu.org/licenses/>.
21 : : */
22 : :
23 : : /*
24 : : * MT safe
25 : : */
26 : :
27 : : #include "config.h"
28 : :
29 : : #include "gthreadpool.h"
30 : :
31 : : #include "gasyncqueue.h"
32 : : #include "gasyncqueueprivate.h"
33 : : #include "glib-private.h"
34 : : #include "gmain.h"
35 : : #include "gtestutils.h"
36 : : #include "gthreadprivate.h"
37 : : #include "gtimer.h"
38 : : #include "gutils.h"
39 : :
40 : : #define DEBUG_MSG(x)
41 : : /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n"); */
42 : :
43 : : typedef struct _GRealThreadPool GRealThreadPool;
44 : :
45 : : /**
46 : : * GThreadPool:
47 : : * @func: the function to execute in the threads of this pool
48 : : * @user_data: the user data for the threads of this pool
49 : : * @exclusive: are all threads exclusive to this pool
50 : : *
51 : : * The `GThreadPool` struct represents a thread pool.
52 : : *
53 : : * A thread pool is useful when you wish to asynchronously fork out the execution of work
54 : : * and continue working in your own thread. If that will happen often, the overhead of starting
55 : : * and destroying a thread each time might be too high. In such cases reusing already started
56 : : * threads seems like a good idea. And it indeed is, but implementing this can be tedious
57 : : * and error-prone.
58 : : *
59 : : * Therefore GLib provides thread pools for your convenience. An added advantage is, that the
60 : : * threads can be shared between the different subsystems of your program, when they are using GLib.
61 : : *
62 : : * To create a new thread pool, you use [func@GLib.ThreadPool.new].
63 : : * It is destroyed by [method@GLib.ThreadPool.free].
64 : : *
65 : : * If you want to execute a certain task within a thread pool, use [method@GLib.ThreadPool.push].
66 : : *
67 : : * To get the current number of running threads you call [method@GLib.ThreadPool.get_num_threads].
68 : : * To get the number of still unprocessed tasks you call [method@GLib.ThreadPool.unprocessed].
69 : : * To control the maximum number of threads for a thread pool, you use
70 : : * [method@GLib.ThreadPool.get_max_threads]. and [method@GLib.ThreadPool.set_max_threads].
71 : : *
72 : : * Finally you can control the number of unused threads, that are kept alive by GLib for future use.
73 : : * The current number can be fetched with [func@GLib.ThreadPool.get_num_unused_threads].
74 : : * The maximum number can be controlled by [func@GLib.ThreadPool.get_max_unused_threads] and
75 : : * [func@GLib.ThreadPool.set_max_unused_threads]. All currently unused threads
76 : : * can be stopped by calling [func@GLib.ThreadPool.stop_unused_threads].
77 : : */
78 : : struct _GRealThreadPool
79 : : {
80 : : GThreadPool pool;
81 : : GAsyncQueue *queue;
82 : : GCond cond;
83 : : gint max_threads;
84 : : guint num_threads;
85 : : gboolean running;
86 : : gboolean immediate;
87 : : gboolean waiting;
88 : : GCompareDataFunc sort_func;
89 : : gpointer sort_user_data;
90 : : };
91 : :
92 : : /* The following is just an address to mark the wakeup order for a
93 : : * thread, it could be any address (as long, as it isn't a valid
94 : : * GThreadPool address)
95 : : */
96 : : static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
97 : : static gint wakeup_thread_serial = 0;
98 : :
99 : : /* Here all unused threads are waiting */
100 : : static GAsyncQueue *unused_thread_queue = NULL;
101 : : static gint unused_threads = 0;
102 : : static gint max_unused_threads = 2;
103 : : static gint kill_unused_threads = 0;
104 : : static guint max_idle_time = 15 * 1000;
105 : :
106 : : typedef struct
107 : : {
108 : : /* Either thread or error are set in the end. Both transfer-full. */
109 : : GThreadPool *pool;
110 : : GThread *thread;
111 : : GError *error;
112 : : } SpawnThreadData;
113 : :
114 : : static GCond spawn_thread_cond;
115 : : static GAsyncQueue *spawn_thread_queue;
116 : :
117 : : static void g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
118 : : gpointer data);
119 : : static void g_thread_pool_free_internal (GRealThreadPool *pool);
120 : : static gpointer g_thread_pool_thread_proxy (gpointer data);
121 : : static gboolean g_thread_pool_start_thread (GRealThreadPool *pool,
122 : : GError **error);
123 : : static void g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool);
124 : : static GRealThreadPool* g_thread_pool_wait_for_new_pool (void);
125 : : static gpointer g_thread_pool_wait_for_new_task (GRealThreadPool *pool);
126 : :
127 : : static void
128 : 105050 : g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
129 : : gpointer data)
130 : : {
131 [ + + ]: 105050 : if (pool->sort_func)
132 : 3563 : g_async_queue_push_sorted_unlocked (pool->queue,
133 : : data,
134 : : pool->sort_func,
135 : : pool->sort_user_data);
136 : : else
137 : 101487 : g_async_queue_push_unlocked (pool->queue, data);
138 : 105050 : }
139 : :
140 : : static GRealThreadPool*
141 : 331 : g_thread_pool_wait_for_new_pool (void)
142 : : {
143 : : GRealThreadPool *pool;
144 : : gint local_wakeup_thread_serial;
145 : : guint local_max_unused_threads;
146 : : gint local_max_idle_time;
147 : : gint last_wakeup_thread_serial;
148 : 331 : gboolean have_relayed_thread_marker = FALSE;
149 : :
150 : 331 : local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
151 : 331 : local_max_idle_time = g_atomic_int_get (&max_idle_time);
152 : 331 : last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
153 : :
154 : : do
155 : : {
156 [ + + ]: 331 : if ((guint) g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
157 : : {
158 : : /* If this is a superfluous thread, stop it. */
159 : 62 : pool = NULL;
160 : : }
161 [ + + ]: 269 : else if (local_max_idle_time > 0)
162 : : {
163 : : /* If a maximal idle time is given, wait for the given time. */
164 : : DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
165 : : g_thread_self (), local_max_idle_time / 1000.0));
166 : :
167 : 48 : pool = g_async_queue_timeout_pop (unused_thread_queue,
168 : 48 : local_max_idle_time * 1000);
169 : : }
170 : : else
171 : : {
172 : : /* If no maximal idle time is given, wait indefinitely. */
173 : : DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ()));
174 : 221 : pool = g_async_queue_pop (unused_thread_queue);
175 : : }
176 : :
177 [ + + ]: 320 : if (pool == wakeup_thread_marker)
178 : : {
179 : 199 : local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
180 [ - + ]: 199 : if (last_wakeup_thread_serial == local_wakeup_thread_serial)
181 : : {
182 [ # # ]: 0 : if (!have_relayed_thread_marker)
183 : : {
184 : : /* If this wakeup marker has been received for
185 : : * the second time, relay it.
186 : : */
187 : : DEBUG_MSG (("thread %p relaying wakeup message to "
188 : : "waiting thread with lower serial.",
189 : : g_thread_self ()));
190 : :
191 : 0 : g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
192 : 0 : have_relayed_thread_marker = TRUE;
193 : :
194 : : /* If a wakeup marker has been relayed, this thread
195 : : * will get out of the way for 100 microseconds to
196 : : * avoid receiving this marker again.
197 : : */
198 : 0 : g_usleep (100);
199 : : }
200 : : }
201 : : else
202 : : {
203 [ + - ]: 199 : if (g_atomic_int_add (&kill_unused_threads, -1) > 0)
204 : : {
205 : 199 : pool = NULL;
206 : 199 : break;
207 : : }
208 : :
209 : : DEBUG_MSG (("thread %p updating to new limits.",
210 : : g_thread_self ()));
211 : :
212 : 0 : local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
213 : 0 : local_max_idle_time = g_atomic_int_get (&max_idle_time);
214 : 0 : last_wakeup_thread_serial = local_wakeup_thread_serial;
215 : :
216 : 0 : have_relayed_thread_marker = FALSE;
217 : : }
218 : : }
219 : : }
220 [ - + ]: 121 : while (pool == wakeup_thread_marker);
221 : :
222 : 320 : return pool;
223 : : }
224 : :
225 : : static gpointer
226 : 105024 : g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
227 : : {
228 : 105024 : gpointer task = NULL;
229 : :
230 [ + + + + : 131040 : if (pool->running || (!pool->immediate &&
+ + ]
231 : 26016 : g_async_queue_length_unlocked (pool->queue) > 0))
232 : : {
233 : : /* This thread pool is still active. */
234 [ + + + + ]: 104928 : if (pool->max_threads != -1 && pool->num_threads > (guint) pool->max_threads)
235 : : {
236 : : /* This is a superfluous thread, so it goes to the global pool. */
237 : : DEBUG_MSG (("superfluous thread %p in pool %p.",
238 : : g_thread_self (), pool));
239 : : }
240 [ + + ]: 104922 : else if (pool->pool.exclusive)
241 : : {
242 : : /* Exclusive threads stay attached to the pool. */
243 : 100610 : task = g_async_queue_pop_unlocked (pool->queue);
244 : :
245 : : DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
246 : : "(%d running, %d unprocessed).",
247 : : g_thread_self (), pool, pool->num_threads,
248 : : g_async_queue_length_unlocked (pool->queue)));
249 : : }
250 : : else
251 : : {
252 : : /* A thread will wait for new tasks for at most 1/2
253 : : * second before going to the global pool.
254 : : */
255 : : DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
256 : : "(%d running, %d unprocessed).",
257 : : g_thread_self (), pool, pool->num_threads,
258 : : g_async_queue_length_unlocked (pool->queue)));
259 : :
260 : 4312 : task = g_async_queue_timeout_pop_unlocked (pool->queue,
261 : : G_USEC_PER_SEC / 2);
262 : : }
263 : : }
264 : : else
265 : : {
266 : : /* This thread pool is inactive, it will no longer process tasks. */
267 : : DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
268 : : "(running: %s, immediate: %s, len: %d).",
269 : : pool, g_thread_self (),
270 : : pool->running ? "true" : "false",
271 : : pool->immediate ? "true" : "false",
272 : : g_async_queue_length_unlocked (pool->queue)));
273 : : }
274 : :
275 : 104908 : return task;
276 : : }
277 : :
278 : : static gpointer
279 : 166 : g_thread_pool_spawn_thread (gpointer data)
280 : : {
281 : : while (TRUE)
282 : 340 : {
283 : : SpawnThreadData *spawn_thread_data;
284 : 506 : GThread *thread = NULL;
285 : 506 : GError *error = NULL;
286 : 506 : const gchar *prgname = g_get_prgname ();
287 : 506 : gchar name[16] = "pool";
288 : :
289 [ + + ]: 506 : if (prgname)
290 : 387 : g_snprintf (name, sizeof (name), "pool-%s", prgname);
291 : :
292 : 506 : g_async_queue_lock (spawn_thread_queue);
293 : : /* Spawn a new thread for the given pool and wake the requesting thread
294 : : * up again with the result. This new thread will have the scheduler
295 : : * settings inherited from this thread and in extension of the thread
296 : : * that created the first non-exclusive thread-pool. */
297 : 506 : spawn_thread_data = g_async_queue_pop_unlocked (spawn_thread_queue);
298 : 340 : thread = g_thread_try_new (name, g_thread_pool_thread_proxy, spawn_thread_data->pool, &error);
299 : :
300 : 340 : spawn_thread_data->thread = g_steal_pointer (&thread);
301 : 340 : spawn_thread_data->error = g_steal_pointer (&error);
302 : :
303 : 340 : g_cond_broadcast (&spawn_thread_cond);
304 : 340 : g_async_queue_unlock (spawn_thread_queue);
305 : : }
306 : :
307 : : return NULL;
308 : : }
309 : :
310 : : static gpointer
311 : 398 : g_thread_pool_thread_proxy (gpointer data)
312 : : {
313 : : GRealThreadPool *pool;
314 : :
315 : 398 : pool = data;
316 : :
317 : : DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool));
318 : :
319 : 398 : g_async_queue_lock (pool->queue);
320 : :
321 : : while (TRUE)
322 : 104626 : {
323 : : gpointer task;
324 : :
325 : 105024 : task = g_thread_pool_wait_for_new_task (pool);
326 [ + + ]: 104908 : if (task)
327 : : {
328 [ + + + + ]: 104577 : if (pool->running || !pool->immediate)
329 : : {
330 : : /* A task was received and the thread pool is active,
331 : : * so execute the function.
332 : : */
333 : 104551 : g_async_queue_unlock (pool->queue);
334 : : DEBUG_MSG (("thread %p in pool %p calling func.",
335 : : g_thread_self (), pool));
336 : 104551 : pool->pool.func (task, pool->pool.user_data);
337 : 104551 : g_async_queue_lock (pool->queue);
338 : : }
339 : : }
340 : : else
341 : : {
342 : : /* No task was received, so this thread goes to the global pool. */
343 : 331 : gboolean free_pool = FALSE;
344 : :
345 : : DEBUG_MSG (("thread %p leaving pool %p for global pool.",
346 : : g_thread_self (), pool));
347 : 331 : pool->num_threads--;
348 : :
349 [ + + ]: 331 : if (!pool->running)
350 : : {
351 [ + + ]: 96 : if (!pool->waiting)
352 : : {
353 [ + + ]: 29 : if (pool->num_threads == 0)
354 : : {
355 : : /* If the pool is not running and no other
356 : : * thread is waiting for this thread pool to
357 : : * finish and this is the last thread of this
358 : : * pool, free the pool.
359 : : */
360 : 23 : free_pool = TRUE;
361 : : }
362 : : else
363 : : {
364 : : /* If the pool is not running and no other
365 : : * thread is waiting for this thread pool to
366 : : * finish and this is not the last thread of
367 : : * this pool and there are no tasks left in the
368 : : * queue, wakeup the remaining threads.
369 : : */
370 : 6 : if (g_async_queue_length_unlocked (pool->queue) ==
371 [ - + ]: 6 : (gint) -pool->num_threads)
372 : 0 : g_thread_pool_wakeup_and_stop_all (pool);
373 : : }
374 : : }
375 [ + + + - ]: 112 : else if (pool->immediate ||
376 : 45 : g_async_queue_length_unlocked (pool->queue) <= 0)
377 : : {
378 : : /* If the pool is not running and another thread is
379 : : * waiting for this thread pool to finish and there
380 : : * are either no tasks left or the pool shall stop
381 : : * immediately, inform the waiting thread of a change
382 : : * of the thread pool state.
383 : : */
384 : 67 : g_cond_broadcast (&pool->cond);
385 : : }
386 : : }
387 : :
388 : 331 : g_atomic_int_inc (&unused_threads);
389 : 331 : g_async_queue_unlock (pool->queue);
390 : :
391 [ + + ]: 331 : if (free_pool)
392 : 23 : g_thread_pool_free_internal (pool);
393 : :
394 : 331 : pool = g_thread_pool_wait_for_new_pool ();
395 : 320 : g_atomic_int_add (&unused_threads, -1);
396 : :
397 [ + + ]: 320 : if (pool == NULL)
398 : 271 : break;
399 : :
400 : 49 : g_async_queue_lock (pool->queue);
401 : :
402 : : DEBUG_MSG (("thread %p entering pool %p from global pool.",
403 : : g_thread_self (), pool));
404 : :
405 : : /* pool->num_threads++ is not done here, but in
406 : : * g_thread_pool_start_thread to make the new started
407 : : * thread known to the pool before itself can do it.
408 : : */
409 : : }
410 : : }
411 : :
412 : 271 : return NULL;
413 : : }
414 : :
415 : : static gboolean
416 : 95546 : g_thread_pool_start_thread (GRealThreadPool *pool,
417 : : GError **error)
418 : : {
419 : 95546 : gboolean success = FALSE;
420 : :
421 [ + + + + ]: 95546 : if (pool->max_threads != -1 && pool->num_threads >= (guint) pool->max_threads)
422 : : /* Enough threads are already running */
423 : 95099 : return TRUE;
424 : :
425 : 447 : g_async_queue_lock (unused_thread_queue);
426 : :
427 [ + + ]: 447 : if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
428 : : {
429 : 49 : g_async_queue_push_unlocked (unused_thread_queue, pool);
430 : 49 : success = TRUE;
431 : : }
432 : :
433 : 447 : g_async_queue_unlock (unused_thread_queue);
434 : :
435 [ + + ]: 447 : if (!success)
436 : : {
437 : 398 : const gchar *prgname = g_get_prgname ();
438 : 398 : gchar name[16] = "pool";
439 : : GThread *thread;
440 : :
441 [ + + ]: 398 : if (prgname)
442 : 339 : g_snprintf (name, sizeof (name), "pool-%s", prgname);
443 : :
444 : : /* No thread was found, we have to start a new one */
445 [ + + ]: 398 : if (pool->pool.exclusive)
446 : : {
447 : : /* For exclusive thread-pools this is directly called from new() and
448 : : * we simply start new threads that inherit the scheduler settings
449 : : * from the current thread.
450 : : */
451 : 58 : thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error);
452 : : }
453 : : else
454 : : {
455 : : /* For non-exclusive thread-pools this can be called at any time
456 : : * when a new thread is needed. We make sure to create a new thread
457 : : * here with the correct scheduler settings by going via our helper
458 : : * thread.
459 : : */
460 : 340 : SpawnThreadData spawn_thread_data = { (GThreadPool *) pool, NULL, NULL };
461 : :
462 : 340 : g_async_queue_lock (spawn_thread_queue);
463 : :
464 : 340 : g_async_queue_push_unlocked (spawn_thread_queue, &spawn_thread_data);
465 : :
466 [ + + + - ]: 680 : while (!spawn_thread_data.thread && !spawn_thread_data.error)
467 : 340 : g_cond_wait (&spawn_thread_cond, _g_async_queue_get_mutex (spawn_thread_queue));
468 : :
469 : 340 : thread = spawn_thread_data.thread;
470 [ - + ]: 340 : if (!thread)
471 : 0 : g_propagate_error (error, g_steal_pointer (&spawn_thread_data.error));
472 : 340 : g_async_queue_unlock (spawn_thread_queue);
473 : : }
474 : :
475 [ - + ]: 398 : if (thread == NULL)
476 : 0 : return FALSE;
477 : :
478 : 398 : g_thread_unref (thread);
479 : : }
480 : :
481 : : /* See comment in g_thread_pool_thread_proxy as to why this is done
482 : : * here and not there
483 : : */
484 : 447 : pool->num_threads++;
485 : :
486 : 447 : return TRUE;
487 : : }
488 : :
489 : : /**
490 : : * g_thread_pool_new:
491 : : * @func: a function to execute in the threads of the new thread pool
492 : : * @user_data: user data that is handed over to @func every time it
493 : : * is called
494 : : * @max_threads: the maximal number of threads to execute concurrently
495 : : * in the new thread pool, -1 means no limit
496 : : * @exclusive: should this thread pool be exclusive?
497 : : * @error: return location for error, or %NULL
498 : : *
499 : : * This function creates a new thread pool.
500 : : *
501 : : * Whenever you call g_thread_pool_push(), either a new thread is
502 : : * created or an unused one is reused. At most @max_threads threads
503 : : * are running concurrently for this thread pool. @max_threads = -1
504 : : * allows unlimited threads to be created for this thread pool. The
505 : : * newly created or reused thread now executes the function @func
506 : : * with the two arguments. The first one is the parameter to
507 : : * g_thread_pool_push() and the second one is @user_data.
508 : : *
509 : : * Pass g_get_num_processors() to @max_threads to create as many threads as
510 : : * there are logical processors on the system. This will not pin each thread to
511 : : * a specific processor.
512 : : *
513 : : * The parameter @exclusive determines whether the thread pool owns
514 : : * all threads exclusive or shares them with other thread pools.
515 : : * If @exclusive is %TRUE, @max_threads threads are started
516 : : * immediately and they will run exclusively for this thread pool
517 : : * until it is destroyed by g_thread_pool_free(). If @exclusive is
518 : : * %FALSE, threads are created when needed and shared between all
519 : : * non-exclusive thread pools. This implies that @max_threads may
520 : : * not be -1 for exclusive thread pools. Besides, exclusive thread
521 : : * pools are not affected by g_thread_pool_set_max_idle_time()
522 : : * since their threads are never considered idle and returned to the
523 : : * global pool.
524 : : *
525 : : * Note that the threads used by exclusive thread pools will all inherit the
526 : : * scheduler settings of the current thread while the threads used by
527 : : * non-exclusive thread pools will inherit the scheduler settings from the
528 : : * first thread that created such a thread pool.
529 : : *
530 : : * At least one thread will be spawned when this function is called, either to
531 : : * create the @max_threads exclusive threads, or to preserve the scheduler
532 : : * settings of the current thread for future spawns.
533 : : *
534 : : * @error can be %NULL to ignore errors, or non-%NULL to report
535 : : * errors. An error can only occur when @exclusive is set to %TRUE
536 : : * and not all @max_threads threads could be created.
537 : : * See #GThreadError for possible errors that may occur.
538 : : * Note, even in case of error a valid #GThreadPool is returned.
539 : : *
540 : : * Returns: the new #GThreadPool
541 : : */
542 : : GThreadPool *
543 : 214 : g_thread_pool_new (GFunc func,
544 : : gpointer user_data,
545 : : gint max_threads,
546 : : gboolean exclusive,
547 : : GError **error)
548 : : {
549 : 214 : return g_thread_pool_new_full (func, user_data, NULL, max_threads, exclusive, error);
550 : : }
551 : :
552 : : /**
553 : : * g_thread_pool_new_full:
554 : : * @func: a function to execute in the threads of the new thread pool
555 : : * @user_data: user data that is handed over to @func every time it
556 : : * is called
557 : : * @item_free_func: (nullable): used to pass as a free function to
558 : : * g_async_queue_new_full()
559 : : * @max_threads: the maximal number of threads to execute concurrently
560 : : * in the new thread pool, `-1` means no limit
561 : : * @exclusive: should this thread pool be exclusive?
562 : : * @error: return location for error, or %NULL
563 : : *
564 : : * This function creates a new thread pool similar to g_thread_pool_new()
565 : : * but allowing @item_free_func to be specified to free the data passed
566 : : * to g_thread_pool_push() in the case that the #GThreadPool is stopped
567 : : * and freed before all tasks have been executed.
568 : : *
569 : : * @item_free_func will *not* be called on items successfully passed to @func.
570 : : * @func is responsible for freeing the items passed to it.
571 : : *
572 : : * Returns: (transfer full): the new #GThreadPool
573 : : *
574 : : * Since: 2.70
575 : : */
576 : : GThreadPool *
577 : 220 : g_thread_pool_new_full (GFunc func,
578 : : gpointer user_data,
579 : : GDestroyNotify item_free_func,
580 : : gint max_threads,
581 : : gboolean exclusive,
582 : : GError **error)
583 : : {
584 : : GRealThreadPool *retval;
585 : : G_LOCK_DEFINE_STATIC (init);
586 : :
587 : 220 : g_return_val_if_fail (func, NULL);
588 : 220 : g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
589 : 220 : g_return_val_if_fail (max_threads >= -1, NULL);
590 : :
591 : 220 : retval = g_new (GRealThreadPool, 1);
592 : :
593 : 220 : retval->pool.func = func;
594 : 220 : retval->pool.user_data = user_data;
595 : 220 : retval->pool.exclusive = exclusive;
596 : 220 : retval->queue = g_async_queue_new_full (item_free_func);
597 : 220 : g_cond_init (&retval->cond);
598 : 220 : retval->max_threads = max_threads;
599 : 220 : retval->num_threads = 0;
600 : 220 : retval->running = TRUE;
601 : 220 : retval->immediate = FALSE;
602 : 220 : retval->waiting = FALSE;
603 : 220 : retval->sort_func = NULL;
604 : 220 : retval->sort_user_data = NULL;
605 : :
606 : 220 : G_LOCK (init);
607 [ + + ]: 220 : if (!unused_thread_queue)
608 : 167 : unused_thread_queue = g_async_queue_new ();
609 : :
610 : : /*
611 : : * Spawn a helper thread that is only responsible for spawning new threads
612 : : * with the scheduler settings of the current thread.
613 : : *
614 : : * This is then used for making sure that all threads created on the
615 : : * non-exclusive thread-pool have the same scheduler settings, and more
616 : : * importantly don't just inherit them from the thread that just happened to
617 : : * push a new task and caused a new thread to be created.
618 : : *
619 : : * Not doing so could cause real-time priority threads or otherwise
620 : : * threads with problematic scheduler settings to be part of the
621 : : * non-exclusive thread-pools.
622 : : *
623 : : * For exclusive thread-pools this is not required as all threads are
624 : : * created immediately below and are running forever, so they will
625 : : * automatically inherit the scheduler settings from this very thread.
626 : : */
627 [ + + + + ]: 220 : if (!exclusive && !spawn_thread_queue)
628 : : {
629 : 166 : GThread *pool_spawner = NULL;
630 : :
631 : 166 : spawn_thread_queue = g_async_queue_new ();
632 : 166 : g_cond_init (&spawn_thread_cond);
633 : 166 : pool_spawner = g_thread_new ("pool-spawner", g_thread_pool_spawn_thread, NULL);
634 : 166 : g_ignore_leak (pool_spawner);
635 : : }
636 : 220 : G_UNLOCK (init);
637 : :
638 [ + + ]: 220 : if (retval->pool.exclusive)
639 : : {
640 : 8 : g_async_queue_lock (retval->queue);
641 : :
642 [ + + ]: 67 : while (retval->num_threads < (guint) retval->max_threads)
643 : : {
644 : 59 : GError *local_error = NULL;
645 : :
646 [ - + ]: 59 : if (!g_thread_pool_start_thread (retval, &local_error))
647 : : {
648 : 0 : g_propagate_error (error, local_error);
649 : 0 : break;
650 : : }
651 : : }
652 : :
653 : 8 : g_async_queue_unlock (retval->queue);
654 : : }
655 : :
656 : 220 : return (GThreadPool*) retval;
657 : : }
658 : :
659 : : /**
660 : : * g_thread_pool_push:
661 : : * @pool: a #GThreadPool
662 : : * @data: a new task for @pool
663 : : * @error: return location for error, or %NULL
664 : : *
665 : : * Inserts @data into the list of tasks to be executed by @pool.
666 : : *
667 : : * When the number of currently running threads is lower than the
668 : : * maximal allowed number of threads, a new thread is started (or
669 : : * reused) with the properties given to g_thread_pool_new().
670 : : * Otherwise, @data stays in the queue until a thread in this pool
671 : : * finishes its previous task and processes @data.
672 : : *
673 : : * @error can be %NULL to ignore errors, or non-%NULL to report
674 : : * errors. An error can only occur when a new thread couldn't be
675 : : * created. In that case @data is simply appended to the queue of
676 : : * work to do.
677 : : *
678 : : * Before version 2.32, this function did not return a success status.
679 : : *
680 : : * Returns: %TRUE on success, %FALSE if an error occurred
681 : : */
682 : : gboolean
683 : 105050 : g_thread_pool_push (GThreadPool *pool,
684 : : gpointer data,
685 : : GError **error)
686 : : {
687 : : GRealThreadPool *real;
688 : : gboolean result;
689 : :
690 : 105050 : real = (GRealThreadPool*) pool;
691 : :
692 : 105050 : g_return_val_if_fail (real, FALSE);
693 : 105050 : g_return_val_if_fail (real->running, FALSE);
694 : :
695 : 105050 : result = TRUE;
696 : :
697 : 105050 : g_async_queue_lock (real->queue);
698 : :
699 [ + + ]: 105050 : if (g_async_queue_length_unlocked (real->queue) >= 0)
700 : : {
701 : : /* No thread is waiting in the queue */
702 : 87295 : GError *local_error = NULL;
703 : :
704 [ - + ]: 87295 : if (!g_thread_pool_start_thread (real, &local_error))
705 : : {
706 : 0 : g_propagate_error (error, local_error);
707 : 0 : result = FALSE;
708 : : }
709 : : }
710 : :
711 : 105050 : g_thread_pool_queue_push_unlocked (real, data);
712 : 105050 : g_async_queue_unlock (real->queue);
713 : :
714 : 105050 : return result;
715 : : }
716 : :
717 : : /**
718 : : * g_thread_pool_set_max_threads:
719 : : * @pool: a #GThreadPool
720 : : * @max_threads: a new maximal number of threads for @pool,
721 : : * or -1 for unlimited
722 : : * @error: return location for error, or %NULL
723 : : *
724 : : * Sets the maximal allowed number of threads for @pool.
725 : : * A value of -1 means that the maximal number of threads
726 : : * is unlimited. If @pool is an exclusive thread pool, setting
727 : : * the maximal number of threads to -1 is not allowed.
728 : : *
729 : : * Setting @max_threads to 0 means stopping all work for @pool.
730 : : * It is effectively frozen until @max_threads is set to a non-zero
731 : : * value again.
732 : : *
733 : : * A thread is never terminated while calling @func, as supplied by
734 : : * g_thread_pool_new(). Instead the maximal number of threads only
735 : : * has effect for the allocation of new threads in g_thread_pool_push().
736 : : * A new thread is allocated, whenever the number of currently
737 : : * running threads in @pool is smaller than the maximal number.
738 : : *
739 : : * @error can be %NULL to ignore errors, or non-%NULL to report
740 : : * errors. An error can only occur when a new thread couldn't be
741 : : * created.
742 : : *
743 : : * Before version 2.32, this function did not return a success status.
744 : : *
745 : : * Returns: %TRUE on success, %FALSE if an error occurred
746 : : */
747 : : gboolean
748 : 14 : g_thread_pool_set_max_threads (GThreadPool *pool,
749 : : gint max_threads,
750 : : GError **error)
751 : : {
752 : : GRealThreadPool *real;
753 : : gint to_start;
754 : : gboolean result;
755 : :
756 : 14 : real = (GRealThreadPool*) pool;
757 : :
758 : 14 : g_return_val_if_fail (real, FALSE);
759 : 14 : g_return_val_if_fail (real->running, FALSE);
760 : 14 : g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE);
761 : 14 : g_return_val_if_fail (max_threads >= -1, FALSE);
762 : :
763 : 14 : result = TRUE;
764 : :
765 : 14 : g_async_queue_lock (real->queue);
766 : :
767 : 14 : real->max_threads = max_threads;
768 : :
769 [ - + ]: 14 : if (pool->exclusive)
770 : 0 : to_start = real->max_threads - real->num_threads;
771 : : else
772 : 14 : to_start = g_async_queue_length_unlocked (real->queue);
773 : :
774 [ + + ]: 8206 : for ( ; to_start > 0; to_start--)
775 : : {
776 : 8192 : GError *local_error = NULL;
777 : :
778 [ - + ]: 8192 : if (!g_thread_pool_start_thread (real, &local_error))
779 : : {
780 : 0 : g_propagate_error (error, local_error);
781 : 0 : result = FALSE;
782 : 0 : break;
783 : : }
784 : : }
785 : :
786 : 14 : g_async_queue_unlock (real->queue);
787 : :
788 : 14 : return result;
789 : : }
790 : :
791 : : /**
792 : : * g_thread_pool_get_max_threads:
793 : : * @pool: a #GThreadPool
794 : : *
795 : : * Returns the maximal number of threads for @pool.
796 : : *
797 : : * Returns: the maximal number of threads
798 : : */
799 : : gint
800 : 5 : g_thread_pool_get_max_threads (GThreadPool *pool)
801 : : {
802 : : GRealThreadPool *real;
803 : : gint retval;
804 : :
805 : 5 : real = (GRealThreadPool*) pool;
806 : :
807 : 5 : g_return_val_if_fail (real, 0);
808 : 5 : g_return_val_if_fail (real->running, 0);
809 : :
810 : 5 : g_async_queue_lock (real->queue);
811 : 5 : retval = real->max_threads;
812 : 5 : g_async_queue_unlock (real->queue);
813 : :
814 : 5 : return retval;
815 : : }
816 : :
817 : : /**
818 : : * g_thread_pool_get_num_threads:
819 : : * @pool: a #GThreadPool
820 : : *
821 : : * Returns the number of threads currently running in @pool.
822 : : *
823 : : * Returns: the number of threads currently running
824 : : */
825 : : guint
826 : 4146034 : g_thread_pool_get_num_threads (GThreadPool *pool)
827 : : {
828 : : GRealThreadPool *real;
829 : : guint retval;
830 : :
831 : 4146034 : real = (GRealThreadPool*) pool;
832 : :
833 : 4146034 : g_return_val_if_fail (real, 0);
834 : 4146034 : g_return_val_if_fail (real->running, 0);
835 : :
836 : 4146034 : g_async_queue_lock (real->queue);
837 : 4146034 : retval = real->num_threads;
838 : 4146034 : g_async_queue_unlock (real->queue);
839 : :
840 : 4146034 : return retval;
841 : : }
842 : :
843 : : /**
844 : : * g_thread_pool_unprocessed:
845 : : * @pool: a #GThreadPool
846 : : *
847 : : * Returns the number of tasks still unprocessed in @pool.
848 : : *
849 : : * Returns: the number of unprocessed tasks
850 : : */
851 : : guint
852 : 3743 : g_thread_pool_unprocessed (GThreadPool *pool)
853 : : {
854 : : GRealThreadPool *real;
855 : : gint unprocessed;
856 : :
857 : 3743 : real = (GRealThreadPool*) pool;
858 : :
859 : 3743 : g_return_val_if_fail (real, 0);
860 : 3743 : g_return_val_if_fail (real->running, 0);
861 : :
862 : 3743 : unprocessed = g_async_queue_length (real->queue);
863 : :
864 : 3743 : return MAX (unprocessed, 0);
865 : : }
866 : :
867 : : /**
868 : : * g_thread_pool_free:
869 : : * @pool: a #GThreadPool
870 : : * @immediate: should @pool shut down immediately?
871 : : * @wait_: should the function wait for all tasks to be finished?
872 : : *
873 : : * Frees all resources allocated for @pool.
874 : : *
875 : : * If @immediate is %TRUE, no new task is processed for @pool.
876 : : * Otherwise @pool is not freed before the last task is processed.
877 : : * Note however, that no thread of this pool is interrupted while
878 : : * processing a task. Instead at least all still running threads
879 : : * can finish their tasks before the @pool is freed.
880 : : *
881 : : * If @wait_ is %TRUE, this function does not return before all
882 : : * tasks to be processed (dependent on @immediate, whether all
883 : : * or only the currently running) are ready.
884 : : * Otherwise this function returns immediately.
885 : : *
886 : : * After calling this function @pool must not be used anymore.
887 : : */
888 : : void
889 : 47 : g_thread_pool_free (GThreadPool *pool,
890 : : gboolean immediate,
891 : : gboolean wait_)
892 : : {
893 : : GRealThreadPool *real;
894 : :
895 : 47 : real = (GRealThreadPool*) pool;
896 : :
897 : 47 : g_return_if_fail (real);
898 : 47 : g_return_if_fail (real->running);
899 : :
900 : : /* If there's no thread allowed here, there is not much sense in
901 : : * not stopping this pool immediately, when it's not empty
902 : : */
903 : 47 : g_return_if_fail (immediate ||
904 : : real->max_threads != 0 ||
905 : : g_async_queue_length (real->queue) == 0);
906 : :
907 : 47 : g_async_queue_lock (real->queue);
908 : :
909 : 47 : real->running = FALSE;
910 : 47 : real->immediate = immediate;
911 : 47 : real->waiting = wait_;
912 : :
913 [ + + ]: 47 : if (wait_)
914 : : {
915 [ + + + + ]: 54 : while (g_async_queue_length_unlocked (real->queue) != (gint) -real->num_threads &&
916 [ + + ]: 22 : !(immediate && real->num_threads == 0))
917 : 29 : g_cond_wait (&real->cond, _g_async_queue_get_mutex (real->queue));
918 : : }
919 : :
920 [ + + + + ]: 47 : if (immediate || g_async_queue_length_unlocked (real->queue) == (gint) -real->num_threads)
921 : : {
922 : : /* No thread is currently doing something (and nothing is left
923 : : * to process in the queue)
924 : : */
925 [ + + ]: 46 : if (real->num_threads == 0)
926 : : {
927 : : /* No threads left, we clean up */
928 : 24 : g_async_queue_unlock (real->queue);
929 : 24 : g_thread_pool_free_internal (real);
930 : 24 : return;
931 : : }
932 : :
933 : 22 : g_thread_pool_wakeup_and_stop_all (real);
934 : : }
935 : :
936 : : /* The last thread should cleanup the pool */
937 : 23 : real->waiting = FALSE;
938 : 23 : g_async_queue_unlock (real->queue);
939 : : }
940 : :
941 : : static void
942 : 47 : g_thread_pool_free_internal (GRealThreadPool* pool)
943 : : {
944 : 47 : g_return_if_fail (pool);
945 : 47 : g_return_if_fail (pool->running == FALSE);
946 : 47 : g_return_if_fail (pool->num_threads == 0);
947 : :
948 : : /* Ensure the dummy item pushed on by g_thread_pool_wakeup_and_stop_all() is
949 : : * removed, before it’s potentially passed to the user-provided
950 : : * @item_free_func. */
951 : 47 : g_async_queue_remove (pool->queue, GUINT_TO_POINTER (1));
952 : :
953 : 47 : g_async_queue_unref (pool->queue);
954 : 47 : g_cond_clear (&pool->cond);
955 : :
956 : 47 : g_free (pool);
957 : : }
958 : :
959 : : static void
960 : 22 : g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool)
961 : : {
962 : : guint i;
963 : :
964 : 22 : g_return_if_fail (pool);
965 : 22 : g_return_if_fail (pool->running == FALSE);
966 : 22 : g_return_if_fail (pool->num_threads != 0);
967 : :
968 : 22 : pool->immediate = TRUE;
969 : :
970 : : /*
971 : : * So here we're sending bogus data to the pool threads, which
972 : : * should cause them each to wake up, and check the above
973 : : * pool->immediate condition. However we don't want that
974 : : * data to be sorted (since it'll crash the sorter).
975 : : */
976 [ + + ]: 50 : for (i = 0; i < pool->num_threads; i++)
977 : 28 : g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
978 : : }
979 : :
980 : : /**
981 : : * g_thread_pool_set_max_unused_threads:
982 : : * @max_threads: maximal number of unused threads
983 : : *
984 : : * Sets the maximal number of unused threads to @max_threads.
985 : : * If @max_threads is -1, no limit is imposed on the number
986 : : * of unused threads.
987 : : *
988 : : * The default value is 2.
989 : : */
990 : : void
991 : 20 : g_thread_pool_set_max_unused_threads (gint max_threads)
992 : : {
993 : 20 : g_return_if_fail (max_threads >= -1);
994 : :
995 : 20 : g_atomic_int_set (&max_unused_threads, max_threads);
996 : :
997 [ + + ]: 20 : if (max_threads != -1)
998 : : {
999 : 13 : max_threads -= g_atomic_int_get (&unused_threads);
1000 [ + + ]: 13 : if (max_threads < 0)
1001 : : {
1002 : 3 : g_atomic_int_set (&kill_unused_threads, -max_threads);
1003 : 3 : g_atomic_int_inc (&wakeup_thread_serial);
1004 : :
1005 : 3 : g_async_queue_lock (unused_thread_queue);
1006 : :
1007 : : do
1008 : : {
1009 : 199 : g_async_queue_push_unlocked (unused_thread_queue,
1010 : : wakeup_thread_marker);
1011 : : }
1012 [ + + ]: 199 : while (++max_threads);
1013 : :
1014 : 3 : g_async_queue_unlock (unused_thread_queue);
1015 : : }
1016 : : }
1017 : : }
1018 : :
1019 : : /**
1020 : : * g_thread_pool_get_max_unused_threads:
1021 : : *
1022 : : * Returns the maximal allowed number of unused threads.
1023 : : *
1024 : : * Returns: the maximal number of unused threads
1025 : : */
1026 : : gint
1027 : 7 : g_thread_pool_get_max_unused_threads (void)
1028 : : {
1029 : 7 : return g_atomic_int_get (&max_unused_threads);
1030 : : }
1031 : :
1032 : : /**
1033 : : * g_thread_pool_get_num_unused_threads:
1034 : : *
1035 : : * Returns the number of currently unused threads.
1036 : : *
1037 : : * Returns: the number of currently unused threads
1038 : : */
1039 : : guint
1040 : 135 : g_thread_pool_get_num_unused_threads (void)
1041 : : {
1042 : 135 : return (guint) g_atomic_int_get (&unused_threads);
1043 : : }
1044 : :
1045 : : /**
1046 : : * g_thread_pool_stop_unused_threads:
1047 : : *
1048 : : * Stops all currently unused threads. This does not change the
1049 : : * maximal number of unused threads. This function can be used to
1050 : : * regularly stop all unused threads e.g. from g_timeout_add().
1051 : : */
1052 : : void
1053 : 5 : g_thread_pool_stop_unused_threads (void)
1054 : : {
1055 : : guint oldval;
1056 : :
1057 : 5 : oldval = g_thread_pool_get_max_unused_threads ();
1058 : :
1059 : 5 : g_thread_pool_set_max_unused_threads (0);
1060 : 5 : g_thread_pool_set_max_unused_threads (oldval);
1061 : 5 : }
1062 : :
1063 : : /**
1064 : : * g_thread_pool_set_sort_function:
1065 : : * @pool: a #GThreadPool
1066 : : * @func: the #GCompareDataFunc used to sort the list of tasks.
1067 : : * This function is passed two tasks. It should return
1068 : : * 0 if the order in which they are handled does not matter,
1069 : : * a negative value if the first task should be processed before
1070 : : * the second or a positive value if the second task should be
1071 : : * processed first.
1072 : : * @user_data: user data passed to @func
1073 : : *
1074 : : * Sets the function used to sort the list of tasks. This allows the
1075 : : * tasks to be processed by a priority determined by @func, and not
1076 : : * just in the order in which they were added to the pool.
1077 : : *
1078 : : * Note, if the maximum number of threads is more than 1, the order
1079 : : * that threads are executed cannot be guaranteed 100%. Threads are
1080 : : * scheduled by the operating system and are executed at random. It
1081 : : * cannot be assumed that threads are executed in the order they are
1082 : : * created.
1083 : : *
1084 : : * Since: 2.10
1085 : : */
1086 : : void
1087 : 163 : g_thread_pool_set_sort_function (GThreadPool *pool,
1088 : : GCompareDataFunc func,
1089 : : gpointer user_data)
1090 : : {
1091 : : GRealThreadPool *real;
1092 : :
1093 : 163 : real = (GRealThreadPool*) pool;
1094 : :
1095 : 163 : g_return_if_fail (real);
1096 : 163 : g_return_if_fail (real->running);
1097 : :
1098 : 163 : g_async_queue_lock (real->queue);
1099 : :
1100 : 163 : real->sort_func = func;
1101 : 163 : real->sort_user_data = user_data;
1102 : :
1103 [ + - ]: 163 : if (func)
1104 : 163 : g_async_queue_sort_unlocked (real->queue,
1105 : : real->sort_func,
1106 : : real->sort_user_data);
1107 : :
1108 : 163 : g_async_queue_unlock (real->queue);
1109 : : }
1110 : :
1111 : : /**
1112 : : * g_thread_pool_move_to_front:
1113 : : * @pool: a #GThreadPool
1114 : : * @data: an unprocessed item in the pool
1115 : : *
1116 : : * Moves the item to the front of the queue of unprocessed
1117 : : * items, so that it will be processed next.
1118 : : *
1119 : : * Returns: %TRUE if the item was found and moved
1120 : : *
1121 : : * Since: 2.46
1122 : : */
1123 : : gboolean
1124 : 1037 : g_thread_pool_move_to_front (GThreadPool *pool,
1125 : : gpointer data)
1126 : : {
1127 : 1037 : GRealThreadPool *real = (GRealThreadPool*) pool;
1128 : : gboolean found;
1129 : :
1130 : 1037 : g_async_queue_lock (real->queue);
1131 : :
1132 : 1037 : found = g_async_queue_remove_unlocked (real->queue, data);
1133 [ + + ]: 1037 : if (found)
1134 : 1014 : g_async_queue_push_front_unlocked (real->queue, data);
1135 : :
1136 : 1037 : g_async_queue_unlock (real->queue);
1137 : :
1138 : 1037 : return found;
1139 : : }
1140 : :
1141 : : /**
1142 : : * g_thread_pool_set_max_idle_time:
1143 : : * @interval: the maximum @interval (in milliseconds)
1144 : : * a thread can be idle
1145 : : *
1146 : : * This function will set the maximum @interval that a thread
1147 : : * waiting in the pool for new tasks can be idle for before
1148 : : * being stopped. This function is similar to calling
1149 : : * g_thread_pool_stop_unused_threads() on a regular timeout,
1150 : : * except this is done on a per thread basis.
1151 : : *
1152 : : * By setting @interval to 0, idle threads will not be stopped.
1153 : : *
1154 : : * The default value is 15000 (15 seconds).
1155 : : *
1156 : : * Since: 2.10
1157 : : */
1158 : : void
1159 : 3 : g_thread_pool_set_max_idle_time (guint interval)
1160 : : {
1161 : : guint i;
1162 : :
1163 : 3 : g_atomic_int_set (&max_idle_time, interval);
1164 : :
1165 : 3 : i = (guint) g_atomic_int_get (&unused_threads);
1166 [ - + ]: 3 : if (i > 0)
1167 : : {
1168 : 0 : g_atomic_int_inc (&wakeup_thread_serial);
1169 : 0 : g_async_queue_lock (unused_thread_queue);
1170 : :
1171 : : do
1172 : : {
1173 : 0 : g_async_queue_push_unlocked (unused_thread_queue,
1174 : : wakeup_thread_marker);
1175 : : }
1176 [ # # ]: 0 : while (--i);
1177 : :
1178 : 0 : g_async_queue_unlock (unused_thread_queue);
1179 : : }
1180 : 3 : }
1181 : :
1182 : : /**
1183 : : * g_thread_pool_get_max_idle_time:
1184 : : *
1185 : : * This function will return the maximum @interval that a
1186 : : * thread will wait in the thread pool for new tasks before
1187 : : * being stopped.
1188 : : *
1189 : : * If this function returns 0, threads waiting in the thread
1190 : : * pool for new work are not stopped.
1191 : : *
1192 : : * Returns: the maximum @interval (milliseconds) to wait
1193 : : * for new tasks in the thread pool before stopping the
1194 : : * thread
1195 : : *
1196 : : * Since: 2.10
1197 : : */
1198 : : guint
1199 : 3 : g_thread_pool_get_max_idle_time (void)
1200 : : {
1201 : 3 : return (guint) g_atomic_int_get (&max_idle_time);
1202 : : }
|