]> git.karo-electronics.de Git - karo-tx-linux.git/blobdiff - net/sunrpc/svc_xprt.c
sunrpc: only call test_bit once in svc_xprt_received
[karo-tx-linux.git] / net / sunrpc / svc_xprt.c
index c179ca2a5aa45b8f37c44a660f50fc4b04c08273..c69358b3cf7f3bb4f88b61f522e0aa5a5866059b 100644 (file)
@@ -15,6 +15,7 @@
 #include <linux/sunrpc/svcsock.h>
 #include <linux/sunrpc/xprt.h>
 #include <linux/module.h>
+#include <trace/events/sunrpc.h>
 
 #define RPCDBG_FACILITY        RPCDBG_SVCXPRT
 
@@ -219,9 +220,11 @@ static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl,
  */
 static void svc_xprt_received(struct svc_xprt *xprt)
 {
-       WARN_ON_ONCE(!test_bit(XPT_BUSY, &xprt->xpt_flags));
-       if (!test_bit(XPT_BUSY, &xprt->xpt_flags))
+       if (!test_bit(XPT_BUSY, &xprt->xpt_flags)) {
+               WARN_ONCE(1, "xprt=0x%p already busy!", xprt);
                return;
+       }
+
        /* As soon as we clear busy, the xprt could be closed and
         * 'put', so we need a reference to call svc_xprt_do_enqueue with:
         */
@@ -309,25 +312,6 @@ char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
 }
 EXPORT_SYMBOL_GPL(svc_print_addr);
 
-/*
- * Queue up an idle server thread.  Must have pool->sp_lock held.
- * Note: this is really a stack rather than a queue, so that we only
- * use as many different threads as we need, and the rest don't pollute
- * the cache.
- */
-static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
-{
-       list_add(&rqstp->rq_list, &pool->sp_threads);
-}
-
-/*
- * Dequeue an nfsd thread.  Must have pool->sp_lock held.
- */
-static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
-{
-       list_del(&rqstp->rq_list);
-}
-
 static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
 {
        if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
@@ -340,11 +324,12 @@ static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
 static void svc_xprt_do_enqueue(struct svc_xprt *xprt)
 {
        struct svc_pool *pool;
-       struct svc_rqst *rqstp;
+       struct svc_rqst *rqstp = NULL;
        int cpu;
+       bool queued = false;
 
        if (!svc_xprt_has_something_to_do(xprt))
-               return;
+               goto out;
 
        /* Mark transport as busy. It will remain in this state until
         * the provider calls svc_xprt_received. We update XPT_BUSY
@@ -354,43 +339,69 @@ static void svc_xprt_do_enqueue(struct svc_xprt *xprt)
        if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) {
                /* Don't enqueue transport while already enqueued */
                dprintk("svc: transport %p busy, not enqueued\n", xprt);
-               return;
+               goto out;
        }
 
        cpu = get_cpu();
        pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
-       spin_lock_bh(&pool->sp_lock);
 
-       pool->sp_stats.packets++;
-
-       if (!list_empty(&pool->sp_threads)) {
-               rqstp = list_entry(pool->sp_threads.next,
-                                  struct svc_rqst,
-                                  rq_list);
-               dprintk("svc: transport %p served by daemon %p\n",
-                       xprt, rqstp);
-               svc_thread_dequeue(pool, rqstp);
-               if (rqstp->rq_xprt)
-                       printk(KERN_ERR
-                               "svc_xprt_enqueue: server %p, rq_xprt=%p!\n",
-                               rqstp, rqstp->rq_xprt);
-               /* Note the order of the following 3 lines:
-                * We want to assign xprt to rqstp->rq_xprt only _after_
-                * we've woken up the process, so that we don't race with
-                * the lockless check in svc_get_next_xprt().
+       atomic_long_inc(&pool->sp_stats.packets);
+
+redo_search:
+       /* find a thread for this xprt */
+       rcu_read_lock();
+       list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
+               /* Do a lockless check first */
+               if (test_bit(RQ_BUSY, &rqstp->rq_flags))
+                       continue;
+
+               /*
+                * Once the xprt has been queued, it can only be dequeued by
+                * the task that intends to service it. All we can do at that
+                * point is to try to wake this thread back up so that it can
+                * do so.
                 */
-               svc_xprt_get(xprt);
+               if (!queued) {
+                       spin_lock_bh(&rqstp->rq_lock);
+                       if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) {
+                               /* already busy, move on... */
+                               spin_unlock_bh(&rqstp->rq_lock);
+                               continue;
+                       }
+
+                       /* this one will do */
+                       rqstp->rq_xprt = xprt;
+                       svc_xprt_get(xprt);
+                       spin_unlock_bh(&rqstp->rq_lock);
+               }
+               rcu_read_unlock();
+
+               atomic_long_inc(&pool->sp_stats.threads_woken);
                wake_up_process(rqstp->rq_task);
-               rqstp->rq_xprt = xprt;
-               pool->sp_stats.threads_woken++;
-       } else {
+               put_cpu();
+               goto out;
+       }
+       rcu_read_unlock();
+
+       /*
+        * We didn't find an idle thread to use, so we need to queue the xprt.
+        * Do so and then search again. If we find one, we can't hook this one
+        * up to it directly but we can wake the thread up in the hopes that it
+        * will pick it up once it searches for a xprt to service.
+        */
+       if (!queued) {
+               queued = true;
                dprintk("svc: transport %p put into queue\n", xprt);
+               spin_lock_bh(&pool->sp_lock);
                list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
                pool->sp_stats.sockets_queued++;
+               spin_unlock_bh(&pool->sp_lock);
+               goto redo_search;
        }
