]> git.karo-electronics.de Git - mv-sheeva.git/blobdiff - net/sunrpc/sched.c
Merge branch 'for-linus' of git://codeaurora.org/quic/kernel/davidb/linux-msm
[mv-sheeva.git] / net / sunrpc / sched.c
index 243fc09b164e81865a902015f18bd87313413167..ffb687671da0545866739ad4c6e69c5155976beb 100644 (file)
@@ -252,23 +252,37 @@ static void rpc_set_active(struct rpc_task *task)
 
 /*
  * Mark an RPC call as having completed by clearing the 'active' bit
+ * and then waking up all tasks that were sleeping.
  */
-static void rpc_mark_complete_task(struct rpc_task *task)
+static int rpc_complete_task(struct rpc_task *task)
 {
-       smp_mb__before_clear_bit();
+       void *m = &task->tk_runstate;
+       wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
+       struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
+       unsigned long flags;
+       int ret;
+
+       spin_lock_irqsave(&wq->lock, flags);
        clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
-       smp_mb__after_clear_bit();
-       wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
+       ret = atomic_dec_and_test(&task->tk_count);
+       if (waitqueue_active(wq))
+               __wake_up_locked_key(wq, TASK_NORMAL, &k);
+       spin_unlock_irqrestore(&wq->lock, flags);
+       return ret;
 }
 
 /*
  * Allow callers to wait for completion of an RPC call
+ *
+ * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
+ * to enforce taking of the wq->lock and hence avoid races with
+ * rpc_complete_task().
  */
 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
 {
        if (action == NULL)
                action = rpc_wait_bit_killable;
-       return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
+       return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
                        action, TASK_KILLABLE);
 }
 EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
@@ -285,15 +299,8 @@ static void rpc_make_runnable(struct rpc_task *task)
        if (rpc_test_and_set_running(task))
                return;
        if (RPC_IS_ASYNC(task)) {
-               int status;
-
                INIT_WORK(&task->u.tk_work, rpc_async_schedule);
-               status = queue_work(rpciod_workqueue, &task->u.tk_work);
-               if (status < 0) {
-                       printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
-                       task->tk_status = status;
-                       return;
-               }
+               queue_work(rpciod_workqueue, &task->u.tk_work);
        } else
                wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
 }
@@ -623,14 +630,12 @@ static void __rpc_execute(struct rpc_task *task)
                        save_callback = task->tk_callback;
                        task->tk_callback = NULL;
                        save_callback(task);
-               }
-
-               /*
-                * Perform the next FSM step.
-                * tk_action may be NULL when the task has been killed
-                * by someone else.
-                */
-               if (!RPC_IS_QUEUED(task)) {
+               } else {
+                       /*
+                        * Perform the next FSM step.
+                        * tk_action may be NULL when the task has been killed
+                        * by someone else.
+                        */
                        if (task->tk_action == NULL)
                                break;
                        task->tk_action(task);
@@ -829,12 +834,6 @@ struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
        }
 
        rpc_init_task(task, setup_data);
-       if (task->tk_status < 0) {
-               int err = task->tk_status;
-               rpc_put_task(task);
-               return ERR_PTR(err);
-       }
-
        task->tk_flags |= flags;
        dprintk("RPC:       allocated task %p\n", task);
        return task;
@@ -857,34 +856,67 @@ static void rpc_async_release(struct work_struct *work)
        rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
 }
 
-void rpc_put_task(struct rpc_task *task)
+static void rpc_release_resources_task(struct rpc_task *task)
 {
-       if (!atomic_dec_and_test(&task->tk_count))
-               return;
-       /* Release resources */
        if (task->tk_rqstp)
                xprt_release(task);
        if (task->tk_msg.rpc_cred)
                put_rpccred(task->tk_msg.rpc_cred);
        rpc_task_release_client(task);
-       if (task->tk_workqueue != NULL) {
+}
+
+static void rpc_final_put_task(struct rpc_task *task,
+               struct workqueue_struct *q)
+{
+       if (q != NULL) {
                INIT_WORK(&task->u.tk_work, rpc_async_release);
-               queue_work(task->tk_workqueue, &task->u.tk_work);
+               queue_work(q, &task->u.tk_work);
        } else
                rpc_free_task(task);
 }
+
+static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
+{
+       if (atomic_dec_and_test(&task->tk_count)) {
+               rpc_release_resources_task(task);
+               rpc_final_put_task(task, q);
+       }
+}
+
+void rpc_put_task(struct rpc_task *task)
+{
+       rpc_do_put_task(task, NULL);
+}
 EXPORT_SYMBOL_GPL(rpc_put_task);
 
+void rpc_put_task_async(struct rpc_task *task)
+{
+       rpc_do_put_task(task, task->tk_workqueue);
+}
+EXPORT_SYMBOL_GPL(rpc_put_task_async);
+
 static void rpc_release_task(struct rpc_task *task)
 {
        dprintk("RPC: %5u release task\n", task->tk_pid);
 
        BUG_ON (RPC_IS_QUEUED(task));
 
-       /* Wake up anyone who is waiting for task completion */
-       rpc_mark_complete_task(task);
+       rpc_release_resources_task(task);
 
-       rpc_put_task(task);
+       /*
+        * Note: at this point we have been removed from rpc_clnt->cl_tasks,
+        * so it should be safe to use task->tk_count as a test for whether
+        * or not any other processes still hold references to our rpc_task.
+        */
+       if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
+               /* Wake up anyone who may be waiting for task completion */
+               if (!rpc_complete_task(task))
+                       return;
+       } else {
+               if (!atomic_dec_and_test(&task->tk_count))
+                       return;
+       }
+       rpc_final_put_task(task, task->tk_workqueue);
 }
 
 int rpciod_up(void)
@@ -908,7 +940,7 @@ static int rpciod_start(void)
         * Create the rpciod thread and wait for it to start.
         */
        dprintk("RPC:       creating workqueue rpciod\n");
-       wq = alloc_workqueue("rpciod", WQ_RESCUER, 0);
+       wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
        rpciod_workqueue = wq;
        return rpciod_workqueue != NULL;
 }