]> git.karo-electronics.de Git - mv-sheeva.git/blobdiff - net/ceph/messenger.c
Merge tag 'v2.6.37' of git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux-2.6
[mv-sheeva.git] / net / ceph / messenger.c
similarity index 89%
rename from fs/ceph/messenger.c
rename to net/ceph/messenger.c
index 2502d76fcec175ddb8500bcd1863c52add1208e5..b6ff4a1519ab664c43d9bff871dc95d8efb52daa 100644 (file)
@@ -1,4 +1,4 @@
-#include "ceph_debug.h"
+#include <linux/ceph/ceph_debug.h>
 
 #include <linux/crc32c.h>
 #include <linux/ctype.h>
@@ -9,12 +9,14 @@
 #include <linux/slab.h>
 #include <linux/socket.h>
 #include <linux/string.h>
+#include <linux/bio.h>
+#include <linux/blkdev.h>
 #include <net/tcp.h>
 
-#include "super.h"
-#include "messenger.h"
-#include "decode.h"
-#include "pagelist.h"
+#include <linux/ceph/libceph.h>
+#include <linux/ceph/messenger.h>
+#include <linux/ceph/decode.h>
+#include <linux/ceph/pagelist.h>
 
 /*
  * Ceph uses the messenger to exchange ceph_msg messages with other
@@ -48,7 +50,7 @@ static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN];
 static DEFINE_SPINLOCK(addr_str_lock);
 static int last_addr_str;
 
-const char *pr_addr(const struct sockaddr_storage *ss)
+const char *ceph_pr_addr(const struct sockaddr_storage *ss)
 {
        int i;
        char *s;
@@ -79,6 +81,7 @@ const char *pr_addr(const struct sockaddr_storage *ss)
 
        return s;
 }
+EXPORT_SYMBOL(ceph_pr_addr);
 
 static void encode_my_addr(struct ceph_messenger *msgr)
 {
@@ -91,27 +94,28 @@ static void encode_my_addr(struct ceph_messenger *msgr)
  */
 struct workqueue_struct *ceph_msgr_wq;
 
-int __init ceph_msgr_init(void)
+int ceph_msgr_init(void)
 {
        ceph_msgr_wq = create_workqueue("ceph-msgr");
-       if (IS_ERR(ceph_msgr_wq)) {
-               int ret = PTR_ERR(ceph_msgr_wq);
-               pr_err("msgr_init failed to create workqueue: %d\n", ret);
-               ceph_msgr_wq = NULL;
-               return ret;
+       if (!ceph_msgr_wq) {
+               pr_err("msgr_init failed to create workqueue\n");
+               return -ENOMEM;
        }
        return 0;
 }
+EXPORT_SYMBOL(ceph_msgr_init);
 
 void ceph_msgr_exit(void)
 {
        destroy_workqueue(ceph_msgr_wq);
 }
+EXPORT_SYMBOL(ceph_msgr_exit);
 
 void ceph_msgr_flush(void)
 {
        flush_workqueue(ceph_msgr_wq);
 }
+EXPORT_SYMBOL(ceph_msgr_flush);
 
 
 /*
@@ -221,19 +225,19 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
 
        set_sock_callbacks(sock, con);
 
-       dout("connect %s\n", pr_addr(&con->peer_addr.in_addr));
+       dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
 
        ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
                                 O_NONBLOCK);
        if (ret == -EINPROGRESS) {
                dout("connect %s EINPROGRESS sk_state = %u\n",
-                    pr_addr(&con->peer_addr.in_addr),
+                    ceph_pr_addr(&con->peer_addr.in_addr),
                     sock->sk->sk_state);
                ret = 0;
        }
        if (ret < 0) {
                pr_err("connect %s error %d\n",
-                      pr_addr(&con->peer_addr.in_addr), ret);
+                      ceph_pr_addr(&con->peer_addr.in_addr), ret);
                sock_release(sock);
                con->sock = NULL;
                con->error_msg = "connect error";
@@ -334,7 +338,8 @@ static void reset_connection(struct ceph_connection *con)
  */
 void ceph_con_close(struct ceph_connection *con)
 {
-       dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr));
+       dout("con_close %p peer %s\n", con,
+            ceph_pr_addr(&con->peer_addr.in_addr));
        set_bit(CLOSED, &con->state);  /* in case there's queued work */
        clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
        clear_bit(LOSSYTX, &con->state);  /* so we retry next connect */