-
-       spin_unlock_bh(&pool->sp_lock);
+       rqstp = NULL;
        put_cpu();
+out:
+       trace_svc_xprt_do_enqueue(xprt, rqstp);
 }
 
 /*
@@ -407,22 +418,28 @@ void svc_xprt_enqueue(struct svc_xprt *xprt)
 EXPORT_SYMBOL_GPL(svc_xprt_enqueue);
 
 /*
- * Dequeue the first transport.  Must be called with the pool->sp_lock held.
+ * Dequeue the first transport, if there is one.
  */
 static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
 {
-       struct svc_xprt *xprt;
+       struct svc_xprt *xprt = NULL;
 
        if (list_empty(&pool->sp_sockets))
-               return NULL;
-
-       xprt = list_entry(pool->sp_sockets.next,
-                         struct svc_xprt, xpt_ready);
-       list_del_init(&xprt->xpt_ready);
+               goto out;
 
-       dprintk("svc: transport %p dequeued, inuse=%d\n",
-               xprt, atomic_read(&xprt->xpt_ref.refcount));
+       spin_lock_bh(&pool->sp_lock);
+       if (likely(!list_empty(&pool->sp_sockets))) {
+               xprt = list_first_entry(&pool->sp_sockets,
+                                       struct svc_xprt, xpt_ready);
+               list_del_init(&xprt->xpt_ready);
+               svc_xprt_get(xprt);
 
+               dprintk("svc: transport %p dequeued, inuse=%d\n",
+                       xprt, atomic_read(&xprt->xpt_ref.refcount));
+       }
+       spin_unlock_bh(&pool->sp_lock);
+out:
+       trace_svc_xprt_dequeue(xprt);
        return xprt;
 }
 
