]> git.karo-electronics.de Git - karo-tx-linux.git/commitdiff
rbd: wait for safe callback for write requests
authorAlex Elder <elder@inktank.com>
Thu, 23 May 2013 01:54:25 +0000 (20:54 -0500)
committerSage Weil <sage@inktank.com>
Thu, 13 Jun 2013 15:46:19 +0000 (08:46 -0700)
When a request is sent to the osd, the sender can indicate what
"level" of completion desired before the request is acknowledged.
There will always be at least an acknowledgement sent to indicate
the osd had received the message.  But for a write request the
sender may request that the acknowledgement indicate when the write
operation is durable on the osd.  The osd marks a response with the
ONDISK flag to signal this in its acknowledgement.

When a request is acknowledged a callback function is run and an
event is completed that a caller can wait for.  When the ONDISK flag
is set in an acknowledgement, an additional callback is used to
allow the caller to record when a request has been sent to an osd
(making it "unsafe"), and when an acknowledgement indicating such a
request has been made durable on the osd (so it is no longer
unsafe).  A "safe completion" is signaled to unblock any waiters.

With that as background...

Currently the rbd client waits only for the acknowledgement response
for all requests, which isn't safe for writes.  Fix that by defining
and using a different callback function that marks write requests
done only when the ONDISK notification arrives.

This resolves:
    http://tracker.ceph.com/issues/5146

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
drivers/block/rbd.c

index 3f162098457b550432be0d739cc3253e2741afa3..c9486c8913d5eefa87ee96b45866a7a22c82b3ab 100644 (file)
@@ -1685,14 +1685,17 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
                rbd_osd_read_callback(obj_request);
                break;
        case CEPH_OSD_OP_WRITE:
+               rbd_assert(!msg);
                rbd_osd_write_callback(obj_request);
                break;
        case CEPH_OSD_OP_STAT:
                rbd_osd_stat_callback(obj_request);
                break;
+       case CEPH_OSD_OP_WATCH:
+               rbd_assert(!msg);
+               /* fall through */
        case CEPH_OSD_OP_CALL:
        case CEPH_OSD_OP_NOTIFY_ACK:
-       case CEPH_OSD_OP_WATCH:
                rbd_osd_trivial_callback(obj_request);
                break;
        default:
@@ -1705,6 +1708,24 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
                rbd_obj_request_complete(obj_request);
 }
 
+/*
+ * This is called twice:  once (with unsafe == true) when the
+ * request message is first handed to the messenger for delivery;
+ * and the second time (with unsafe == false) after we get
+ * confirmation the change is durable on the osd.  We ignore the
+ * first, and let the "normal" callback routine handle the second.
+ */
+static void rbd_osd_req_unsafe_callback(struct ceph_osd_request *osd_req,
+                               bool unsafe)
+{
+       dout("%s: osd_req %p unsafe %s op 0x%hx\n", __func__, osd_req,
+               unsafe ? "true" : "false", osd_req->r_ops[0].op);
+
+       rbd_assert(osd_req->r_flags & CEPH_OSD_FLAG_WRITE);
+       if (!unsafe)
+               rbd_osd_req_callback(osd_req, NULL);
+}
+
 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
 {
        struct rbd_img_request *img_request = obj_request->img_request;
@@ -1757,12 +1778,13 @@ static struct ceph_osd_request *rbd_osd_req_create(
        if (!osd_req)
                return NULL;    /* ENOMEM */
 
-       if (write_request)
+       if (write_request) {
                osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
-       else
+               osd_req->r_unsafe_callback = rbd_osd_req_unsafe_callback;
+       } else {
                osd_req->r_flags = CEPH_OSD_FLAG_READ;
-
-       osd_req->r_callback = rbd_osd_req_callback;
+               osd_req->r_callback = rbd_osd_req_callback;
+       }
        osd_req->r_priv = obj_request;
 
        osd_req->r_oid_len = strlen(obj_request->object_name);