@@ -347,19 +352,21 @@ void ceph_con_close(struct ceph_connection *con)
        mutex_unlock(&con->mutex);
        queue_con(con);
 }
+EXPORT_SYMBOL(ceph_con_close);
 
 /*
  * Reopen a closed connection, with a new peer address.
  */
 void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr)
 {
-       dout("con_open %p %s\n", con, pr_addr(&addr->in_addr));
+       dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
        set_bit(OPENING, &con->state);
        clear_bit(CLOSED, &con->state);
        memcpy(&con->peer_addr, addr, sizeof(*addr));
        con->delay = 0;      /* reset backoff memory */
        queue_con(con);
 }
+EXPORT_SYMBOL(ceph_con_open);
 
 /*
  * return true if this connection ever successfully opened
@@ -406,6 +413,7 @@ void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
        INIT_LIST_HEAD(&con->out_sent);
        INIT_DELAYED_WORK(&con->work, con_work);
 }
+EXPORT_SYMBOL(ceph_con_init);
 
 
 /*
@@ -529,8 +537,10 @@ static void prepare_write_message(struct ceph_connection *con)
        if (le32_to_cpu(m->hdr.data_len) > 0) {
                /* initialize page iterator */
                con->out_msg_pos.page = 0;
-               con->out_msg_pos.page_pos =
-                       le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+               if (m->pages)
+                       con->out_msg_pos.page_pos = m->page_alignment;
+               else
+                       con->out_msg_pos.page_pos = 0;
                con->out_msg_pos.data_pos = 0;
                con->out_msg_pos.did_page_crc = 0;
                con->out_more = 1;  /* data + footer will follow */
@@ -647,7 +657,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
        dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
             con->connect_seq, global_seq, proto);
 
-       con->out_connect.features = cpu_to_le64(CEPH_FEATURE_SUPPORTED);
+       con->out_connect.features = cpu_to_le64(msgr->supported_features);
        con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
        con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
        con->out_connect.global_seq = cpu_to_le32(global_seq);
@@ -712,6 +722,31 @@ out:
        return ret;  /* done! */
 }
 
+#ifdef CONFIG_BLOCK
+static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+{
+       if (!bio) {
+               *iter = NULL;
+               *seg = 0;
+               return;
+       }
+       *iter = bio;
+       *seg = bio->bi_idx;
+}
+
+static void iter_bio_next(struct bio **bio_iter, int *seg)
+{
+       if (*bio_iter == NULL)
+               return;
+
+       BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+
+       (*seg)++;
+       if (*seg == (*bio_iter)->bi_vcnt)
+               init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+}
+#endif
+
 /*
  * Write as much message data payload as we can.  If we finish, queue
  * up the footer.
@@ -726,21 +761,46 @@ static int write_partial_msg_pages(struct ceph_connection *con)
        size_t len;
        int crc = con->msgr->nocrc;
        int ret;
+       int total_max_write;
+       int in_trail = 0;
+       size_t trail_len = (msg->trail ? msg->trail->length : 0);
 
        dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
             con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
             con->out_msg_pos.page_pos);
 
-       while (con->out_msg_pos.page < con->out_msg->nr_pages) {
+#ifdef CONFIG_BLOCK
+       if (msg->bio && !msg->bio_iter)
+               init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
+#endif
+
+       while (data_len > con->out_msg_pos.data_pos) {
                struct page *page = NULL;
                void *kaddr = NULL;
+               int max_write = PAGE_SIZE;
+               int page_shift = 0;
+
+               total_max_write = data_len - trail_len -
+                       con->out_msg_pos.data_pos;
 
                /*
                 * if we are calculating the data crc (the default), we need
                 * to map the page.  if our pages[] has been revoked, use the
                 * zero page.
                 */