@@ -483,34 +500,36 @@ static void svc_xprt_release(struct svc_rqst *rqstp)
 }
 
 /*
- * External function to wake up a server waiting for data
- * This really only makes sense for services like lockd
- * which have exactly one thread anyway.
+ * Some svc_serv's will have occasional work to do, even when a xprt is not
+ * waiting to be serviced. This function is there to "kick" a task in one of
+ * those services so that it can wake up and do that work. Note that we only
+ * bother with pool 0 as we don't need to wake up more than one thread for
+ * this purpose.
  */
 void svc_wake_up(struct svc_serv *serv)
 {
        struct svc_rqst *rqstp;
-       unsigned int i;
        struct svc_pool *pool;
 
-       for (i = 0; i < serv->sv_nrpools; i++) {
-               pool = &serv->sv_pools[i];
+       pool = &serv->sv_pools[0];
 
-               spin_lock_bh(&pool->sp_lock);
-               if (!list_empty(&pool->sp_threads)) {
-                       rqstp = list_entry(pool->sp_threads.next,
-                                          struct svc_rqst,
-                                          rq_list);
-                       dprintk("svc: daemon %p woken up.\n", rqstp);
-                       /*
-                       svc_thread_dequeue(pool, rqstp);
-                       rqstp->rq_xprt = NULL;
-                        */
-                       wake_up_process(rqstp->rq_task);
-               } else
-                       pool->sp_task_pending = 1;
-               spin_unlock_bh(&pool->sp_lock);
+       rcu_read_lock();
+       list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
+               /* skip any that aren't queued */
+               if (test_bit(RQ_BUSY, &rqstp->rq_flags))
+                       continue;
+               rcu_read_unlock();
+               dprintk("svc: daemon %p woken up.\n", rqstp);
+               wake_up_process(rqstp->rq_task);
+               trace_svc_wake_up(rqstp->rq_task->pid);
+               return;
        }
+       rcu_read_unlock();
+
+       /* No free entries available */
+       set_bit(SP_TASK_PENDING, &pool->sp_flags);
+       smp_wmb();
+       trace_svc_wake_up(0);
 }
 EXPORT_SYMBOL_GPL(svc_wake_up);
 
@@ -621,75 +640,86 @@ static int svc_alloc_arg(struct svc_rqst *rqstp)
        return 0;
 }
 
