]> git.karo-electronics.de Git - karo-tx-linux.git/blob - fs/ceph/messenger.c
ceph: negotiate authentication protocol; implement AUTH_NONE protocol
[karo-tx-linux.git] / fs / ceph / messenger.c
1 #include "ceph_debug.h"
2
3 #include <linux/crc32c.h>
4 #include <linux/ctype.h>
5 #include <linux/highmem.h>
6 #include <linux/inet.h>
7 #include <linux/kthread.h>
8 #include <linux/net.h>
9 #include <linux/socket.h>
10 #include <linux/string.h>
11 #include <net/tcp.h>
12
13 #include "super.h"
14 #include "messenger.h"
15 #include "decode.h"
16
17 /*
18  * Ceph uses the messenger to exchange ceph_msg messages with other
19  * hosts in the system.  The messenger provides ordered and reliable
20  * delivery.  We tolerate TCP disconnects by reconnecting (with
21  * exponential backoff) in the case of a fault (disconnection, bad
22  * crc, protocol error).  Acks allow sent messages to be discarded by
23  * the sender.
24  */
25
26 /* static tag bytes (protocol control messages) */
27 static char tag_msg = CEPH_MSGR_TAG_MSG;
28 static char tag_ack = CEPH_MSGR_TAG_ACK;
29 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
30
31
32 static void queue_con(struct ceph_connection *con);
33 static void con_work(struct work_struct *);
34 static void ceph_fault(struct ceph_connection *con);
35
36 const char *ceph_name_type_str(int t)
37 {
38         switch (t) {
39         case CEPH_ENTITY_TYPE_MON: return "mon";
40         case CEPH_ENTITY_TYPE_MDS: return "mds";
41         case CEPH_ENTITY_TYPE_OSD: return "osd";
42         case CEPH_ENTITY_TYPE_CLIENT: return "client";
43         case CEPH_ENTITY_TYPE_ADMIN: return "admin";
44         default: return "???";
45         }
46 }
47
48 /*
49  * nicely render a sockaddr as a string.
50  */
51 #define MAX_ADDR_STR 20
52 static char addr_str[MAX_ADDR_STR][40];
53 static DEFINE_SPINLOCK(addr_str_lock);
54 static int last_addr_str;
55
56 const char *pr_addr(const struct sockaddr_storage *ss)
57 {
58         int i;
59         char *s;
60         struct sockaddr_in *in4 = (void *)ss;
61         unsigned char *quad = (void *)&in4->sin_addr.s_addr;
62         struct sockaddr_in6 *in6 = (void *)ss;
63
64         spin_lock(&addr_str_lock);
65         i = last_addr_str++;
66         if (last_addr_str == MAX_ADDR_STR)
67                 last_addr_str = 0;
68         spin_unlock(&addr_str_lock);
69         s = addr_str[i];
70
71         switch (ss->ss_family) {
72         case AF_INET:
73                 sprintf(s, "%u.%u.%u.%u:%u",
74                         (unsigned int)quad[0],
75                         (unsigned int)quad[1],
76                         (unsigned int)quad[2],
77                         (unsigned int)quad[3],
78                         (unsigned int)ntohs(in4->sin_port));
79                 break;
80
81         case AF_INET6:
82                 sprintf(s, "%04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%u",
83                         in6->sin6_addr.s6_addr16[0],
84                         in6->sin6_addr.s6_addr16[1],
85                         in6->sin6_addr.s6_addr16[2],
86                         in6->sin6_addr.s6_addr16[3],
87                         in6->sin6_addr.s6_addr16[4],
88                         in6->sin6_addr.s6_addr16[5],
89                         in6->sin6_addr.s6_addr16[6],
90                         in6->sin6_addr.s6_addr16[7],
91                         (unsigned int)ntohs(in6->sin6_port));
92                 break;
93
94         default:
95                 sprintf(s, "(unknown sockaddr family %d)", (int)ss->ss_family);
96         }
97
98         return s;
99 }
100
101 static void encode_my_addr(struct ceph_messenger *msgr)
102 {
103         memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr));
104         ceph_encode_addr(&msgr->my_enc_addr);
105 }
106
107 /*
108  * work queue for all reading and writing to/from the socket.
109  */
110 struct workqueue_struct *ceph_msgr_wq;
111
112 int __init ceph_msgr_init(void)
113 {
114         ceph_msgr_wq = create_workqueue("ceph-msgr");
115         if (IS_ERR(ceph_msgr_wq)) {
116                 int ret = PTR_ERR(ceph_msgr_wq);
117                 pr_err("msgr_init failed to create workqueue: %d\n", ret);
118                 ceph_msgr_wq = NULL;
119                 return ret;
120         }
121         return 0;
122 }
123
124 void ceph_msgr_exit(void)
125 {
126         destroy_workqueue(ceph_msgr_wq);
127 }
128
129 /*
130  * socket callback functions
131  */
132
133 /* data available on socket, or listen socket received a connect */
134 static void ceph_data_ready(struct sock *sk, int count_unused)
135 {
136         struct ceph_connection *con =
137                 (struct ceph_connection *)sk->sk_user_data;
138         if (sk->sk_state != TCP_CLOSE_WAIT) {
139                 dout("ceph_data_ready on %p state = %lu, queueing work\n",
140                      con, con->state);
141                 queue_con(con);
142         }
143 }
144
145 /* socket has buffer space for writing */
146 static void ceph_write_space(struct sock *sk)
147 {
148         struct ceph_connection *con =
149                 (struct ceph_connection *)sk->sk_user_data;
150
151         /* only queue to workqueue if there is data we want to write. */
152         if (test_bit(WRITE_PENDING, &con->state)) {
153                 dout("ceph_write_space %p queueing write work\n", con);
154                 queue_con(con);
155         } else {
156                 dout("ceph_write_space %p nothing to write\n", con);
157         }
158
159         /* since we have our own write_space, clear the SOCK_NOSPACE flag */
160         clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
161 }
162
163 /* socket's state has changed */
164 static void ceph_state_change(struct sock *sk)
165 {
166         struct ceph_connection *con =
167                 (struct ceph_connection *)sk->sk_user_data;
168
169         dout("ceph_state_change %p state = %lu sk_state = %u\n",
170              con, con->state, sk->sk_state);
171
172         if (test_bit(CLOSED, &con->state))
173                 return;
174
175         switch (sk->sk_state) {
176         case TCP_CLOSE:
177                 dout("ceph_state_change TCP_CLOSE\n");
178         case TCP_CLOSE_WAIT:
179                 dout("ceph_state_change TCP_CLOSE_WAIT\n");
180                 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) {
181                         if (test_bit(CONNECTING, &con->state))
182                                 con->error_msg = "connection failed";
183                         else
184                                 con->error_msg = "socket closed";
185                         queue_con(con);
186                 }
187                 break;
188         case TCP_ESTABLISHED:
189                 dout("ceph_state_change TCP_ESTABLISHED\n");
190                 queue_con(con);
191                 break;
192         }
193 }
194
195 /*
196  * set up socket callbacks
197  */
198 static void set_sock_callbacks(struct socket *sock,
199                                struct ceph_connection *con)
200 {
201         struct sock *sk = sock->sk;
202         sk->sk_user_data = (void *)con;
203         sk->sk_data_ready = ceph_data_ready;
204         sk->sk_write_space = ceph_write_space;
205         sk->sk_state_change = ceph_state_change;
206 }
207
208
209 /*
210  * socket helpers
211  */
212
213 /*
214  * initiate connection to a remote socket.
215  */
216 static struct socket *ceph_tcp_connect(struct ceph_connection *con)
217 {
218         struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.in_addr;
219         struct socket *sock;
220         int ret;
221
222         BUG_ON(con->sock);
223         ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
224         if (ret)
225                 return ERR_PTR(ret);
226         con->sock = sock;
227         sock->sk->sk_allocation = GFP_NOFS;
228
229         set_sock_callbacks(sock, con);
230
231         dout("connect %s\n", pr_addr(&con->peer_addr.in_addr));
232
233         ret = sock->ops->connect(sock, paddr, sizeof(*paddr), O_NONBLOCK);
234         if (ret == -EINPROGRESS) {
235                 dout("connect %s EINPROGRESS sk_state = %u\n",
236                      pr_addr(&con->peer_addr.in_addr),
237                      sock->sk->sk_state);
238                 ret = 0;
239         }
240         if (ret < 0) {
241                 pr_err("connect %s error %d\n",
242                        pr_addr(&con->peer_addr.in_addr), ret);
243                 sock_release(sock);
244                 con->sock = NULL;
245                 con->error_msg = "connect error";
246         }
247
248         if (ret < 0)
249                 return ERR_PTR(ret);
250         return sock;
251 }
252
253 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
254 {
255         struct kvec iov = {buf, len};
256         struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
257
258         return kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
259 }
260
261 /*
262  * write something.  @more is true if caller will be sending more data
263  * shortly.
264  */
265 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
266                      size_t kvlen, size_t len, int more)
267 {
268         struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
269
270         if (more)
271                 msg.msg_flags |= MSG_MORE;
272         else
273                 msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
274
275         return kernel_sendmsg(sock, &msg, iov, kvlen, len);
276 }
277
278
279 /*
280  * Shutdown/close the socket for the given connection.
281  */
282 static int con_close_socket(struct ceph_connection *con)
283 {
284         int rc;
285
286         dout("con_close_socket on %p sock %p\n", con, con->sock);
287         if (!con->sock)
288                 return 0;
289         set_bit(SOCK_CLOSED, &con->state);
290         rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
291         sock_release(con->sock);
292         con->sock = NULL;
293         clear_bit(SOCK_CLOSED, &con->state);
294         return rc;
295 }
296
297 /*
298  * Reset a connection.  Discard all incoming and outgoing messages
299  * and clear *_seq state.
300  */
301 static void ceph_msg_remove(struct ceph_msg *msg)
302 {
303         list_del_init(&msg->list_head);
304         ceph_msg_put(msg);
305 }
306 static void ceph_msg_remove_list(struct list_head *head)
307 {
308         while (!list_empty(head)) {
309                 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
310                                                         list_head);
311                 ceph_msg_remove(msg);
312         }
313 }
314
315 static void reset_connection(struct ceph_connection *con)
316 {
317         /* reset connection, out_queue, msg_ and connect_seq */
318         /* discard existing out_queue and msg_seq */
319         mutex_lock(&con->out_mutex);
320         ceph_msg_remove_list(&con->out_queue);
321         ceph_msg_remove_list(&con->out_sent);
322
323         con->connect_seq = 0;
324         con->out_seq = 0;
325         con->out_msg = NULL;
326         con->in_seq = 0;
327         mutex_unlock(&con->out_mutex);
328 }
329
330 /*
331  * mark a peer down.  drop any open connections.
332  */
333 void ceph_con_close(struct ceph_connection *con)
334 {
335         dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr));
336         set_bit(CLOSED, &con->state);  /* in case there's queued work */
337         clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
338         reset_connection(con);
339         queue_con(con);
340 }
341
342 /*
343  * Reopen a closed connection, with a new peer address.
344  */
345 void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr)
346 {
347         dout("con_open %p %s\n", con, pr_addr(&addr->in_addr));
348         set_bit(OPENING, &con->state);
349         clear_bit(CLOSED, &con->state);
350         memcpy(&con->peer_addr, addr, sizeof(*addr));
351         queue_con(con);
352 }
353
354 /*
355  * generic get/put
356  */
357 struct ceph_connection *ceph_con_get(struct ceph_connection *con)
358 {
359         dout("con_get %p nref = %d -> %d\n", con,
360              atomic_read(&con->nref), atomic_read(&con->nref) + 1);
361         if (atomic_inc_not_zero(&con->nref))
362                 return con;
363         return NULL;
364 }
365
366 void ceph_con_put(struct ceph_connection *con)
367 {
368         dout("con_put %p nref = %d -> %d\n", con,
369              atomic_read(&con->nref), atomic_read(&con->nref) - 1);
370         BUG_ON(atomic_read(&con->nref) == 0);
371         if (atomic_dec_and_test(&con->nref)) {
372                 BUG_ON(con->sock);
373                 kfree(con);
374         }
375 }
376
377 /*
378  * initialize a new connection.
379  */
380 void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
381 {
382         dout("con_init %p\n", con);
383         memset(con, 0, sizeof(*con));
384         atomic_set(&con->nref, 1);
385         con->msgr = msgr;
386         mutex_init(&con->out_mutex);
387         INIT_LIST_HEAD(&con->out_queue);
388         INIT_LIST_HEAD(&con->out_sent);
389         INIT_DELAYED_WORK(&con->work, con_work);
390 }
391
392
393 /*
394  * We maintain a global counter to order connection attempts.  Get
395  * a unique seq greater than @gt.
396  */
397 static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
398 {
399         u32 ret;
400
401         spin_lock(&msgr->global_seq_lock);
402         if (msgr->global_seq < gt)
403                 msgr->global_seq = gt;
404         ret = ++msgr->global_seq;
405         spin_unlock(&msgr->global_seq_lock);
406         return ret;
407 }
408
409
410 /*
411  * Prepare footer for currently outgoing message, and finish things
412  * off.  Assumes out_kvec* are already valid.. we just add on to the end.
413  */
414 static void prepare_write_message_footer(struct ceph_connection *con, int v)
415 {
416         struct ceph_msg *m = con->out_msg;
417
418         dout("prepare_write_message_footer %p\n", con);
419         con->out_kvec_is_msg = true;
420         con->out_kvec[v].iov_base = &m->footer;
421         con->out_kvec[v].iov_len = sizeof(m->footer);
422         con->out_kvec_bytes += sizeof(m->footer);
423         con->out_kvec_left++;
424         con->out_more = m->more_to_follow;
425         con->out_msg = NULL;   /* we're done with this one */
426 }
427
428 /*
429  * Prepare headers for the next outgoing message.
430  */
431 static void prepare_write_message(struct ceph_connection *con)
432 {
433         struct ceph_msg *m;
434         int v = 0;
435
436         con->out_kvec_bytes = 0;
437         con->out_kvec_is_msg = true;
438
439         /* Sneak an ack in there first?  If we can get it into the same
440          * TCP packet that's a good thing. */
441         if (con->in_seq > con->in_seq_acked) {
442                 con->in_seq_acked = con->in_seq;
443                 con->out_kvec[v].iov_base = &tag_ack;
444                 con->out_kvec[v++].iov_len = 1;
445                 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
446                 con->out_kvec[v].iov_base = &con->out_temp_ack;
447                 con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack);
448                 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
449         }
450
451         /* move message to sending/sent list */
452         m = list_first_entry(&con->out_queue,
453                        struct ceph_msg, list_head);
454         list_move_tail(&m->list_head, &con->out_sent);
455         con->out_msg = m;   /* we don't bother taking a reference here. */
456
457         m->hdr.seq = cpu_to_le64(++con->out_seq);
458
459         dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
460              m, con->out_seq, le16_to_cpu(m->hdr.type),
461              le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
462              le32_to_cpu(m->hdr.data_len),
463              m->nr_pages);
464         BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
465
466         /* tag + hdr + front + middle */
467         con->out_kvec[v].iov_base = &tag_msg;
468         con->out_kvec[v++].iov_len = 1;
469         con->out_kvec[v].iov_base = &m->hdr;
470         con->out_kvec[v++].iov_len = sizeof(m->hdr);
471         con->out_kvec[v++] = m->front;
472         if (m->middle)
473                 con->out_kvec[v++] = m->middle->vec;
474         con->out_kvec_left = v;
475         con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
476                 (m->middle ? m->middle->vec.iov_len : 0);
477         con->out_kvec_cur = con->out_kvec;
478
479         /* fill in crc (except data pages), footer */
480         con->out_msg->hdr.crc =
481                 cpu_to_le32(crc32c(0, (void *)&m->hdr,
482                                       sizeof(m->hdr) - sizeof(m->hdr.crc)));
483         con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
484         con->out_msg->footer.front_crc =
485                 cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
486         if (m->middle)
487                 con->out_msg->footer.middle_crc =
488                         cpu_to_le32(crc32c(0, m->middle->vec.iov_base,
489                                            m->middle->vec.iov_len));
490         else
491                 con->out_msg->footer.middle_crc = 0;
492         con->out_msg->footer.data_crc = 0;
493         dout("prepare_write_message front_crc %u data_crc %u\n",
494              le32_to_cpu(con->out_msg->footer.front_crc),
495              le32_to_cpu(con->out_msg->footer.middle_crc));
496
497         /* is there a data payload? */
498         if (le32_to_cpu(m->hdr.data_len) > 0) {
499                 /* initialize page iterator */
500                 con->out_msg_pos.page = 0;
501                 con->out_msg_pos.page_pos =
502                         le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
503                 con->out_msg_pos.data_pos = 0;
504                 con->out_msg_pos.did_page_crc = 0;
505                 con->out_more = 1;  /* data + footer will follow */
506         } else {
507                 /* no, queue up footer too and be done */
508                 prepare_write_message_footer(con, v);
509         }
510
511         set_bit(WRITE_PENDING, &con->state);
512 }
513
514 /*
515  * Prepare an ack.
516  */
517 static void prepare_write_ack(struct ceph_connection *con)
518 {
519         dout("prepare_write_ack %p %llu -> %llu\n", con,
520              con->in_seq_acked, con->in_seq);
521         con->in_seq_acked = con->in_seq;
522
523         con->out_kvec[0].iov_base = &tag_ack;
524         con->out_kvec[0].iov_len = 1;
525         con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
526         con->out_kvec[1].iov_base = &con->out_temp_ack;
527         con->out_kvec[1].iov_len = sizeof(con->out_temp_ack);
528         con->out_kvec_left = 2;
529         con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
530         con->out_kvec_cur = con->out_kvec;
531         con->out_more = 1;  /* more will follow.. eventually.. */
532         set_bit(WRITE_PENDING, &con->state);
533 }
534
535 /*
536  * Prepare to write keepalive byte.
537  */
538 static void prepare_write_keepalive(struct ceph_connection *con)
539 {
540         dout("prepare_write_keepalive %p\n", con);
541         con->out_kvec[0].iov_base = &tag_keepalive;
542         con->out_kvec[0].iov_len = 1;
543         con->out_kvec_left = 1;
544         con->out_kvec_bytes = 1;
545         con->out_kvec_cur = con->out_kvec;
546         set_bit(WRITE_PENDING, &con->state);
547 }
548
549 /*
550  * Connection negotiation.
551  */
552
553 static void prepare_connect_authorizer(struct ceph_connection *con)
554 {
555         void *auth_buf;
556         int auth_len = 0;
557         int auth_protocol = 0;
558
559         if (con->ops->get_authorizer)
560                 con->ops->get_authorizer(con, &auth_buf, &auth_len,
561                                          &auth_protocol, &con->auth_reply_buf,
562                                          &con->auth_reply_buf_len,
563                                          con->auth_retry);
564
565         con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
566         con->out_connect.authorizer_len = cpu_to_le32(auth_len);
567
568         con->out_kvec[con->out_kvec_left].iov_base = auth_buf;
569         con->out_kvec[con->out_kvec_left].iov_len = auth_len;
570         con->out_kvec_left++;
571         con->out_kvec_bytes += auth_len;
572 }
573
574 /*
575  * We connected to a peer and are saying hello.
576  */
577 static void prepare_write_banner(struct ceph_messenger *msgr,
578                                  struct ceph_connection *con)
579 {
580         int len = strlen(CEPH_BANNER);
581
582         con->out_kvec[0].iov_base = CEPH_BANNER;
583         con->out_kvec[0].iov_len = len;
584         con->out_kvec[1].iov_base = &msgr->my_enc_addr;
585         con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
586         con->out_kvec_left = 2;
587         con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
588         con->out_kvec_cur = con->out_kvec;
589         con->out_more = 0;
590         set_bit(WRITE_PENDING, &con->state);
591 }
592
593 static void prepare_write_connect(struct ceph_messenger *msgr,
594                                   struct ceph_connection *con,
595                                   int after_banner)
596 {
597         unsigned global_seq = get_global_seq(con->msgr, 0);
598         int proto;
599
600         switch (con->peer_name.type) {
601         case CEPH_ENTITY_TYPE_MON:
602                 proto = CEPH_MONC_PROTOCOL;
603                 break;
604         case CEPH_ENTITY_TYPE_OSD:
605                 proto = CEPH_OSDC_PROTOCOL;
606                 break;
607         case CEPH_ENTITY_TYPE_MDS:
608                 proto = CEPH_MDSC_PROTOCOL;
609                 break;
610         default:
611                 BUG();
612         }
613
614         dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
615              con->connect_seq, global_seq, proto);
616
617         con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
618         con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
619         con->out_connect.global_seq = cpu_to_le32(global_seq);
620         con->out_connect.protocol_version = cpu_to_le32(proto);
621         con->out_connect.flags = 0;
622         if (test_bit(LOSSYTX, &con->state))
623                 con->out_connect.flags = CEPH_MSG_CONNECT_LOSSY;
624
625         if (!after_banner) {
626                 con->out_kvec_left = 0;
627                 con->out_kvec_bytes = 0;
628         }
629         con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect;
630         con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect);
631         con->out_kvec_left++;
632         con->out_kvec_bytes += sizeof(con->out_connect);
633         con->out_kvec_cur = con->out_kvec;
634         con->out_more = 0;
635         set_bit(WRITE_PENDING, &con->state);
636
637         prepare_connect_authorizer(con);
638 }
639
640
641 /*
642  * write as much of pending kvecs to the socket as we can.
643  *  1 -> done
644  *  0 -> socket full, but more to do
645  * <0 -> error
646  */
647 static int write_partial_kvec(struct ceph_connection *con)
648 {
649         int ret;
650
651         dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
652         while (con->out_kvec_bytes > 0) {
653                 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
654                                        con->out_kvec_left, con->out_kvec_bytes,
655                                        con->out_more);
656                 if (ret <= 0)
657                         goto out;
658                 con->out_kvec_bytes -= ret;
659                 if (con->out_kvec_bytes == 0)
660                         break;            /* done */
661                 while (ret > 0) {
662                         if (ret >= con->out_kvec_cur->iov_len) {
663                                 ret -= con->out_kvec_cur->iov_len;
664                                 con->out_kvec_cur++;
665                                 con->out_kvec_left--;
666                         } else {
667                                 con->out_kvec_cur->iov_len -= ret;
668                                 con->out_kvec_cur->iov_base += ret;
669                                 ret = 0;
670                                 break;
671                         }
672                 }
673         }
674         con->out_kvec_left = 0;
675         con->out_kvec_is_msg = false;
676         ret = 1;
677 out:
678         dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
679              con->out_kvec_bytes, con->out_kvec_left, ret);
680         return ret;  /* done! */
681 }
682
683 /*
684  * Write as much message data payload as we can.  If we finish, queue
685  * up the footer.
686  *  1 -> done, footer is now queued in out_kvec[].
687  *  0 -> socket full, but more to do
688  * <0 -> error
689  */
690 static int write_partial_msg_pages(struct ceph_connection *con)
691 {
692         struct ceph_msg *msg = con->out_msg;
693         unsigned data_len = le32_to_cpu(msg->hdr.data_len);
694         size_t len;
695         int crc = con->msgr->nocrc;
696         int ret;
697
698         dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
699              con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
700              con->out_msg_pos.page_pos);
701
702         while (con->out_msg_pos.page < con->out_msg->nr_pages) {
703                 struct page *page = NULL;
704                 void *kaddr = NULL;
705
706                 /*
707                  * if we are calculating the data crc (the default), we need
708                  * to map the page.  if our pages[] has been revoked, use the
709                  * zero page.
710                  */
711                 if (msg->pages) {
712                         page = msg->pages[con->out_msg_pos.page];
713                         if (crc)
714                                 kaddr = kmap(page);
715                 } else {
716                         page = con->msgr->zero_page;
717                         if (crc)
718                                 kaddr = page_address(con->msgr->zero_page);
719                 }
720                 len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
721                           (int)(data_len - con->out_msg_pos.data_pos));
722                 if (crc && !con->out_msg_pos.did_page_crc) {
723                         void *base = kaddr + con->out_msg_pos.page_pos;
724                         u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
725
726                         BUG_ON(kaddr == NULL);
727                         con->out_msg->footer.data_crc =
728                                 cpu_to_le32(crc32c(tmpcrc, base, len));
729                         con->out_msg_pos.did_page_crc = 1;
730                 }
731
732                 ret = kernel_sendpage(con->sock, page,
733                                       con->out_msg_pos.page_pos, len,
734                                       MSG_DONTWAIT | MSG_NOSIGNAL |
735                                       MSG_MORE);
736
737                 if (crc && msg->pages)
738                         kunmap(page);
739
740                 if (ret <= 0)
741                         goto out;
742
743                 con->out_msg_pos.data_pos += ret;
744                 con->out_msg_pos.page_pos += ret;
745                 if (ret == len) {
746                         con->out_msg_pos.page_pos = 0;
747                         con->out_msg_pos.page++;
748                         con->out_msg_pos.did_page_crc = 0;
749                 }
750         }
751
752         dout("write_partial_msg_pages %p msg %p done\n", con, msg);
753
754         /* prepare and queue up footer, too */
755         if (!crc)
756                 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
757         con->out_kvec_bytes = 0;
758         con->out_kvec_left = 0;
759         con->out_kvec_cur = con->out_kvec;
760         prepare_write_message_footer(con, 0);
761         ret = 1;
762 out:
763         return ret;
764 }
765
766 /*
767  * write some zeros
768  */
769 static int write_partial_skip(struct ceph_connection *con)
770 {
771         int ret;
772
773         while (con->out_skip > 0) {
774                 struct kvec iov = {
775                         .iov_base = page_address(con->msgr->zero_page),
776                         .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
777                 };
778
779                 ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1);
780                 if (ret <= 0)
781                         goto out;
782                 con->out_skip -= ret;
783         }
784         ret = 1;
785 out:
786         return ret;
787 }
788
789 /*
790  * Prepare to read connection handshake, or an ack.
791  */
792 static void prepare_read_banner(struct ceph_connection *con)
793 {
794         dout("prepare_read_banner %p\n", con);
795         con->in_base_pos = 0;
796 }
797
798 static void prepare_read_connect(struct ceph_connection *con)
799 {
800         dout("prepare_read_connect %p\n", con);
801         con->in_base_pos = 0;
802 }
803
804 static void prepare_read_connect_retry(struct ceph_connection *con)
805 {
806         dout("prepare_read_connect_retry %p\n", con);
807         con->in_base_pos = strlen(CEPH_BANNER) + sizeof(con->actual_peer_addr)
808                 + sizeof(con->peer_addr_for_me);
809 }
810
811 static void prepare_read_ack(struct ceph_connection *con)
812 {
813         dout("prepare_read_ack %p\n", con);
814         con->in_base_pos = 0;
815 }
816
817 static void prepare_read_tag(struct ceph_connection *con)
818 {
819         dout("prepare_read_tag %p\n", con);
820         con->in_base_pos = 0;
821         con->in_tag = CEPH_MSGR_TAG_READY;
822 }
823
824 /*
825  * Prepare to read a message.
826  */
827 static int prepare_read_message(struct ceph_connection *con)
828 {
829         dout("prepare_read_message %p\n", con);
830         BUG_ON(con->in_msg != NULL);
831         con->in_base_pos = 0;
832         con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
833         return 0;
834 }
835
836
837 static int read_partial(struct ceph_connection *con,
838                         int *to, int size, void *object)
839 {
840         *to += size;
841         while (con->in_base_pos < *to) {
842                 int left = *to - con->in_base_pos;
843                 int have = size - left;
844                 int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
845                 if (ret <= 0)
846                         return ret;
847                 con->in_base_pos += ret;
848         }
849         return 1;
850 }
851
852
853 /*
854  * Read all or part of the connect-side handshake on a new connection
855  */
856 static int read_partial_banner(struct ceph_connection *con)
857 {
858         int ret, to = 0;
859
860         dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
861
862         /* peer's banner */
863         ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
864         if (ret <= 0)
865                 goto out;
866         ret = read_partial(con, &to, sizeof(con->actual_peer_addr),
867                            &con->actual_peer_addr);
868         if (ret <= 0)
869                 goto out;
870         ret = read_partial(con, &to, sizeof(con->peer_addr_for_me),
871                            &con->peer_addr_for_me);
872         if (ret <= 0)
873                 goto out;
874 out:
875         return ret;
876 }
877
878 static int read_partial_connect(struct ceph_connection *con)
879 {
880         int ret, to = 0;
881
882         dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
883
884         ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
885         if (ret <= 0)
886                 goto out;
887         ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len),
888                            con->auth_reply_buf);
889         if (ret <= 0)
890                 goto out;
891
892         dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
893              con, (int)con->in_reply.tag,
894              le32_to_cpu(con->in_reply.connect_seq),
895              le32_to_cpu(con->in_reply.global_seq));
896 out:
897         return ret;
898
899 }
900
901 /*
902  * Verify the hello banner looks okay.
903  */
904 static int verify_hello(struct ceph_connection *con)
905 {
906         if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
907                 pr_err("connect to %s got bad banner\n",
908                        pr_addr(&con->peer_addr.in_addr));
909                 con->error_msg = "protocol error, bad banner";
910                 return -1;
911         }
912         return 0;
913 }
914
915 static bool addr_is_blank(struct sockaddr_storage *ss)
916 {
917         switch (ss->ss_family) {
918         case AF_INET:
919                 return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0;
920         case AF_INET6:
921                 return
922                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 &&
923                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 &&
924                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 &&
925                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0;
926         }
927         return false;
928 }
929
930 static int addr_port(struct sockaddr_storage *ss)
931 {
932         switch (ss->ss_family) {
933         case AF_INET:
934                 return ntohs(((struct sockaddr_in *)ss)->sin_port);
935         case AF_INET6:
936                 return ntohs(((struct sockaddr_in6 *)ss)->sin6_port);
937         }
938         return 0;
939 }
940
941 static void addr_set_port(struct sockaddr_storage *ss, int p)
942 {
943         switch (ss->ss_family) {
944         case AF_INET:
945                 ((struct sockaddr_in *)ss)->sin_port = htons(p);
946         case AF_INET6:
947                 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p);
948         }
949 }
950
951 /*
952  * Parse an ip[:port] list into an addr array.  Use the default
953  * monitor port if a port isn't specified.
954  */
955 int ceph_parse_ips(const char *c, const char *end,
956                    struct ceph_entity_addr *addr,
957                    int max_count, int *count)
958 {
959         int i;
960         const char *p = c;
961
962         dout("parse_ips on '%.*s'\n", (int)(end-c), c);
963         for (i = 0; i < max_count; i++) {
964                 const char *ipend;
965                 struct sockaddr_storage *ss = &addr[i].in_addr;
966                 struct sockaddr_in *in4 = (void *)ss;
967                 struct sockaddr_in6 *in6 = (void *)ss;
968                 int port;
969
970                 memset(ss, 0, sizeof(*ss));
971                 if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr,
972                              ',', &ipend)) {
973                         ss->ss_family = AF_INET;
974                 } else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr,
975                                     ',', &ipend)) {
976                         ss->ss_family = AF_INET6;
977                 } else {
978                         goto bad;
979                 }
980                 p = ipend;
981
982                 /* port? */
983                 if (p < end && *p == ':') {
984                         port = 0;
985                         p++;
986                         while (p < end && *p >= '0' && *p <= '9') {
987                                 port = (port * 10) + (*p - '0');
988                                 p++;
989                         }
990                         if (port > 65535 || port == 0)
991                                 goto bad;
992                 } else {
993                         port = CEPH_MON_PORT;
994                 }
995
996                 addr_set_port(ss, port);
997
998                 dout("parse_ips got %s\n", pr_addr(ss));
999
1000                 if (p == end)
1001                         break;
1002                 if (*p != ',')
1003                         goto bad;
1004                 p++;
1005         }
1006
1007         if (p != end)
1008                 goto bad;
1009
1010         if (count)
1011                 *count = i + 1;
1012         return 0;
1013
1014 bad:
1015         pr_err("parse_ips bad ip '%s'\n", c);
1016         return -EINVAL;
1017 }
1018
1019 static int process_banner(struct ceph_connection *con)
1020 {
1021         dout("process_banner on %p\n", con);
1022
1023         if (verify_hello(con) < 0)
1024                 return -1;
1025
1026         ceph_decode_addr(&con->actual_peer_addr);
1027         ceph_decode_addr(&con->peer_addr_for_me);
1028
1029         /*
1030          * Make sure the other end is who we wanted.  note that the other
1031          * end may not yet know their ip address, so if it's 0.0.0.0, give
1032          * them the benefit of the doubt.
1033          */
1034         if (!ceph_entity_addr_is_local(&con->peer_addr,
1035                                        &con->actual_peer_addr) &&
1036             !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
1037               con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
1038                 pr_err("wrong peer, want %s/%d, "
1039                        "got %s/%d, wtf\n",
1040                        pr_addr(&con->peer_addr.in_addr),
1041                        con->peer_addr.nonce,
1042                        pr_addr(&con->actual_peer_addr.in_addr),
1043                        con->actual_peer_addr.nonce);
1044                 con->error_msg = "protocol error, wrong peer";
1045                 return -1;
1046         }
1047
1048         /*
1049          * did we learn our address?
1050          */
1051         if (addr_is_blank(&con->msgr->inst.addr.in_addr)) {
1052                 int port = addr_port(&con->msgr->inst.addr.in_addr);
1053
1054                 memcpy(&con->msgr->inst.addr.in_addr,
1055                        &con->peer_addr_for_me.in_addr,
1056                        sizeof(con->peer_addr_for_me.in_addr));
1057                 addr_set_port(&con->msgr->inst.addr.in_addr, port);
1058                 encode_my_addr(con->msgr);
1059                 dout("process_banner learned my addr is %s\n",
1060                      pr_addr(&con->msgr->inst.addr.in_addr));
1061         }
1062
1063         set_bit(NEGOTIATING, &con->state);
1064         prepare_read_connect(con);
1065         return 0;
1066 }
1067
1068 static int process_connect(struct ceph_connection *con)
1069 {
1070         dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
1071
1072         switch (con->in_reply.tag) {
1073         case CEPH_MSGR_TAG_BADPROTOVER:
1074                 dout("process_connect got BADPROTOVER my %d != their %d\n",
1075                      le32_to_cpu(con->out_connect.protocol_version),
1076                      le32_to_cpu(con->in_reply.protocol_version));
1077                 pr_err("%s%lld %s protocol version mismatch,"
1078                        " my %d != server's %d\n",
1079                        ENTITY_NAME(con->peer_name),
1080                        pr_addr(&con->peer_addr.in_addr),
1081                        le32_to_cpu(con->out_connect.protocol_version),
1082                        le32_to_cpu(con->in_reply.protocol_version));
1083                 con->error_msg = "protocol version mismatch";
1084                 if (con->ops->bad_proto)
1085                         con->ops->bad_proto(con);
1086                 reset_connection(con);
1087                 set_bit(CLOSED, &con->state);  /* in case there's queued work */
1088                 return -1;
1089
1090         case CEPH_MSGR_TAG_BADAUTHORIZER:
1091                 con->auth_retry++;
1092                 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
1093                      con->auth_retry);
1094                 if (con->auth_retry == 2) {
1095                         con->error_msg = "connect authorization failure";
1096                         reset_connection(con);
1097                         set_bit(CLOSED, &con->state);
1098                         return -1;
1099                 }
1100                 con->auth_retry = 1;
1101                 prepare_write_connect(con->msgr, con, 0);
1102                 prepare_read_connect_retry(con);
1103                 break;
1104
1105         case CEPH_MSGR_TAG_RESETSESSION:
1106                 /*
1107                  * If we connected with a large connect_seq but the peer
1108                  * has no record of a session with us (no connection, or
1109                  * connect_seq == 0), they will send RESETSESION to indicate
1110                  * that they must have reset their session, and may have
1111                  * dropped messages.
1112                  */
1113                 dout("process_connect got RESET peer seq %u\n",
1114                      le32_to_cpu(con->in_connect.connect_seq));
1115                 pr_err("%s%lld %s connection reset\n",
1116                        ENTITY_NAME(con->peer_name),
1117                        pr_addr(&con->peer_addr.in_addr));
1118                 reset_connection(con);
1119                 prepare_write_connect(con->msgr, con, 0);
1120                 prepare_read_connect(con);
1121
1122                 /* Tell ceph about it. */
1123                 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
1124                 if (con->ops->peer_reset)
1125                         con->ops->peer_reset(con);
1126                 break;
1127
1128         case CEPH_MSGR_TAG_RETRY_SESSION:
1129                 /*
1130                  * If we sent a smaller connect_seq than the peer has, try
1131                  * again with a larger value.
1132                  */
1133                 dout("process_connect got RETRY my seq = %u, peer_seq = %u\n",
1134                      le32_to_cpu(con->out_connect.connect_seq),
1135                      le32_to_cpu(con->in_connect.connect_seq));
1136                 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
1137                 prepare_write_connect(con->msgr, con, 0);
1138                 prepare_read_connect(con);
1139                 break;
1140
1141         case CEPH_MSGR_TAG_RETRY_GLOBAL:
1142                 /*
1143                  * If we sent a smaller global_seq than the peer has, try
1144                  * again with a larger value.
1145                  */
1146                 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
1147                      con->peer_global_seq,
1148                      le32_to_cpu(con->in_connect.global_seq));
1149                 get_global_seq(con->msgr,
1150                                le32_to_cpu(con->in_connect.global_seq));
1151                 prepare_write_connect(con->msgr, con, 0);
1152                 prepare_read_connect(con);
1153                 break;
1154
1155         case CEPH_MSGR_TAG_READY:
1156                 clear_bit(CONNECTING, &con->state);
1157                 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1158                 con->connect_seq++;
1159                 dout("process_connect got READY gseq %d cseq %d (%d)\n",
1160                      con->peer_global_seq,
1161                      le32_to_cpu(con->in_reply.connect_seq),
1162                      con->connect_seq);
1163                 WARN_ON(con->connect_seq !=
1164                         le32_to_cpu(con->in_reply.connect_seq));
1165
1166                 con->delay = 0;  /* reset backoff memory */
1167                 prepare_read_tag(con);
1168                 break;
1169
1170         case CEPH_MSGR_TAG_WAIT:
1171                 /*
1172                  * If there is a connection race (we are opening
1173                  * connections to each other), one of us may just have
1174                  * to WAIT.  This shouldn't happen if we are the
1175                  * client.
1176                  */
1177                 pr_err("process_connect peer connecting WAIT\n");
1178
1179         default:
1180                 pr_err("connect protocol error, will retry\n");
1181                 con->error_msg = "protocol error, garbage tag during connect";
1182                 return -1;
1183         }
1184         return 0;
1185 }
1186
1187
1188 /*
1189  * read (part of) an ack
1190  */
1191 static int read_partial_ack(struct ceph_connection *con)
1192 {
1193         int to = 0;
1194
1195         return read_partial(con, &to, sizeof(con->in_temp_ack),
1196                             &con->in_temp_ack);
1197 }
1198
1199
1200 /*
1201  * We can finally discard anything that's been acked.
1202  */
1203 static void process_ack(struct ceph_connection *con)
1204 {
1205         struct ceph_msg *m;
1206         u64 ack = le64_to_cpu(con->in_temp_ack);
1207         u64 seq;
1208
1209         mutex_lock(&con->out_mutex);
1210         while (!list_empty(&con->out_sent)) {
1211                 m = list_first_entry(&con->out_sent, struct ceph_msg,
1212                                      list_head);
1213                 seq = le64_to_cpu(m->hdr.seq);
1214                 if (seq > ack)
1215                         break;
1216                 dout("got ack for seq %llu type %d at %p\n", seq,
1217                      le16_to_cpu(m->hdr.type), m);
1218                 ceph_msg_remove(m);
1219         }
1220         mutex_unlock(&con->out_mutex);
1221         prepare_read_tag(con);
1222 }
1223
1224
1225
1226
1227
1228
1229 /*
1230  * read (part of) a message.
1231  */
1232 static int read_partial_message(struct ceph_connection *con)
1233 {
1234         struct ceph_msg *m = con->in_msg;
1235         void *p;
1236         int ret;
1237         int to, want, left;
1238         unsigned front_len, middle_len, data_len, data_off;
1239         int datacrc = con->msgr->nocrc;
1240
1241         dout("read_partial_message con %p msg %p\n", con, m);
1242
1243         /* header */
1244         while (con->in_base_pos < sizeof(con->in_hdr)) {
1245                 left = sizeof(con->in_hdr) - con->in_base_pos;
1246                 ret = ceph_tcp_recvmsg(con->sock,
1247                                        (char *)&con->in_hdr + con->in_base_pos,
1248                                        left);
1249                 if (ret <= 0)
1250                         return ret;
1251                 con->in_base_pos += ret;
1252                 if (con->in_base_pos == sizeof(con->in_hdr)) {
1253                         u32 crc = crc32c(0, (void *)&con->in_hdr,
1254                                  sizeof(con->in_hdr) - sizeof(con->in_hdr.crc));
1255                         if (crc != le32_to_cpu(con->in_hdr.crc)) {
1256                                 pr_err("read_partial_message bad hdr "
1257                                        " crc %u != expected %u\n",
1258                                        crc, con->in_hdr.crc);
1259                                 return -EBADMSG;
1260                         }
1261                 }
1262         }
1263
1264         front_len = le32_to_cpu(con->in_hdr.front_len);
1265         if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1266                 return -EIO;
1267         middle_len = le32_to_cpu(con->in_hdr.middle_len);
1268         if (middle_len > CEPH_MSG_MAX_DATA_LEN)
1269                 return -EIO;
1270         data_len = le32_to_cpu(con->in_hdr.data_len);
1271         if (data_len > CEPH_MSG_MAX_DATA_LEN)
1272                 return -EIO;
1273
1274         /* allocate message? */
1275         if (!con->in_msg) {
1276                 dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
1277                      con->in_hdr.front_len, con->in_hdr.data_len);
1278                 con->in_msg = con->ops->alloc_msg(con, &con->in_hdr);
1279                 if (!con->in_msg) {
1280                         /* skip this message */
1281                         dout("alloc_msg returned NULL, skipping message\n");
1282                         con->in_base_pos = -front_len - middle_len - data_len -
1283                                 sizeof(m->footer);
1284                         con->in_tag = CEPH_MSGR_TAG_READY;
1285                         return 0;
1286                 }
1287                 if (IS_ERR(con->in_msg)) {
1288                         ret = PTR_ERR(con->in_msg);
1289                         con->in_msg = NULL;
1290                         con->error_msg = "out of memory for incoming message";
1291                         return ret;
1292                 }
1293                 m = con->in_msg;
1294                 m->front.iov_len = 0;    /* haven't read it yet */
1295                 memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
1296         }
1297
1298         /* front */
1299         while (m->front.iov_len < front_len) {
1300                 BUG_ON(m->front.iov_base == NULL);
1301                 left = front_len - m->front.iov_len;
1302                 ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base +
1303                                        m->front.iov_len, left);
1304                 if (ret <= 0)
1305                         return ret;
1306                 m->front.iov_len += ret;
1307                 if (m->front.iov_len == front_len)
1308                         con->in_front_crc = crc32c(0, m->front.iov_base,
1309                                                       m->front.iov_len);
1310         }
1311
1312         /* middle */
1313         while (middle_len > 0 && (!m->middle ||
1314                                   m->middle->vec.iov_len < middle_len)) {
1315                 if (m->middle == NULL) {
1316                         ret = -EOPNOTSUPP;
1317                         if (con->ops->alloc_middle)
1318                                 ret = con->ops->alloc_middle(con, m);
1319                         if (ret < 0) {
1320                                 dout("alloc_middle failed, skipping payload\n");
1321                                 con->in_base_pos = -middle_len - data_len
1322                                         - sizeof(m->footer);
1323                                 ceph_msg_put(con->in_msg);
1324                                 con->in_msg = NULL;
1325                                 con->in_tag = CEPH_MSGR_TAG_READY;
1326                                 return 0;
1327                         }
1328                         m->middle->vec.iov_len = 0;
1329                 }
1330                 left = middle_len - m->middle->vec.iov_len;
1331                 ret = ceph_tcp_recvmsg(con->sock,
1332                                        (char *)m->middle->vec.iov_base +
1333                                        m->middle->vec.iov_len, left);
1334                 if (ret <= 0)
1335                         return ret;
1336                 m->middle->vec.iov_len += ret;
1337                 if (m->middle->vec.iov_len == middle_len)
1338                         con->in_middle_crc = crc32c(0, m->middle->vec.iov_base,
1339                                                       m->middle->vec.iov_len);
1340         }
1341
1342         /* (page) data */
1343         data_off = le16_to_cpu(m->hdr.data_off);
1344         if (data_len == 0)
1345                 goto no_data;
1346
1347         if (m->nr_pages == 0) {
1348                 con->in_msg_pos.page = 0;
1349                 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
1350                 con->in_msg_pos.data_pos = 0;
1351                 /* find pages for data payload */
1352                 want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1353                 ret = -1;
1354                 if (con->ops->prepare_pages)
1355                         ret = con->ops->prepare_pages(con, m, want);
1356                 if (ret < 0) {
1357                         dout("%p prepare_pages failed, skipping payload\n", m);
1358                         con->in_base_pos = -data_len - sizeof(m->footer);
1359                         ceph_msg_put(con->in_msg);
1360                         con->in_msg = NULL;
1361                         con->in_tag = CEPH_MSGR_TAG_READY;
1362                         return 0;
1363                 }
1364                 BUG_ON(m->nr_pages < want);
1365         }
1366         while (con->in_msg_pos.data_pos < data_len) {
1367                 left = min((int)(data_len - con->in_msg_pos.data_pos),
1368                            (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
1369                 BUG_ON(m->pages == NULL);
1370                 p = kmap(m->pages[con->in_msg_pos.page]);
1371                 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1372                                        left);
1373                 if (ret > 0 && datacrc)
1374                         con->in_data_crc =
1375                                 crc32c(con->in_data_crc,
1376                                           p + con->in_msg_pos.page_pos, ret);
1377                 kunmap(m->pages[con->in_msg_pos.page]);
1378                 if (ret <= 0)
1379                         return ret;
1380                 con->in_msg_pos.data_pos += ret;
1381                 con->in_msg_pos.page_pos += ret;
1382                 if (con->in_msg_pos.page_pos == PAGE_SIZE) {
1383                         con->in_msg_pos.page_pos = 0;
1384                         con->in_msg_pos.page++;
1385                 }
1386         }
1387
1388 no_data:
1389         /* footer */
1390         to = sizeof(m->hdr) + sizeof(m->footer);
1391         while (con->in_base_pos < to) {
1392                 left = to - con->in_base_pos;
1393                 ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
1394                                        (con->in_base_pos - sizeof(m->hdr)),
1395                                        left);
1396                 if (ret <= 0)
1397                         return ret;
1398                 con->in_base_pos += ret;
1399         }
1400         dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1401              m, front_len, m->footer.front_crc, middle_len,
1402              m->footer.middle_crc, data_len, m->footer.data_crc);
1403
1404         /* crc ok? */
1405         if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1406                 pr_err("read_partial_message %p front crc %u != exp. %u\n",
1407                        m, con->in_front_crc, m->footer.front_crc);
1408                 return -EBADMSG;
1409         }
1410         if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1411                 pr_err("read_partial_message %p middle crc %u != exp %u\n",
1412                        m, con->in_middle_crc, m->footer.middle_crc);
1413                 return -EBADMSG;
1414         }
1415         if (datacrc &&
1416             (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1417             con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1418                 pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1419                        con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1420                 return -EBADMSG;
1421         }
1422
1423         return 1; /* done! */
1424 }
1425
1426 /*
1427  * Process message.  This happens in the worker thread.  The callback should
1428  * be careful not to do anything that waits on other incoming messages or it
1429  * may deadlock.
1430  */
1431 static void process_message(struct ceph_connection *con)
1432 {
1433         struct ceph_msg *msg = con->in_msg;
1434
1435         con->in_msg = NULL;
1436
1437         /* if first message, set peer_name */
1438         if (con->peer_name.type == 0)
1439                 con->peer_name = msg->hdr.src.name;
1440
1441         mutex_lock(&con->out_mutex);
1442         con->in_seq++;
1443         mutex_unlock(&con->out_mutex);
1444
1445         dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
1446              msg, le64_to_cpu(msg->hdr.seq),
1447              ENTITY_NAME(msg->hdr.src.name),
1448              le16_to_cpu(msg->hdr.type),
1449              ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1450              le32_to_cpu(msg->hdr.front_len),
1451              le32_to_cpu(msg->hdr.data_len),
1452              con->in_front_crc, con->in_middle_crc, con->in_data_crc);
1453         con->ops->dispatch(con, msg);
1454         prepare_read_tag(con);
1455 }
1456
1457
1458 /*
1459  * Write something to the socket.  Called in a worker thread when the
1460  * socket appears to be writeable and we have something ready to send.
1461  */
1462 static int try_write(struct ceph_connection *con)
1463 {
1464         struct ceph_messenger *msgr = con->msgr;
1465         int ret = 1;
1466
1467         dout("try_write start %p state %lu nref %d\n", con, con->state,
1468              atomic_read(&con->nref));
1469
1470         mutex_lock(&con->out_mutex);
1471 more:
1472         dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
1473
1474         /* open the socket first? */
1475         if (con->sock == NULL) {
1476                 /*
1477                  * if we were STANDBY and are reconnecting _this_
1478                  * connection, bump connect_seq now.  Always bump
1479                  * global_seq.
1480                  */
1481                 if (test_and_clear_bit(STANDBY, &con->state))
1482                         con->connect_seq++;
1483
1484                 prepare_write_banner(msgr, con);
1485                 prepare_write_connect(msgr, con, 1);
1486                 prepare_read_banner(con);
1487                 set_bit(CONNECTING, &con->state);
1488                 clear_bit(NEGOTIATING, &con->state);
1489
1490                 con->in_tag = CEPH_MSGR_TAG_READY;
1491                 dout("try_write initiating connect on %p new state %lu\n",
1492                      con, con->state);
1493                 con->sock = ceph_tcp_connect(con);
1494                 if (IS_ERR(con->sock)) {
1495                         con->sock = NULL;
1496                         con->error_msg = "connect error";
1497                         ret = -1;
1498                         goto out;
1499                 }
1500         }
1501
1502 more_kvec:
1503         /* kvec data queued? */
1504         if (con->out_skip) {
1505                 ret = write_partial_skip(con);
1506                 if (ret <= 0)
1507                         goto done;
1508                 if (ret < 0) {
1509                         dout("try_write write_partial_skip err %d\n", ret);
1510                         goto done;
1511                 }
1512         }
1513         if (con->out_kvec_left) {
1514                 ret = write_partial_kvec(con);
1515                 if (ret <= 0)
1516                         goto done;
1517                 if (ret < 0) {
1518                         dout("try_write write_partial_kvec err %d\n", ret);
1519                         goto done;
1520                 }
1521         }
1522
1523         /* msg pages? */
1524         if (con->out_msg) {
1525                 ret = write_partial_msg_pages(con);
1526                 if (ret == 1)
1527                         goto more_kvec;  /* we need to send the footer, too! */
1528                 if (ret == 0)
1529                         goto done;
1530                 if (ret < 0) {
1531                         dout("try_write write_partial_msg_pages err %d\n",
1532                              ret);
1533                         goto done;
1534                 }
1535         }
1536
1537         if (!test_bit(CONNECTING, &con->state)) {
1538                 /* is anything else pending? */
1539                 if (!list_empty(&con->out_queue)) {
1540                         prepare_write_message(con);
1541                         goto more;
1542                 }
1543                 if (con->in_seq > con->in_seq_acked) {
1544                         prepare_write_ack(con);
1545                         goto more;
1546                 }
1547                 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
1548                         prepare_write_keepalive(con);
1549                         goto more;
1550                 }
1551         }
1552
1553         /* Nothing to do! */
1554         clear_bit(WRITE_PENDING, &con->state);
1555         dout("try_write nothing else to write.\n");
1556 done:
1557         ret = 0;
1558 out:
1559         mutex_unlock(&con->out_mutex);
1560         dout("try_write done on %p\n", con);
1561         return ret;
1562 }
1563
1564
1565
1566 /*
1567  * Read what we can from the socket.
1568  */
1569 static int try_read(struct ceph_connection *con)
1570 {
1571         struct ceph_messenger *msgr;
1572         int ret = -1;
1573
1574         if (!con->sock)
1575                 return 0;
1576
1577         if (test_bit(STANDBY, &con->state))
1578                 return 0;
1579
1580         dout("try_read start on %p\n", con);
1581         msgr = con->msgr;
1582
1583 more:
1584         dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
1585              con->in_base_pos);
1586         if (test_bit(CONNECTING, &con->state)) {
1587                 if (!test_bit(NEGOTIATING, &con->state)) {
1588                         dout("try_read connecting\n");
1589                         ret = read_partial_banner(con);
1590                         if (ret <= 0)
1591                                 goto done;
1592                         if (process_banner(con) < 0) {
1593                                 ret = -1;
1594                                 goto out;
1595                         }
1596                 }
1597                 ret = read_partial_connect(con);
1598                 if (ret <= 0)
1599                         goto done;
1600                 if (process_connect(con) < 0) {
1601                         ret = -1;
1602                         goto out;
1603                 }
1604                 goto more;
1605         }
1606
1607         if (con->in_base_pos < 0) {
1608                 /*
1609                  * skipping + discarding content.
1610                  *
1611                  * FIXME: there must be a better way to do this!
1612                  */
1613                 static char buf[1024];
1614                 int skip = min(1024, -con->in_base_pos);
1615                 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
1616                 ret = ceph_tcp_recvmsg(con->sock, buf, skip);
1617                 if (ret <= 0)
1618                         goto done;
1619                 con->in_base_pos += ret;
1620                 if (con->in_base_pos)
1621                         goto more;
1622         }
1623         if (con->in_tag == CEPH_MSGR_TAG_READY) {
1624                 /*
1625                  * what's next?
1626                  */
1627                 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
1628                 if (ret <= 0)
1629                         goto done;
1630                 dout("try_read got tag %d\n", (int)con->in_tag);
1631                 switch (con->in_tag) {
1632                 case CEPH_MSGR_TAG_MSG:
1633                         prepare_read_message(con);
1634                         break;
1635                 case CEPH_MSGR_TAG_ACK:
1636                         prepare_read_ack(con);
1637                         break;
1638                 case CEPH_MSGR_TAG_CLOSE:
1639                         set_bit(CLOSED, &con->state);   /* fixme */
1640                         goto done;
1641                 default:
1642                         goto bad_tag;
1643                 }
1644         }
1645         if (con->in_tag == CEPH_MSGR_TAG_MSG) {
1646                 ret = read_partial_message(con);
1647                 if (ret <= 0) {
1648                         switch (ret) {
1649                         case -EBADMSG:
1650                                 con->error_msg = "bad crc";
1651                                 ret = -EIO;
1652                                 goto out;
1653                         case -EIO:
1654                                 con->error_msg = "io error";
1655                                 goto out;
1656                         default:
1657                                 goto done;
1658                         }
1659                 }
1660                 if (con->in_tag == CEPH_MSGR_TAG_READY)
1661                         goto more;
1662                 process_message(con);
1663                 goto more;
1664         }
1665         if (con->in_tag == CEPH_MSGR_TAG_ACK) {
1666                 ret = read_partial_ack(con);
1667                 if (ret <= 0)
1668                         goto done;
1669                 process_ack(con);
1670                 goto more;
1671         }
1672
1673 done:
1674         ret = 0;
1675 out:
1676         dout("try_read done on %p\n", con);
1677         return ret;
1678
1679 bad_tag:
1680         pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag);
1681         con->error_msg = "protocol error, garbage tag";
1682         ret = -1;
1683         goto out;
1684 }
1685
1686
1687 /*
1688  * Atomically queue work on a connection.  Bump @con reference to
1689  * avoid races with connection teardown.
1690  *
1691  * There is some trickery going on with QUEUED and BUSY because we
1692  * only want a _single_ thread operating on each connection at any
1693  * point in time, but we want to use all available CPUs.
1694  *
1695  * The worker thread only proceeds if it can atomically set BUSY.  It
1696  * clears QUEUED and does it's thing.  When it thinks it's done, it
1697  * clears BUSY, then rechecks QUEUED.. if it's set again, it loops
1698  * (tries again to set BUSY).
1699  *
1700  * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
1701  * try to queue work.  If that fails (work is already queued, or BUSY)
1702  * we give up (work also already being done or is queued) but leave QUEUED
1703  * set so that the worker thread will loop if necessary.
1704  */
1705 static void queue_con(struct ceph_connection *con)
1706 {
1707         if (test_bit(DEAD, &con->state)) {
1708                 dout("queue_con %p ignoring: DEAD\n",
1709                      con);
1710                 return;
1711         }
1712
1713         if (!con->ops->get(con)) {
1714                 dout("queue_con %p ref count 0\n", con);
1715                 return;
1716         }
1717
1718         set_bit(QUEUED, &con->state);
1719         if (test_bit(BUSY, &con->state)) {
1720                 dout("queue_con %p - already BUSY\n", con);
1721                 con->ops->put(con);
1722         } else if (!queue_work(ceph_msgr_wq, &con->work.work)) {
1723                 dout("queue_con %p - already queued\n", con);
1724                 con->ops->put(con);
1725         } else {
1726                 dout("queue_con %p\n", con);
1727         }
1728 }
1729
1730 /*
1731  * Do some work on a connection.  Drop a connection ref when we're done.
1732  */
1733 static void con_work(struct work_struct *work)
1734 {
1735         struct ceph_connection *con = container_of(work, struct ceph_connection,
1736                                                    work.work);
1737         int backoff = 0;
1738
1739 more:
1740         if (test_and_set_bit(BUSY, &con->state) != 0) {
1741                 dout("con_work %p BUSY already set\n", con);
1742                 goto out;
1743         }
1744         dout("con_work %p start, clearing QUEUED\n", con);
1745         clear_bit(QUEUED, &con->state);
1746
1747         if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
1748                 dout("con_work CLOSED\n");
1749                 con_close_socket(con);
1750                 goto done;
1751         }
1752         if (test_and_clear_bit(OPENING, &con->state)) {
1753                 /* reopen w/ new peer */
1754                 dout("con_work OPENING\n");
1755                 con_close_socket(con);
1756         }
1757
1758         if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
1759             try_read(con) < 0 ||
1760             try_write(con) < 0) {
1761                 backoff = 1;
1762                 ceph_fault(con);     /* error/fault path */
1763         }
1764
1765 done:
1766         clear_bit(BUSY, &con->state);
1767         dout("con->state=%lu\n", con->state);
1768         if (test_bit(QUEUED, &con->state)) {
1769                 if (!backoff) {
1770                         dout("con_work %p QUEUED reset, looping\n", con);
1771                         goto more;
1772                 }
1773                 dout("con_work %p QUEUED reset, but just faulted\n", con);
1774                 clear_bit(QUEUED, &con->state);
1775         }
1776         dout("con_work %p done\n", con);
1777
1778 out:
1779         con->ops->put(con);
1780 }
1781
1782
1783 /*
1784  * Generic error/fault handler.  A retry mechanism is used with
1785  * exponential backoff
1786  */
1787 static void ceph_fault(struct ceph_connection *con)
1788 {
1789         pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
1790                pr_addr(&con->peer_addr.in_addr), con->error_msg);
1791         dout("fault %p state %lu to peer %s\n",
1792              con, con->state, pr_addr(&con->peer_addr.in_addr));
1793
1794         if (test_bit(LOSSYTX, &con->state)) {
1795                 dout("fault on LOSSYTX channel\n");
1796                 goto out;
1797         }
1798
1799         clear_bit(BUSY, &con->state);  /* to avoid an improbable race */
1800
1801         con_close_socket(con);
1802         con->in_msg = NULL;
1803
1804         /* If there are no messages in the queue, place the connection
1805          * in a STANDBY state (i.e., don't try to reconnect just yet). */
1806         mutex_lock(&con->out_mutex);
1807         if (list_empty(&con->out_queue) && !con->out_keepalive_pending) {
1808                 dout("fault setting STANDBY\n");
1809                 set_bit(STANDBY, &con->state);
1810                 mutex_unlock(&con->out_mutex);
1811                 goto out;
1812         }
1813
1814         /* Requeue anything that hasn't been acked, and retry after a
1815          * delay. */
1816         list_splice_init(&con->out_sent, &con->out_queue);
1817         mutex_unlock(&con->out_mutex);
1818
1819         if (con->delay == 0)
1820                 con->delay = BASE_DELAY_INTERVAL;
1821         else if (con->delay < MAX_DELAY_INTERVAL)
1822                 con->delay *= 2;
1823
1824         /* explicitly schedule work to try to reconnect again later. */
1825         dout("fault queueing %p delay %lu\n", con, con->delay);
1826         con->ops->get(con);
1827         if (queue_delayed_work(ceph_msgr_wq, &con->work,
1828                                round_jiffies_relative(con->delay)) == 0)
1829                 con->ops->put(con);
1830
1831 out:
1832         if (con->ops->fault)
1833                 con->ops->fault(con);
1834 }
1835
1836
1837
1838 /*
1839  * create a new messenger instance
1840  */
1841 struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
1842 {
1843         struct ceph_messenger *msgr;
1844
1845         msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
1846         if (msgr == NULL)
1847                 return ERR_PTR(-ENOMEM);
1848
1849         spin_lock_init(&msgr->global_seq_lock);
1850
1851         /* the zero page is needed if a request is "canceled" while the message
1852          * is being written over the socket */
1853         msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO);
1854         if (!msgr->zero_page) {
1855                 kfree(msgr);
1856                 return ERR_PTR(-ENOMEM);
1857         }
1858         kmap(msgr->zero_page);
1859
1860         if (myaddr)
1861                 msgr->inst.addr = *myaddr;
1862
1863         /* select a random nonce */
1864         get_random_bytes(&msgr->inst.addr.nonce,
1865                          sizeof(msgr->inst.addr.nonce));
1866         encode_my_addr(msgr);
1867
1868         dout("messenger_create %p\n", msgr);
1869         return msgr;
1870 }
1871
1872 void ceph_messenger_destroy(struct ceph_messenger *msgr)
1873 {
1874         dout("destroy %p\n", msgr);
1875         kunmap(msgr->zero_page);
1876         __free_page(msgr->zero_page);
1877         kfree(msgr);
1878         dout("destroyed messenger %p\n", msgr);
1879 }
1880
1881 /*
1882  * Queue up an outgoing message on the given connection.
1883  */
1884 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
1885 {
1886         if (test_bit(CLOSED, &con->state)) {
1887                 dout("con_send %p closed, dropping %p\n", con, msg);
1888                 ceph_msg_put(msg);
1889                 return;
1890         }
1891
1892         /* set src+dst */
1893         msg->hdr.src.name = con->msgr->inst.name;
1894         msg->hdr.src.addr = con->msgr->my_enc_addr;
1895         msg->hdr.orig_src = msg->hdr.src;
1896         msg->hdr.dst_erank = con->peer_addr.erank;
1897
1898         /* queue */
1899         mutex_lock(&con->out_mutex);
1900         BUG_ON(!list_empty(&msg->list_head));
1901         list_add_tail(&msg->list_head, &con->out_queue);
1902         dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
1903              ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
1904              ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1905              le32_to_cpu(msg->hdr.front_len),
1906              le32_to_cpu(msg->hdr.middle_len),
1907              le32_to_cpu(msg->hdr.data_len));
1908         mutex_unlock(&con->out_mutex);
1909
1910         /* if there wasn't anything waiting to send before, queue
1911          * new work */
1912         if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
1913                 queue_con(con);
1914 }
1915
1916 /*
1917  * Revoke a message that was previously queued for send
1918  */
1919 void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
1920 {
1921         mutex_lock(&con->out_mutex);
1922         if (!list_empty(&msg->list_head)) {
1923                 dout("con_revoke %p msg %p\n", con, msg);
1924                 list_del_init(&msg->list_head);
1925                 ceph_msg_put(msg);
1926                 msg->hdr.seq = 0;
1927                 if (con->out_msg == msg)
1928                         con->out_msg = NULL;
1929                 if (con->out_kvec_is_msg) {
1930                         con->out_skip = con->out_kvec_bytes;
1931                         con->out_kvec_is_msg = false;
1932                 }
1933         } else {
1934                 dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg);
1935         }
1936         mutex_unlock(&con->out_mutex);
1937 }
1938
1939 /*
1940  * Queue a keepalive byte to ensure the tcp connection is alive.
1941  */
1942 void ceph_con_keepalive(struct ceph_connection *con)
1943 {
1944         if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
1945             test_and_set_bit(WRITE_PENDING, &con->state) == 0)
1946                 queue_con(con);
1947 }
1948
1949
1950 /*
1951  * construct a new message with given type, size
1952  * the new msg has a ref count of 1.
1953  */
1954 struct ceph_msg *ceph_msg_new(int type, int front_len,
1955                               int page_len, int page_off, struct page **pages)
1956 {
1957         struct ceph_msg *m;
1958
1959         m = kmalloc(sizeof(*m), GFP_NOFS);
1960         if (m == NULL)
1961                 goto out;
1962         atomic_set(&m->nref, 1);
1963         INIT_LIST_HEAD(&m->list_head);
1964
1965         m->hdr.type = cpu_to_le16(type);
1966         m->hdr.front_len = cpu_to_le32(front_len);
1967         m->hdr.middle_len = 0;
1968         m->hdr.data_len = cpu_to_le32(page_len);
1969         m->hdr.data_off = cpu_to_le16(page_off);
1970         m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
1971         m->footer.front_crc = 0;
1972         m->footer.middle_crc = 0;
1973         m->footer.data_crc = 0;
1974         m->front_max = front_len;
1975         m->front_is_vmalloc = false;
1976         m->more_to_follow = false;
1977         m->pool = NULL;
1978
1979         /* front */
1980         if (front_len) {
1981                 if (front_len > PAGE_CACHE_SIZE) {
1982                         m->front.iov_base = __vmalloc(front_len, GFP_NOFS,
1983                                                       PAGE_KERNEL);
1984                         m->front_is_vmalloc = true;
1985                 } else {
1986                         m->front.iov_base = kmalloc(front_len, GFP_NOFS);
1987                 }
1988                 if (m->front.iov_base == NULL) {
1989                         pr_err("msg_new can't allocate %d bytes\n",
1990                              front_len);
1991                         goto out2;
1992                 }
1993         } else {
1994                 m->front.iov_base = NULL;
1995         }
1996         m->front.iov_len = front_len;
1997
1998         /* middle */
1999         m->middle = NULL;
2000
2001         /* data */
2002         m->nr_pages = calc_pages_for(page_off, page_len);
2003         m->pages = pages;
2004
2005         dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len,
2006              m->nr_pages);
2007         return m;
2008
2009 out2:
2010         ceph_msg_put(m);
2011 out:
2012         pr_err("msg_new can't create type %d len %d\n", type, front_len);
2013         return ERR_PTR(-ENOMEM);
2014 }
2015
2016 /*
2017  * Generic message allocator, for incoming messages.
2018  */
2019 struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2020                                 struct ceph_msg_header *hdr)
2021 {
2022         int type = le16_to_cpu(hdr->type);
2023         int front_len = le32_to_cpu(hdr->front_len);
2024         struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL);
2025
2026         if (!msg) {
2027                 pr_err("unable to allocate msg type %d len %d\n",
2028                        type, front_len);
2029                 return ERR_PTR(-ENOMEM);
2030         }
2031         return msg;
2032 }
2033
2034 /*
2035  * Allocate "middle" portion of a message, if it is needed and wasn't
2036  * allocated by alloc_msg.  This allows us to read a small fixed-size
2037  * per-type header in the front and then gracefully fail (i.e.,
2038  * propagate the error to the caller based on info in the front) when
2039  * the middle is too large.
2040  */
2041 int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
2042 {
2043         int type = le16_to_cpu(msg->hdr.type);
2044         int middle_len = le32_to_cpu(msg->hdr.middle_len);
2045
2046         dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
2047              ceph_msg_type_name(type), middle_len);
2048         BUG_ON(!middle_len);
2049         BUG_ON(msg->middle);
2050
2051         msg->middle = ceph_buffer_new_alloc(middle_len, GFP_NOFS);
2052         if (!msg->middle)
2053                 return -ENOMEM;
2054         return 0;
2055 }
2056
2057
2058 /*
2059  * Free a generically kmalloc'd message.
2060  */
2061 void ceph_msg_kfree(struct ceph_msg *m)
2062 {
2063         dout("msg_kfree %p\n", m);
2064         if (m->front_is_vmalloc)
2065                 vfree(m->front.iov_base);
2066         else
2067                 kfree(m->front.iov_base);
2068         kfree(m);
2069 }
2070
2071 /*
2072  * Drop a msg ref.  Destroy as needed.
2073  */
2074 void ceph_msg_put(struct ceph_msg *m)
2075 {
2076         dout("ceph_msg_put %p %d -> %d\n", m, atomic_read(&m->nref),
2077              atomic_read(&m->nref)-1);
2078         if (atomic_read(&m->nref) <= 0) {
2079                 pr_err("bad ceph_msg_put on %p %llu %d=%s %d+%d\n",
2080                        m, le64_to_cpu(m->hdr.seq),
2081                        le16_to_cpu(m->hdr.type),
2082                        ceph_msg_type_name(le16_to_cpu(m->hdr.type)),
2083                        le32_to_cpu(m->hdr.front_len),
2084                        le32_to_cpu(m->hdr.data_len));
2085                 WARN_ON(1);
2086         }
2087         if (atomic_dec_and_test(&m->nref)) {
2088                 dout("ceph_msg_put last one on %p\n", m);
2089                 WARN_ON(!list_empty(&m->list_head));
2090
2091                 /* drop middle, data, if any */
2092                 if (m->middle) {
2093                         ceph_buffer_put(m->middle);
2094                         m->middle = NULL;
2095                 }
2096                 m->nr_pages = 0;
2097                 m->pages = NULL;
2098
2099                 if (m->pool)
2100                         ceph_msgpool_put(m->pool, m);
2101                 else
2102                         ceph_msg_kfree(m);
2103         }
2104 }