diff mbox series

[RFC,23/35] rbd: Use ceph_databuf_enc_start/stop()

Message ID 20250313233341.1675324-24-dhowells@redhat.com
State New
Headers show
Series ceph, rbd, netfs: Make ceph fully use netfslib | expand

Commit Message

David Howells March 13, 2025, 11:33 p.m. UTC
Make rbd use ceph_databuf_enc_start() and ceph_databuf_enc_stop() when
filling out the request data.  Also use ceph_encode_*() rather than
ceph_databuf_encode_*() as the latter will do an iterator copy to deal with
page crossing and misalignment (the latter being something that the CPU
will handle on some arches).

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Viacheslav Dubeyko <slava@dubeyko.com>
cc: Alex Markuze <amarkuze@redhat.com>
cc: Ilya Dryomov <idryomov@gmail.com>
cc: ceph-devel@vger.kernel.org
cc: linux-fsdevel@vger.kernel.org
---
 drivers/block/rbd.c | 64 ++++++++++++++++++++++-----------------------
 1 file changed, 31 insertions(+), 33 deletions(-)

Comments

Viacheslav Dubeyko March 19, 2025, 12:32 a.m. UTC | #1
On Thu, 2025-03-13 at 23:33 +0000, David Howells wrote:
> Make rbd use ceph_databuf_enc_start() and ceph_databuf_enc_stop() when
> filling out the request data.  Also use ceph_encode_*() rather than
> ceph_databuf_encode_*() as the latter will do an iterator copy to deal with
> page crossing and misalignment (the latter being something that the CPU
> will handle on some arches).
> 
> Signed-off-by: David Howells <dhowells@redhat.com>
> cc: Viacheslav Dubeyko <slava@dubeyko.com>
> cc: Alex Markuze <amarkuze@redhat.com>
> cc: Ilya Dryomov <idryomov@gmail.com>
> cc: ceph-devel@vger.kernel.org
> cc: linux-fsdevel@vger.kernel.org
> ---
>  drivers/block/rbd.c | 64 ++++++++++++++++++++++-----------------------
>  1 file changed, 31 insertions(+), 33 deletions(-)
> 
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index a2674077edea..956fc4a8f1da 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -1970,19 +1970,19 @@ static int rbd_cls_object_map_update(struct ceph_osd_request *req,
>  				     int which, u64 objno, u8 new_state,
>  				     const u8 *current_state)
>  {
> -	struct ceph_databuf *dbuf;
> -	void *p, *start;
> +	struct ceph_databuf *request;
> +	void *p;
>  	int ret;
>  
>  	ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
>  	if (ret)
>  		return ret;
>  
> -	dbuf = ceph_databuf_req_alloc(1, PAGE_SIZE, GFP_NOIO);
> -	if (!dbuf)
> +	request = ceph_databuf_req_alloc(1, 8 * 2 + 3 * 1, GFP_NOIO);

This 8 * 2 + 3 * 1 is too unclear for me. :) Could we introduce named constants
here?

