]> git.karo-electronics.de Git - karo-tx-linux.git/blobdiff - net/ceph/osd_client.c
ceph: kill ceph_osdc_new_request() "num_reply" parameter
[karo-tx-linux.git] / net / ceph / osd_client.c
index eb9a4447876481e9a4110a1e68ea351ce3ec86d9..d3e75138506be171978777e52fcc16976b3c60a4 100644 (file)
@@ -32,52 +32,43 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 static void __send_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req);
 
-static int op_needs_trail(int op)
-{
-       switch (op) {
-       case CEPH_OSD_OP_GETXATTR:
-       case CEPH_OSD_OP_SETXATTR:
-       case CEPH_OSD_OP_CMPXATTR:
-       case CEPH_OSD_OP_CALL:
-       case CEPH_OSD_OP_NOTIFY:
-               return 1;
-       default:
-               return 0;
-       }
-}
-
 static int op_has_extent(int op)
 {
        return (op == CEPH_OSD_OP_READ ||
                op == CEPH_OSD_OP_WRITE);
 }
 
-int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
-                       struct ceph_file_layout *layout,
-                       u64 snapid,
+int ceph_calc_raw_layout(struct ceph_file_layout *layout,
                        u64 off, u64 *plen, u64 *bno,
                        struct ceph_osd_request *req,
                        struct ceph_osd_req_op *op)
 {
-       struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        u64 orig_len = *plen;
        u64 objoff, objlen;    /* extent in object */
        int r;
 
-       reqhead->snapid = cpu_to_le64(snapid);
-
        /* object extent? */
-       r = ceph_calc_file_object_mapping(layout, off, plen, bno,
+       r = ceph_calc_file_object_mapping(layout, off, orig_len, bno,
                                          &objoff, &objlen);
        if (r < 0)
                return r;
-       if (*plen < orig_len)
+       if (objlen < orig_len) {
+               *plen = objlen;
                dout(" skipping last %llu, final file extent %llu~%llu\n",
                     orig_len - *plen, off, *plen);
+       }
 
        if (op_has_extent(op->op)) {
+               u32 osize = le32_to_cpu(layout->fl_object_size);
                op->extent.offset = objoff;
                op->extent.length = objlen;
+               if (op->extent.truncate_size <= off - objoff) {
+                       op->extent.truncate_size = 0;
+               } else {
+                       op->extent.truncate_size -= off - objoff;
+                       if (op->extent.truncate_size > osize)
+                               op->extent.truncate_size = osize;
+               }
        }
        req->r_num_pages = calc_pages_for(off, *plen);
        req->r_page_alignment = off & ~PAGE_MASK;
@@ -115,8 +106,7 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
  *
  * fill osd op in request message.
  */
-static int calc_layout(struct ceph_osd_client *osdc,
-                      struct ceph_vino vino,
+static int calc_layout(struct ceph_vino vino,
                       struct ceph_file_layout *layout,
                       u64 off, u64 *plen,
                       struct ceph_osd_request *req,
@@ -125,8 +115,7 @@ static int calc_layout(struct ceph_osd_client *osdc,
        u64 bno;
        int r;
 
-       r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
-                                plen, &bno, req, op);
+       r = ceph_calc_raw_layout(layout, off, plen, &bno, req, op);
        if (r < 0)
                return r;
 
@@ -148,25 +137,19 @@ void ceph_osdc_release_request(struct kref *kref)
        if (req->r_request)
                ceph_msg_put(req->r_request);
        if (req->r_con_filling_msg) {
-               dout("%s revoking pages %p from con %p\n", __func__,
-                    req->r_pages, req->r_con_filling_msg);
+               dout("%s revoking msg %p from con %p\n", __func__,
+                    req->r_reply, req->r_con_filling_msg);
                ceph_msg_revoke_incoming(req->r_reply);
                req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
+               req->r_con_filling_msg = NULL;
        }
        if (req->r_reply)
                ceph_msg_put(req->r_reply);
        if (req->r_own_pages)
                ceph_release_page_vector(req->r_pages,
                                         req->r_num_pages);
-#ifdef CONFIG_BLOCK
-       if (req->r_bio)
-               bio_put(req->r_bio);
-#endif
        ceph_put_snap_context(req->r_snapc);
-       if (req->r_trail) {
-               ceph_pagelist_release(req->r_trail);
-               kfree(req->r_trail);
-       }
+       ceph_pagelist_release(&req->r_trail);
        if (req->r_mempool)
                mempool_free(req, req->r_osdc->req_mempool);
        else
@@ -174,34 +157,14 @@ void ceph_osdc_release_request(struct kref *kref)
 }
 EXPORT_SYMBOL(ceph_osdc_release_request);
 
-static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
-{
-       int i = 0;
-
-       if (needs_trail)
-               *needs_trail = 0;
-       while (ops[i].op) {
-               if (needs_trail && op_needs_trail(ops[i].op))
-                       *needs_trail = 1;
-               i++;
-       }
-
-       return i;
-}
-
 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
-                                              int flags,
                                               struct ceph_snap_context *snapc,
-                                              struct ceph_osd_req_op *ops,
+                                              unsigned int num_op,
                                               bool use_mempool,
-                                              gfp_t gfp_flags,
-                                              struct page **pages,
-                                              struct bio *bio)
+                                              gfp_t gfp_flags)
 {
        struct ceph_osd_request *req;
        struct ceph_msg *msg;
-       int needs_trail;
-       int num_op = get_num_ops(ops, &needs_trail);
        size_t msg_size = sizeof(struct ceph_osd_request_head);
 
        msg_size += num_op*sizeof(struct ceph_osd_op);
@@ -228,10 +191,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        INIT_LIST_HEAD(&req->r_req_lru_item);
        INIT_LIST_HEAD(&req->r_osd_item);
 
-       req->r_flags = flags;
-
-       WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
-
        /* create reply message */
        if (use_mempool)
                msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -244,15 +203,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        }
        req->r_reply = msg;
 
-       /* allocate space for the trailing data */
-       if (needs_trail) {
-               req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
-               if (!req->r_trail) {
-                       ceph_osdc_put_request(req);
-                       return NULL;
-               }
-               ceph_pagelist_init(req->r_trail);
-       }
+       ceph_pagelist_init(&req->r_trail);
 
        /* create request message; allow space for oid */
        msg_size += MAX_OBJ_NAME_SIZE;
@@ -270,13 +221,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        memset(msg->front.iov_base, 0, msg->front.iov_len);
 
        req->r_request = msg;
-       req->r_pages = pages;
-#ifdef CONFIG_BLOCK
-       if (bio) {
-               req->r_bio = bio;
-               bio_get(req->r_bio);
-       }
-#endif
 
        return req;
 }
