]> git.karo-electronics.de Git - karo-tx-linux.git/commitdiff
srcu: Use rcu_segcblist to track SRCU callbacks
authorPaul E. McKenney <paulmck@linux.vnet.ibm.com>
Mon, 13 Mar 2017 23:48:18 +0000 (16:48 -0700)
committerPaul E. McKenney <paulmck@linux.vnet.ibm.com>
Tue, 18 Apr 2017 18:38:20 +0000 (11:38 -0700)
This commit switches SRCU from custom-built callback queues to the new
rcu_segcblist structure.  This change associates grace-period sequence
numbers with groups of callbacks, which will be needed for efficient
processing of per-CPU callbacks.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
include/linux/rcu_segcblist.h [moved from kernel/rcu/rcu_segcblist.h with 98% similarity]
include/linux/srcu.h
kernel/rcu/rcu.h
kernel/rcu/srcu.c
kernel/rcu/tree.h

similarity index 98%
rename from kernel/rcu/rcu_segcblist.h
rename to include/linux/rcu_segcblist.h
index 982e3e05b22a2054d1a200029a134525ad1da719..74b1e7243955dbdde705b969dcccdab68bab43d9 100644 (file)
@@ -92,7 +92,6 @@ static inline struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp)
        rhp = rclp->head;
        if (!rhp)
                return NULL;
-       prefetch(rhp);
        rclp->len--;
        rclp->head = rhp->next;
        if (!rclp->head)
@@ -175,6 +174,15 @@ struct rcu_segcblist {
        long len_lazy;
 };
 
+#define RCU_SEGCBLIST_INITIALIZER(n) \
+{ \
+       .head = NULL, \
+       .tails[RCU_DONE_TAIL] = &n.head, \
+       .tails[RCU_WAIT_TAIL] = &n.head, \
+       .tails[RCU_NEXT_READY_TAIL] = &n.head, \
+       .tails[RCU_NEXT_TAIL] = &n.head, \
+}
+
 /*
  * Initialize an rcu_segcblist structure.
  */
index 047ac8c28a4e2b9207e78ede1a3d15be2c263c68..ad154a7bc114f7c216a509c0498fc483d6f64c4d 100644 (file)
@@ -22,7 +22,7 @@
  *        Lai Jiangshan <laijs@cn.fujitsu.com>
  *
  * For detailed explanation of Read-Copy Update mechanism see -
- *             Documentation/RCU/ *.txt
+ *             Documentation/RCU/ *.txt
  *
  */
 
 #include <linux/mutex.h>
 #include <linux/rcupdate.h>
 #include <linux/workqueue.h>
+#include <linux/rcu_segcblist.h>
 
 struct srcu_array {
        unsigned long lock_count[2];
        unsigned long unlock_count[2];
 };
 
-struct rcu_batch {
-       struct rcu_head *head, **tail;
-};
-
-#define RCU_BATCH_INIT(name) { NULL, &(name.head) }
-
 struct srcu_struct {
        unsigned long completed;
        unsigned long srcu_gp_seq;
        struct srcu_array __percpu *per_cpu_ref;
-       spinlock_t queue_lock; /* protect ->batch_queue, ->running */
+       spinlock_t queue_lock; /* protect ->srcu_cblist, ->srcu_state */
        int srcu_state;
-       /* callbacks just queued */
-       struct rcu_batch batch_queue;
-       /* callbacks try to do the first check_zero */
-       struct rcu_batch batch_check0;
-       /* callbacks done with the first check_zero and the flip */
-       struct rcu_batch batch_check1;
-       struct rcu_batch batch_done;
+       struct rcu_segcblist srcu_cblist;
        struct delayed_work work;
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
        struct lockdep_map dep_map;
@@ -97,10 +86,7 @@ void process_srcu(struct work_struct *work);
                .per_cpu_ref = &name##_srcu_array,                      \
                .queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock),    \
                .srcu_state = SRCU_STATE_IDLE,                          \
-               .batch_queue = RCU_BATCH_INIT(name.batch_queue),        \
-               .batch_check0 = RCU_BATCH_INIT(name.batch_check0),      \
-               .batch_check1 = RCU_BATCH_INIT(name.batch_check1),      \
-               .batch_done = RCU_BATCH_INIT(name.batch_done),          \
+               .srcu_cblist = RCU_SEGCBLIST_INITIALIZER(name.srcu_cblist),\
                .work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\
                __SRCU_DEP_MAP_INIT(name)                               \
        }
index 0bc1313c49e2d6ec975675c3f7dcee19c8ebe3c9..a943b42a9cf796f60e94ae59dbf2070ddd0210d7 100644 (file)
@@ -87,6 +87,12 @@ static inline unsigned long rcu_seq_snap(unsigned long *sp)
        return s;
 }
 