-               if (msg->pages) {
+
+               /* have we reached the trail part of the data? */
+               if (con->out_msg_pos.data_pos >= data_len - trail_len) {
+                       in_trail = 1;
+
+                       total_max_write = data_len - con->out_msg_pos.data_pos;
+
+                       page = list_first_entry(&msg->trail->head,
+                                               struct page, lru);
+                       if (crc)
+                               kaddr = kmap(page);
+                       max_write = PAGE_SIZE;
+               } else if (msg->pages) {
                        page = msg->pages[con->out_msg_pos.page];
                        if (crc)
                                kaddr = kmap(page);
@@ -749,13 +809,25 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                                                struct page, lru);
                        if (crc)
                                kaddr = kmap(page);
+#ifdef CONFIG_BLOCK
+               } else if (msg->bio) {
+                       struct bio_vec *bv;
+
+                       bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
+                       page = bv->bv_page;
+                       page_shift = bv->bv_offset;
+                       if (crc)
+                               kaddr = kmap(page) + page_shift;
+                       max_write = bv->bv_len;
+#endif
                } else {
                        page = con->msgr->zero_page;
                        if (crc)
                                kaddr = page_address(con->msgr->zero_page);
                }
-               len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
-                         (int)(data_len - con->out_msg_pos.data_pos));
+               len = min_t(int, max_write - con->out_msg_pos.page_pos,
+                           total_max_write);
+
                if (crc && !con->out_msg_pos.did_page_crc) {
                        void *base = kaddr + con->out_msg_pos.page_pos;
                        u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
@@ -765,13 +837,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                                cpu_to_le32(crc32c(tmpcrc, base, len));
                        con->out_msg_pos.did_page_crc = 1;
                }
-
                ret = kernel_sendpage(con->sock, page,
-                                     con->out_msg_pos.page_pos, len,
+                                     con->out_msg_pos.page_pos + page_shift,
+                                     len,
                                      MSG_DONTWAIT | MSG_NOSIGNAL |
                                      MSG_MORE);
 
-               if (crc && (msg->pages || msg->pagelist))
+               if (crc &&
+                   (msg->pages || msg->pagelist || msg->bio || in_trail))
                        kunmap(page);
 
                if (ret <= 0)
@@ -783,9 +856,16 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                        con->out_msg_pos.page_pos = 0;
                        con->out_msg_pos.page++;
                        con->out_msg_pos.did_page_crc = 0;
-                       if (msg->pagelist)
+                       if (in_trail)
+                               list_move_tail(&page->lru,
+                                              &msg->trail->head);
+                       else if (msg->pagelist)
                                list_move_tail(&page->lru,
                                               &msg->pagelist->head);
+#ifdef CONFIG_BLOCK
+                       else if (msg->bio)
+                               iter_bio_next(&msg->bio_iter, &msg->bio_seg);
+#endif
                }
        }
 
@@ -938,7 +1018,7 @@ static int verify_hello(struct ceph_connection *con)
 {
        if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
                pr_err("connect to %s got bad banner\n",
-                      pr_addr(&con->peer_addr.in_addr));
+                      ceph_pr_addr(&con->peer_addr.in_addr));
                con->error_msg = "protocol error, bad banner";
                return -1;
        }
@@ -1041,7 +1121,7 @@ int ceph_parse_ips(const char *c, const char *end,
 
                addr_set_port(ss, port);
 
-               dout("parse_ips got %s\n", pr_addr(ss));
+               dout("parse_ips got %s\n", ceph_pr_addr(ss));
 
                if (p == end)
                        break;
@@ -1061,6 +1141,7 @@ bad:
        pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c);
        return -EINVAL;
 }
+EXPORT_SYMBOL(ceph_parse_ips);
 
 static int process_banner(struct ceph_connection *con)
 {
@@ -1082,9 +1163,9 @@ static int process_banner(struct ceph_connection *con)
            !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
              con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
                pr_warning("wrong peer, want %s/%d, got %s/%d\n",
-                          pr_addr(&con->peer_addr.in_addr),
+                          ceph_pr_addr(&con->peer_addr.in_addr),
                           (int)le32_to_cpu(con->peer_addr.nonce),
-                          pr_addr(&con->actual_peer_addr.in_addr),
+                          ceph_pr_addr(&con->actual_peer_addr.in_addr),
                           (int)le32_to_cpu(con->actual_peer_addr.nonce));
                con->error_msg = "wrong peer at address";
                return -1;
@@ -1102,7 +1183,7 @@ static int process_banner(struct ceph_connection *con)
                addr_set_port(&con->msgr->inst.addr.in_addr, port);
                encode_my_addr(con->msgr);
                dout("process_banner learned my addr is %s\n",
-                    pr_addr(&con->msgr->inst.addr.in_addr));
+                    ceph_pr_addr(&con->msgr->inst.addr.in_addr));
        }
 
        set_bit(NEGOTIATING, &con->state);