@@ -304,29 +248,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
        case CEPH_OSD_OP_GETXATTR:
        case CEPH_OSD_OP_SETXATTR:
        case CEPH_OSD_OP_CMPXATTR:
-               BUG_ON(!req->r_trail);
-
                dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
                dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
                dst->xattr.cmp_op = src->xattr.cmp_op;
                dst->xattr.cmp_mode = src->xattr.cmp_mode;
-               ceph_pagelist_append(req->r_trail, src->xattr.name,
+               ceph_pagelist_append(&req->r_trail, src->xattr.name,
                                     src->xattr.name_len);
-               ceph_pagelist_append(req->r_trail, src->xattr.val,
+               ceph_pagelist_append(&req->r_trail, src->xattr.val,
                                     src->xattr.value_len);
                break;
        case CEPH_OSD_OP_CALL:
-               BUG_ON(!req->r_trail);
-
                dst->cls.class_len = src->cls.class_len;
                dst->cls.method_len = src->cls.method_len;
                dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
 
-               ceph_pagelist_append(req->r_trail, src->cls.class_name,
+               ceph_pagelist_append(&req->r_trail, src->cls.class_name,
                                     src->cls.class_len);
-               ceph_pagelist_append(req->r_trail, src->cls.method_name,
+               ceph_pagelist_append(&req->r_trail, src->cls.method_name,
                                     src->cls.method_len);
-               ceph_pagelist_append(req->r_trail, src->cls.indata,
+               ceph_pagelist_append(&req->r_trail, src->cls.indata,
                                     src->cls.indata_len);
                break;
        case CEPH_OSD_OP_ROLLBACK:
@@ -339,11 +279,9 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
                        __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
                        __le32 timeout = cpu_to_le32(src->watch.timeout);
 
-                       BUG_ON(!req->r_trail);
-
-                       ceph_pagelist_append(req->r_trail,
+                       ceph_pagelist_append(&req->r_trail,
                                                &prot_ver, sizeof(prot_ver));
-                       ceph_pagelist_append(req->r_trail,
+                       ceph_pagelist_append(&req->r_trail,
                                                &timeout, sizeof(timeout));
                }
        case CEPH_OSD_OP_NOTIFY_ACK:
@@ -365,25 +303,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
  *
  */
 void ceph_osdc_build_request(struct ceph_osd_request *req,
-                            u64 off, u64 *plen,
+                            u64 off, u64 len, unsigned int num_op,
                             struct ceph_osd_req_op *src_ops,
-                            struct ceph_snap_context *snapc,
-                            struct timespec *mtime,
-                            const char *oid,
-                            int oid_len)
+                            struct ceph_snap_context *snapc, u64 snap_id,
+                            struct timespec *mtime)
 {
        struct ceph_msg *msg = req->r_request;
        struct ceph_osd_request_head *head;
        struct ceph_osd_req_op *src_op;
        struct ceph_osd_op *op;
        void *p;
-       int num_op = get_num_ops(src_ops, NULL);
        size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
        int flags = req->r_flags;
        u64 data_len = 0;
        int i;
 
+       WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
+
        head = msg->front.iov_base;
+       head->snapid = cpu_to_le64(snap_id);
        op = (void *)(head + 1);
        p = (void *)(op + num_op);
 
@@ -393,23 +331,19 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
        head->flags = cpu_to_le32(flags);
        if (flags & CEPH_OSD_FLAG_WRITE)
                ceph_encode_timespec(&head->mtime, mtime);
+       BUG_ON(num_op > (unsigned int) ((u16) -1));
        head->num_ops = cpu_to_le16(num_op);
 
-
        /* fill in oid */
-       head->object_len = cpu_to_le32(oid_len);
-       memcpy(p, oid, oid_len);
-       p += oid_len;
+       head->object_len = cpu_to_le32(req->r_oid_len);
+       memcpy(p, req->r_oid, req->r_oid_len);
+       p += req->r_oid_len;
 
        src_op = src_ops;
-       while (src_op->op) {
-               osd_req_encode_op(req, op, src_op);
-               src_op++;
-               op++;
-       }
+       while (num_op--)
+               osd_req_encode_op(req, op++, src_op++);
 
-       if (req->r_trail)
-               data_len += req->r_trail->length;
+       data_len += req->r_trail.length;
 
        if (snapc) {
                head->snap_seq = cpu_to_le64(snapc->seq);
@@ -422,7 +356,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 
        if (flags & CEPH_OSD_FLAG_WRITE) {
                req->r_request->hdr.data_off = cpu_to_le16(off);
-               req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
+               req->r_request->hdr.data_len = cpu_to_le32(len + data_len);
        } else if (data_len) {
                req->r_request->hdr.data_off = 0;
                req->r_request->hdr.data_len = cpu_to_le32(data_len);
@@ -459,34 +393,33 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                               u32 truncate_seq,
                                               u64 truncate_size,
                                               struct timespec *mtime,
-                                              bool use_mempool, int num_reply,
+                                              bool use_mempool,
                                               int page_align)
 {
-       struct ceph_osd_req_op ops[3];
+       struct ceph_osd_req_op ops[2];
        struct ceph_osd_request *req;
+       unsigned int num_op = 1;
        int r;
 
+       memset(&ops, 0, sizeof ops);
+
        ops[0].op = opcode;
        ops[0].extent.truncate_seq = truncate_seq;
        ops[0].extent.truncate_size = truncate_size;
-       ops[0].payload_len = 0;
 
        if (do_sync) {
                ops[1].op = CEPH_OSD_OP_STARTSYNC;
-               ops[1].payload_len = 0;
-               ops[2].op = 0;
-       } else
-               ops[1].op = 0;
-
-       req = ceph_osdc_alloc_request(osdc, flags,
-                                        snapc, ops,
-                                        use_mempool,
-                                        GFP_NOFS, NULL, NULL);
+               num_op++;
+       }
+
+       req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
+                                       GFP_NOFS);
        if (!req)
                return ERR_PTR(-ENOMEM);
+       req->r_flags = flags;
 
        /* calculate max write size */
-       r = calc_layout(osdc, vino, layout, off, plen, req, ops);
+       r = calc_layout(vino, layout, off, plen, req, ops);
        if (r < 0)
                return ERR_PTR(r);
        req->r_file_layout = *layout;  /* keep a copy */
@@ -496,10 +429,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        req->r_num_pages = calc_pages_for(page_align, *plen);
        req->r_page_alignment = page_align;
 
-       ceph_osdc_build_request(req, off, plen, ops,
-                               snapc,
-                               mtime,
-                               req->r_oid, req->r_oid_len);
+       ceph_osdc_build_request(req, off, *plen, num_op, ops,
+                               snapc, vino.snap, mtime);
 
        return req;
 }
@@ -739,31 +670,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
  */
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
-       struct ceph_osd_request *req;
-       int ret = 0;
+       struct ceph_entity_addr *peer_addr;
 
        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
        if (list_empty(&osd->o_requests) &&
            list_empty(&osd->o_linger_requests)) {
                __remove_osd(osdc, osd);
-               ret = -ENODEV;
-       } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
-                         &osd->o_con.peer_addr,
-                         sizeof(osd->o_con.peer_addr)) == 0 &&
-                  !ceph_con_opened(&osd->o_con)) {
+
+               return -ENODEV;
+       }
+
+       peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
+       if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
+                       !ceph_con_opened(&osd->o_con)) {
+               struct ceph_osd_request *req;
+
                dout(" osd addr hasn't changed and connection never opened,"
                     " letting msgr retry");
                /* touch each r_stamp for handle_timeout()'s benfit */
                list_for_each_entry(req, &osd->o_requests, r_osd_item)
                        req->r_stamp = jiffies;
-               ret = -EAGAIN;
-       } else {
-               ceph_con_close(&osd->o_con);
-               ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
-                             &osdc->osdmap->osd_addr[osd->o_osd]);
-               osd->o_incarnation++;
+
+               return -EAGAIN;
        }
-       return ret;
+
+       ceph_con_close(&osd->o_con);
+       ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
+       osd->o_incarnation++;
+
+       return 0;
 }
 
 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -1706,7 +1641,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 #ifdef CONFIG_BLOCK
        req->r_request->bio = req->r_bio;
 #endif
