]> git.karo-electronics.de Git - linux-beck.git/commitdiff
padata: simplify serialization mechanism
authorSteffen Klassert <steffen.klassert@secunet.com>
Wed, 7 Jul 2010 13:32:39 +0000 (15:32 +0200)
committerHerbert Xu <herbert@gondor.apana.org.au>
Wed, 14 Jul 2010 12:29:30 +0000 (20:29 +0800)
We count the number of processed objects on a percpu basis,
so we need to go through all the percpu reorder queues to calculate
the sequence number of the next object that needs serialization.
This patch changes this to count the number of processed objects
global. So we can calculate the sequence number and the percpu
reorder queue of the next object that needs serialization without
searching through the percpu reorder queues. This avoids some
accesses to memory of foreign cpus.

Signed-off-by: Steffen Klassert <steffen.klassert@secunet.com>
Signed-off-by: Herbert Xu <herbert@gondor.apana.org.au>
include/linux/padata.h
kernel/padata.c

index e4c17f9b7c9e2b03e76e8712c5d4566974ea2e3f..8844b851191e618a95343a94129effb9ded2ef8f 100644 (file)
@@ -67,7 +67,6 @@ struct padata_list {
  * @pwork: work struct for parallelization.
  * @swork: work struct for serialization.
  * @pd: Backpointer to the internal control structure.
- * @num_obj: Number of objects that are processed by this cpu.
  * @cpu_index: Index of the cpu.
  */
 struct padata_queue {
@@ -77,7 +76,6 @@ struct padata_queue {
        struct work_struct      pwork;
        struct work_struct      swork;
        struct parallel_data    *pd;
-       atomic_t                num_obj;
        int                     cpu_index;
 };
 
@@ -93,6 +91,7 @@ struct padata_queue {
  * @max_seq_nr:  Maximal used sequence number.
  * @cpumask: cpumask in use.
  * @lock: Reorder lock.
+ * @processed: Number of already processed objects.
  * @timer: Reorder timer.
  */
 struct parallel_data {
@@ -103,7 +102,8 @@ struct parallel_data {
        atomic_t                refcnt;
        unsigned int            max_seq_nr;
        cpumask_var_t           cpumask;
-       spinlock_t              lock;
+       spinlock_t              lock ____cacheline_aligned;
+       unsigned int            processed;
        struct timer_list       timer;
 };
 
index ae8defcf062263d197d40c863a0f3d535a8418f1..450d67d394b0fca3f99c3a9af233c46cde74f82e 100644 (file)
@@ -170,79 +170,47 @@ EXPORT_SYMBOL(padata_do_parallel);
  */
 static struct padata_priv *padata_get_next(struct parallel_data *pd)
 {
-       int cpu, num_cpus, empty, calc_seq_nr;
-       int seq_nr, next_nr, overrun, next_overrun;
+       int cpu, num_cpus;
+       int next_nr, next_index;
        struct padata_queue *queue, *next_queue;
        struct padata_priv *padata;
        struct padata_list *reorder;
 
-       empty = 0;
-       next_nr = -1;
-       next_overrun = 0;
-       next_queue = NULL;
-
        num_cpus = cpumask_weight(pd->cpumask);
 
-       for_each_cpu(cpu, pd->cpumask) {
-               queue = per_cpu_ptr(pd->queue, cpu);
-               reorder = &queue->reorder;
-
-               /*
-                * Calculate the seq_nr of the object that should be
-                * next in this reorder queue.
-                */
-               overrun = 0;
-               calc_seq_nr = (atomic_read(&queue->num_obj) * num_cpus)
-                              + queue->cpu_index;
-
-               if (unlikely(calc_seq_nr > pd->max_seq_nr)) {
-                       calc_seq_nr = calc_seq_nr - pd->max_seq_nr - 1;
-                       overrun = 1;
-               }
-
-               if (!list_empty(&reorder->list)) {
-                       padata = list_entry(reorder->list.next,
-                                           struct padata_priv, list);
-
-                       seq_nr  = padata->seq_nr;
-                       BUG_ON(calc_seq_nr != seq_nr);
-               } else {
-                       seq_nr = calc_seq_nr;
-                       empty++;
-               }
-
-               if (next_nr < 0 || seq_nr < next_nr
-                   || (next_overrun && !overrun)) {
-                       next_nr = seq_nr;
-                       next_overrun = overrun;
-                       next_queue = queue;
-               }
+       /*
+        * Calculate the percpu reorder queue and the sequence
+        * number of the next object.
+        */
+       next_nr = pd->processed;
+       next_index = next_nr % num_cpus;
+       cpu = padata_index_to_cpu(pd, next_index);
+       next_queue = per_cpu_ptr(pd->queue, cpu);
+
+       if (unlikely(next_nr > pd->max_seq_nr)) {
+               next_nr = next_nr - pd->max_seq_nr - 1;
+               next_index = next_nr % num_cpus;
+               cpu = padata_index_to_cpu(pd, next_index);
+               next_queue = per_cpu_ptr(pd->queue, cpu);
+               pd->processed = 0;
        }
 
        padata = NULL;
 
-       if (empty == num_cpus)
-               goto out;
-
        reorder = &next_queue->reorder;
 
        if (!list_empty(&reorder->list)) {
                padata = list_entry(reorder->list.next,
                                    struct padata_priv, list);
 
-               if (unlikely(next_overrun)) {
-                       for_each_cpu(cpu, pd->cpumask) {
-                               queue = per_cpu_ptr(pd->queue, cpu);
-                               atomic_set(&queue->num_obj, 0);
-                       }
-               }
+               BUG_ON(next_nr != padata->seq_nr);
 
                spin_lock(&reorder->lock);
                list_del_init(&padata->list);
                atomic_dec(&pd->reorder_objects);
                spin_unlock(&reorder->lock);
 
-               atomic_inc(&next_queue->num_obj);
+               pd->processed++;
 
                goto out;
        }
@@ -430,7 +398,6 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
 
                INIT_WORK(&queue->pwork, padata_parallel_worker);
                INIT_WORK(&queue->swork, padata_serial_worker);
-               atomic_set(&queue->num_obj, 0);
        }
 
        num_cpus = cpumask_weight(pd->cpumask);