@@ -1123,8 +1204,8 @@ static void fail_protocol(struct ceph_connection *con)
 
 static int process_connect(struct ceph_connection *con)
 {
-       u64 sup_feat = CEPH_FEATURE_SUPPORTED;
-       u64 req_feat = CEPH_FEATURE_REQUIRED;
+       u64 sup_feat = con->msgr->supported_features;
+       u64 req_feat = con->msgr->required_features;
        u64 server_feat = le64_to_cpu(con->in_reply.features);
 
        dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -1134,7 +1215,7 @@ static int process_connect(struct ceph_connection *con)
                pr_err("%s%lld %s feature set mismatch,"
                       " my %llx < server's %llx, missing %llx\n",
                       ENTITY_NAME(con->peer_name),
-                      pr_addr(&con->peer_addr.in_addr),
+                      ceph_pr_addr(&con->peer_addr.in_addr),
                       sup_feat, server_feat, server_feat & ~sup_feat);
                con->error_msg = "missing required protocol features";
                fail_protocol(con);
@@ -1144,7 +1225,7 @@ static int process_connect(struct ceph_connection *con)
                pr_err("%s%lld %s protocol version mismatch,"
                       " my %d != server's %d\n",
                       ENTITY_NAME(con->peer_name),
-                      pr_addr(&con->peer_addr.in_addr),
+                      ceph_pr_addr(&con->peer_addr.in_addr),
                       le32_to_cpu(con->out_connect.protocol_version),
                       le32_to_cpu(con->in_reply.protocol_version));
                con->error_msg = "protocol version mismatch";
@@ -1178,7 +1259,7 @@ static int process_connect(struct ceph_connection *con)
                     le32_to_cpu(con->in_connect.connect_seq));
                pr_err("%s%lld %s connection reset\n",
                       ENTITY_NAME(con->peer_name),
-                      pr_addr(&con->peer_addr.in_addr));
+                      ceph_pr_addr(&con->peer_addr.in_addr));
                reset_connection(con);
                prepare_write_connect(con->msgr, con, 0);
                prepare_read_connect(con);
@@ -1223,7 +1304,7 @@ static int process_connect(struct ceph_connection *con)
                        pr_err("%s%lld %s protocol feature mismatch,"
                               " my required %llx > server's %llx, need %llx\n",
                               ENTITY_NAME(con->peer_name),
-                              pr_addr(&con->peer_addr.in_addr),
+                              ceph_pr_addr(&con->peer_addr.in_addr),
                               req_feat, server_feat, req_feat & ~server_feat);
                        con->error_msg = "missing required protocol features";
                        fail_protocol(con);
