From: Alex Elder Date: Thu, 23 May 2013 01:54:25 +0000 (-0500) Subject: rbd: wait for safe callback for write requests X-Git-Tag: next-20130607~95^2~5 X-Git-Url: https://git.karo-electronics.de/?a=commitdiff_plain;h=70c725fdbe98c965bf9b3eca2618ddb121793f2a;p=karo-tx-linux.git rbd: wait for safe callback for write requests When a request is sent to the osd, the sender can indicate what "level" of completion desired before the request is acknowledged. There will always be at least an acknowledgement sent to indicate the osd had received the message. But for a write request the sender may request that the acknowledgement indicate when the write operation is durable on the osd. The osd marks a response with the ONDISK flag to signal this in its acknowledgement. When a request is acknowledged a callback function is run and an event is completed that a caller can wait for. When the ONDISK flag is set in an acknowledgement, an additional callback is used to allow the caller to record when a request has been sent to an osd (making it "unsafe"), and when an acknowledgement indicating such a request has been made durable on the osd (so it is no longer unsafe). A "safe completion" is signaled to unblock any waiters. With that as background... Currently the rbd client waits only for the acknowledgement response for all requests, which isn't safe for writes. Fix that by defining and using a different callback function that marks write requests done only when the ONDISK notification arrives. This resolves: http://tracker.ceph.com/issues/5146 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 3296db5d6ac6..6e377a0459c1 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1681,14 +1681,17 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, rbd_osd_read_callback(obj_request); break; case CEPH_OSD_OP_WRITE: + rbd_assert(!msg); rbd_osd_write_callback(obj_request); break; case CEPH_OSD_OP_STAT: rbd_osd_stat_callback(obj_request); break; + case CEPH_OSD_OP_WATCH: + rbd_assert(!msg); + /* fall through */ case CEPH_OSD_OP_CALL: case CEPH_OSD_OP_NOTIFY_ACK: - case CEPH_OSD_OP_WATCH: rbd_osd_trivial_callback(obj_request); break; default: @@ -1701,6 +1704,24 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, rbd_obj_request_complete(obj_request); } +/* + * This is called twice: once (with unsafe == true) when the + * request message is first handed to the messenger for delivery; + * and the second time (with unsafe == false) after we get + * confirmation the change is durable on the osd. We ignore the + * first, and let the "normal" callback routine handle the second. + */ +static void rbd_osd_req_unsafe_callback(struct ceph_osd_request *osd_req, + bool unsafe) +{ + dout("%s: osd_req %p unsafe %s op 0x%hx\n", __func__, osd_req, + unsafe ? "true" : "false", osd_req->r_ops[0].op); + + rbd_assert(osd_req->r_flags & CEPH_OSD_FLAG_WRITE); + if (!unsafe) + rbd_osd_req_callback(osd_req, NULL); +} + static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) { struct rbd_img_request *img_request = obj_request->img_request; @@ -1753,12 +1774,13 @@ static struct ceph_osd_request *rbd_osd_req_create( if (!osd_req) return NULL; /* ENOMEM */ - if (write_request) + if (write_request) { osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; - else + osd_req->r_unsafe_callback = rbd_osd_req_unsafe_callback; + } else { osd_req->r_flags = CEPH_OSD_FLAG_READ; - - osd_req->r_callback = rbd_osd_req_callback; + osd_req->r_callback = rbd_osd_req_callback; + } osd_req->r_priv = obj_request; osd_req->r_oid_len = strlen(obj_request->object_name);