+static bool
+rqst_should_sleep(struct svc_rqst *rqstp)
+{
+       struct svc_pool         *pool = rqstp->rq_pool;
+
+       /* did someone call svc_wake_up? */
+       if (test_and_clear_bit(SP_TASK_PENDING, &pool->sp_flags))
+               return false;
+
+       /* was a socket queued? */
+       if (!list_empty(&pool->sp_sockets))
+               return false;
+
+       /* are we shutting down? */
+       if (signalled() || kthread_should_stop())
+               return false;
+
+       /* are we freezing? */
+       if (freezing(current))
+               return false;
+
+       return true;
+}
+
 static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
 {
        struct svc_xprt *xprt;
        struct svc_pool         *pool = rqstp->rq_pool;
        long                    time_left = 0;
 
+       /* rq_xprt should be clear on entry */
+       WARN_ON_ONCE(rqstp->rq_xprt);
+
        /* Normally we will wait up to 5 seconds for any required
         * cache information to be provided.
         */
        rqstp->rq_chandle.thread_wait = 5*HZ;
 
-       spin_lock_bh(&pool->sp_lock);
        xprt = svc_xprt_dequeue(pool);
        if (xprt) {
                rqstp->rq_xprt = xprt;
-               svc_xprt_get(xprt);
 
                /* As there is a shortage of threads and this request
                 * had to be queued, don't allow the thread to wait so
                 * long for cache updates.
                 */
                rqstp->rq_chandle.thread_wait = 1*HZ;
-               pool->sp_task_pending = 0;
-       } else {
-               if (pool->sp_task_pending) {
-                       pool->sp_task_pending = 0;
-                       xprt = ERR_PTR(-EAGAIN);
-                       goto out;
-               }
-               /*
-                * We have to be able to interrupt this wait
-                * to bring down the daemons ...
-                */
-               set_current_state(TASK_INTERRUPTIBLE);
+               clear_bit(SP_TASK_PENDING, &pool->sp_flags);
+               return xprt;
+       }
 
-               /* No data pending. Go to sleep */
-               svc_thread_enqueue(pool, rqstp);
-               spin_unlock_bh(&pool->sp_lock);
+       /*
+        * We have to be able to interrupt this wait
+        * to bring down the daemons ...
+        */
+       set_current_state(TASK_INTERRUPTIBLE);
+       clear_bit(RQ_BUSY, &rqstp->rq_flags);
+       smp_mb();
 
-               if (!(signalled() || kthread_should_stop())) {
-                       time_left = schedule_timeout(timeout);
-                       __set_current_state(TASK_RUNNING);
+       if (likely(rqst_should_sleep(rqstp)))
+               time_left = schedule_timeout(timeout);
+       else
+               __set_current_state(TASK_RUNNING);
 
-                       try_to_freeze();
+       try_to_freeze();
 
-                       xprt = rqstp->rq_xprt;
-                       if (xprt != NULL)
-                               return xprt;
-               } else
-                       __set_current_state(TASK_RUNNING);
+       spin_lock_bh(&rqstp->rq_lock);
+       set_bit(RQ_BUSY, &rqstp->rq_flags);
+       spin_unlock_bh(&rqstp->rq_lock);
 
-               spin_lock_bh(&pool->sp_lock);
-               if (!time_left)
-                       pool->sp_stats.threads_timedout++;
+       xprt = rqstp->rq_xprt;
+       if (xprt != NULL)
+               return xprt;
 
-               xprt = rqstp->rq_xprt;
-               if (!xprt) {
-                       svc_thread_dequeue(pool, rqstp);
-                       spin_unlock_bh(&pool->sp_lock);
-                       dprintk("svc: server %p, no data yet\n", rqstp);
-                       if (signalled() || kthread_should_stop())
-                               return ERR_PTR(-EINTR);
-                       else
-                               return ERR_PTR(-EAGAIN);
-               }
-       }
-out:
-       spin_unlock_bh(&pool->sp_lock);
-       return xprt;
+       if (!time_left)
+               atomic_long_inc(&pool->sp_stats.threads_timedout);
+
+       if (signalled() || kthread_should_stop())
+               return ERR_PTR(-EINTR);
+       return ERR_PTR(-EAGAIN);
 }
 
 static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)
@@ -718,7 +748,7 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
                dprintk("svc_recv: found XPT_CLOSE\n");
                svc_delete_xprt(xprt);
                /* Leave XPT_BUSY set on the dead xprt: */
-               return 0;
+               goto out;
        }
        if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
                struct svc_xprt *newxpt;
@@ -749,6 +779,8 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
        }
        /* clear XPT_BUSY: */
        svc_xprt_received(xprt);
+out:
+       trace_svc_handle_xprt(xprt, len);
        return len;
 }
 
@@ -773,35 +805,46 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)
 
        err = svc_alloc_arg(rqstp);
        if (err)
-               return err;
+               goto out;
 
        try_to_freeze();
        cond_resched();
+       err = -EINTR;
        if (signalled() || kthread_should_stop())
-               return -EINTR;
+               goto out;
 
        xprt = svc_get_next_xprt(rqstp, timeout);
-       if (IS_ERR(xprt))
-               return PTR_ERR(xprt);
+       if (IS_ERR(xprt)) {
+               err = PTR_ERR(xprt);
+               goto out;
+       }
 
        len = svc_handle_xprt(rqstp, xprt);
 
        /* No data, incomplete (TCP) read, or accept() */
+       err = -EAGAIN;
        if (len <= 0)
-               goto out;
+               goto out_release;
 
        clear_bit(XPT_OLD, &xprt->xpt_flags);
 
-       rqstp->rq_secure = xprt->xpt_ops->xpo_secure_port(rqstp);
+       if (xprt->xpt_ops->xpo_secure_port(rqstp))
+               set_bit(RQ_SECURE, &rqstp->rq_flags);
+       else
+               clear_bit(RQ_SECURE, &rqstp->rq_flags);
        rqstp->rq_chandle.defer = svc_defer;