> +	if (!request)
>  		return -ENOMEM;
>  
> -	p = start = kmap_ceph_databuf_page(dbuf, 0);
> +	p = ceph_databuf_enc_start(request);
>  	ceph_encode_64(&p, objno);
>  	ceph_encode_64(&p, objno + 1);
>  	ceph_encode_8(&p, new_state);
> @@ -1992,10 +1992,9 @@ static int rbd_cls_object_map_update(struct ceph_osd_request *req,
>  	} else {
>  		ceph_encode_8(&p, 0);
>  	}
> -	kunmap_local(p);
> -	ceph_databuf_added_data(dbuf, p - start);
> +	ceph_databuf_enc_stop(request, p);
>  
> -	osd_req_op_cls_request_databuf(req, which, dbuf);
> +	osd_req_op_cls_request_databuf(req, which, request);
>  	return 0;
>  }
>  
> @@ -2108,7 +2107,7 @@ static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
>  
>  static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
>  {
> -	struct ceph_databuf *dbuf;
> +	struct ceph_databuf *request;
>  
>  	/*
>  	 * The response data for a STAT call consists of:
> @@ -2118,12 +2117,12 @@ static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
>  	 *         le32 tv_nsec;
>  	 *     } mtime;
>  	 */
> -	dbuf = ceph_databuf_reply_alloc(1, 8 + sizeof(struct ceph_timespec), GFP_NOIO);
> -	if (!dbuf)
> +	request = ceph_databuf_reply_alloc(1, 8 + sizeof(struct ceph_timespec), GFP_NOIO);

Ditto. Why do we have 8 + sizeof(struct ceph_timespec) here?

Thanks,
Slava.

> +	if (!request)
>  		return -ENOMEM;
>  
>  	osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
> -	osd_req_op_raw_data_in_databuf(osd_req, which, dbuf);
> +	osd_req_op_raw_data_in_databuf(osd_req, which, request);
>  	return 0;
>  }
>  
> @@ -2964,16 +2963,16 @@ static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
>  
>  static int setup_copyup_buf(struct rbd_obj_request *obj_req, u64 obj_overlap)
>  {
> -	struct ceph_databuf *dbuf;
> +	struct ceph_databuf *request;
>  
>  	rbd_assert(!obj_req->copyup_buf);
>  
> -	dbuf = ceph_databuf_req_alloc(calc_pages_for(0, obj_overlap),
> +	request = ceph_databuf_req_alloc(calc_pages_for(0, obj_overlap),
>  				      obj_overlap, GFP_NOIO);
> -	if (!dbuf)
> +	if (!request)
>  		return -ENOMEM;
>  
> -	obj_req->copyup_buf = dbuf;
> +	obj_req->copyup_buf = request;
>  	return 0;
>  }
>  
> @@ -4580,10 +4579,9 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
>  		if (!request)
>  			return -ENOMEM;
>  
> -		p = kmap_ceph_databuf_page(request, 0);
> -		memcpy(p, outbound, outbound_size);
> -		kunmap_local(p);
> -		ceph_databuf_added_data(request, outbound_size);
> +		p = ceph_databuf_enc_start(request);
> +		ceph_encode_copy(&p, outbound, outbound_size);
> +		ceph_databuf_enc_stop(request, p);
>  	}
>  
>  	reply = ceph_databuf_reply_alloc(1, inbound_size, GFP_KERNEL);
> @@ -4712,7 +4710,7 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
>  static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
>  			     struct ceph_object_id *oid,
>  			     struct ceph_object_locator *oloc,
> -			     struct ceph_databuf *dbuf, int len)
> +			     struct ceph_databuf *request, int len)
>  {
>  	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
>  	struct ceph_osd_request *req;
> @@ -4727,7 +4725,7 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
>  	req->r_flags = CEPH_OSD_FLAG_READ;
>  
>  	osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, len, 0, 0);
> -	osd_req_op_extent_osd_databuf(req, 0, dbuf);
> +	osd_req_op_extent_osd_databuf(req, 0, request);
>  
>  	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
>  	if (ret)
> @@ -4750,16 +4748,16 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev,
>  				  bool first_time)
>  {
>  	struct rbd_image_header_ondisk *ondisk;
> -	struct ceph_databuf *dbuf = NULL;
> +	struct ceph_databuf *request = NULL;
>  	u32 snap_count = 0;
>  	u64 names_size = 0;
>  	u32 want_count;
>  	int ret;
>  
> -	dbuf = ceph_databuf_req_alloc(1, sizeof(*ondisk), GFP_KERNEL);
> -	if (!dbuf)
> +	request = ceph_databuf_req_alloc(1, sizeof(*ondisk), GFP_KERNEL);
> +	if (!request)
>  		return -ENOMEM;
> -	ondisk = kmap_ceph_databuf_page(dbuf, 0);
> +	ondisk = kmap_ceph_databuf_page(request, 0);
>  
>  	/*
>  	 * The complete header will include an array of its 64-bit
> @@ -4776,13 +4774,13 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev,
>  		size += names_size;
>  
>  		ret = -ENOMEM;
> -		if (size > dbuf->limit &&
> -		    ceph_databuf_reserve(dbuf, size - dbuf->limit,
> +		if (size > request->limit &&
> +		    ceph_databuf_reserve(request, size - request->limit,
>  					 GFP_KERNEL) < 0)
>  			goto out;
>  
>  		ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
> -					&rbd_dev->header_oloc, dbuf, size);
> +					&rbd_dev->header_oloc, request, size);
>  		if (ret < 0)
>  			goto out;
>  		if ((size_t)ret < size) {
> @@ -4806,7 +4804,7 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev,
>  	ret = rbd_header_from_disk(header, ondisk, first_time);
>  out:
>  	kunmap_local(ondisk);
> -	ceph_databuf_release(dbuf);
> +	ceph_databuf_release(request);
>  	return ret;
>  }
>  
> @@ -5625,10 +5623,10 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev,
>  	if (!reply)
>  		goto out_free;
>  
> -	p = kmap_ceph_databuf_page(request, 0);
> +	p = ceph_databuf_enc_start(request);
>  	ceph_encode_64(&p, rbd_dev->spec->snap_id);
> -	kunmap_local(p);
> -	ceph_databuf_added_data(request, sizeof(__le64));
> +	ceph_databuf_enc_stop(request, p);
> +
>  	ret = __get_parent_info(rbd_dev, request, reply, pii);
>  	if (ret > 0)
>  		ret = __get_parent_info_legacy(rbd_dev, request, reply, pii);
> 
>
David Howells March 20, 2025, 2:59 p.m. UTC | #2
Viacheslav Dubeyko <Slava.Dubeyko@ibm.com> wrote:

> > -	dbuf = ceph_databuf_reply_alloc(1, 8 + sizeof(struct ceph_timespec), GFP_NOIO);
> > -	if (!dbuf)
> > +	request = ceph_databuf_reply_alloc(1, 8 + sizeof(struct ceph_timespec), GFP_NOIO);
> 
> Ditto. Why do we have 8 + sizeof(struct ceph_timespec) here?

Because that's the size of the composite protocol element.

As to why it's using a total of plain integers and sizeofs rather than
constant macros, Ilya is the person to ask according to git blame;-).

I would probably prefer sizeof(__le64) here over 8, but I didn't want to
change it too far from the existing code.

If you want macro constants for these sorts of things, someone else who knows
the protocol better needs to do that.  You could probably write something to
generate them (akin to rpcgen).

David
Viacheslav Dubeyko March 20, 2025, 9:48 p.m. UTC | #3
On Thu, 2025-03-20 at 14:59 +0000, David Howells wrote:
> Viacheslav Dubeyko <Slava.Dubeyko@ibm.com> wrote:
> 
> > > -	dbuf = ceph_databuf_reply_alloc(1, 8 + sizeof(struct ceph_timespec), GFP_NOIO);
> > > -	if (!dbuf)
> > > +	request = ceph_databuf_reply_alloc(1, 8 + sizeof(struct ceph_timespec), GFP_NOIO);
> > 
> > Ditto. Why do we have 8 + sizeof(struct ceph_timespec) here?
> 
> Because that's the size of the composite protocol element.
> 
> As to why it's using a total of plain integers and sizeofs rather than
> constant macros, Ilya is the person to ask according to git blame;-).
> 
> I would probably prefer sizeof(__le64) here over 8, but I didn't want to
> change it too far from the existing code.
> 
> If you want macro constants for these sorts of things, someone else who knows
> the protocol better needs to do that.  You could probably write something to
> generate them (akin to rpcgen).
> 

Yes, make sense. I totally agree with you. :)

Thanks,
Slava.
diff mbox series

Patch

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index a2674077edea..956fc4a8f1da 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1970,19 +1970,19 @@  static int rbd_cls_object_map_update(struct ceph_osd_request *req,
 				     int which, u64 objno, u8 new_state,
 				     const u8 *current_state)
 {
-	struct ceph_databuf *dbuf;
-	void *p, *start;
+	struct ceph_databuf *request;
+	void *p;
 	int ret;
 
 	ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
 	if (ret)
 		return ret;
 
-	dbuf = ceph_databuf_req_alloc(1, PAGE_SIZE, GFP_NOIO);
-	if (!dbuf)
+	request = ceph_databuf_req_alloc(1, 8 * 2 + 3 * 1, GFP_NOIO);
+	if (!request)
 		return -ENOMEM;
 
-	p = start = kmap_ceph_databuf_page(dbuf, 0);
+	p = ceph_databuf_enc_start(request);
 	ceph_encode_64(&p, objno);
 	ceph_encode_64(&p, objno + 1);
 	ceph_encode_8(&p, new_state);
@@ -1992,10 +1992,9 @@  static int rbd_cls_object_map_update(struct ceph_osd_request *req,
 	} else {
 		ceph_encode_8(&p, 0);
 	}
-	kunmap_local(p);
-	ceph_databuf_added_data(dbuf, p - start);
+	ceph_databuf_enc_stop(request, p);
 
-	osd_req_op_cls_request_databuf(req, which, dbuf);
+	osd_req_op_cls_request_databuf(req, which, request);
 	return 0;
 }
 
@@ -2108,7 +2107,7 @@  static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
 
 static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
 {
-	struct ceph_databuf *dbuf;
+	struct ceph_databuf *request;
 
 	/*
 	 * The response data for a STAT call consists of:
@@ -2118,12 +2117,12 @@  static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
 	 *         le32 tv_nsec;
 	 *     } mtime;
 	 */
-	dbuf = ceph_databuf_reply_alloc(1, 8 + sizeof(struct ceph_timespec), GFP_NOIO);
-	if (!dbuf)
+	request = ceph_databuf_reply_alloc(1, 8 + sizeof(struct ceph_timespec), GFP_NOIO);
+	if (!request)
 		return -ENOMEM;
 
 	osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
-	osd_req_op_raw_data_in_databuf(osd_req, which, dbuf);
+	osd_req_op_raw_data_in_databuf(osd_req, which, request);
 	return 0;
 }
 