+/* Return the current value the update side's sequence number, no ordering. */
+static inline unsigned long rcu_seq_current(unsigned long *sp)
+{
+       return READ_ONCE(*sp);
+}
+
 /*
  * Given a snapshot from rcu_seq_snap(), determine whether or not a
  * full update-side operation has occurred.
index d464986d82b6931c18ef172835d8db1ef1a90064..56fd30862122557bcea5f72d627e8399853b47c5 100644 (file)
@@ -22,7 +22,7 @@
  *        Lai Jiangshan <laijs@cn.fujitsu.com>
  *
  * For detailed explanation of Read-Copy Update mechanism see -
- *             Documentation/RCU/ *.txt
+ *             Documentation/RCU/ *.txt
  *
  */
 
 
 #include "rcu.h"
 
-/*
- * Initialize an rcu_batch structure to empty.
- */
-static inline void rcu_batch_init(struct rcu_batch *b)
-{
-       b->head = NULL;
-       b->tail = &b->head;
-}
-
-/*
- * Enqueue a callback onto the tail of the specified rcu_batch structure.
- */
-static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head)
-{
-       *b->tail = head;
-       b->tail = &head->next;
-}
-
-/*
- * Is the specified rcu_batch structure empty?
- */
-static inline bool rcu_batch_empty(struct rcu_batch *b)
-{
-       return b->tail == &b->head;
-}
-
-/*
- * Are all batches empty for the specified srcu_struct?
- */
-static inline bool rcu_all_batches_empty(struct srcu_struct *sp)
-{
-       return rcu_batch_empty(&sp->batch_done) &&
-              rcu_batch_empty(&sp->batch_check1) &&
-              rcu_batch_empty(&sp->batch_check0) &&
-              rcu_batch_empty(&sp->batch_queue);
-}
-
-/*
- * Remove the callback at the head of the specified rcu_batch structure
- * and return a pointer to it, or return NULL if the structure is empty.
- */
-static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b)
-{
-       struct rcu_head *head;
-
-       if (rcu_batch_empty(b))
-               return NULL;
-
-       head = b->head;
-       b->head = head->next;
-       if (b->tail == &head->next)
-               rcu_batch_init(b);
-
-       return head;
-}
-
-/*
- * Move all callbacks from the rcu_batch structure specified by "from" to
- * the structure specified by "to".
- */
-static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from)
-{
-       if (!rcu_batch_empty(from)) {
-               *to->tail = from->head;
-               to->tail = from->tail;
-               rcu_batch_init(from);
-       }
-}
-
 static int init_srcu_struct_fields(struct srcu_struct *sp)
 {
        sp->completed = 0;
        sp->srcu_gp_seq = 0;
        spin_lock_init(&sp->queue_lock);
        sp->srcu_state = SRCU_STATE_IDLE;
-       rcu_batch_init(&sp->batch_queue);
-       rcu_batch_init(&sp->batch_check0);
-       rcu_batch_init(&sp->batch_check1);
-       rcu_batch_init(&sp->batch_done);
+       rcu_segcblist_init(&sp->srcu_cblist);
        INIT_DELAYED_WORK(&sp->work, process_srcu);
        sp->per_cpu_ref = alloc_percpu(struct srcu_array);
        return sp->per_cpu_ref ? 0 : -ENOMEM;
@@ -268,7 +196,7 @@ void cleanup_srcu_struct(struct srcu_struct *sp)
 {
        if (WARN_ON(srcu_readers_active(sp)))
                return; /* Leakage unless caller handles error. */
-       if (WARN_ON(!rcu_all_batches_empty(sp)))
+       if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist)))
                return; /* Leakage unless caller handles error. */
        flush_delayed_work(&sp->work);
        if (WARN_ON(READ_ONCE(sp->srcu_state) != SRCU_STATE_IDLE))
@@ -324,6 +252,8 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock);
  */
 static void srcu_gp_start(struct srcu_struct *sp)
 {
+       rcu_segcblist_accelerate(&sp->srcu_cblist,
+                                rcu_seq_snap(&sp->srcu_gp_seq));
        WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1);
        rcu_seq_start(&sp->srcu_gp_seq);
 }
@@ -371,6 +301,11 @@ static void srcu_gp_end(struct srcu_struct *sp)
 {
        rcu_seq_end(&sp->srcu_gp_seq);
        WRITE_ONCE(sp->srcu_state, SRCU_STATE_DONE);
+
+       spin_lock_irq(&sp->queue_lock);
+       rcu_segcblist_advance(&sp->srcu_cblist,
+                             rcu_seq_current(&sp->srcu_gp_seq));
+       spin_unlock_irq(&sp->queue_lock);
 }
 
 /*
@@ -409,7 +344,7 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
        head->func = func;
        spin_lock_irqsave(&sp->queue_lock, flags);
        smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
-       rcu_batch_queue(&sp->batch_queue, head);
+       rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
        if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) {
                srcu_gp_start(sp);
                queue_delayed_work(system_power_efficient_wq, &sp->work, 0);
@@ -445,13 +380,13 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
        smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
        if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) {
                /* steal the processing owner */
+               rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
                srcu_gp_start(sp);
-               rcu_batch_queue(&sp->batch_check0, head);
                spin_unlock_irq(&sp->queue_lock);
                /* give the processing owner to work_struct */
                srcu_reschedule(sp, 0);
        } else {
-               rcu_batch_queue(&sp->batch_queue, head);
+               rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
                spin_unlock_irq(&sp->queue_lock);
        }
 
