]> git.karo-electronics.de Git - karo-tx-linux.git/commitdiff
rbd: wait for safe callback for write requests
authorAlex Elder <elder@inktank.com>
Thu, 23 May 2013 01:54:25 +0000 (20:54 -0500)
committerAlex Elder <elder@inktank.com>
Wed, 29 May 2013 14:10:51 +0000 (09:10 -0500)
When a request is sent to the osd, the sender can indicate what
"level" of completion desired before the request is acknowledged.
There will always be at least an acknowledgement sent to indicate
the osd had received the message.  But for a write request the
sender may request that the acknowledgement indicate when the write
operation is durable on the osd.  The osd marks a response with the
ONDISK flag to signal this in its acknowledgement.

When a request is acknowledged a callback function is run and an
event is completed that a caller can wait for.  When the ONDISK flag
is set in an acknowledgement, an additional callback is used to
allow the caller to record when a request has been sent to an osd
(making it "unsafe"), and when an acknowledgement indicating such a
request has been made durable on the osd (so it is no longer
unsafe).  A "safe completion" is signaled to unblock any waiters.

With that as background...

Currently the rbd client waits only for the acknowledgement response
for all requests, which isn't safe for writes.  Fix that by defining
and using a different callback function that marks write requests
done only when the ONDISK notification arrives.

This resolves:
    http://tracker.ceph.com/issues/5146

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
drivers/block/rbd.c

index 3296db5d6ac61d7e231334c35e3c9ac78e8e24c6..6e377a0459c162b7b0825f2ecf475baacd543678 100644 (file)
@@ -1681,14 +1681,17 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
                rbd_osd_read_callback(obj_request);
                break;
        case CEPH_OSD_OP_WRITE:
+               rbd_assert(!msg);
                rbd_osd_write_callback(obj_request);
                break;
        case CEPH_OSD_OP_STAT:
                rbd_osd_stat_callback(obj_request);
                break;
+       case CEPH_OSD_OP_WATCH:
+               rbd_assert(!msg);
+               /* fall through */
        case CEPH_OSD_OP_CALL:
        case CEPH_OSD_OP_NOTIFY_ACK:
-       case CEPH_OSD_OP_WATCH:
                rbd_osd_trivial_callback(obj_request);
                break;
        default:
@@ -1701,6 +1704,24 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
                rbd_obj_request_complete(obj_request);
 }
 
+/*
+ * This is called twice:  once (with unsafe == true) when the
+ * request message is first handed to the messenger for delivery;
+ * and the second time (with unsafe == false) after we get
+ * confirmation the change is durable on the osd.  We ignore the
+ * first, and let the "normal" callback routine handle the second.
+ */
+static void rbd_osd_req_unsafe_callback(struct ceph_osd_request *osd_req,
+                               bool unsafe)
+{
+       dout("%s: osd_req %p unsafe %s op 0x%hx\n", __func__, osd_req,
+               unsafe ? "true" : "false", osd_req->r_ops[0].op);
+
+       rbd_assert(osd_req->r_flags & CEPH_OSD_FLAG_WRITE);
+       if (!unsafe)
+               rbd_osd_req_callback(osd_req, NULL);
+}
+
 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
 {
        struct rbd_img_request *img_request = obj_request->img_request;
@@ -1753,12 +1774,13 @@ static struct ceph_osd_request *rbd_osd_req_create(
        if (!osd_req)
                return NULL;    /* ENOMEM */
 
-       if (write_request)
+       if (write_request) {
                osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
-       else
+               osd_req->r_unsafe_callback = rbd_osd_req_unsafe_callback;
+       } else {
                osd_req->r_flags = CEPH_OSD_FLAG_READ;
-
-       osd_req->r_callback = rbd_osd_req_callback;
+               osd_req->r_callback = rbd_osd_req_callback;
+       }
        osd_req->r_priv = obj_request;
 
        osd_req->r_oid_len = strlen(obj_request->object_name);