@@ -2964,16 +2963,16 @@  static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
 
 static int setup_copyup_buf(struct rbd_obj_request *obj_req, u64 obj_overlap)
 {
-	struct ceph_databuf *dbuf;
+	struct ceph_databuf *request;
 
 	rbd_assert(!obj_req->copyup_buf);
 
-	dbuf = ceph_databuf_req_alloc(calc_pages_for(0, obj_overlap),
+	request = ceph_databuf_req_alloc(calc_pages_for(0, obj_overlap),
 				      obj_overlap, GFP_NOIO);
-	if (!dbuf)
+	if (!request)
 		return -ENOMEM;
 
-	obj_req->copyup_buf = dbuf;
+	obj_req->copyup_buf = request;
 	return 0;
 }
 
@@ -4580,10 +4579,9 @@  static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
 		if (!request)
 			return -ENOMEM;
 
-		p = kmap_ceph_databuf_page(request, 0);
-		memcpy(p, outbound, outbound_size);
-		kunmap_local(p);
-		ceph_databuf_added_data(request, outbound_size);
+		p = ceph_databuf_enc_start(request);
+		ceph_encode_copy(&p, outbound, outbound_size);
+		ceph_databuf_enc_stop(request, p);
 	}
 
 	reply = ceph_databuf_reply_alloc(1, inbound_size, GFP_KERNEL);