@@ -548,19 +483,6 @@ EXPORT_SYMBOL_GPL(srcu_batches_completed);
 #define SRCU_CALLBACK_BATCH    10
 #define SRCU_INTERVAL          1
 
-/*
- * Move any new SRCU callbacks to the first stage of the SRCU grace
- * period pipeline.
- */
-static void srcu_collect_new(struct srcu_struct *sp)
-{
-       if (!rcu_batch_empty(&sp->batch_queue)) {
-               spin_lock_irq(&sp->queue_lock);
-               rcu_batch_move(&sp->batch_check0, &sp->batch_queue);
-               spin_unlock_irq(&sp->queue_lock);
-       }
-}
-
 /*
  * Core SRCU state machine.  Advance callbacks from ->batch_check0 to
  * ->batch_check1 and then to ->batch_done as readers drain.
@@ -586,26 +508,7 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
                idx = 1 ^ (sp->completed & 1);
                if (!try_check_zero(sp, idx, trycount))
                        return; /* readers present, retry after SRCU_INTERVAL */
-
-               /*
-                * The callbacks in ->batch_check1 have already done
-                * with their first zero check and flip back when they were
-                * enqueued on ->batch_check0 in a previous invocation of
-                * srcu_advance_batches().  (Presumably try_check_zero()
-                * returned false during that invocation, leaving the
-                * callbacks stranded on ->batch_check1.) They are therefore
-                * ready to invoke, so move them to ->batch_done.
-                */
-               rcu_batch_move(&sp->batch_done, &sp->batch_check1);
                srcu_flip(sp);
-
-               /*
-                * The callbacks in ->batch_check0 just finished their
-                * first check zero and flip, so move them to ->batch_check1
-                * for future checking on the other idx.
-                */
-               rcu_batch_move(&sp->batch_check1, &sp->batch_check0);
-
                WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN2);
        }
 
@@ -619,14 +522,6 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
                trycount = trycount < 2 ? 2 : trycount;
                if (!try_check_zero(sp, idx, trycount))
                        return; /* readers present, retry after SRCU_INTERVAL */
-
-               /*
-                * The callbacks in ->batch_check1 have now waited for
-                * all pre-existing readers using both idx values.  They are
-                * therefore ready to invoke, so move them to ->batch_done.
-                */
-               rcu_batch_move(&sp->batch_done, &sp->batch_check1);
-
                srcu_gp_end(sp);
        }
 }
@@ -639,17 +534,26 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
  */
 static void srcu_invoke_callbacks(struct srcu_struct *sp)
 {
-       int i;
-       struct rcu_head *head;
+       struct rcu_cblist ready_cbs;
+       struct rcu_head *rhp;
 
-       for (i = 0; i < SRCU_CALLBACK_BATCH; i++) {
-               head = rcu_batch_dequeue(&sp->batch_done);
-               if (!head)
-                       break;
+       spin_lock_irq(&sp->queue_lock);
+       if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) {
+               spin_unlock_irq(&sp->queue_lock);
+               return;
+       }
+       rcu_cblist_init(&ready_cbs);
+       rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs);
+       spin_unlock_irq(&sp->queue_lock);
+       rhp = rcu_cblist_dequeue(&ready_cbs);
+       for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) {
                local_bh_disable();
-               head->func(head);
+               rhp->func(rhp);
                local_bh_enable();
        }
+       spin_lock_irq(&sp->queue_lock);
+       rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs);
+       spin_unlock_irq(&sp->queue_lock);
 }
 
 /*
@@ -660,9 +564,9 @@ static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
 {
        bool pending = true;
 
-       if (rcu_all_batches_empty(sp)) {
+       if (rcu_segcblist_empty(&sp->srcu_cblist)) {
                spin_lock_irq(&sp->queue_lock);
-               if (rcu_all_batches_empty(sp) &&
+               if (rcu_segcblist_empty(&sp->srcu_cblist) &&
                    READ_ONCE(sp->srcu_state) == SRCU_STATE_DONE) {
                        WRITE_ONCE(sp->srcu_state, SRCU_STATE_IDLE);
                        pending = false;
@@ -683,7 +587,6 @@ void process_srcu(struct work_struct *work)
 
        sp = container_of(work, struct srcu_struct, work.work);
 
-       srcu_collect_new(sp);
        srcu_advance_batches(sp, 1);
        srcu_invoke_callbacks(sp);
        srcu_reschedule(sp, SRCU_INTERVAL);
index 93889ff21dbb9d216e17d5fb10bfa1b5a9efd835..4f62651588ea64ca7f0c67db25414c00352c2820 100644 (file)
@@ -30,7 +30,7 @@
 #include <linux/seqlock.h>
 #include <linux/swait.h>
 #include <linux/stop_machine.h>
-#include "rcu_segcblist.h"
+#include <linux/rcu_segcblist.h>
 
 /*
  * Define shape of hierarchy based on NR_CPUS, CONFIG_RCU_FANOUT, and