From patchwork Wed Mar 9 12:33:21 2022 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jeff Layton X-Patchwork-Id: 549775 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from vger.kernel.org (vger.kernel.org [23.128.96.18]) by smtp.lore.kernel.org (Postfix) with ESMTP id 9A9E2C433F5 for ; Wed, 9 Mar 2022 12:33:30 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S231867AbiCIMe1 (ORCPT ); Wed, 9 Mar 2022 07:34:27 -0500 Received: from lindbergh.monkeyblade.net ([23.128.96.19]:53942 "EHLO lindbergh.monkeyblade.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S230111AbiCIMe0 (ORCPT ); Wed, 9 Mar 2022 07:34:26 -0500 Received: from dfw.source.kernel.org (dfw.source.kernel.org [139.178.84.217]) by lindbergh.monkeyblade.net (Postfix) with ESMTPS id 332943EAB6 for ; Wed, 9 Mar 2022 04:33:27 -0800 (PST) Received: from smtp.kernel.org (relay.kernel.org [52.25.139.140]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dfw.source.kernel.org (Postfix) with ESMTPS id B5B0361989 for ; Wed, 9 Mar 2022 12:33:26 +0000 (UTC) Received: by smtp.kernel.org (Postfix) with ESMTPSA id D67DBC340EE; Wed, 9 Mar 2022 12:33:25 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=kernel.org; s=k20201202; t=1646829206; bh=x7+C/cRZ/k52fQxzSWQO5rdPL1azPIH+NupV7R/MLUQ=; h=From:To:Subject:Date:In-Reply-To:References:From; b=MhDMDoveYAUM5K6cKQT0SXquP2lbSwBhbOy3wtgqZ8ZYgDtaNFoxzoESBwZ5O2uzx yxzyx86YkhHZhCzAyDlOa3ImZCGhL+WQ+PXs7bUhkHyNYcsV1AyLp1ePxcr+YkFTNz tTXNqBUb3nVPilP9bE2kIibYlLDdwOztp3TWKPpPGf9w+F7gaHjfAyz/vfO1leUo03 80Z4Ul+CcSayH4HVto81R0GxMC+yVRfySzSgh6doEFsYwz3oK2u51+V/hvIqb2pB7e KiwT2A4BVnXbrLbiZTD/Swg4lRGg8Wz72/bP8L1iqTLxNgRaGFHqk4CV8NVB3yJifd 7sqNUnt4ES3MQ== From: Jeff Layton To: ceph-devel@vger.kernel.org, idryomov@gmail.com Subject: [PATCH 1/3] libceph: add sparse read support to msgr2 crc state machine Date: Wed, 9 Mar 2022 07:33:21 -0500 Message-Id: <20220309123323.20593-2-jlayton@kernel.org> X-Mailer: git-send-email 2.35.1 In-Reply-To: <20220309123323.20593-1-jlayton@kernel.org> References: <20220309123323.20593-1-jlayton@kernel.org> MIME-Version: 1.0 Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org Add support for a new sparse_read ceph_connection operation. The idea is that the client driver can define this operation use it to do special handling for incoming reads. The alloc_msg routine will look at the request and determine whether the reply is expected to be sparse. If it is, then we'll dispatch to a different set of state machine states that will repeatedly call the driver's sparse_read op to get length and placement info for reading the extent map, and the extents themselves. This necessitates adding some new field to some other structs: - The msg gets a new bool to track whether it's a sparse_read request. - A new field is added to the cursor to track the amount remaining in the current extent. This is used to cap the read from the socket into the msg_data - Handing a revoke with all of this is particularly difficult, so I've added a new data_len_remain field to the v2 connection info, and then use that to skip that much on a revoke. We may want to expand the use of that to the normal read path as well, just for consistency's sake. Signed-off-by: Jeff Layton --- include/linux/ceph/messenger.h | 31 +++++ net/ceph/messenger.c | 1 + net/ceph/messenger_v2.c | 215 +++++++++++++++++++++++++++++++-- 3 files changed, 238 insertions(+), 9 deletions(-) diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index e7f2fb2fc207..e9c86d6de2e6 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -17,6 +17,7 @@ struct ceph_msg; struct ceph_connection; +struct ceph_msg_data_cursor; /* * Ceph defines these callbacks for handling connection events. @@ -70,6 +71,31 @@ struct ceph_connection_operations { int used_proto, int result, const int *allowed_protos, int proto_cnt, const int *allowed_modes, int mode_cnt); + + /** + * sparse_read: read sparse data + * @con: connection we're reading from + * @cursor: data cursor for reading extents + * @len: len of the data that msgr should read + * @buf: optional buffer to read into + * + * This should be called more than once, each time setting up to + * receive an extent into the current cursor position, and zeroing + * the holes between them. + * + * Returns 1 if there is more data to be read, 0 if reading is + * complete, or -errno if there was an error. + * + * If @buf is set on a 1 return, then the data should be read into + * the provided buffer. Otherwise, it should be read into the cursor. + * + * The sparse read operation is expected to initialize the cursor + * with a length covering up to the end of the last extent. + */ + int (*sparse_read)(struct ceph_connection *con, + struct ceph_msg_data_cursor *cursor, + u64 *len, char **buf); + }; /* use format string %s%lld */ @@ -207,6 +233,7 @@ struct ceph_msg_data_cursor { struct ceph_msg_data *data; /* current data item */ size_t resid; /* bytes not yet consumed */ + int sr_resid; /* residual sparse_read len */ bool last_piece; /* current is last piece */ bool need_crc; /* crc update needed */ union { @@ -252,6 +279,7 @@ struct ceph_msg { struct kref kref; bool more_to_follow; bool needs_out_seq; + bool sparse_read; int front_alloc_len; struct ceph_msgpool *pool; @@ -396,6 +424,7 @@ struct ceph_connection_v2_info { void *conn_bufs[16]; int conn_buf_cnt; + int data_len_remain; struct kvec in_sign_kvecs[8]; struct kvec out_sign_kvecs[8]; @@ -464,6 +493,8 @@ struct ceph_connection { struct page *bounce_page; u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */ + int sparse_resid; + struct timespec64 last_keepalive_ack; /* keepalive2 ack stamp */ struct delayed_work work; /* send|recv work */ diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index d3bb656308b4..bf4e7f5751ee 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1034,6 +1034,7 @@ void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, cursor->total_resid = length; cursor->data = msg->data; + cursor->sr_resid = 0; __ceph_msg_data_cursor_init(cursor); } diff --git a/net/ceph/messenger_v2.c b/net/ceph/messenger_v2.c index c6e5bfc717d5..845c2f093a02 100644 --- a/net/ceph/messenger_v2.c +++ b/net/ceph/messenger_v2.c @@ -52,14 +52,17 @@ #define FRAME_LATE_STATUS_COMPLETE 0xe #define FRAME_LATE_STATUS_ABORTED_MASK 0xf -#define IN_S_HANDLE_PREAMBLE 1 -#define IN_S_HANDLE_CONTROL 2 -#define IN_S_HANDLE_CONTROL_REMAINDER 3 -#define IN_S_PREPARE_READ_DATA 4 -#define IN_S_PREPARE_READ_DATA_CONT 5 -#define IN_S_PREPARE_READ_ENC_PAGE 6 -#define IN_S_HANDLE_EPILOGUE 7 -#define IN_S_FINISH_SKIP 8 +#define IN_S_HANDLE_PREAMBLE 1 +#define IN_S_HANDLE_CONTROL 2 +#define IN_S_HANDLE_CONTROL_REMAINDER 3 +#define IN_S_PREPARE_READ_DATA 4 +#define IN_S_PREPARE_READ_DATA_CONT 5 +#define IN_S_PREPARE_READ_ENC_PAGE 6 +#define IN_S_PREPARE_SPARSE_DATA 7 +#define IN_S_PREPARE_SPARSE_DATA_HDR 8 +#define IN_S_PREPARE_SPARSE_DATA_CONT 9 +#define IN_S_HANDLE_EPILOGUE 10 +#define IN_S_FINISH_SKIP 11 #define OUT_S_QUEUE_DATA 1 #define OUT_S_QUEUE_DATA_CONT 2 @@ -1819,6 +1822,166 @@ static void prepare_read_data_cont(struct ceph_connection *con) con->v2.in_state = IN_S_HANDLE_EPILOGUE; } +static int prepare_sparse_read_cont(struct ceph_connection *con) +{ + int ret; + struct bio_vec bv; + char *buf = NULL; + struct ceph_msg_data_cursor *cursor = &con->v2.in_cursor; + u64 len = 0; + + if (!iov_iter_is_bvec(&con->v2.in_iter)) + return -EIO; + + if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { + con->in_data_crc = crc32c(con->in_data_crc, + page_address(con->bounce_page), + con->v2.in_bvec.bv_len); + + get_bvec_at(cursor, &bv); + memcpy_to_page(bv.bv_page, bv.bv_offset, + page_address(con->bounce_page), + con->v2.in_bvec.bv_len); + } else { + con->in_data_crc = ceph_crc32c_page(con->in_data_crc, + con->v2.in_bvec.bv_page, + con->v2.in_bvec.bv_offset, + con->v2.in_bvec.bv_len); + } + + ceph_msg_data_advance(cursor, con->v2.in_bvec.bv_len); + cursor->sr_resid -= con->v2.in_bvec.bv_len; + dout("%s: advance by 0x%x sr_resid 0x%x\n", __func__, + con->v2.in_bvec.bv_len, cursor->sr_resid); + WARN_ON_ONCE(cursor->sr_resid > cursor->total_resid); + if (cursor->sr_resid) { + get_bvec_at(cursor, &bv); + if (bv.bv_len > cursor->sr_resid) + bv.bv_len = cursor->sr_resid; + if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { + bv.bv_page = con->bounce_page; + bv.bv_offset = 0; + } + set_in_bvec(con, &bv); + con->v2.data_len_remain -= bv.bv_len; + WARN_ON(con->v2.in_state != IN_S_PREPARE_SPARSE_DATA_CONT); + return 0; + } + + /* get next extent */ + ret = con->ops->sparse_read(con, cursor, &len, &buf); + if (ret <= 0) { + if (ret < 0) + return ret; + + reset_in_kvecs(con); + add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN); + con->v2.in_state = IN_S_HANDLE_EPILOGUE; + return 0; + } + + cursor->sr_resid = len; + get_bvec_at(cursor, &bv); + if (bv.bv_len > cursor->sr_resid) + bv.bv_len = cursor->sr_resid; + if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { + if (unlikely(!con->bounce_page)) { + con->bounce_page = alloc_page(GFP_NOIO); + if (!con->bounce_page) { + pr_err("failed to allocate bounce page\n"); + return -ENOMEM; + } + } + + bv.bv_page = con->bounce_page; + bv.bv_offset = 0; + } + set_in_bvec(con, &bv); + con->v2.data_len_remain -= len; + return ret; +} + +static int prepare_sparse_read_header(struct ceph_connection *con) +{ + int ret; + char *buf = NULL; + struct bio_vec bv; + struct ceph_msg_data_cursor *cursor = &con->v2.in_cursor; + u64 len = 0; + + if (!iov_iter_is_kvec(&con->v2.in_iter)) + return -EIO; + + /* On first call, we have no kvec so don't compute crc */ + if (con->v2.in_kvec_cnt) { + WARN_ON_ONCE(con->v2.in_kvec_cnt > 1); + con->in_data_crc = crc32c(con->in_data_crc, + con->v2.in_kvecs[0].iov_base, + con->v2.in_kvecs[0].iov_len); + } + + ret = con->ops->sparse_read(con, cursor, &len, &buf); + if (ret < 0) + return ret; + if (ret == 0) { + reset_in_kvecs(con); + add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN); + con->v2.in_state = IN_S_HANDLE_EPILOGUE; + return 0; + } + + /* No actual data? */ + if (WARN_ON_ONCE(!ret)) + return -EIO; + + if (!buf) { + cursor->sr_resid = len; + get_bvec_at(cursor, &bv); + if (bv.bv_len > cursor->sr_resid) + bv.bv_len = cursor->sr_resid; + if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { + if (unlikely(!con->bounce_page)) { + con->bounce_page = alloc_page(GFP_NOIO); + if (!con->bounce_page) { + pr_err("failed to allocate bounce page\n"); + return -ENOMEM; + } + } + + bv.bv_page = con->bounce_page; + bv.bv_offset = 0; + } + set_in_bvec(con, &bv); + con->v2.data_len_remain -= len; + con->v2.in_state = IN_S_PREPARE_SPARSE_DATA_CONT; + return ret; + } + + WARN_ON_ONCE(con->v2.in_state != IN_S_PREPARE_SPARSE_DATA_HDR); + reset_in_kvecs(con); + add_in_kvec(con, buf, len); + con->v2.data_len_remain -= len; + return 0; +} + +static int prepare_sparse_read_data(struct ceph_connection *con) +{ + struct ceph_msg *msg = con->in_msg; + + dout("%s: starting sparse read\n", __func__); + + if (WARN_ON_ONCE(!con->ops->sparse_read)) + return -EOPNOTSUPP; + + if (!con_secure(con)) + con->in_data_crc = -1; + + reset_in_kvecs(con); + con->v2.in_state = IN_S_PREPARE_SPARSE_DATA_HDR; + con->v2.data_len_remain = data_len(msg); + return prepare_sparse_read_header(con); +} + static int prepare_read_tail_plain(struct ceph_connection *con) { struct ceph_msg *msg = con->in_msg; @@ -1839,7 +2002,10 @@ static int prepare_read_tail_plain(struct ceph_connection *con) } if (data_len(msg)) { - con->v2.in_state = IN_S_PREPARE_READ_DATA; + if (msg->sparse_read) + con->v2.in_state = IN_S_PREPARE_SPARSE_DATA; + else + con->v2.in_state = IN_S_PREPARE_READ_DATA; } else { add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN); con->v2.in_state = IN_S_HANDLE_EPILOGUE; @@ -2893,6 +3059,15 @@ static int populate_in_iter(struct ceph_connection *con) prepare_read_enc_page(con); ret = 0; break; + case IN_S_PREPARE_SPARSE_DATA: + ret = prepare_sparse_read_data(con); + break; + case IN_S_PREPARE_SPARSE_DATA_HDR: + ret = prepare_sparse_read_header(con); + break; + case IN_S_PREPARE_SPARSE_DATA_CONT: + ret = prepare_sparse_read_cont(con); + break; case IN_S_HANDLE_EPILOGUE: ret = handle_epilogue(con); break; @@ -3485,6 +3660,23 @@ static void revoke_at_prepare_read_enc_page(struct ceph_connection *con) con->v2.in_state = IN_S_FINISH_SKIP; } +static void revoke_at_prepare_sparse_data(struct ceph_connection *con) +{ + int resid; /* current piece of data */ + int remaining; + + WARN_ON(con_secure(con)); + WARN_ON(!data_len(con->in_msg)); + WARN_ON(!iov_iter_is_bvec(&con->v2.in_iter)); + resid = iov_iter_count(&con->v2.in_iter); + dout("%s con %p resid %d\n", __func__, con, resid); + + remaining = CEPH_EPILOGUE_PLAIN_LEN + con->v2.data_len_remain; + con->v2.in_iter.count -= resid; + set_in_skip(con, resid + remaining); + con->v2.in_state = IN_S_FINISH_SKIP; +} + static void revoke_at_handle_epilogue(struct ceph_connection *con) { int resid; @@ -3501,6 +3693,7 @@ static void revoke_at_handle_epilogue(struct ceph_connection *con) void ceph_con_v2_revoke_incoming(struct ceph_connection *con) { switch (con->v2.in_state) { + case IN_S_PREPARE_SPARSE_DATA: case IN_S_PREPARE_READ_DATA: revoke_at_prepare_read_data(con); break; @@ -3510,6 +3703,10 @@ void ceph_con_v2_revoke_incoming(struct ceph_connection *con) case IN_S_PREPARE_READ_ENC_PAGE: revoke_at_prepare_read_enc_page(con); break; + case IN_S_PREPARE_SPARSE_DATA_HDR: + case IN_S_PREPARE_SPARSE_DATA_CONT: + revoke_at_prepare_sparse_data(con); + break; case IN_S_HANDLE_EPILOGUE: revoke_at_handle_epilogue(con); break; From patchwork Wed Mar 9 12:33:22 2022 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jeff Layton X-Patchwork-Id: 549774 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from vger.kernel.org (vger.kernel.org [23.128.96.18]) by smtp.lore.kernel.org (Postfix) with ESMTP id 280CCC433F5 for ; Wed, 9 Mar 2022 12:33:33 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S232515AbiCIMe3 (ORCPT ); Wed, 9 Mar 2022 07:34:29 -0500 Received: from lindbergh.monkeyblade.net ([23.128.96.19]:54184 "EHLO lindbergh.monkeyblade.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S232105AbiCIMe2 (ORCPT ); Wed, 9 Mar 2022 07:34:28 -0500 Received: from ams.source.kernel.org (ams.source.kernel.org [145.40.68.75]) by lindbergh.monkeyblade.net (Postfix) with ESMTPS id 730A837AA4 for ; Wed, 9 Mar 2022 04:33:29 -0800 (PST) Received: from smtp.kernel.org (relay.kernel.org [52.25.139.140]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by ams.source.kernel.org (Postfix) with ESMTPS id E443BB8213B for ; Wed, 9 Mar 2022 12:33:27 +0000 (UTC) Received: by smtp.kernel.org (Postfix) with ESMTPSA id 63BCCC340F4; Wed, 9 Mar 2022 12:33:26 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=kernel.org; s=k20201202; t=1646829206; bh=fqRQU4yJE6wpv8f4GmKMdkmBCQVSV/LzLEPYb2po9hY=; h=From:To:Subject:Date:In-Reply-To:References:From; b=uLgXTFQOgZTaLJE6TPSl07T/B6ve0n0gOfyia1pZGGHXf1yy8hXgEEa1jxhWvhVQW qTBGcuAuzckflF8zB05mfjzcFWydlpzh0L995zCgNrnv3Q2OCrjJlmfxQqH9q1kg98 q/c1IGC2CkgGwC4HdezsFoEh2LGVw4gMNRl6iPzCKROso5VVad6HIQXGU/U0JmVkn8 QHLrzHToKFwIi49tmBk1p150SLqx02zVY/7OEOZ5WUhGhHWYxBN9+rG5fWSS4lEvpX /k0gabMb8ROTK1lqyvcB38BteTK9qrX62sAIbLpSYoVjVqy3ZbYsTev3Vi3hnwzvWg XiPdSbp2b4OmQ== From: Jeff Layton To: ceph-devel@vger.kernel.org, idryomov@gmail.com Subject: [PATCH 2/3] libceph: add sparse read support to OSD client Date: Wed, 9 Mar 2022 07:33:22 -0500 Message-Id: <20220309123323.20593-3-jlayton@kernel.org> X-Mailer: git-send-email 2.35.1 In-Reply-To: <20220309123323.20593-1-jlayton@kernel.org> References: <20220309123323.20593-1-jlayton@kernel.org> MIME-Version: 1.0 Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org Add a new sparse_read operation for the OSD client, driven by its own state machine. The messenger can repeatedly call the sparse_read operation, and it will pass back the necessary info to set up to read the next extent of data, while zeroing in the sparse regions. Signed-off-by: Jeff Layton --- include/linux/ceph/osd_client.h | 38 ++++++++ net/ceph/osd_client.c | 163 ++++++++++++++++++++++++++++++-- 2 files changed, 194 insertions(+), 7 deletions(-) diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 3431011f364d..42eb1628a66d 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -29,6 +29,43 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *); #define CEPH_HOMELESS_OSD -1 +enum ceph_sparse_read_state { + CEPH_SPARSE_READ_HDR = 0, + CEPH_SPARSE_READ_EXTENTS, + CEPH_SPARSE_READ_DATA_LEN, + CEPH_SPARSE_READ_DATA, +}; + +/* A single extent in a SPARSE_READ reply */ +struct ceph_sparse_extent { + __le64 off; + __le64 len; +} __attribute__((packed)); + +/* + * A SPARSE_READ reply is a 32-bit count of extents, followed by an array of + * 64-bit offset/length pairs, and then all of the actual file data + * concatenated after it (sans holes). + * + * Unfortunately, we don't know how long the extent array is until we've + * started reading the data section of the reply, so for a real sparse read, we + * have to allocate the array after alloc_msg returns. + * + * For the common case of a single extent, we keep an embedded extent here so + * we can avoid the extra allocation. + */ +struct ceph_sparse_read { + enum ceph_sparse_read_state sr_state; /* state machine state */ + u64 sr_req_off; /* orig request offset */ + u64 sr_req_len; /* orig request length */ + u64 sr_pos; /* current pos in buffer */ + int sr_index; /* current extent index */ + __le32 sr_datalen; /* length of actual data */ + __le32 sr_count; /* extent count */ + struct ceph_sparse_extent *sr_extent; /* extent array */ + struct ceph_sparse_extent sr_emb_ext[1]; /* embedded extent */ +}; + /* a given osd we're communicating with */ struct ceph_osd { refcount_t o_ref; @@ -46,6 +83,7 @@ struct ceph_osd { unsigned long lru_ttl; struct list_head o_keepalive_item; struct mutex lock; + struct ceph_sparse_read o_sparse_read; }; #define CEPH_OSD_SLAB_OPS 2 diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 1c5815530e0d..f519b5727ee3 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -376,6 +376,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, switch (op->op) { case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: ceph_osd_data_release(&op->extent.osd_data); @@ -706,6 +707,7 @@ static void get_num_data_items(struct ceph_osd_request *req, /* reply */ case CEPH_OSD_OP_STAT: case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: case CEPH_OSD_OP_LIST_WATCHERS: *num_reply_data_items += 1; break; @@ -775,7 +777,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req, BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && - opcode != CEPH_OSD_OP_TRUNCATE); + opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ); op->extent.offset = offset; op->extent.length = length; @@ -984,6 +986,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, case CEPH_OSD_OP_STAT: break; case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: case CEPH_OSD_OP_ZERO: @@ -1080,7 +1083,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && - opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); + opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE && + opcode != CEPH_OSD_OP_SPARSE_READ); req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, GFP_NOFS); @@ -2037,6 +2041,7 @@ static void setup_request_data(struct ceph_osd_request *req) &op->raw_data_in); break; case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: ceph_osdc_msg_data_add(reply_msg, &op->extent.osd_data); break; @@ -2443,6 +2448,21 @@ static void submit_request(struct ceph_osd_request *req, bool wrlocked) __submit_request(req, wrlocked); } +static void ceph_init_sparse_read(struct ceph_sparse_read *sr, struct ceph_osd_req_op *op) +{ + if (sr->sr_extent != sr->sr_emb_ext) + kfree(sr->sr_extent); + sr->sr_state = CEPH_SPARSE_READ_HDR; + sr->sr_req_off = op ? op->extent.offset : 0; + sr->sr_req_len = op ? op->extent.length : 0; + sr->sr_pos = sr->sr_req_off; + sr->sr_index = 0; + sr->sr_count = 0; + sr->sr_extent = sr->sr_emb_ext; + sr->sr_extent[0].off = 0; + sr->sr_extent[0].len = 0; +} + static void finish_request(struct ceph_osd_request *req) { struct ceph_osd_client *osdc = req->r_osdc; @@ -2452,8 +2472,10 @@ static void finish_request(struct ceph_osd_request *req) req->r_end_latency = ktime_get(); - if (req->r_osd) + if (req->r_osd) { + ceph_init_sparse_read(&req->r_osd->o_sparse_read, NULL); unlink_request(req->r_osd, req); + } atomic_dec(&osdc->num_requests); /* @@ -3655,6 +3677,8 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) struct MOSDOpReply m; u64 tid = le64_to_cpu(msg->hdr.tid); u32 data_len = 0; + u32 result_len = 0; + bool sparse = false; int ret; int i; @@ -3749,21 +3773,32 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) req->r_ops[i].rval = m.rval[i]; req->r_ops[i].outdata_len = m.outdata_len[i]; data_len += m.outdata_len[i]; + if (req->r_ops[i].op == CEPH_OSD_OP_SPARSE_READ) + sparse = true; } + + result_len = data_len; + if (sparse) { + struct ceph_sparse_read *sr = &osd->o_sparse_read; + + /* Fudge the result if this was a sparse read. */ + result_len = sr->sr_pos - sr->sr_req_off; + } + if (data_len != le32_to_cpu(msg->hdr.data_len)) { pr_err("sum of lens %u != %u for tid %llu\n", data_len, le32_to_cpu(msg->hdr.data_len), req->r_tid); goto fail_request; } - dout("%s req %p tid %llu result %d data_len %u\n", __func__, - req, req->r_tid, m.result, data_len); + dout("%s req %p tid %llu result %d data_len %u result_len %u\n", __func__, + req, req->r_tid, m.result, data_len, result_len); /* * Since we only ever request ONDISK, we should only ever get * one (type of) reply back. */ WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK)); - req->r_result = m.result ?: data_len; + req->r_result = m.result ?: result_len; finish_request(req); mutex_unlock(&osd->lock); up_read(&osdc->lock); @@ -5398,6 +5433,21 @@ static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg) ceph_msg_put(msg); } +static struct ceph_osd_req_op * +sparse_read_op(struct ceph_osd_request *req) +{ + int i; + + if (!(req->r_flags & CEPH_OSD_FLAG_READ)) + return NULL; + + for (i = 0; i < req->r_num_ops; ++i) { + if (req->r_ops[i].op == CEPH_OSD_OP_SPARSE_READ) + return &req->r_ops[i]; + } + return NULL; +} + /* * Lookup and return message for incoming reply. Don't try to do * anything about a larger than preallocated data portion of the @@ -5414,6 +5464,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, int front_len = le32_to_cpu(hdr->front_len); int data_len = le32_to_cpu(hdr->data_len); u64 tid = le64_to_cpu(hdr->tid); + struct ceph_osd_req_op *srop; down_read(&osdc->lock); if (!osd_registered(osd)) { @@ -5446,7 +5497,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, req->r_reply = m; } - if (data_len > req->r_reply->data_length) { + srop = sparse_read_op(req); + + if (!srop && (data_len > req->r_reply->data_length)) { pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", __func__, osd->o_osd, req->r_tid, data_len, req->r_reply->data_length); @@ -5456,6 +5509,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, } m = ceph_msg_get(req->r_reply); + m->sparse_read = srop; + if (srop) + ceph_init_sparse_read(&osd->o_sparse_read, srop); + dout("get_reply tid %lld %p\n", tid, m); out_unlock_session: @@ -5688,9 +5745,101 @@ static int osd_check_message_signature(struct ceph_msg *msg) return ceph_auth_check_message_signature(auth, msg); } +static void zero_len(struct ceph_msg_data_cursor *cursor, size_t len) +{ + while (len) { + struct page *page; + size_t poff, plen; + bool last = false; + + page = ceph_msg_data_next(cursor, &poff, &plen, &last); + if (plen > len) + plen = len; + zero_user_segment(page, poff, poff + plen); + len -= plen; + ceph_msg_data_advance(cursor, plen); + } +} + +static int osd_sparse_read(struct ceph_connection *con, + struct ceph_msg_data_cursor *cursor, + u64 *plen, char **pbuf) +{ + struct ceph_osd *o = con->private; + struct ceph_sparse_read *sr = &o->o_sparse_read; + u32 count = __le32_to_cpu(sr->sr_count); + u64 eoff, elen; + + switch (sr->sr_state) { + case CEPH_SPARSE_READ_HDR: + dout("[%d] request to read 0x%llx~0x%llx\n", o->o_osd, sr->sr_req_off, sr->sr_req_len); + /* number of extents */ + *plen = sizeof(sr->sr_count); + *pbuf = (char *)&sr->sr_count; + sr->sr_state = CEPH_SPARSE_READ_EXTENTS; + break; + case CEPH_SPARSE_READ_EXTENTS: + dout("[%d] got %u extents\n", o->o_osd, count); + + if (count > 0) { + if (count > 1) { + /* can't use the embedded extent array */ + sr->sr_extent = kmalloc_array(count, sizeof(*sr->sr_extent), + GFP_NOIO); + if (!sr->sr_extent) + return -ENOMEM; + } + *plen = count * sizeof(*sr->sr_extent); + *pbuf = (char *)sr->sr_extent; + sr->sr_state = CEPH_SPARSE_READ_DATA_LEN; + break; + } + /* No extents? Fall through to reading data len */ + fallthrough; + case CEPH_SPARSE_READ_DATA_LEN: + *plen = sizeof(sr->sr_datalen); + *pbuf = (char *)&sr->sr_datalen; + sr->sr_state = CEPH_SPARSE_READ_DATA; + break; + case CEPH_SPARSE_READ_DATA: + if (sr->sr_index >= count) + return 0; + if (sr->sr_index == 0) { + /* last extent */ + eoff = le64_to_cpu(sr->sr_extent[count - 1].off); + elen = le64_to_cpu(sr->sr_extent[count - 1].len); + + /* set up cursor to end of last extent */ + ceph_msg_data_cursor_init(cursor, con->in_msg, + eoff + elen - sr->sr_req_off); + } + + eoff = le64_to_cpu(sr->sr_extent[sr->sr_index].off); + elen = le64_to_cpu(sr->sr_extent[sr->sr_index].len); + + dout("[%d] ext %d off 0x%llx len 0x%llx\n", o->o_osd, sr->sr_index, eoff, elen); + + /* zero out anything from sr_pos to start of extent */ + if (sr->sr_pos < eoff) + zero_len(cursor, eoff - sr->sr_pos); + + /* Set position to end of extent */ + sr->sr_pos = eoff + elen; + + /* send back the new length */ + *plen = elen; + + /* Bump the array index */ + ++sr->sr_index; + break; + } + return 1; +} + static const struct ceph_connection_operations osd_con_ops = { .get = osd_get_con, .put = osd_put_con, + .sparse_read = osd_sparse_read, .alloc_msg = osd_alloc_msg, .dispatch = osd_dispatch, .fault = osd_fault,