diff mbox series

[RFC,15/18] ceph: Convert ceph_osdc_notify() reply to ceph_databuf

Message ID 20230804131327.2574082-16-dhowells@redhat.com
State New
Headers show
Series ceph, rbd: Collapse all the I/O types down to something iov_iter-based | expand

Commit Message

David Howells Aug. 4, 2023, 1:13 p.m. UTC
Convert the reply buffer of ceph_osdc_notify() to ceph_databuf rather than
an array of pages.

Signed-off-by: David Howells <dhowells@redhat.com>
---
 drivers/block/rbd.c             | 33 ++++++++++++++++++++-------------
 include/linux/ceph/osd_client.h |  7 ++-----
 net/ceph/osd_client.c           | 17 ++++-------------
 3 files changed, 26 insertions(+), 31 deletions(-)
diff mbox series

Patch

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 950b63eb41de..7a624e75ac7a 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3455,8 +3455,7 @@  static void rbd_unlock(struct rbd_device *rbd_dev)
 
 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
 				enum rbd_notify_op notify_op,
-				struct page ***preply_pages,
-				size_t *preply_len)
+				struct ceph_databuf *reply)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
@@ -3474,13 +3473,13 @@  static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
 
 	return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
 				&rbd_dev->header_oloc, buf, buf_size,
-				RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
+				RBD_NOTIFY_TIMEOUT, reply);
 }
 
 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
 			       enum rbd_notify_op notify_op)
 {
-	__rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
+	__rbd_notify_op_lock(rbd_dev, notify_op, NULL);
 }
 
 static void rbd_notify_acquired_lock(struct work_struct *work)
@@ -3501,23 +3500,26 @@  static void rbd_notify_released_lock(struct work_struct *work)
 
 static int rbd_request_lock(struct rbd_device *rbd_dev)
 {
-	struct page **reply_pages;
-	size_t reply_len;
+	struct ceph_databuf *reply;
 	bool lock_owner_responded = false;
 	int ret;
 
 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
 
-	ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
-				   &reply_pages, &reply_len);
+	reply = ceph_databuf_alloc(0, 0, GFP_KERNEL);
+	if (!reply)
+		return -ENOMEM;
+
+	ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK, reply);
 	if (ret && ret != -ETIMEDOUT) {
 		rbd_warn(rbd_dev, "failed to request lock: %d", ret);
 		goto out;
 	}
 
-	if (reply_len > 0 && reply_len <= PAGE_SIZE) {
-		void *p = page_address(reply_pages[0]);
-		void *const end = p + reply_len;
+	if (reply->length > 0 && reply->length <= PAGE_SIZE) {
+		void *s = kmap_ceph_databuf_page(reply, 0);
+		void *p = s;
+		void *const end = p + reply->length;
 		u32 n;
 
 		ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
@@ -3529,10 +3531,12 @@  static int rbd_request_lock(struct rbd_device *rbd_dev)
 			p += 8 + 8; /* skip gid and cookie */
 
 			ceph_decode_32_safe(&p, end, len, e_inval);
-			if (!len)
+			if (!len) {
 				continue;
+			}
 
 			if (lock_owner_responded) {
+				kunmap_local(s);
 				rbd_warn(rbd_dev,
 					 "duplicate lock owners detected");
 				ret = -EIO;
@@ -3543,6 +3547,7 @@  static int rbd_request_lock(struct rbd_device *rbd_dev)
 			ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
 						  &struct_v, &len);
 			if (ret) {
+				kunmap_local(s);
 				rbd_warn(rbd_dev,
 					 "failed to decode ResponseMessage: %d",
 					 ret);
@@ -3551,6 +3556,8 @@  static int rbd_request_lock(struct rbd_device *rbd_dev)
 
 			ret = ceph_decode_32(&p);
 		}
+
+		kunmap_local(s);
 	}
 
 	if (!lock_owner_responded) {
@@ -3559,7 +3566,7 @@  static int rbd_request_lock(struct rbd_device *rbd_dev)
 	}
 
 out:
-	ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
+	ceph_databuf_release(reply);
 	return ret;
 
 e_inval:
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 83c3073c44bb..3099f923c241 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -332,9 +332,7 @@  struct ceph_osd_linger_request {
 
 	struct ceph_databuf *request_pl;
 	struct ceph_databuf *notify_id_buf;
-
-	struct page ***preply_pages;
-	size_t *preply_len;
+	struct ceph_databuf *reply;
 };
 
 struct ceph_watch_item {
@@ -587,8 +585,7 @@  int ceph_osdc_notify(struct ceph_osd_client *osdc,
 		     void *payload,
 		     u32 payload_len,
 		     u32 timeout,
-		     struct page ***preply_pages,
-		     size_t *preply_len);
+		     struct ceph_databuf *reply);
 int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
 			  struct ceph_osd_linger_request *lreq);
 int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 8cbe06d2e16d..0fe16fdc760f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -4530,7 +4530,7 @@  static void handle_watch_notify(struct ceph_osd_client *osdc,
 			    msg->num_data_items ? &msg->data[0] : NULL;
 
 			if (data) {
-				if (lreq->preply_pages) {
+				if (lreq->reply) {
 					WARN_ON(data->type !=
 							CEPH_MSG_DATA_PAGES);
 					*lreq->preply_pages = data->pages;
@@ -4828,10 +4828,7 @@  EXPORT_SYMBOL(ceph_osdc_notify_ack);
 /*
  * @timeout: in seconds
  *
- * @preply_{pages,len} are initialized both on success and error.
- * The caller is responsible for:
- *
- *     ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
+ * @reply should be an empty ceph_databuf.
  */
 int ceph_osdc_notify(struct ceph_osd_client *osdc,
 		     struct ceph_object_id *oid,
@@ -4839,17 +4836,12 @@  int ceph_osdc_notify(struct ceph_osd_client *osdc,
 		     void *payload,
 		     u32 payload_len,
 		     u32 timeout,
-		     struct page ***preply_pages,
-		     size_t *preply_len)
+		     struct ceph_databuf *reply)
 {
 	struct ceph_osd_linger_request *lreq;
 	int ret;
 
 	WARN_ON(!timeout);
-	if (preply_pages) {
-		*preply_pages = NULL;
-		*preply_len = 0;
-	}
 
 	lreq = linger_alloc(osdc);
 	if (!lreq)
@@ -4877,8 +4869,7 @@  int ceph_osdc_notify(struct ceph_osd_client *osdc,
 		goto out_put_lreq;
 	}
 
-	lreq->preply_pages = preply_pages;
-	lreq->preply_len = preply_len;
+	lreq->reply = reply;
 
 	ceph_oid_copy(&lreq->t.base_oid, oid);
 	ceph_oloc_copy(&lreq->t.base_oloc, oloc);