-       req->r_request->trail = req->r_trail;
+       req->r_request->trail = &req->r_trail;
 
        register_request(osdc, req);
 
@@ -1902,7 +1837,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
        req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
                                    NULL, 0, truncate_seq, truncate_size, NULL,
-                                   false, 1, page_align);
+                                   false, page_align);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
@@ -1931,8 +1866,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         u64 off, u64 len,
                         u32 truncate_seq, u64 truncate_size,
                         struct timespec *mtime,
-                        struct page **pages, int num_pages,
-                        int flags, int do_sync, bool nofail)
+                        struct page **pages, int num_pages)
 {
        struct ceph_osd_request *req;
        int rc = 0;
@@ -1941,11 +1875,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
        BUG_ON(vino.snap != CEPH_NOSNAP);
        req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
                                    CEPH_OSD_OP_WRITE,
-                                   flags | CEPH_OSD_FLAG_ONDISK |
-                                           CEPH_OSD_FLAG_WRITE,
-                                   snapc, do_sync,
+                                   CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
+                                   snapc, 0,
                                    truncate_seq, truncate_size, mtime,
-                                   nofail, 1, page_align);
+                                   true, page_align);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
@@ -1954,7 +1887,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
        dout("writepages %llu~%llu (%d pages)\n", off, len,
             req->r_num_pages);
 
-       rc = ceph_osdc_start_request(osdc, req, nofail);
+       rc = ceph_osdc_start_request(osdc, req, true);
        if (!rc)
                rc = ceph_osdc_wait_request(osdc, req);
 
@@ -2047,7 +1980,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        if (data_len > 0) {
                int want = calc_pages_for(req->r_page_alignment, data_len);
 
-               if (unlikely(req->r_num_pages < want)) {
+               if (req->r_pages && unlikely(req->r_num_pages < want)) {
                        pr_warning("tid %lld reply has %d bytes %d pages, we"
                                   " had only %d pages ready\n", tid, data_len,
                                   want, req->r_num_pages);