From 8660b7d8a545227fd9ee80508aa82528ea9947d7 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Mon, 13 Mar 2017 16:48:18 -0700 Subject: [PATCH] srcu: Use rcu_segcblist to track SRCU callbacks This commit switches SRCU from custom-built callback queues to the new rcu_segcblist structure. This change associates grace-period sequence numbers with groups of callbacks, which will be needed for efficient processing of per-CPU callbacks. Signed-off-by: Paul E. McKenney --- {kernel/rcu => include/linux}/rcu_segcblist.h | 10 +- include/linux/srcu.h | 24 +-- kernel/rcu/rcu.h | 6 + kernel/rcu/srcu.c | 159 ++++-------------- kernel/rcu/tree.h | 2 +- 5 files changed, 52 insertions(+), 149 deletions(-) rename {kernel/rcu => include/linux}/rcu_segcblist.h (98%) diff --git a/kernel/rcu/rcu_segcblist.h b/include/linux/rcu_segcblist.h similarity index 98% rename from kernel/rcu/rcu_segcblist.h rename to include/linux/rcu_segcblist.h index 982e3e05b22a..74b1e7243955 100644 --- a/kernel/rcu/rcu_segcblist.h +++ b/include/linux/rcu_segcblist.h @@ -92,7 +92,6 @@ static inline struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp) rhp = rclp->head; if (!rhp) return NULL; - prefetch(rhp); rclp->len--; rclp->head = rhp->next; if (!rclp->head) @@ -175,6 +174,15 @@ struct rcu_segcblist { long len_lazy; }; +#define RCU_SEGCBLIST_INITIALIZER(n) \ +{ \ + .head = NULL, \ + .tails[RCU_DONE_TAIL] = &n.head, \ + .tails[RCU_WAIT_TAIL] = &n.head, \ + .tails[RCU_NEXT_READY_TAIL] = &n.head, \ + .tails[RCU_NEXT_TAIL] = &n.head, \ +} + /* * Initialize an rcu_segcblist structure. */ diff --git a/include/linux/srcu.h b/include/linux/srcu.h index 047ac8c28a4e..ad154a7bc114 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -22,7 +22,7 @@ * Lai Jiangshan * * For detailed explanation of Read-Copy Update mechanism see - - * Documentation/RCU/ *.txt + * Documentation/RCU/ *.txt * */ @@ -32,31 +32,20 @@ #include #include #include +#include struct srcu_array { unsigned long lock_count[2]; unsigned long unlock_count[2]; }; -struct rcu_batch { - struct rcu_head *head, **tail; -}; - -#define RCU_BATCH_INIT(name) { NULL, &(name.head) } - struct srcu_struct { unsigned long completed; unsigned long srcu_gp_seq; struct srcu_array __percpu *per_cpu_ref; - spinlock_t queue_lock; /* protect ->batch_queue, ->running */ + spinlock_t queue_lock; /* protect ->srcu_cblist, ->srcu_state */ int srcu_state; - /* callbacks just queued */ - struct rcu_batch batch_queue; - /* callbacks try to do the first check_zero */ - struct rcu_batch batch_check0; - /* callbacks done with the first check_zero and the flip */ - struct rcu_batch batch_check1; - struct rcu_batch batch_done; + struct rcu_segcblist srcu_cblist; struct delayed_work work; #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map; @@ -97,10 +86,7 @@ void process_srcu(struct work_struct *work); .per_cpu_ref = &name##_srcu_array, \ .queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \ .srcu_state = SRCU_STATE_IDLE, \ - .batch_queue = RCU_BATCH_INIT(name.batch_queue), \ - .batch_check0 = RCU_BATCH_INIT(name.batch_check0), \ - .batch_check1 = RCU_BATCH_INIT(name.batch_check1), \ - .batch_done = RCU_BATCH_INIT(name.batch_done), \ + .srcu_cblist = RCU_SEGCBLIST_INITIALIZER(name.srcu_cblist),\ .work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\ __SRCU_DEP_MAP_INIT(name) \ } diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h index 0bc1313c49e2..a943b42a9cf7 100644 --- a/kernel/rcu/rcu.h +++ b/kernel/rcu/rcu.h @@ -87,6 +87,12 @@ static inline unsigned long rcu_seq_snap(unsigned long *sp) return s; } +/* Return the current value the update side's sequence number, no ordering. */ +static inline unsigned long rcu_seq_current(unsigned long *sp) +{ + return READ_ONCE(*sp); +} + /* * Given a snapshot from rcu_seq_snap(), determine whether or not a * full update-side operation has occurred. diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c index d464986d82b6..56fd30862122 100644 --- a/kernel/rcu/srcu.c +++ b/kernel/rcu/srcu.c @@ -22,7 +22,7 @@ * Lai Jiangshan * * For detailed explanation of Read-Copy Update mechanism see - - * Documentation/RCU/ *.txt + * Documentation/RCU/ *.txt * */ @@ -38,85 +38,13 @@ #include "rcu.h" -/* - * Initialize an rcu_batch structure to empty. - */ -static inline void rcu_batch_init(struct rcu_batch *b) -{ - b->head = NULL; - b->tail = &b->head; -} - -/* - * Enqueue a callback onto the tail of the specified rcu_batch structure. - */ -static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head) -{ - *b->tail = head; - b->tail = &head->next; -} - -/* - * Is the specified rcu_batch structure empty? - */ -static inline bool rcu_batch_empty(struct rcu_batch *b) -{ - return b->tail == &b->head; -} - -/* - * Are all batches empty for the specified srcu_struct? - */ -static inline bool rcu_all_batches_empty(struct srcu_struct *sp) -{ - return rcu_batch_empty(&sp->batch_done) && - rcu_batch_empty(&sp->batch_check1) && - rcu_batch_empty(&sp->batch_check0) && - rcu_batch_empty(&sp->batch_queue); -} - -/* - * Remove the callback at the head of the specified rcu_batch structure - * and return a pointer to it, or return NULL if the structure is empty. - */ -static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b) -{ - struct rcu_head *head; - - if (rcu_batch_empty(b)) - return NULL; - - head = b->head; - b->head = head->next; - if (b->tail == &head->next) - rcu_batch_init(b); - - return head; -} - -/* - * Move all callbacks from the rcu_batch structure specified by "from" to - * the structure specified by "to". - */ -static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from) -{ - if (!rcu_batch_empty(from)) { - *to->tail = from->head; - to->tail = from->tail; - rcu_batch_init(from); - } -} - static int init_srcu_struct_fields(struct srcu_struct *sp) { sp->completed = 0; sp->srcu_gp_seq = 0; spin_lock_init(&sp->queue_lock); sp->srcu_state = SRCU_STATE_IDLE; - rcu_batch_init(&sp->batch_queue); - rcu_batch_init(&sp->batch_check0); - rcu_batch_init(&sp->batch_check1); - rcu_batch_init(&sp->batch_done); + rcu_segcblist_init(&sp->srcu_cblist); INIT_DELAYED_WORK(&sp->work, process_srcu); sp->per_cpu_ref = alloc_percpu(struct srcu_array); return sp->per_cpu_ref ? 0 : -ENOMEM; @@ -268,7 +196,7 @@ void cleanup_srcu_struct(struct srcu_struct *sp) { if (WARN_ON(srcu_readers_active(sp))) return; /* Leakage unless caller handles error. */ - if (WARN_ON(!rcu_all_batches_empty(sp))) + if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist))) return; /* Leakage unless caller handles error. */ flush_delayed_work(&sp->work); if (WARN_ON(READ_ONCE(sp->srcu_state) != SRCU_STATE_IDLE)) @@ -324,6 +252,8 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); */ static void srcu_gp_start(struct srcu_struct *sp) { + rcu_segcblist_accelerate(&sp->srcu_cblist, + rcu_seq_snap(&sp->srcu_gp_seq)); WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1); rcu_seq_start(&sp->srcu_gp_seq); } @@ -371,6 +301,11 @@ static void srcu_gp_end(struct srcu_struct *sp) { rcu_seq_end(&sp->srcu_gp_seq); WRITE_ONCE(sp->srcu_state, SRCU_STATE_DONE); + + spin_lock_irq(&sp->queue_lock); + rcu_segcblist_advance(&sp->srcu_cblist, + rcu_seq_current(&sp->srcu_gp_seq)); + spin_unlock_irq(&sp->queue_lock); } /* @@ -409,7 +344,7 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head, head->func = func; spin_lock_irqsave(&sp->queue_lock, flags); smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ - rcu_batch_queue(&sp->batch_queue, head); + rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) { srcu_gp_start(sp); queue_delayed_work(system_power_efficient_wq, &sp->work, 0); @@ -445,13 +380,13 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount) smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) { /* steal the processing owner */ + rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); srcu_gp_start(sp); - rcu_batch_queue(&sp->batch_check0, head); spin_unlock_irq(&sp->queue_lock); /* give the processing owner to work_struct */ srcu_reschedule(sp, 0); } else { - rcu_batch_queue(&sp->batch_queue, head); + rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); spin_unlock_irq(&sp->queue_lock); } @@ -548,19 +483,6 @@ EXPORT_SYMBOL_GPL(srcu_batches_completed); #define SRCU_CALLBACK_BATCH 10 #define SRCU_INTERVAL 1 -/* - * Move any new SRCU callbacks to the first stage of the SRCU grace - * period pipeline. - */ -static void srcu_collect_new(struct srcu_struct *sp) -{ - if (!rcu_batch_empty(&sp->batch_queue)) { - spin_lock_irq(&sp->queue_lock); - rcu_batch_move(&sp->batch_check0, &sp->batch_queue); - spin_unlock_irq(&sp->queue_lock); - } -} - /* * Core SRCU state machine. Advance callbacks from ->batch_check0 to * ->batch_check1 and then to ->batch_done as readers drain. @@ -586,26 +508,7 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) idx = 1 ^ (sp->completed & 1); if (!try_check_zero(sp, idx, trycount)) return; /* readers present, retry after SRCU_INTERVAL */ - - /* - * The callbacks in ->batch_check1 have already done - * with their first zero check and flip back when they were - * enqueued on ->batch_check0 in a previous invocation of - * srcu_advance_batches(). (Presumably try_check_zero() - * returned false during that invocation, leaving the - * callbacks stranded on ->batch_check1.) They are therefore - * ready to invoke, so move them to ->batch_done. - */ - rcu_batch_move(&sp->batch_done, &sp->batch_check1); srcu_flip(sp); - - /* - * The callbacks in ->batch_check0 just finished their - * first check zero and flip, so move them to ->batch_check1 - * for future checking on the other idx. - */ - rcu_batch_move(&sp->batch_check1, &sp->batch_check0); - WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN2); } @@ -619,14 +522,6 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) trycount = trycount < 2 ? 2 : trycount; if (!try_check_zero(sp, idx, trycount)) return; /* readers present, retry after SRCU_INTERVAL */ - - /* - * The callbacks in ->batch_check1 have now waited for - * all pre-existing readers using both idx values. They are - * therefore ready to invoke, so move them to ->batch_done. - */ - rcu_batch_move(&sp->batch_done, &sp->batch_check1); - srcu_gp_end(sp); } } @@ -639,17 +534,26 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) */ static void srcu_invoke_callbacks(struct srcu_struct *sp) { - int i; - struct rcu_head *head; + struct rcu_cblist ready_cbs; + struct rcu_head *rhp; - for (i = 0; i < SRCU_CALLBACK_BATCH; i++) { - head = rcu_batch_dequeue(&sp->batch_done); - if (!head) - break; + spin_lock_irq(&sp->queue_lock); + if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) { + spin_unlock_irq(&sp->queue_lock); + return; + } + rcu_cblist_init(&ready_cbs); + rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs); + spin_unlock_irq(&sp->queue_lock); + rhp = rcu_cblist_dequeue(&ready_cbs); + for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) { local_bh_disable(); - head->func(head); + rhp->func(rhp); local_bh_enable(); } + spin_lock_irq(&sp->queue_lock); + rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs); + spin_unlock_irq(&sp->queue_lock); } /* @@ -660,9 +564,9 @@ static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay) { bool pending = true; - if (rcu_all_batches_empty(sp)) { + if (rcu_segcblist_empty(&sp->srcu_cblist)) { spin_lock_irq(&sp->queue_lock); - if (rcu_all_batches_empty(sp) && + if (rcu_segcblist_empty(&sp->srcu_cblist) && READ_ONCE(sp->srcu_state) == SRCU_STATE_DONE) { WRITE_ONCE(sp->srcu_state, SRCU_STATE_IDLE); pending = false; @@ -683,7 +587,6 @@ void process_srcu(struct work_struct *work) sp = container_of(work, struct srcu_struct, work.work); - srcu_collect_new(sp); srcu_advance_batches(sp, 1); srcu_invoke_callbacks(sp); srcu_reschedule(sp, SRCU_INTERVAL); diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h index 93889ff21dbb..4f62651588ea 100644 --- a/kernel/rcu/tree.h +++ b/kernel/rcu/tree.h @@ -30,7 +30,7 @@ #include #include #include -#include "rcu_segcblist.h" +#include /* * Define shape of hierarchy based on NR_CPUS, CONFIG_RCU_FANOUT, and -- 2.39.2