From fe38a2b67bc6b3a60da82a23e9082256a30e39d9 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 6 Mar 2013 23:39:39 -0600 Subject: [PATCH] libceph: start defining message data cursor This patch lays out the foundation for using generic routines to manage processing items of message data. For simplicity, we'll start with just the trail portion of a message, because it stands alone and is only present for outgoing data. First some basic concepts. We'll use the term "data item" to represent one of the ceph_msg_data structures associated with a message. There are currently four of those, with single-letter field names p, l, b, and t. A data item is further broken into "pieces" which always lie in a single page. A data item will include a "cursor" that will track state as the memory defined by the item is consumed by sending data from or receiving data into it. We define three routines to manipulate a data item's cursor: the "init" routine; the "next" routine; and the "advance" routine. The "init" routine initializes the cursor so it points at the beginning of the first piece in the item. The "next" routine returns the page, page offset, and length (limited by both the page and item size) of the next unconsumed piece in the item. It also indicates to the caller whether the piece being returned is the last one in the data item. The "advance" routine consumes the requested number of bytes in the item (advancing the cursor). This is used to record the number of bytes from the current piece that were actually sent or received by the network code. It returns an indication of whether the result means the current piece has been fully consumed. This is used by the message send code to determine whether it should calculate the CRC for the next piece processed. The trail of a message is implemented as a ceph pagelist. The routines defined for it will be usable for non-trail pagelist data as well. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- include/linux/ceph/messenger.h | 7 ++ net/ceph/messenger.c | 138 ++++++++++++++++++++++++++++++--- 2 files changed, 135 insertions(+), 10 deletions(-) diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 5860dd0c2caf..14862438faff 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -95,6 +95,12 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type) } } +struct ceph_msg_data_cursor { + bool last_piece; /* now at last piece of data item */ + struct page *page; /* current page in pagelist */ + size_t offset; /* pagelist bytes consumed */ +}; + struct ceph_msg_data { enum ceph_msg_data_type type; union { @@ -112,6 +118,7 @@ struct ceph_msg_data { }; struct ceph_pagelist *pagelist; }; + struct ceph_msg_data_cursor cursor; /* pagelist only */ }; /* diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index f256b4b174ad..b978cf8b27ff 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -21,6 +21,9 @@ #include #include +#define list_entry_next(pos, member) \ + list_entry(pos->member.next, typeof(*pos), member) + /* * Ceph uses the messenger to exchange ceph_msg messages with other * hosts in the system. The messenger provides ordered and reliable @@ -738,6 +741,109 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg) } #endif +/* + * Message data is handled (sent or received) in pieces, where each + * piece resides on a single page. The network layer might not + * consume an entire piece at once. A data item's cursor keeps + * track of which piece is next to process and how much remains to + * be processed in that piece. It also tracks whether the current + * piece is the last one in the data item. + */ +static void ceph_msg_data_cursor_init(struct ceph_msg_data *data) +{ + struct ceph_msg_data_cursor *cursor = &data->cursor; + struct ceph_pagelist *pagelist; + struct page *page; + + if (data->type != CEPH_MSG_DATA_PAGELIST) + return; + + pagelist = data->pagelist; + BUG_ON(!pagelist); + if (!pagelist->length) + return; /* pagelist can be assigned but empty */ + + BUG_ON(list_empty(&pagelist->head)); + page = list_first_entry(&pagelist->head, struct page, lru); + + cursor->page = page; + cursor->offset = 0; + cursor->last_piece = pagelist->length <= PAGE_SIZE; +} + +/* + * Return the page containing the next piece to process for a given + * data item, and supply the page offset and length of that piece. + * Indicate whether this is the last piece in this data item. + */ +static struct page *ceph_msg_data_next(struct ceph_msg_data *data, + size_t *page_offset, + size_t *length, + bool *last_piece) +{ + struct ceph_msg_data_cursor *cursor = &data->cursor; + struct ceph_pagelist *pagelist; + size_t piece_end; + + BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); + + pagelist = data->pagelist; + BUG_ON(!pagelist); + + BUG_ON(!cursor->page); + BUG_ON(cursor->offset >= pagelist->length); + + *last_piece = cursor->last_piece; + if (*last_piece) { + /* pagelist offset is always 0 */ + piece_end = pagelist->length & ~PAGE_MASK; + if (!piece_end) + piece_end = PAGE_SIZE; + } else { + piece_end = PAGE_SIZE; + } + *page_offset = cursor->offset & ~PAGE_MASK; + *length = piece_end - *page_offset; + + return data->cursor.page; +} + +/* + * Returns true if the result moves the cursor on to the next piece + * (the next page) of the pagelist. + */ +static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes) +{ + struct ceph_msg_data_cursor *cursor = &data->cursor; + struct ceph_pagelist *pagelist; + + BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); + + pagelist = data->pagelist; + BUG_ON(!pagelist); + BUG_ON(!cursor->page); + BUG_ON(cursor->offset + bytes > pagelist->length); + BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE); + + /* Advance the cursor offset */ + + cursor->offset += bytes; + /* pagelist offset is always 0 */ + if (!bytes || cursor->offset & ~PAGE_MASK) + return false; /* more bytes to process in the current page */ + + /* Move on to the next page */ + + BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); + cursor->page = list_entry_next(cursor->page, lru); + + /* cursor offset is at page boundary; pagelist offset is always 0 */ + if (pagelist->length - cursor->offset <= PAGE_SIZE) + cursor->last_piece = true; + + return true; +} + static void prepare_message_data(struct ceph_msg *msg, struct ceph_msg_pos *msg_pos) { @@ -755,6 +861,12 @@ static void prepare_message_data(struct ceph_msg *msg, init_bio_iter(msg->b.bio, &msg->b.bio_iter, &msg->b.bio_seg); #endif msg_pos->data_pos = 0; + + /* If there's a trail, initialize its cursor */ + + if (ceph_msg_has_trail(msg)) + ceph_msg_data_cursor_init(&msg->t); + msg_pos->did_page_crc = false; } @@ -1045,6 +1157,12 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, msg_pos->data_pos += sent; msg_pos->page_pos += sent; + if (in_trail) { + bool need_crc; + + need_crc = ceph_msg_data_advance(&msg->t, sent); + BUG_ON(need_crc && sent != len); + } if (sent < len) return; @@ -1052,10 +1170,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, msg_pos->page_pos = 0; msg_pos->page++; msg_pos->did_page_crc = false; - if (in_trail) { - BUG_ON(!ceph_msg_has_trail(msg)); - list_rotate_left(&msg->t.pagelist->head); - } else if (ceph_msg_has_pagelist(msg)) { + if (ceph_msg_has_pagelist(msg)) { list_rotate_left(&msg->l.pagelist->head); #ifdef CONFIG_BLOCK } else if (ceph_msg_has_bio(msg)) { @@ -1141,6 +1256,8 @@ static int write_partial_message_data(struct ceph_connection *con) size_t length; int max_write = PAGE_SIZE; int bio_offset = 0; + bool use_cursor = false; + bool last_piece = true; /* preserve existing behavior */ in_trail = in_trail || msg_pos->data_pos >= trail_off; if (!in_trail) @@ -1148,9 +1265,9 @@ static int write_partial_message_data(struct ceph_connection *con) if (in_trail) { BUG_ON(!ceph_msg_has_trail(msg)); - total_max_write = data_len - msg_pos->data_pos; - page = list_first_entry(&msg->t.pagelist->head, - struct page, lru); + use_cursor = true; + page = ceph_msg_data_next(&msg->t, &page_offset, + &length, &last_piece); } else if (ceph_msg_has_pages(msg)) { page = msg->p.pages[msg_pos->page]; } else if (ceph_msg_has_pagelist(msg)) { @@ -1168,8 +1285,9 @@ static int write_partial_message_data(struct ceph_connection *con) } else { page = zero_page; } - length = min_t(int, max_write - msg_pos->page_pos, - total_max_write); + if (!use_cursor) + length = min_t(int, max_write - msg_pos->page_pos, + total_max_write); page_offset = msg_pos->page_pos + bio_offset; if (do_datacrc && !msg_pos->did_page_crc) { @@ -1180,7 +1298,7 @@ static int write_partial_message_data(struct ceph_connection *con) msg_pos->did_page_crc = true; } ret = ceph_tcp_sendpage(con->sock, page, page_offset, - length, true); + length, last_piece); if (ret <= 0) goto out; -- 2.39.5