diff mbox

[API-NEXT,PATCHv4,8/8] linux-generic: queue: add ordered chain enq support

Message ID 1438557709-30355-9-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer Aug. 2, 2015, 11:21 p.m. UTC
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../linux-generic/include/odp_buffer_internal.h    |   4 +
 platform/linux-generic/odp_queue.c                 | 149 +++++++++++++++------
 2 files changed, 114 insertions(+), 39 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index c459fce..30f061e 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -109,6 +109,10 @@  typedef union queue_entry_u queue_entry_t;
 /* Common buffer header */
 typedef struct odp_buffer_hdr_t {
 	struct odp_buffer_hdr_t *next;       /* next buf in a list */
+	union {                              /* Multi-use secondary link */
+		struct odp_buffer_hdr_t *prev;
+		struct odp_buffer_hdr_t *link;
+	};
 	odp_buffer_bits_t        handle;     /* handle */
 	union {
 		uint32_t all;
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index a2460e7..8443ead 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -336,6 +336,7 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 {
 	int sched = 0;
 	queue_entry_t *origin_qe = buf_hdr->origin_qe;
+	odp_buffer_hdr_t *buf_tail;
 
 	/* Need two locks for enq operations from ordered queues */
 	if (origin_qe) {
@@ -404,17 +405,33 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 
 		/* We're in order, so account for this and proceed with enq */
 		origin_qe->s.order_out++;
+
+		/* if this element is linked, restore the linked chain */
+		buf_tail = buf_hdr->link;
+
+		if (buf_tail) {
+			buf_hdr->next = buf_tail;
+			buf_hdr->link = NULL;
+
+			/* find end of the chain */
+			while (buf_tail->next)
+				buf_tail = buf_tail->next;
+		} else {
+			buf_tail = buf_hdr;
+		}
+	} else {
+		buf_tail = buf_hdr;
 	}
 
 	if (queue->s.head == NULL) {
 		/* Empty queue */
 		queue->s.head = buf_hdr;
-		queue->s.tail = buf_hdr;
-		buf_hdr->next = NULL;
+		queue->s.tail = buf_tail;
+		buf_tail->next = NULL;
 	} else {
 		queue->s.tail->next = buf_hdr;
-		queue->s.tail = buf_hdr;
-		buf_hdr->next = NULL;
+		queue->s.tail = buf_tail;
+		buf_tail->next = NULL;
 	}
 
 	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
@@ -461,8 +478,23 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 			next_buf = reorder_buf->next;
 
 			if (odp_likely(reorder_buf->target_qe == queue)) {
-				reorder_prev = reorder_buf;
-				reorder_buf  = next_buf;
+				/* promote any chain */
+				odp_buffer_hdr_t *reorder_link =
+					reorder_buf->link;
+
+				if (reorder_link) {
+					reorder_buf->next = reorder_link;
+					reorder_buf->link = NULL;
+					while (reorder_link->next)
+						reorder_link =
+							reorder_link->next;
+					reorder_link->next = next_buf;
+					reorder_prev = reorder_link;
+				} else {
+					reorder_prev = reorder_buf;
+				}
+
+				reorder_buf = next_buf;
 				release_count++;
 			} else if (!reorder_buf->target_qe) {
 				if (reorder_prev)
@@ -523,53 +555,89 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 {
 	int sched = 0;
-	int i, j;
+	int i, rc, ret_count = 0;
+	int ordered_head[num];
+	int ordered_count = 0;
 	odp_buffer_hdr_t *tail;
 
+	/* Identify ordered chains in the input buffer list */
 	for (i = 0; i < num; i++) {
-		/* If any buffer is coming from an ordered queue, enqueue them
-		 * individually since in the general case each might originate
-		 * from a different ordered queue.  If any of these fail, the
-		 * return code tells the caller how many succeeded.
-		 */
-		if (buf_hdr[i]->origin_qe) {
-			for (j = 0; j < num; j++) {
-				if (queue_enq(queue, buf_hdr[j]))
-					return j;
-			}
-			return num;
+		if (buf_hdr[i]->origin_qe)
+			ordered_head[ordered_count++] = i;
+		if (i < num - 1)
+			buf_hdr[i]->next = buf_hdr[i + 1];
+	}
+
+	buf_hdr[num - 1] = NULL;
+
+	if (ordered_count) {
+		if (ordered_head[0] > 0) {
+			tail = buf_hdr[ordered_head[0] - 1];
+			tail->next = NULL;
+			ret_count = ordered_head[0];
+		} else {
+			tail = NULL;
+			ret_count = 0;
 		}
-		buf_hdr[i]->next = i == num - 1 ? NULL : buf_hdr[i + 1];
+	} else {
+		tail = buf_hdr[num - 1];
+		ret_count = num;
 	}
 
-	tail = buf_hdr[num-1];
+	/* Handle regular enq's at start of list */
+	if (tail) {
+		tail->next = NULL;
+		LOCK(&queue->s.lock);
+		if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+			UNLOCK(&queue->s.lock);
+			ODP_ERR("Bad queue status\n");
+			return -1;
+		}
 
-	LOCK(&queue->s.lock);
-	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+		/* Empty queue */
+		if (queue->s.head)
+			queue->s.tail->next = buf_hdr[0];
+		else
+			queue->s.head = buf_hdr[0];
+
+		queue->s.tail = tail;
+
+		if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
+			queue->s.status = QUEUE_STATUS_SCHED;
+			sched = 1; /* retval: schedule queue */
+		}
 		UNLOCK(&queue->s.lock);
-		ODP_ERR("Bad queue status\n");
-		return -1;
+
+		/* Add queue to scheduling */
+		if (sched && schedule_queue(queue))
+			ODP_ABORT("schedule_queue failed\n");
 	}
 
-	/* Empty queue */
-	if (queue->s.head == NULL)
-		queue->s.head = buf_hdr[0];
-	else
-		queue->s.tail->next = buf_hdr[0];
+	/* Handle ordered chains in the list */
+	for (i = 0; i < ordered_count; i++) {
+		int eol = i < ordered_count - 1 ? ordered_head[i + 1] : num;
+		int list_count = eol - i;
 
-	queue->s.tail = tail;
+		if (i < ordered_count - 1)
+			buf_hdr[ordered_head[i + 1] - 1]->next = NULL;
 
-	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
-		queue->s.status = QUEUE_STATUS_SCHED;
-		sched = 1; /* retval: schedule queue */
-	}
-	UNLOCK(&queue->s.lock);
+		if (ordered_head[i] < eol - 1)
+			buf_hdr[ordered_head[i]]->link =
+				buf_hdr[ordered_head[i] + 1];
+		else
+			buf_hdr[ordered_head[i]]->link = NULL;
 
-	/* Add queue to scheduling */
-	if (sched && schedule_queue(queue))
-		ODP_ABORT("schedule_queue failed\n");
+		rc = queue_enq(queue, buf_hdr[ordered_head[i]]);
+		if (rc < 0)
+			return ret_count;
+
+		if (rc < list_count)
+			return ret_count + rc;
 
-	return num; /* All events enqueued */
+		ret_count += rc;
+	}
+
+	return ret_count;
 }
 
 int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
@@ -598,6 +666,9 @@  int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
 	queue   = queue_to_qentry(handle);
 	buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
 
+	/* No chains via this entry */
+	buf_hdr->link = NULL;
+
 	return queue->s.enqueue(queue, buf_hdr);
 }