diff mbox

[API-NEXT,PATCHv4,3/8] linux-generic: queue: add ordered_queue_enq() routine - part 1

Message ID 1447166821-24585-4-git-send-email-bill.fischofer@linaro.org
State Accepted
Commit 4478c004cdbf645fe9ecb9945b2e98b0b564eb13
Headers show

Commit Message

Bill Fischofer Nov. 10, 2015, 2:46 p.m. UTC
Add the new ordered_queue_enq() internal routine. This is done in two
parts to make the diffs easier to follow. Part 1 adds the new routine
while Part 2 replaces queue_enq() to use it.

Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../linux-generic/include/odp_queue_internal.h     |   2 +
 platform/linux-generic/odp_queue.c                 | 118 +++++++++++++++++++++
 2 files changed, 120 insertions(+)
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index 32e3288..1bd365b 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -96,6 +96,8 @@  union queue_entry_u {
 queue_entry_t *get_qentry(uint32_t queue_id);
 
 int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain);
+int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
+		      int systain, queue_entry_t *origin_qe, uint64_t order);
 odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
 
 int queue_enq_internal(odp_buffer_hdr_t *buf_hdr);
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index bcc8190..a545927 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -529,6 +529,124 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 	return 0;
 }
 
+int ordered_queue_enq(queue_entry_t *queue,
+		      odp_buffer_hdr_t *buf_hdr,
+		      int sustain,
+		      queue_entry_t *origin_qe,
+		      uint64_t order)
+{
+	odp_buffer_hdr_t *reorder_buf;
+	odp_buffer_hdr_t *next_buf;
+	odp_buffer_hdr_t *reorder_prev;
+	odp_buffer_hdr_t *placeholder_buf = NULL;
+	int               release_count, placeholder_count;
+	int               sched = 0;
+
+	/* Need two locks for enq operations from ordered queues */
+	get_qe_locks(origin_qe, queue);
+
+	if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY ||
+			 queue->s.status < QUEUE_STATUS_READY)) {
+		free_qe_locks(queue, origin_qe);
+		ODP_ERR("Bad queue status\n");
+		ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
+			queue->s.name, origin_qe->s.name, buf_hdr);
+		return -1;
+	}
+
+	/* Remember that enq was called for this order */
+	sched_enq_called();
+
+	/* We can only complete this enq if we're in order */
+	if (order > origin_qe->s.order_out) {
+		reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
+
+		/* This enq can't complete until order is restored, so
+		 * we're done here.
+		 */
+		free_qe_locks(queue, origin_qe);
+		return 0;
+	}
+
+	/* Resolve order if requested */
+	if (!sustain) {
+		order_release(origin_qe, 1);
+		sched_order_resolved(buf_hdr);
+	}
+
+	/* Update queue status */
+	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
+		queue->s.status = QUEUE_STATUS_SCHED;
+		sched = 1;
+	}
+
+	/* We're in order, however the reorder queue may have other buffers
+	 * sharing this order on it and this buffer must not be enqueued ahead
+	 * of them. If the reorder queue is empty we can short-cut and
+	 * simply add to the target queue directly.
+	 */
+
+	if (!origin_qe->s.reorder_head) {
+		queue_add_chain(queue, buf_hdr);
+		free_qe_locks(queue, origin_qe);
+
+		/* Add queue to scheduling */
+		if (sched && schedule_queue(queue))
+			ODP_ABORT("schedule_queue failed\n");
+		return 0;
+	}
+
+	/* The reorder_queue is non-empty, so sort this buffer into it.  Note
+	 * that we force the sustain bit on here because we'll be removing
+	 * this immediately and we already accounted for this order earlier.
+	 */
+	reorder_enq(queue, order, origin_qe, buf_hdr, 1);
+
+	/* Pick up this element, and all others resolved by this enq,
+	 * and add them to the target queue.
+	 */
+	reorder_deq(queue, origin_qe, &reorder_buf, &reorder_prev,
+		    &placeholder_buf, &release_count, &placeholder_count);
+
+	/* Move the list from the reorder queue to the target queue */
+	if (queue->s.head)
+		queue->s.tail->next = origin_qe->s.reorder_head;
+	else
+		queue->s.head       = origin_qe->s.reorder_head;
+	queue->s.tail               = reorder_prev;
+	origin_qe->s.reorder_head   = reorder_prev->next;
+	reorder_prev->next          = NULL;
+
+	/* Reflect resolved orders in the output sequence */
+	order_release(origin_qe, release_count + placeholder_count);
+
+	/* Now handle any resolved orders for events destined for other
+	 * queues, appending placeholder bufs as needed.
+	 */
+	if (origin_qe != queue)
+		UNLOCK(&queue->s.lock);
+
+	/* Add queue to scheduling */
+	if (sched && schedule_queue(queue))
+		ODP_ABORT("schedule_queue failed\n");
+
+	reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
+			 1, 0);
+	UNLOCK(&origin_qe->s.lock);
+
+	if (reorder_buf)
+		queue_enq_internal(reorder_buf);
+
+	/* Free all placeholder bufs that are now released */
+	while (placeholder_buf) {
+		next_buf = placeholder_buf->next;
+		odp_buffer_free(placeholder_buf->handle.handle);
+		placeholder_buf = next_buf;
+	}
+
+	return 0;
+}
+
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 		    int num, int sustain)
 {