@@ -1305,8 +1386,7 @@ static int read_partial_message_section(struct ceph_connection *con,
                                        struct kvec *section,
                                        unsigned int sec_len, u32 *crc)
 {
-       int left;
-       int ret;
+       int ret, left;
 
        BUG_ON(!section);
 
@@ -1329,16 +1409,86 @@ static int read_partial_message_section(struct ceph_connection *con,
 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
                                struct ceph_msg_header *hdr,
                                int *skip);
+
+
+static int read_partial_message_pages(struct ceph_connection *con,
+                                     struct page **pages,
+                                     unsigned data_len, int datacrc)
+{
+       void *p;
+       int ret;
+       int left;
+
+       left = min((int)(data_len - con->in_msg_pos.data_pos),
+                  (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+       /* (page) data */
+       BUG_ON(pages == NULL);
+       p = kmap(pages[con->in_msg_pos.page]);
+       ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+                              left);
+       if (ret > 0 && datacrc)
+               con->in_data_crc =
+                       crc32c(con->in_data_crc,
+                                 p + con->in_msg_pos.page_pos, ret);
+       kunmap(pages[con->in_msg_pos.page]);
+       if (ret <= 0)
+               return ret;
+       con->in_msg_pos.data_pos += ret;
+       con->in_msg_pos.page_pos += ret;
+       if (con->in_msg_pos.page_pos == PAGE_SIZE) {
+               con->in_msg_pos.page_pos = 0;
+               con->in_msg_pos.page++;
+       }
+
+       return ret;
+}
+
+#ifdef CONFIG_BLOCK
+static int read_partial_message_bio(struct ceph_connection *con,
+                                   struct bio **bio_iter, int *bio_seg,
+                                   unsigned data_len, int datacrc)
+{
+       struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
+       void *p;
+       int ret, left;
+
+       if (IS_ERR(bv))
+               return PTR_ERR(bv);
+
+       left = min((int)(data_len - con->in_msg_pos.data_pos),
+                  (int)(bv->bv_len - con->in_msg_pos.page_pos));
+
+       p = kmap(bv->bv_page) + bv->bv_offset;
+
+       ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+                              left);
+       if (ret > 0 && datacrc)
+               con->in_data_crc =
+                       crc32c(con->in_data_crc,
+                                 p + con->in_msg_pos.page_pos, ret);
+       kunmap(bv->bv_page);
+       if (ret <= 0)
+               return ret;
+       con->in_msg_pos.data_pos += ret;
+       con->in_msg_pos.page_pos += ret;
+       if (con->in_msg_pos.page_pos == bv->bv_len) {
+               con->in_msg_pos.page_pos = 0;
+               iter_bio_next(bio_iter, bio_seg);
+       }
+
+       return ret;
+}
+#endif
+
 /*
  * read (part of) a message.
  */
 static int read_partial_message(struct ceph_connection *con)
 {
        struct ceph_msg *m = con->in_msg;
-       void *p;
        int ret;
        int to, left;
-       unsigned front_len, middle_len, data_len, data_off;
+       unsigned front_len, middle_len, data_len;
        int datacrc = con->msgr->nocrc;
        int skip;
        u64 seq;
@@ -1374,19 +1524,17 @@ static int read_partial_message(struct ceph_connection *con)
        data_len = le32_to_cpu(con->in_hdr.data_len);
        if (data_len > CEPH_MSG_MAX_DATA_LEN)
                return -EIO;
-       data_off = le16_to_cpu(con->in_hdr.data_off);
 
        /* verify seq# */
        seq = le64_to_cpu(con->in_hdr.seq);
        if ((s64)seq - (s64)con->in_seq < 1) {
-               pr_info("skipping %s%lld %s seq %lld, expected %lld\n",
+               pr_info("skipping %s%lld %s seq %lld expected %lld\n",
                        ENTITY_NAME(con->peer_name),
-                       pr_addr(&con->peer_addr.in_addr),
+                       ceph_pr_addr(&con->peer_addr.in_addr),
                        seq, con->in_seq + 1);
                con->in_base_pos = -front_len - middle_len - data_len -
                        sizeof(m->footer);
                con->in_tag = CEPH_MSGR_TAG_READY;
-               con->in_seq++;
                return 0;
        } else if ((s64)seq - (s64)con->in_seq > 1) {
                pr_err("read_partial_message bad seq %lld expected %lld\n",
@@ -1422,7 +1570,10 @@ static int read_partial_message(struct ceph_connection *con)
                        m->middle->vec.iov_len = 0;
 
                con->in_msg_pos.page = 0;
-               con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+               if (m->pages)
+                       con->in_msg_pos.page_pos = m->page_alignment;
+               else
+                       con->in_msg_pos.page_pos = 0;
                con->in_msg_pos.data_pos = 0;
        }
 
@@ -1440,27 +1591,29 @@ static int read_partial_message(struct ceph_connection *con)
                if (ret <= 0)
                        return ret;
        }
+#ifdef CONFIG_BLOCK
+       if (m->bio && !m->bio_iter)
+               init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
+#endif
 
        /* (page) data */
        while (con->in_msg_pos.data_pos < data_len) {
-               left = min((int)(data_len - con->in_msg_pos.data_pos),
-                          (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
-               BUG_ON(m->pages == NULL);
-               p = kmap(m->pages[con->in_msg_pos.page]);
-               ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
-                                      left);
-               if (ret > 0 && datacrc)
-                       con->in_data_crc =
-                               crc32c(con->in_data_crc,
-                                         p + con->in_msg_pos.page_pos, ret);
-               kunmap(m->pages[con->in_msg_pos.page]);
-               if (ret <= 0)
-                       return ret;
-               con->in_msg_pos.data_pos += ret;
-               con->in_msg_pos.page_pos += ret;
-               if (con->in_msg_pos.page_pos == PAGE_SIZE) {
-                       con->in_msg_pos.page_pos = 0;
-                       con->in_msg_pos.page++;
+               if (m->pages) {
+                       ret = read_partial_message_pages(con, m->pages,
+                                                data_len, datacrc);
+                       if (ret <= 0)
+                               return ret;
+#ifdef CONFIG_BLOCK
+               } else if (m->bio) {
+
+                       ret = read_partial_message_bio(con,
+                                                &m->bio_iter, &m->bio_seg,
+                                                data_len, datacrc);
+                       if (ret <= 0)
+                               return ret;
+#endif
+               } else {
+                       BUG_ON(1);
                }
        }
 
@@ -1874,9 +2027,9 @@ out:
 static void ceph_fault(struct ceph_connection *con)
 {
        pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
-              pr_addr(&con->peer_addr.in_addr), con->error_msg);
+              ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
        dout("fault %p state %lu to peer %s\n",
-            con, con->state, pr_addr(&con->peer_addr.in_addr));
+            con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
 
        if (test_bit(LOSSYTX, &con->state)) {
                dout("fault on LOSSYTX channel\n");
@@ -1936,7 +2089,9 @@ out:
 /*
  * create a new messenger instance
  */
-struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
+struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
+                                            u32 supported_features,
+                                            u32 required_features)
 {
        struct ceph_messenger *msgr;
 
@@ -1944,6 +2099,9 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
        if (msgr == NULL)
                return ERR_PTR(-ENOMEM);
 
+       msgr->supported_features = supported_features;
+       msgr->required_features = required_features;
+
        spin_lock_init(&msgr->global_seq_lock);
 
        /* the zero page is needed if a request is "canceled" while the message
@@ -1966,6 +2124,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
        dout("messenger_create %p\n", msgr);
        return msgr;
 }
+EXPORT_SYMBOL(ceph_messenger_create);
 
 void ceph_messenger_destroy(struct ceph_messenger *msgr)
 {
@@ -1975,6 +2134,7 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
        kfree(msgr);
        dout("destroyed messenger %p\n", msgr);
 }
+EXPORT_SYMBOL(ceph_messenger_destroy);
 
 /*
  * Queue up an outgoing message on the given connection.
@@ -2011,6 +2171,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
        if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
                queue_con(con);
 }
+EXPORT_SYMBOL(ceph_con_send);
 
 /*
  * Revoke a message that was previously queued for send
@@ -2076,6 +2237,7 @@ void ceph_con_keepalive(struct ceph_connection *con)
            test_and_set_bit(WRITE_PENDING, &con->state) == 0)
                queue_con(con);
 }
+EXPORT_SYMBOL(ceph_con_keepalive);
 
 
 /*
@@ -2134,8 +2296,13 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
 
        /* data */
        m->nr_pages = 0;
+       m->page_alignment = 0;
        m->pages = NULL;
        m->pagelist = NULL;
+       m->bio = NULL;
+       m->bio_iter = NULL;
+       m->bio_seg = 0;
+       m->trail = NULL;
 
        dout("ceph_msg_new %p front %d\n", m, front_len);
        return m;
@@ -2146,6 +2313,7 @@ out:
        pr_err("msg_new can't create type %d front %d\n", type, front_len);
        return NULL;
 }
+EXPORT_SYMBOL(ceph_msg_new);
 
 /*
  * Allocate "middle" portion of a message, if it is needed and wasn't
@@ -2198,6 +2366,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
                               type, front_len);
                        return NULL;
                }
+               msg->page_alignment = le16_to_cpu(hdr->data_off);
        }
        memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
 
@@ -2250,11 +2419,14 @@ void ceph_msg_last_put(struct kref *kref)
                m->pagelist = NULL;
        }
 
+       m->trail = NULL;
+
        if (m->pool)
                ceph_msgpool_put(m->pool, m);
        else
                ceph_msg_kfree(m);
 }
+EXPORT_SYMBOL(ceph_msg_last_put);
 
 void ceph_msg_dump(struct ceph_msg *msg)
 {
@@ -2275,3 +2447,4 @@ void ceph_msg_dump(struct ceph_msg *msg)
                       DUMP_PREFIX_OFFSET, 16, 1,
                       &msg->footer, sizeof(msg->footer), true);
 }
+EXPORT_SYMBOL(ceph_msg_dump);