diff mbox series

[RFC,21/35] libceph: Make notify code use ceph_databuf_enc_start/stop

Message ID 20250313233341.1675324-22-dhowells@redhat.com
State New
Headers show
Series ceph, rbd, netfs: Make ceph fully use netfslib | expand

Commit Message

David Howells March 13, 2025, 11:33 p.m. UTC
Make the ceph_osdc_notify*() functions use ceph_databuf_enc_start() and
ceph_databuf_enc_stop() when filling out the request data.  Also use
ceph_encode_*() rather than ceph_databuf_encode_*() as the latter will do
an iterator copy to deal with page crossing and misalignment (the latter
being something that the CPU will handle on some arches).

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Viacheslav Dubeyko <slava@dubeyko.com>
cc: Alex Markuze <amarkuze@redhat.com>
cc: Ilya Dryomov <idryomov@gmail.com>
cc: ceph-devel@vger.kernel.org
cc: linux-fsdevel@vger.kernel.org
---
 net/ceph/osd_client.c | 55 +++++++++++++++++++++----------------------
 1 file changed, 27 insertions(+), 28 deletions(-)

Comments

Viacheslav Dubeyko March 18, 2025, 8:12 p.m. UTC | #1
On Thu, 2025-03-13 at 23:33 +0000, David Howells wrote:
> Make the ceph_osdc_notify*() functions use ceph_databuf_enc_start() and
> ceph_databuf_enc_stop() when filling out the request data.  Also use
> ceph_encode_*() rather than ceph_databuf_encode_*() as the latter will do
> an iterator copy to deal with page crossing and misalignment (the latter
> being something that the CPU will handle on some arches).
> 
> Signed-off-by: David Howells <dhowells@redhat.com>
> cc: Viacheslav Dubeyko <slava@dubeyko.com>
> cc: Alex Markuze <amarkuze@redhat.com>
> cc: Ilya Dryomov <idryomov@gmail.com>
> cc: ceph-devel@vger.kernel.org
> cc: linux-fsdevel@vger.kernel.org
> ---
>  net/ceph/osd_client.c | 55 +++++++++++++++++++++----------------------
>  1 file changed, 27 insertions(+), 28 deletions(-)
> 
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 0ac439e7e730..1a0cb2cdcc52 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -4759,7 +4759,10 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
>  {
>  	struct ceph_osd_req_op *op;
>  	struct ceph_databuf *dbuf;
> -	int ret;
> +	void *p;
> +
> +	if (!payload)
> +		payload_len = 0;
>  
>  	op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
>  
> @@ -4767,18 +4770,13 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
>  	if (!dbuf)
>  		return -ENOMEM;
>  
> -	ret = ceph_databuf_encode_64(dbuf, notify_id);
> -	ret |= ceph_databuf_encode_64(dbuf, cookie);
> -	if (payload) {
> -		ret |= ceph_databuf_encode_32(dbuf, payload_len);
> -		ret |= ceph_databuf_append(dbuf, payload, payload_len);
> -	} else {
> -		ret |= ceph_databuf_encode_32(dbuf, 0);
> -	}
> -	if (ret) {
> -		ceph_databuf_release(dbuf);
> -		return -ENOMEM;
> -	}
> +	p = ceph_databuf_enc_start(dbuf);
> +	ceph_encode_64(&p, notify_id);
> +	ceph_encode_64(&p, cookie);
> +	ceph_encode_32(&p, payload_len);
> +	if (payload)
> +		ceph_encode_copy(&p, payload, payload_len);
> +	ceph_databuf_enc_stop(dbuf, p);
>  
>  	ceph_osd_databuf_init(&op->notify_ack.request_data, dbuf);
>  	op->indata_len = ceph_databuf_len(dbuf);
> @@ -4840,8 +4838,12 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
>  		     size_t *preply_len)
>  {
>  	struct ceph_osd_linger_request *lreq;
> +	void *p;
>  	int ret;
>  
> +	if (WARN_ON_ONCE(payload_len > PAGE_SIZE - 3 * 4))

Why PAGE_SIZE - 3 * 4? Could make this more clear here?

> +		return -EIO;
> +
>  	WARN_ON(!timeout);
>  	if (preply_pages) {
>  		*preply_pages = NULL;
> @@ -4852,20 +4854,19 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
>  	if (!lreq)
>  		return -ENOMEM;
>  
> -	lreq->request_pl = ceph_databuf_req_alloc(1, PAGE_SIZE, GFP_NOIO);
> +	lreq->request_pl = ceph_databuf_req_alloc(0, 3 * 4 + payload_len,

The same question... :)

Thanks,
Slava.

> +						  GFP_NOIO);
>  	if (!lreq->request_pl) {
>  		ret = -ENOMEM;
>  		goto out_put_lreq;
>  	}
>  
> -	ret = ceph_databuf_encode_32(lreq->request_pl, 1); /* prot_ver */
> -	ret |= ceph_databuf_encode_32(lreq->request_pl, timeout);
> -	ret |= ceph_databuf_encode_32(lreq->request_pl, payload_len);
> -	ret |= ceph_databuf_append(lreq->request_pl, payload, payload_len);
> -	if (ret) {
> -		ret = -ENOMEM;
> -		goto out_put_lreq;
> -	}
> +	p = ceph_databuf_enc_start(lreq->request_pl);
> +	ceph_encode_32(&p, 1); /* prot_ver */
> +	ceph_encode_32(&p, timeout);
> +	ceph_encode_32(&p, payload_len);
> +	ceph_encode_copy(&p, payload, payload_len);
> +	ceph_databuf_enc_stop(lreq->request_pl, p);
>  
>  	/* for notify_id */
>  	lreq->notify_id_buf = ceph_databuf_reply_alloc(1, PAGE_SIZE, GFP_NOIO);
> @@ -5217,7 +5218,7 @@ int osd_req_op_copy_from_init(struct ceph_osd_request *req,
>  {
>  	struct ceph_osd_req_op *op;
>  	struct ceph_databuf *dbuf;
> -	void *p, *end;
> +	void *p;
>  
>  	dbuf = ceph_databuf_req_alloc(1, PAGE_SIZE, GFP_KERNEL);
>  	if (!dbuf)
> @@ -5230,15 +5231,13 @@ int osd_req_op_copy_from_init(struct ceph_osd_request *req,
>  	op->copy_from.flags = copy_from_flags;
>  	op->copy_from.src_fadvise_flags = src_fadvise_flags;
>  
> -	p = kmap_ceph_databuf_page(dbuf, 0);
> -	end = p + PAGE_SIZE;
> +	p = ceph_databuf_enc_start(dbuf);
>  	ceph_encode_string(&p, src_oid->name, src_oid->name_len);
>  	encode_oloc(&p, src_oloc);
>  	ceph_encode_32(&p, truncate_seq);
>  	ceph_encode_64(&p, truncate_size);
> -	op->indata_len = PAGE_SIZE - (end - p);
> -	ceph_databuf_added_data(dbuf, op->indata_len);
> -	kunmap_local(p);
> +	ceph_databuf_enc_stop(dbuf, p);
> +	op->indata_len = ceph_databuf_len(dbuf);
>  
>  	ceph_osd_databuf_init(&op->copy_from.osd_data, dbuf);
>  	return 0;
>
David Howells March 18, 2025, 10:36 p.m. UTC | #2
Viacheslav Dubeyko <Slava.Dubeyko@ibm.com> wrote:

> > @@ -4852,20 +4854,19 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
> >  	if (!lreq)
> >  		return -ENOMEM;
> >  
> > -	lreq->request_pl = ceph_databuf_req_alloc(1, PAGE_SIZE, GFP_NOIO);
> > +	lreq->request_pl = ceph_databuf_req_alloc(0, 3 * 4 + payload_len,
> 
> The same question... :)
> 
> Thanks,
> Slava.
> 
> > +						  GFP_NOIO);
> >  	if (!lreq->request_pl) {
> >  		ret = -ENOMEM;
> >  		goto out_put_lreq;
> >  	}
> >  
> > -	ret = ceph_databuf_encode_32(lreq->request_pl, 1); /* prot_ver */
> > -	ret |= ceph_databuf_encode_32(lreq->request_pl, timeout);
> > -	ret |= ceph_databuf_encode_32(lreq->request_pl, payload_len);
> > -	ret |= ceph_databuf_append(lreq->request_pl, payload, payload_len);
> > -	if (ret) {
> > -		ret = -ENOMEM;
> > -		goto out_put_lreq;
> > -	}
> > +	p = ceph_databuf_enc_start(lreq->request_pl);
> > +	ceph_encode_32(&p, 1); /* prot_ver */
> > +	ceph_encode_32(&p, timeout);
> > +	ceph_encode_32(&p, payload_len);
> > +	ceph_encode_copy(&p, payload, payload_len);
> > +	ceph_databuf_enc_stop(lreq->request_pl, p);

I think the answer is obvious from that.  You have 3 protocol LE32 words plus
the payload.  Previously, ceph just allocated a whole page, whether or not we
needed anywhere near that much, and then would dynamically add pages as it
went along if one was insufficient.  By allocating up front, we get to make
use of the bulk allocator.

However, if we don't need all that much space, it affords us the opportunity
to make use of a page fragment allocator.

David
diff mbox series

Patch

diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 0ac439e7e730..1a0cb2cdcc52 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -4759,7 +4759,10 @@  static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
 {
 	struct ceph_osd_req_op *op;
 	struct ceph_databuf *dbuf;
-	int ret;
+	void *p;
+
+	if (!payload)
+		payload_len = 0;
 
 	op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
 
@@ -4767,18 +4770,13 @@  static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
 	if (!dbuf)
 		return -ENOMEM;
 
-	ret = ceph_databuf_encode_64(dbuf, notify_id);
-	ret |= ceph_databuf_encode_64(dbuf, cookie);
-	if (payload) {
-		ret |= ceph_databuf_encode_32(dbuf, payload_len);
-		ret |= ceph_databuf_append(dbuf, payload, payload_len);
-	} else {
-		ret |= ceph_databuf_encode_32(dbuf, 0);
-	}
-	if (ret) {
-		ceph_databuf_release(dbuf);
-		return -ENOMEM;
-	}
+	p = ceph_databuf_enc_start(dbuf);
+	ceph_encode_64(&p, notify_id);
+	ceph_encode_64(&p, cookie);
+	ceph_encode_32(&p, payload_len);
+	if (payload)
+		ceph_encode_copy(&p, payload, payload_len);
+	ceph_databuf_enc_stop(dbuf, p);
 
 	ceph_osd_databuf_init(&op->notify_ack.request_data, dbuf);
 	op->indata_len = ceph_databuf_len(dbuf);
@@ -4840,8 +4838,12 @@  int ceph_osdc_notify(struct ceph_osd_client *osdc,
 		     size_t *preply_len)
 {
 	struct ceph_osd_linger_request *lreq;
+	void *p;
 	int ret;
 
+	if (WARN_ON_ONCE(payload_len > PAGE_SIZE - 3 * 4))
+		return -EIO;
+
 	WARN_ON(!timeout);
 	if (preply_pages) {
 		*preply_pages = NULL;
@@ -4852,20 +4854,19 @@  int ceph_osdc_notify(struct ceph_osd_client *osdc,
 	if (!lreq)
 		return -ENOMEM;
 
-	lreq->request_pl = ceph_databuf_req_alloc(1, PAGE_SIZE, GFP_NOIO);
+	lreq->request_pl = ceph_databuf_req_alloc(0, 3 * 4 + payload_len,
+						  GFP_NOIO);
 	if (!lreq->request_pl) {
 		ret = -ENOMEM;
 		goto out_put_lreq;
 	}
 
-	ret = ceph_databuf_encode_32(lreq->request_pl, 1); /* prot_ver */
-	ret |= ceph_databuf_encode_32(lreq->request_pl, timeout);
-	ret |= ceph_databuf_encode_32(lreq->request_pl, payload_len);
-	ret |= ceph_databuf_append(lreq->request_pl, payload, payload_len);
-	if (ret) {
-		ret = -ENOMEM;
-		goto out_put_lreq;
-	}
+	p = ceph_databuf_enc_start(lreq->request_pl);
+	ceph_encode_32(&p, 1); /* prot_ver */
+	ceph_encode_32(&p, timeout);
+	ceph_encode_32(&p, payload_len);
+	ceph_encode_copy(&p, payload, payload_len);
+	ceph_databuf_enc_stop(lreq->request_pl, p);
 
 	/* for notify_id */
 	lreq->notify_id_buf = ceph_databuf_reply_alloc(1, PAGE_SIZE, GFP_NOIO);
@@ -5217,7 +5218,7 @@  int osd_req_op_copy_from_init(struct ceph_osd_request *req,
 {
 	struct ceph_osd_req_op *op;
 	struct ceph_databuf *dbuf;
-	void *p, *end;
+	void *p;
 
 	dbuf = ceph_databuf_req_alloc(1, PAGE_SIZE, GFP_KERNEL);
 	if (!dbuf)
@@ -5230,15 +5231,13 @@  int osd_req_op_copy_from_init(struct ceph_osd_request *req,
 	op->copy_from.flags = copy_from_flags;
 	op->copy_from.src_fadvise_flags = src_fadvise_flags;
 
-	p = kmap_ceph_databuf_page(dbuf, 0);
-	end = p + PAGE_SIZE;
+	p = ceph_databuf_enc_start(dbuf);
 	ceph_encode_string(&p, src_oid->name, src_oid->name_len);
 	encode_oloc(&p, src_oloc);
 	ceph_encode_32(&p, truncate_seq);
 	ceph_encode_64(&p, truncate_size);
-	op->indata_len = PAGE_SIZE - (end - p);
-	ceph_databuf_added_data(dbuf, op->indata_len);
-	kunmap_local(p);
+	ceph_databuf_enc_stop(dbuf, p);
+	op->indata_len = ceph_databuf_len(dbuf);
 
 	ceph_osd_databuf_init(&op->copy_from.osd_data, dbuf);
 	return 0;