@@ -4712,7 +4710,7 @@  static void rbd_free_disk(struct rbd_device *rbd_dev)
 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
 			     struct ceph_object_id *oid,
 			     struct ceph_object_locator *oloc,
-			     struct ceph_databuf *dbuf, int len)
+			     struct ceph_databuf *request, int len)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct ceph_osd_request *req;
@@ -4727,7 +4725,7 @@  static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
 	req->r_flags = CEPH_OSD_FLAG_READ;
 
 	osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, len, 0, 0);
-	osd_req_op_extent_osd_databuf(req, 0, dbuf);
+	osd_req_op_extent_osd_databuf(req, 0, request);
 
 	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
 	if (ret)
@@ -4750,16 +4748,16 @@  static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev,
 				  bool first_time)
 {
 	struct rbd_image_header_ondisk *ondisk;
-	struct ceph_databuf *dbuf = NULL;
+	struct ceph_databuf *request = NULL;
 	u32 snap_count = 0;
 	u64 names_size = 0;
 	u32 want_count;
 	int ret;
 
-	dbuf = ceph_databuf_req_alloc(1, sizeof(*ondisk), GFP_KERNEL);
-	if (!dbuf)
+	request = ceph_databuf_req_alloc(1, sizeof(*ondisk), GFP_KERNEL);
+	if (!request)
 		return -ENOMEM;
-	ondisk = kmap_ceph_databuf_page(dbuf, 0);
+	ondisk = kmap_ceph_databuf_page(request, 0);
 
 	/*
 	 * The complete header will include an array of its 64-bit
@@ -4776,13 +4774,13 @@  static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev,
 		size += names_size;
 
 		ret = -ENOMEM;
-		if (size > dbuf->limit &&
-		    ceph_databuf_reserve(dbuf, size - dbuf->limit,
+		if (size > request->limit &&
+		    ceph_databuf_reserve(request, size - request->limit,
 					 GFP_KERNEL) < 0)
 			goto out;
 
 		ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
-					&rbd_dev->header_oloc, dbuf, size);
+					&rbd_dev->header_oloc, request, size);
 		if (ret < 0)
 			goto out;
 		if ((size_t)ret < size) {
@@ -4806,7 +4804,7 @@  static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev,
 	ret = rbd_header_from_disk(header, ondisk, first_time);
 out:
 	kunmap_local(ondisk);
-	ceph_databuf_release(dbuf);
+	ceph_databuf_release(request);
 	return ret;
 }
 
@@ -5625,10 +5623,10 @@  static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev,
 	if (!reply)
 		goto out_free;
 
-	p = kmap_ceph_databuf_page(request, 0);
+	p = ceph_databuf_enc_start(request);
 	ceph_encode_64(&p, rbd_dev->spec->snap_id);
-	kunmap_local(p);
-	ceph_databuf_added_data(request, sizeof(__le64));
+	ceph_databuf_enc_stop(request, p);
+
 	ret = __get_parent_info(rbd_dev, request, reply, pii);
 	if (ret > 0)
 		ret = __get_parent_info_legacy(rbd_dev, request, reply, pii);