+       rqstp->rq_xid = svc_getu32(&rqstp->rq_arg.head[0]);
 
        if (serv->sv_stats)
                serv->sv_stats->netcnt++;
+       trace_svc_recv(rqstp, len);
        return len;
-out:
+out_release:
        rqstp->rq_res.len = 0;
        svc_xprt_release(rqstp);
-       return -EAGAIN;
+out:
+       trace_svc_recv(rqstp, err);
+       return err;
 }
 EXPORT_SYMBOL_GPL(svc_recv);
 
@@ -821,12 +864,12 @@ EXPORT_SYMBOL_GPL(svc_drop);
 int svc_send(struct svc_rqst *rqstp)
 {
        struct svc_xprt *xprt;
-       int             len;
+       int             len = -EFAULT;
        struct xdr_buf  *xb;
 
        xprt = rqstp->rq_xprt;
        if (!xprt)
-               return -EFAULT;
+               goto out;
 
        /* release the receive skb before sending the reply */
        rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp);
@@ -849,7 +892,9 @@ int svc_send(struct svc_rqst *rqstp)
        svc_xprt_release(rqstp);
 
        if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
-               return 0;
+               len = 0;
+out:
+       trace_svc_send(rqstp, len);
        return len;
 }
 
@@ -884,7 +929,6 @@ static void svc_age_temp_xprts(unsigned long closure)
                        continue;
                list_del_init(le);
                set_bit(XPT_CLOSE, &xprt->xpt_flags);
-               set_bit(XPT_DETACHED, &xprt->xpt_flags);
                dprintk("queuing xprt %p for closing\n", xprt);
 
                /* a thread will dequeue and close it soon */
@@ -924,8 +968,7 @@ static void svc_delete_xprt(struct svc_xprt *xprt)
        xprt->xpt_ops->xpo_detach(xprt);
 
        spin_lock_bh(&serv->sv_lock);
-       if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags))
-               list_del_init(&xprt->xpt_list);
+       list_del_init(&xprt->xpt_list);
        WARN_ON_ONCE(!list_empty(&xprt->xpt_ready));
        if (test_bit(XPT_TEMP, &xprt->xpt_flags))
                serv->sv_tmpcnt--;
@@ -1069,7 +1112,7 @@ static struct cache_deferred_req *svc_defer(struct cache_req *req)
        struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
        struct svc_deferred_req *dr;
 
-       if (rqstp->rq_arg.page_len || !rqstp->rq_usedeferral)
+       if (rqstp->rq_arg.page_len || !test_bit(RQ_USEDEFERRAL, &rqstp->rq_flags))
                return NULL; /* if more than a page, give up FIXME */
        if (rqstp->rq_deferred) {
                dr = rqstp->rq_deferred;
@@ -1098,7 +1141,7 @@ static struct cache_deferred_req *svc_defer(struct cache_req *req)
        }
        svc_xprt_get(rqstp->rq_xprt);
        dr->xprt = rqstp->rq_xprt;
-       rqstp->rq_dropme = true;
+       set_bit(RQ_DROPME, &rqstp->rq_flags);
 
        dr->handle.revisit = svc_revisit;
        return &dr->handle;
@@ -1300,10 +1343,10 @@ static int svc_pool_stats_show(struct seq_file *m, void *p)
 
        seq_printf(m, "%u %lu %lu %lu %lu\n",
                pool->sp_id,
-               pool->sp_stats.packets,
+               (unsigned long)atomic_long_read(&pool->sp_stats.packets),
                pool->sp_stats.sockets_queued,
-               pool->sp_stats.threads_woken,
-               pool->sp_stats.threads_timedout);
+               (unsigned long)atomic_long_read(&pool->sp_stats.threads_woken),
+               (unsigned long)atomic_long_read(&pool->sp_stats.threads_timedout));
 
        return 0;
 }