From patchwork Thu Aug 25 13:31:21 2022 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jeff Layton X-Patchwork-Id: 599999 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from vger.kernel.org (vger.kernel.org [23.128.96.18]) by smtp.lore.kernel.org (Postfix) with ESMTP id 8A85DC04AA5 for ; Thu, 25 Aug 2022 13:32:17 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S242823AbiHYNcO (ORCPT ); Thu, 25 Aug 2022 09:32:14 -0400 Received: from lindbergh.monkeyblade.net ([23.128.96.19]:36882 "EHLO lindbergh.monkeyblade.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S242759AbiHYNb5 (ORCPT ); Thu, 25 Aug 2022 09:31:57 -0400 Received: from ams.source.kernel.org (ams.source.kernel.org [145.40.68.75]) by lindbergh.monkeyblade.net (Postfix) with ESMTPS id 985BBB5A46 for ; Thu, 25 Aug 2022 06:31:50 -0700 (PDT) Received: from smtp.kernel.org (relay.kernel.org [52.25.139.140]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by ams.source.kernel.org (Postfix) with ESMTPS id A3DA5B829AB for ; Thu, 25 Aug 2022 13:31:48 +0000 (UTC) Received: by smtp.kernel.org (Postfix) with ESMTPSA id D5F5FC433B5; Thu, 25 Aug 2022 13:31:46 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=kernel.org; s=k20201202; t=1661434307; bh=qsYmVRBvFGeV52iB0K+q12rlC8X9h7fd+eXe6rtNRNQ=; h=From:To:Cc:Subject:Date:In-Reply-To:References:From; b=i2DGJr0ex07M4lgS9looTnAZaNGOGx/jwS/1fG0amt814kVj8oNonGyh6ndyKLv1r lff066nxFH0FzTc/Wri2KO3ypTGCwFOjQiFfhRK2n0zZjOgtgiHO3pj+MJCvp3V7bl RkOmDpf+gX10cxEwmFoWC/Tz/lWWhpuScgUWI71DlWNxCr37J65iV913fKgUj824MO jeZ3Bedn/ua9QILXe66NjAttFYCA2XsXgFddez9E220vQydL7oeMRDjLaUNUMmpCYd KM/oO7IEYxNWBHI7W41Cetgj4Ls2tee/tjxbtV2+otLHi68iu9X+jWQy5LAIOh2Toa tTmsgaOEM2BQg== From: Jeff Layton To: xiubli@redhat.com, idryomov@gmail.com Cc: lhenriques@suse.de, ceph-devel@vger.kernel.org Subject: [PATCH v15 18/29] ceph: add read/modify/write to ceph_sync_write Date: Thu, 25 Aug 2022 09:31:21 -0400 Message-Id: <20220825133132.153657-19-jlayton@kernel.org> X-Mailer: git-send-email 2.37.2 In-Reply-To: <20220825133132.153657-1-jlayton@kernel.org> References: <20220825133132.153657-1-jlayton@kernel.org> MIME-Version: 1.0 Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org When doing a synchronous write on an encrypted inode, we have no guarantee that the caller is writing crypto block-aligned data. When that happens, we must do a read/modify/write cycle. First, expand the range to cover complete blocks. If we had to change the original pos or length, issue a read to fill the first and/or last pages, and fetch the version of the object from the result. We then copy data into the pages as usual, encrypt the result and issue a write prefixed by an assertion that the version hasn't changed. If it has changed then we restart the whole thing again. If there is no object at that position in the file (-ENOENT), we prefix the write on an exclusive create of the object instead. Reviewed-by: Xiubo Li Signed-off-by: Jeff Layton --- fs/ceph/file.c | 316 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 288 insertions(+), 28 deletions(-) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index bfba2e17cd0a..064af05da955 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -1561,18 +1561,16 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_vino vino; + struct ceph_osd_client *osdc = &fsc->client->osdc; struct ceph_osd_request *req; struct page **pages; u64 len; int num_pages; int written = 0; - int flags; int ret; bool check_caps = false; struct timespec64 mtime = current_time(inode); size_t count = iov_iter_count(from); - size_t off; if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) return -EROFS; @@ -1592,29 +1590,234 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, if (ret < 0) dout("invalidate_inode_pages2_range returned %d\n", ret); - flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE; - while ((len = iov_iter_count(from)) > 0) { size_t left; int n; + u64 write_pos = pos; + u64 write_len = len; + u64 objnum, objoff; + u32 xlen; + u64 assert_ver = 0; + bool rmw; + bool first, last; + struct iov_iter saved_iter = *from; + size_t off; + + ceph_fscrypt_adjust_off_and_len(inode, &write_pos, &write_len); + + /* clamp the length to the end of first object */ + ceph_calc_file_object_mapping(&ci->i_layout, write_pos, + write_len, &objnum, &objoff, + &xlen); + write_len = xlen; + + /* adjust len downward if it goes beyond current object */ + if (pos + len > write_pos + write_len) + len = write_pos + write_len - pos; - vino = ceph_vino(inode); - req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, pos, &len, 0, 1, - CEPH_OSD_OP_WRITE, flags, snapc, - ci->i_truncate_seq, - ci->i_truncate_size, - false); - if (IS_ERR(req)) { - ret = PTR_ERR(req); - break; - } + /* + * If we had to adjust the length or position to align with a + * crypto block, then we must do a read/modify/write cycle. We + * use a version assertion to redrive the thing if something + * changes in between. + */ + first = pos != write_pos; + last = (pos + len) != (write_pos + write_len); + rmw = first || last; - num_pages = calc_pages_for(pos, len); + dout("sync_write ino %llx %lld~%llu adjusted %lld~%llu -- %srmw\n", + ci->i_vino.ino, pos, len, write_pos, write_len, rmw ? "" : "no "); + + /* + * The data is emplaced into the page as it would be if it were in + * an array of pagecache pages. + */ + num_pages = calc_pages_for(write_pos, write_len); pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); if (IS_ERR(pages)) { ret = PTR_ERR(pages); - goto out; + break; + } + + /* Do we need to preload the pages? */ + if (rmw) { + u64 first_pos = write_pos; + u64 last_pos = (write_pos + write_len) - CEPH_FSCRYPT_BLOCK_SIZE; + u64 read_len = CEPH_FSCRYPT_BLOCK_SIZE; + struct ceph_osd_req_op *op; + + /* We should only need to do this for encrypted inodes */ + WARN_ON_ONCE(!IS_ENCRYPTED(inode)); + + /* No need to do two reads if first and last blocks are same */ + if (first && last_pos == first_pos) + last = false; + + /* + * Allocate a read request for one or two extents, depending + * on how the request was aligned. + */ + req = ceph_osdc_new_request(osdc, &ci->i_layout, + ci->i_vino, first ? first_pos : last_pos, + &read_len, 0, (first && last) ? 2 : 1, + CEPH_OSD_OP_SPARSE_READ, CEPH_OSD_FLAG_READ, + NULL, ci->i_truncate_seq, + ci->i_truncate_size, false); + if (IS_ERR(req)) { + ceph_release_page_vector(pages, num_pages); + ret = PTR_ERR(req); + break; + } + + /* Something is misaligned! */ + if (read_len != CEPH_FSCRYPT_BLOCK_SIZE) { + ceph_osdc_put_request(req); + ceph_release_page_vector(pages, num_pages); + ret = -EIO; + break; + } + + /* Add extent for first block? */ + op = &req->r_ops[0]; + + if (first) { + osd_req_op_extent_osd_data_pages(req, 0, pages, + CEPH_FSCRYPT_BLOCK_SIZE, + offset_in_page(first_pos), + false, false); + /* We only expect a single extent here */ + ret = __ceph_alloc_sparse_ext_map(op, 1); + if (ret) { + ceph_osdc_put_request(req); + ceph_release_page_vector(pages, num_pages); + break; + } + } + + /* Add extent for last block */ + if (last) { + /* Init the other extent if first extent has been used */ + if (first) { + op = &req->r_ops[1]; + osd_req_op_extent_init(req, 1, CEPH_OSD_OP_SPARSE_READ, + last_pos, CEPH_FSCRYPT_BLOCK_SIZE, + ci->i_truncate_size, + ci->i_truncate_seq); + } + + ret = __ceph_alloc_sparse_ext_map(op, 1); + if (ret) { + ceph_osdc_put_request(req); + ceph_release_page_vector(pages, num_pages); + break; + } + + osd_req_op_extent_osd_data_pages(req, first ? 1 : 0, + &pages[num_pages - 1], + CEPH_FSCRYPT_BLOCK_SIZE, + offset_in_page(last_pos), + false, false); + } + + ceph_osdc_start_request(osdc, req); + ret = ceph_osdc_wait_request(osdc, req); + + /* FIXME: length field is wrong if there are 2 extents */ + ceph_update_read_metrics(&fsc->mdsc->metric, + req->r_start_latency, + req->r_end_latency, + read_len, ret); + + /* Ok if object is not already present */ + if (ret == -ENOENT) { + /* + * If there is no object, then we can't assert + * on its version. Set it to 0, and we'll use an + * exclusive create instead. + */ + ceph_osdc_put_request(req); + ret = 0; + + /* + * zero out the soon-to-be uncopied parts of the + * first and last pages. + */ + if (first) + zero_user_segment(pages[0], 0, + offset_in_page(first_pos)); + if (last) + zero_user_segment(pages[num_pages - 1], + offset_in_page(last_pos), + PAGE_SIZE); + } else { + if (ret < 0) { + ceph_osdc_put_request(req); + ceph_release_page_vector(pages, num_pages); + break; + } + + op = &req->r_ops[0]; + if (op->extent.sparse_ext_cnt == 0) { + if (first) + zero_user_segment(pages[0], 0, + offset_in_page(first_pos)); + else + zero_user_segment(pages[num_pages - 1], + offset_in_page(last_pos), + PAGE_SIZE); + } else if (op->extent.sparse_ext_cnt != 1 || + ceph_sparse_ext_map_end(op) != + CEPH_FSCRYPT_BLOCK_SIZE) { + ret = -EIO; + ceph_osdc_put_request(req); + ceph_release_page_vector(pages, num_pages); + break; + } + + if (first && last) { + op = &req->r_ops[1]; + if (op->extent.sparse_ext_cnt == 0) { + zero_user_segment(pages[num_pages - 1], + offset_in_page(last_pos), + PAGE_SIZE); + } else if (op->extent.sparse_ext_cnt != 1 || + ceph_sparse_ext_map_end(op) != + CEPH_FSCRYPT_BLOCK_SIZE) { + ret = -EIO; + ceph_osdc_put_request(req); + ceph_release_page_vector(pages, num_pages); + break; + } + } + + /* Grab assert version. It must be non-zero. */ + assert_ver = req->r_version; + WARN_ON_ONCE(ret > 0 && assert_ver == 0); + + ceph_osdc_put_request(req); + if (first) { + ret = ceph_fscrypt_decrypt_block_inplace(inode, + pages[0], + CEPH_FSCRYPT_BLOCK_SIZE, + offset_in_page(first_pos), + first_pos >> CEPH_FSCRYPT_BLOCK_SHIFT); + if (ret < 0) { + ceph_release_page_vector(pages, num_pages); + break; + } + } + if (last) { + ret = ceph_fscrypt_decrypt_block_inplace(inode, + pages[num_pages - 1], + CEPH_FSCRYPT_BLOCK_SIZE, + offset_in_page(last_pos), + last_pos >> CEPH_FSCRYPT_BLOCK_SHIFT); + if (ret < 0) { + ceph_release_page_vector(pages, num_pages); + break; + } + } + } } left = len; @@ -1622,35 +1825,90 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, for (n = 0; n < num_pages; n++) { size_t plen = min_t(size_t, left, PAGE_SIZE - off); + /* copy the data */ ret = copy_page_from_iter(pages[n], off, plen, from); - off = 0; if (ret != plen) { ret = -EFAULT; break; } + off = 0; left -= ret; } - if (ret < 0) { + dout("sync_write write failed with %d\n", ret); ceph_release_page_vector(pages, num_pages); - goto out; + break; } - req->r_inode = inode; + if (IS_ENCRYPTED(inode)) { + ret = ceph_fscrypt_encrypt_pages(inode, pages, + write_pos, write_len, + GFP_KERNEL); + if (ret < 0) { + dout("encryption failed with %d\n", ret); + ceph_release_page_vector(pages, num_pages); + break; + } + } - osd_req_op_extent_osd_data_pages(req, 0, pages, len, - offset_in_page(pos), - false, true); + req = ceph_osdc_new_request(osdc, &ci->i_layout, + ci->i_vino, write_pos, &write_len, + rmw ? 1 : 0, rmw ? 2 : 1, + CEPH_OSD_OP_WRITE, + CEPH_OSD_FLAG_WRITE, + snapc, ci->i_truncate_seq, + ci->i_truncate_size, false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + ceph_release_page_vector(pages, num_pages); + break; + } + dout("sync_write write op %lld~%llu\n", write_pos, write_len); + osd_req_op_extent_osd_data_pages(req, rmw ? 1 : 0, pages, write_len, + offset_in_page(write_pos), false, + true); + req->r_inode = inode; req->r_mtime = mtime; - ceph_osdc_start_request(&fsc->client->osdc, req); - ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + + /* Set up the assertion */ + if (rmw) { + /* + * Set up the assertion. If we don't have a version number, + * then the object doesn't exist yet. Use an exclusive create + * instead of a version assertion in that case. + */ + if (assert_ver) { + osd_req_op_init(req, 0, CEPH_OSD_OP_ASSERT_VER, 0); + req->r_ops[0].assert_ver.ver = assert_ver; + } else { + osd_req_op_init(req, 0, CEPH_OSD_OP_CREATE, + CEPH_OSD_OP_FLAG_EXCL); + } + } + + ceph_osdc_start_request(osdc, req); + ret = ceph_osdc_wait_request(osdc, req); ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, req->r_end_latency, len, ret); -out: ceph_osdc_put_request(req); if (ret != 0) { + dout("sync_write osd write returned %d\n", ret); + /* Version changed! Must re-do the rmw cycle */ + if ((assert_ver && (ret == -ERANGE || ret == -EOVERFLOW)) || + (!assert_ver && ret == -EEXIST)) { + /* We should only ever see this on a rmw */ + WARN_ON_ONCE(!rmw); + + /* The version should never go backward */ + WARN_ON_ONCE(ret == -EOVERFLOW); + + *from = saved_iter; + + /* FIXME: limit number of times we loop? */ + continue; + } ceph_set_error_write(ci); break; } @@ -1658,6 +1916,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, ceph_clear_error_write(ci); pos += len; written += len; + dout("sync_write written %d\n", written); if (pos > i_size_read(inode)) { check_caps = ceph_inode_set_size(inode, pos); if (check_caps) @@ -1672,6 +1931,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, ret = written; iocb->ki_pos = pos; } + dout("sync_write returning %d\n", ret); return ret; }