1 /* Worker thread pool for slow items, such as filesystem lookups or mkdirs
3 * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public Licence
8 * as published by the Free Software Foundation; either version
9 * 2 of the Licence, or (at your option) any later version.
11 * See Documentation/slow-work.txt
14 #include <linux/module.h>
15 #include <linux/slow-work.h>
16 #include <linux/kthread.h>
17 #include <linux/freezer.h>
18 #include <linux/wait.h>
19 #include <linux/debugfs.h>
20 #include "slow-work.h"
22 static void slow_work_cull_timeout(unsigned long);
23 static void slow_work_oom_timeout(unsigned long);
26 static int slow_work_min_threads_sysctl(struct ctl_table *, int,
27 void __user *, size_t *, loff_t *);
29 static int slow_work_max_threads_sysctl(struct ctl_table *, int ,
30 void __user *, size_t *, loff_t *);
34 * The pool of threads has at least min threads in it as long as someone is
35 * using the facility, and may have as many as max.
37 * A portion of the pool may be processing very slow operations.
39 static unsigned slow_work_min_threads = 2;
40 static unsigned slow_work_max_threads = 4;
41 static unsigned vslow_work_proportion = 50; /* % of threads that may process
45 static const int slow_work_min_min_threads = 2;
46 static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT;
47 static const int slow_work_min_vslow = 1;
48 static const int slow_work_max_vslow = 99;
50 ctl_table slow_work_sysctls[] = {
52 .ctl_name = CTL_UNNUMBERED,
53 .procname = "min-threads",
54 .data = &slow_work_min_threads,
55 .maxlen = sizeof(unsigned),
57 .proc_handler = slow_work_min_threads_sysctl,
58 .extra1 = (void *) &slow_work_min_min_threads,
59 .extra2 = &slow_work_max_threads,
62 .ctl_name = CTL_UNNUMBERED,
63 .procname = "max-threads",
64 .data = &slow_work_max_threads,
65 .maxlen = sizeof(unsigned),
67 .proc_handler = slow_work_max_threads_sysctl,
68 .extra1 = &slow_work_min_threads,
69 .extra2 = (void *) &slow_work_max_max_threads,
72 .ctl_name = CTL_UNNUMBERED,
73 .procname = "vslow-percentage",
74 .data = &vslow_work_proportion,
75 .maxlen = sizeof(unsigned),
77 .proc_handler = &proc_dointvec_minmax,
78 .extra1 = (void *) &slow_work_min_vslow,
79 .extra2 = (void *) &slow_work_max_vslow,
86 * The active state of the thread pool
88 static atomic_t slow_work_thread_count;
89 static atomic_t vslow_work_executing_count;
91 static bool slow_work_may_not_start_new_thread;
92 static bool slow_work_cull; /* cull a thread due to lack of activity */
93 static DEFINE_TIMER(slow_work_cull_timer, slow_work_cull_timeout, 0, 0);
94 static DEFINE_TIMER(slow_work_oom_timer, slow_work_oom_timeout, 0, 0);
95 static struct slow_work slow_work_new_thread; /* new thread starter */
98 * slow work ID allocation (use slow_work_queue_lock)
100 static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
103 * Unregistration tracking to prevent put_ref() from disappearing during module
106 #ifdef CONFIG_MODULES
107 static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT];
108 static struct module *slow_work_unreg_module;
109 static struct slow_work *slow_work_unreg_work_item;
110 static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq);
111 static DEFINE_MUTEX(slow_work_unreg_sync_lock);
113 static void slow_work_set_thread_processing(int id, struct slow_work *work)
116 slow_work_thread_processing[id] = work->owner;
118 static void slow_work_done_thread_processing(int id, struct slow_work *work)
120 struct module *module = slow_work_thread_processing[id];
122 slow_work_thread_processing[id] = NULL;
124 if (slow_work_unreg_work_item == work ||
125 slow_work_unreg_module == module)
126 wake_up_all(&slow_work_unreg_wq);
128 static void slow_work_clear_thread_processing(int id)
130 slow_work_thread_processing[id] = NULL;
133 static void slow_work_set_thread_processing(int id, struct slow_work *work) {}
134 static void slow_work_done_thread_processing(int id, struct slow_work *work) {}
135 static void slow_work_clear_thread_processing(int id) {}
139 * Data for tracking currently executing items for indication through /proc
141 #ifdef CONFIG_SLOW_WORK_DEBUG
142 struct slow_work *slow_work_execs[SLOW_WORK_THREAD_LIMIT];
143 pid_t slow_work_pids[SLOW_WORK_THREAD_LIMIT];
144 DEFINE_RWLOCK(slow_work_execs_lock);
148 * The queues of work items and the lock governing access to them. These are
149 * shared between all the CPUs. It doesn't make sense to have per-CPU queues
150 * as the number of threads bears no relation to the number of CPUs.
152 * There are two queues of work items: one for slow work items, and one for
153 * very slow work items.
155 LIST_HEAD(slow_work_queue);
156 LIST_HEAD(vslow_work_queue);
157 DEFINE_SPINLOCK(slow_work_queue_lock);
160 * The following are two wait queues that get pinged when a work item is placed
161 * on an empty queue. These allow work items that are hogging a thread by
162 * sleeping in a way that could be deferred to yield their thread and enqueue
165 static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation);
166 static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation);
169 * The thread controls. A variable used to signal to the threads that they
170 * should exit when the queue is empty, a waitqueue used by the threads to wait
171 * for signals, and a completion set by the last thread to exit.
173 static bool slow_work_threads_should_exit;
174 static DECLARE_WAIT_QUEUE_HEAD(slow_work_thread_wq);
175 static DECLARE_COMPLETION(slow_work_last_thread_exited);
178 * The number of users of the thread pool and its lock. Whilst this is zero we
179 * have no threads hanging around, and when this reaches zero, we wait for all
180 * active or queued work items to complete and kill all the threads we do have.
182 static int slow_work_user_count;
183 static DEFINE_MUTEX(slow_work_user_lock);
185 static inline int slow_work_get_ref(struct slow_work *work)
187 if (work->ops->get_ref)
188 return work->ops->get_ref(work);
193 static inline void slow_work_put_ref(struct slow_work *work)
195 if (work->ops->put_ref)
196 work->ops->put_ref(work);
200 * Calculate the maximum number of active threads in the pool that are
201 * permitted to process very slow work items.
203 * The answer is rounded up to at least 1, but may not equal or exceed the
204 * maximum number of the threads in the pool. This means we always have at
205 * least one thread that can process slow work items, and we always have at
206 * least one thread that won't get tied up doing so.
208 static unsigned slow_work_calc_vsmax(void)
212 vsmax = atomic_read(&slow_work_thread_count) * vslow_work_proportion;
214 vsmax = max(vsmax, 1U);
215 return min(vsmax, slow_work_max_threads - 1);
219 * Attempt to execute stuff queued on a slow thread. Return true if we managed
220 * it, false if there was nothing to do.
222 static noinline bool slow_work_execute(int id)
224 struct slow_work *work = NULL;
228 vsmax = slow_work_calc_vsmax();
230 /* see if we can schedule a new thread to be started if we're not
231 * keeping up with the work */
232 if (!waitqueue_active(&slow_work_thread_wq) &&
233 (!list_empty(&slow_work_queue) || !list_empty(&vslow_work_queue)) &&
234 atomic_read(&slow_work_thread_count) < slow_work_max_threads &&
235 !slow_work_may_not_start_new_thread)
236 slow_work_enqueue(&slow_work_new_thread);
238 /* find something to execute */
239 spin_lock_irq(&slow_work_queue_lock);
240 if (!list_empty(&vslow_work_queue) &&
241 atomic_read(&vslow_work_executing_count) < vsmax) {
242 work = list_entry(vslow_work_queue.next,
243 struct slow_work, link);
244 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
246 list_del_init(&work->link);
247 atomic_inc(&vslow_work_executing_count);
249 } else if (!list_empty(&slow_work_queue)) {
250 work = list_entry(slow_work_queue.next,
251 struct slow_work, link);
252 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
254 list_del_init(&work->link);
257 very_slow = false; /* avoid the compiler warning */
260 slow_work_set_thread_processing(id, work);
262 slow_work_mark_time(work);
263 slow_work_begin_exec(id, work);
266 spin_unlock_irq(&slow_work_queue_lock);
271 if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags))
274 /* don't execute if the work is in the process of being cancelled */
275 if (!test_bit(SLOW_WORK_CANCELLING, &work->flags))
276 work->ops->execute(work);
279 atomic_dec(&vslow_work_executing_count);
280 clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags);
282 /* wake up anyone waiting for this work to be complete */
283 wake_up_bit(&work->flags, SLOW_WORK_EXECUTING);
285 slow_work_end_exec(id, work);
287 /* if someone tried to enqueue the item whilst we were executing it,
288 * then it'll be left unenqueued to avoid multiple threads trying to
289 * execute it simultaneously
291 * there is, however, a race between us testing the pending flag and
292 * getting the spinlock, and between the enqueuer setting the pending
293 * flag and getting the spinlock, so we use a deferral bit to tell us
294 * if the enqueuer got there first
296 if (test_bit(SLOW_WORK_PENDING, &work->flags)) {
297 spin_lock_irq(&slow_work_queue_lock);
299 if (!test_bit(SLOW_WORK_EXECUTING, &work->flags) &&
300 test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags))
303 spin_unlock_irq(&slow_work_queue_lock);
306 /* sort out the race between module unloading and put_ref() */
307 slow_work_put_ref(work);
308 slow_work_done_thread_processing(id, work);
313 /* we must complete the enqueue operation
314 * - we transfer our ref on the item back to the appropriate queue
315 * - don't wake another thread up as we're awake already
317 slow_work_mark_time(work);
318 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags))
319 list_add_tail(&work->link, &vslow_work_queue);
321 list_add_tail(&work->link, &slow_work_queue);
322 spin_unlock_irq(&slow_work_queue_lock);
323 slow_work_clear_thread_processing(id);
328 * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work
329 * work: The work item under execution that wants to sleep
330 * _timeout: Scheduler sleep timeout
332 * Allow a requeueable work item to sleep on a slow-work processor thread until
333 * that thread is needed to do some other work or the sleep is interrupted by
336 * The caller must set up a wake up event before calling this and must have set
337 * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own
338 * condition before calling this function as no test is made here.
340 * False is returned if there is nothing on the queue; true is returned if the
341 * work item should be requeued
343 bool slow_work_sleep_till_thread_needed(struct slow_work *work,
344 signed long *_timeout)
346 wait_queue_head_t *wfo_wq;
347 struct list_head *queue;
351 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
352 wfo_wq = &vslow_work_queue_waits_for_occupation;
353 queue = &vslow_work_queue;
355 wfo_wq = &slow_work_queue_waits_for_occupation;
356 queue = &slow_work_queue;
359 if (!list_empty(queue))
362 add_wait_queue_exclusive(wfo_wq, &wait);
363 if (list_empty(queue))
364 *_timeout = schedule_timeout(*_timeout);
365 finish_wait(wfo_wq, &wait);
367 return !list_empty(queue);
369 EXPORT_SYMBOL(slow_work_sleep_till_thread_needed);
372 * slow_work_enqueue - Schedule a slow work item for processing
373 * @work: The work item to queue
375 * Schedule a slow work item for processing. If the item is already undergoing
376 * execution, this guarantees not to re-enter the execution routine until the
377 * first execution finishes.
379 * The item is pinned by this function as it retains a reference to it, managed
380 * through the item operations. The item is unpinned once it has been
383 * An item may hog the thread that is running it for a relatively large amount
384 * of time, sufficient, for example, to perform several lookup, mkdir, create
385 * and setxattr operations. It may sleep on I/O and may sleep to obtain locks.
387 * Conversely, if a number of items are awaiting processing, it may take some
388 * time before any given item is given attention. The number of threads in the
389 * pool may be increased to deal with demand, but only up to a limit.
391 * If SLOW_WORK_VERY_SLOW is set on the work item, then it will be placed in
392 * the very slow queue, from which only a portion of the threads will be
393 * allowed to pick items to execute. This ensures that very slow items won't
394 * overly block ones that are just ordinarily slow.
396 * Returns 0 if successful, -EAGAIN if not (or -ECANCELED if cancelled work is
399 int slow_work_enqueue(struct slow_work *work)
401 wait_queue_head_t *wfo_wq;
402 struct list_head *queue;
406 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
409 BUG_ON(slow_work_user_count <= 0);
413 /* when honouring an enqueue request, we only promise that we will run
414 * the work function in the future; we do not promise to run it once
415 * per enqueue request
417 * we use the PENDING bit to merge together repeat requests without
418 * having to disable IRQs and take the spinlock, whilst still
419 * maintaining our promise
421 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
422 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
423 wfo_wq = &vslow_work_queue_waits_for_occupation;
424 queue = &vslow_work_queue;
426 wfo_wq = &slow_work_queue_waits_for_occupation;
427 queue = &slow_work_queue;
430 spin_lock_irqsave(&slow_work_queue_lock, flags);
432 if (unlikely(test_bit(SLOW_WORK_CANCELLING, &work->flags)))
435 /* we promise that we will not attempt to execute the work
436 * function in more than one thread simultaneously
438 * this, however, leaves us with a problem if we're asked to
439 * enqueue the work whilst someone is executing the work
440 * function as simply queueing the work immediately means that
441 * another thread may try executing it whilst it is already
444 * to deal with this, we set the ENQ_DEFERRED bit instead of
445 * enqueueing, and the thread currently executing the work
446 * function will enqueue the work item when the work function
447 * returns and it has cleared the EXECUTING bit
449 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
450 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
452 ret = slow_work_get_ref(work);
455 slow_work_mark_time(work);
456 list_add_tail(&work->link, queue);
457 wake_up(&slow_work_thread_wq);
459 /* if someone who could be requeued is sleeping on a
460 * thread, then ask them to yield their thread */
461 if (work->link.prev == queue)
465 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
472 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
475 EXPORT_SYMBOL(slow_work_enqueue);
477 static int slow_work_wait(void *word)
484 * slow_work_cancel - Cancel a slow work item
485 * @work: The work item to cancel
487 * This function will cancel a previously enqueued work item. If we cannot
488 * cancel the work item, it is guarenteed to have run when this function
491 void slow_work_cancel(struct slow_work *work)
493 bool wait = true, put = false;
495 set_bit(SLOW_WORK_CANCELLING, &work->flags);
498 /* if the work item is a delayed work item with an active timer, we
499 * need to wait for the timer to finish _before_ getting the spinlock,
500 * lest we deadlock against the timer routine
502 * the timer routine will leave DELAYED set if it notices the
503 * CANCELLING flag in time
505 if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
506 struct delayed_slow_work *dwork =
507 container_of(work, struct delayed_slow_work, work);
508 del_timer_sync(&dwork->timer);
511 spin_lock_irq(&slow_work_queue_lock);
513 if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
514 /* the timer routine aborted or never happened, so we are left
515 * holding the timer's reference on the item and should just
516 * drop the pending flag and wait for any ongoing execution to
518 struct delayed_slow_work *dwork =
519 container_of(work, struct delayed_slow_work, work);
521 BUG_ON(timer_pending(&dwork->timer));
522 BUG_ON(!list_empty(&work->link));
524 clear_bit(SLOW_WORK_DELAYED, &work->flags);
526 clear_bit(SLOW_WORK_PENDING, &work->flags);
528 } else if (test_bit(SLOW_WORK_PENDING, &work->flags) &&
529 !list_empty(&work->link)) {
530 /* the link in the pending queue holds a reference on the item
531 * that we will need to release */
532 list_del_init(&work->link);
535 clear_bit(SLOW_WORK_PENDING, &work->flags);
537 } else if (test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags)) {
538 /* the executor is holding our only reference on the item, so
539 * we merely need to wait for it to finish executing */
540 clear_bit(SLOW_WORK_PENDING, &work->flags);
543 spin_unlock_irq(&slow_work_queue_lock);
545 /* the EXECUTING flag is set by the executor whilst the spinlock is set
546 * and before the item is dequeued - so assuming the above doesn't
547 * actually dequeue it, simply waiting for the EXECUTING flag to be
548 * released here should be sufficient */
550 wait_on_bit(&work->flags, SLOW_WORK_EXECUTING, slow_work_wait,
551 TASK_UNINTERRUPTIBLE);
553 clear_bit(SLOW_WORK_CANCELLING, &work->flags);
555 slow_work_put_ref(work);
557 EXPORT_SYMBOL(slow_work_cancel);
560 * Handle expiry of the delay timer, indicating that a delayed slow work item
561 * should now be queued if not cancelled
563 static void delayed_slow_work_timer(unsigned long data)
565 wait_queue_head_t *wfo_wq;
566 struct list_head *queue;
567 struct slow_work *work = (struct slow_work *) data;
569 bool queued = false, put = false, first = false;
571 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
572 wfo_wq = &vslow_work_queue_waits_for_occupation;
573 queue = &vslow_work_queue;
575 wfo_wq = &slow_work_queue_waits_for_occupation;
576 queue = &slow_work_queue;
579 spin_lock_irqsave(&slow_work_queue_lock, flags);
580 if (likely(!test_bit(SLOW_WORK_CANCELLING, &work->flags))) {
581 clear_bit(SLOW_WORK_DELAYED, &work->flags);
583 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
584 /* we discard the reference the timer was holding in
585 * favour of the one the executor holds */
586 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
589 slow_work_mark_time(work);
590 list_add_tail(&work->link, queue);
592 if (work->link.prev == queue)
597 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
599 slow_work_put_ref(work);
603 wake_up(&slow_work_thread_wq);
607 * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
608 * @dwork: The delayed work item to queue
609 * @delay: When to start executing the work, in jiffies from now
611 * This is similar to slow_work_enqueue(), but it adds a delay before the work
612 * is actually queued for processing.
614 * The item can have delayed processing requested on it whilst it is being
615 * executed. The delay will begin immediately, and if it expires before the
616 * item finishes executing, the item will be placed back on the queue when it
617 * has done executing.
619 int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
622 struct slow_work *work = &dwork->work;
627 return slow_work_enqueue(&dwork->work);
629 BUG_ON(slow_work_user_count <= 0);
633 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
636 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
637 spin_lock_irqsave(&slow_work_queue_lock, flags);
639 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
642 /* the timer holds a reference whilst it is pending */
643 ret = work->ops->get_ref(work);
647 if (test_and_set_bit(SLOW_WORK_DELAYED, &work->flags))
649 dwork->timer.expires = jiffies + delay;
650 dwork->timer.data = (unsigned long) work;
651 dwork->timer.function = delayed_slow_work_timer;
652 add_timer(&dwork->timer);
654 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
662 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
665 EXPORT_SYMBOL(delayed_slow_work_enqueue);
668 * Schedule a cull of the thread pool at some time in the near future
670 static void slow_work_schedule_cull(void)
672 mod_timer(&slow_work_cull_timer,
673 round_jiffies(jiffies + SLOW_WORK_CULL_TIMEOUT));
677 * Worker thread culling algorithm
679 static bool slow_work_cull_thread(void)
682 bool do_cull = false;
684 spin_lock_irqsave(&slow_work_queue_lock, flags);
686 if (slow_work_cull) {
687 slow_work_cull = false;
689 if (list_empty(&slow_work_queue) &&
690 list_empty(&vslow_work_queue) &&
691 atomic_read(&slow_work_thread_count) >
692 slow_work_min_threads) {
693 slow_work_schedule_cull();
698 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
703 * Determine if there is slow work available for dispatch
705 static inline bool slow_work_available(int vsmax)
707 return !list_empty(&slow_work_queue) ||
708 (!list_empty(&vslow_work_queue) &&
709 atomic_read(&vslow_work_executing_count) < vsmax);
713 * Worker thread dispatcher
715 static int slow_work_thread(void *_data)
722 set_user_nice(current, -5);
724 /* allocate ourselves an ID */
725 spin_lock_irq(&slow_work_queue_lock);
726 id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
727 BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT);
728 __set_bit(id, slow_work_ids);
729 slow_work_set_thread_pid(id, current->pid);
730 spin_unlock_irq(&slow_work_queue_lock);
732 sprintf(current->comm, "kslowd%03u", id);
735 vsmax = vslow_work_proportion;
736 vsmax *= atomic_read(&slow_work_thread_count);
739 prepare_to_wait_exclusive(&slow_work_thread_wq, &wait,
741 if (!freezing(current) &&
742 !slow_work_threads_should_exit &&
743 !slow_work_available(vsmax) &&
746 finish_wait(&slow_work_thread_wq, &wait);
750 vsmax = vslow_work_proportion;
751 vsmax *= atomic_read(&slow_work_thread_count);
754 if (slow_work_available(vsmax) && slow_work_execute(id)) {
756 if (list_empty(&slow_work_queue) &&
757 list_empty(&vslow_work_queue) &&
758 atomic_read(&slow_work_thread_count) >
759 slow_work_min_threads)
760 slow_work_schedule_cull();
764 if (slow_work_threads_should_exit)
767 if (slow_work_cull && slow_work_cull_thread())
771 spin_lock_irq(&slow_work_queue_lock);
772 slow_work_set_thread_pid(id, 0);
773 __clear_bit(id, slow_work_ids);
774 spin_unlock_irq(&slow_work_queue_lock);
776 if (atomic_dec_and_test(&slow_work_thread_count))
777 complete_and_exit(&slow_work_last_thread_exited, 0);
782 * Handle thread cull timer expiration
784 static void slow_work_cull_timeout(unsigned long data)
786 slow_work_cull = true;
787 wake_up(&slow_work_thread_wq);
791 * Start a new slow work thread
793 static void slow_work_new_thread_execute(struct slow_work *work)
795 struct task_struct *p;
797 if (slow_work_threads_should_exit)
800 if (atomic_read(&slow_work_thread_count) >= slow_work_max_threads)
803 if (!mutex_trylock(&slow_work_user_lock))
806 slow_work_may_not_start_new_thread = true;
807 atomic_inc(&slow_work_thread_count);
808 p = kthread_run(slow_work_thread, NULL, "kslowd");
810 printk(KERN_DEBUG "Slow work thread pool: OOM\n");
811 if (atomic_dec_and_test(&slow_work_thread_count))
812 BUG(); /* we're running on a slow work thread... */
813 mod_timer(&slow_work_oom_timer,
814 round_jiffies(jiffies + SLOW_WORK_OOM_TIMEOUT));
816 /* ratelimit the starting of new threads */
817 mod_timer(&slow_work_oom_timer, jiffies + 1);
820 mutex_unlock(&slow_work_user_lock);
823 static const struct slow_work_ops slow_work_new_thread_ops = {
824 .owner = THIS_MODULE,
825 .execute = slow_work_new_thread_execute,
826 #ifdef CONFIG_SLOW_WORK_DEBUG
827 .desc = slow_work_new_thread_desc,
832 * post-OOM new thread start suppression expiration
834 static void slow_work_oom_timeout(unsigned long data)
836 slow_work_may_not_start_new_thread = false;
841 * Handle adjustment of the minimum number of threads
843 static int slow_work_min_threads_sysctl(struct ctl_table *table, int write,
845 size_t *lenp, loff_t *ppos)
847 int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
851 mutex_lock(&slow_work_user_lock);
852 if (slow_work_user_count > 0) {
853 /* see if we need to start or stop threads */
854 n = atomic_read(&slow_work_thread_count) -
855 slow_work_min_threads;
857 if (n < 0 && !slow_work_may_not_start_new_thread)
858 slow_work_enqueue(&slow_work_new_thread);
860 slow_work_schedule_cull();
862 mutex_unlock(&slow_work_user_lock);
869 * Handle adjustment of the maximum number of threads
871 static int slow_work_max_threads_sysctl(struct ctl_table *table, int write,
873 size_t *lenp, loff_t *ppos)
875 int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
879 mutex_lock(&slow_work_user_lock);
880 if (slow_work_user_count > 0) {
881 /* see if we need to stop threads */
882 n = slow_work_max_threads -
883 atomic_read(&slow_work_thread_count);
886 slow_work_schedule_cull();
888 mutex_unlock(&slow_work_user_lock);
893 #endif /* CONFIG_SYSCTL */
896 * slow_work_register_user - Register a user of the facility
897 * @module: The module about to make use of the facility
899 * Register a user of the facility, starting up the initial threads if there
900 * aren't any other users at this point. This will return 0 if successful, or
903 int slow_work_register_user(struct module *module)
905 struct task_struct *p;
908 mutex_lock(&slow_work_user_lock);
910 if (slow_work_user_count == 0) {
911 printk(KERN_NOTICE "Slow work thread pool: Starting up\n");
912 init_completion(&slow_work_last_thread_exited);
914 slow_work_threads_should_exit = false;
915 slow_work_init(&slow_work_new_thread,
916 &slow_work_new_thread_ops);
917 slow_work_may_not_start_new_thread = false;
918 slow_work_cull = false;
920 /* start the minimum number of threads */
921 for (loop = 0; loop < slow_work_min_threads; loop++) {
922 atomic_inc(&slow_work_thread_count);
923 p = kthread_run(slow_work_thread, NULL, "kslowd");
927 printk(KERN_NOTICE "Slow work thread pool: Ready\n");
930 slow_work_user_count++;
931 mutex_unlock(&slow_work_user_lock);
935 if (atomic_dec_and_test(&slow_work_thread_count))
936 complete(&slow_work_last_thread_exited);
938 printk(KERN_ERR "Slow work thread pool:"
939 " Aborting startup on ENOMEM\n");
940 slow_work_threads_should_exit = true;
941 wake_up_all(&slow_work_thread_wq);
942 wait_for_completion(&slow_work_last_thread_exited);
943 printk(KERN_ERR "Slow work thread pool: Aborted\n");
945 mutex_unlock(&slow_work_user_lock);
948 EXPORT_SYMBOL(slow_work_register_user);
951 * wait for all outstanding items from the calling module to complete
952 * - note that more items may be queued whilst we're waiting
954 static void slow_work_wait_for_items(struct module *module)
956 #ifdef CONFIG_MODULES
957 DECLARE_WAITQUEUE(myself, current);
958 struct slow_work *work;
961 mutex_lock(&slow_work_unreg_sync_lock);
962 add_wait_queue(&slow_work_unreg_wq, &myself);
965 spin_lock_irq(&slow_work_queue_lock);
967 /* first of all, we wait for the last queued item in each list
969 list_for_each_entry_reverse(work, &vslow_work_queue, link) {
970 if (work->owner == module) {
971 set_current_state(TASK_UNINTERRUPTIBLE);
972 slow_work_unreg_work_item = work;
976 list_for_each_entry_reverse(work, &slow_work_queue, link) {
977 if (work->owner == module) {
978 set_current_state(TASK_UNINTERRUPTIBLE);
979 slow_work_unreg_work_item = work;
984 /* then we wait for the items being processed to finish */
985 slow_work_unreg_module = module;
987 for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) {
988 if (slow_work_thread_processing[loop] == module)
991 spin_unlock_irq(&slow_work_queue_lock);
992 break; /* okay, we're done */
995 spin_unlock_irq(&slow_work_queue_lock);
997 slow_work_unreg_work_item = NULL;
998 slow_work_unreg_module = NULL;
1001 remove_wait_queue(&slow_work_unreg_wq, &myself);
1002 mutex_unlock(&slow_work_unreg_sync_lock);
1003 #endif /* CONFIG_MODULES */
1007 * slow_work_unregister_user - Unregister a user of the facility
1008 * @module: The module whose items should be cleared
1010 * Unregister a user of the facility, killing all the threads if this was the
1013 * This waits for all the work items belonging to the nominated module to go
1014 * away before proceeding.
1016 void slow_work_unregister_user(struct module *module)
1018 /* first of all, wait for all outstanding items from the calling module
1021 slow_work_wait_for_items(module);
1023 /* then we can actually go about shutting down the facility if need
1025 mutex_lock(&slow_work_user_lock);
1027 BUG_ON(slow_work_user_count <= 0);
1029 slow_work_user_count--;
1030 if (slow_work_user_count == 0) {
1031 printk(KERN_NOTICE "Slow work thread pool: Shutting down\n");
1032 slow_work_threads_should_exit = true;
1033 del_timer_sync(&slow_work_cull_timer);
1034 del_timer_sync(&slow_work_oom_timer);
1035 wake_up_all(&slow_work_thread_wq);
1036 wait_for_completion(&slow_work_last_thread_exited);
1037 printk(KERN_NOTICE "Slow work thread pool:"
1038 " Shut down complete\n");
1041 mutex_unlock(&slow_work_user_lock);
1043 EXPORT_SYMBOL(slow_work_unregister_user);
1046 * Initialise the slow work facility
1048 static int __init init_slow_work(void)
1050 unsigned nr_cpus = num_possible_cpus();
1052 if (slow_work_max_threads < nr_cpus)
1053 slow_work_max_threads = nr_cpus;
1054 #ifdef CONFIG_SYSCTL
1055 if (slow_work_max_max_threads < nr_cpus * 2)
1056 slow_work_max_max_threads = nr_cpus * 2;
1058 #ifdef CONFIG_SLOW_WORK_DEBUG
1060 struct dentry *dbdir;
1062 dbdir = debugfs_create_dir("slow_work", NULL);
1063 if (dbdir && !IS_ERR(dbdir))
1064 debugfs_create_file("runqueue", S_IFREG | 0400, dbdir,
1065 NULL, &slow_work_runqueue_fops);
1071 subsys_initcall(init_slow_work);