@@ -14,6 +14,7 @@
#include <odp_buffer_inlines.h>
#include <odp_internal.h>
#include <odp/shared_memory.h>
+#include <odp/schedule.h>
#include <odp_schedule_internal.h>
#include <odp/config.h>
#include <odp_packet_io_internal.h>
@@ -401,6 +402,7 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
return 0;
}
+ /* We're in order, so account for this and proceed with enq */
origin_qe->s.order_out++;
}
@@ -426,16 +428,56 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
*/
if (origin_qe) {
odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
- odp_buffer_hdr_t *reorder_prev;
+ odp_buffer_hdr_t *reorder_prev = NULL;
+ odp_buffer_hdr_t *placeholder_buf = NULL;
+ odp_buffer_hdr_t *next_buf;
uint32_t release_count = 0;
+ uint32_t placeholder_count = 0;
while (reorder_buf &&
- reorder_buf->target_qe == queue &&
- reorder_buf->order <=
- origin_qe->s.order_out + release_count) {
- release_count++;
- reorder_prev = reorder_buf;
- reorder_buf = reorder_buf->next;
+ reorder_buf->order <= origin_qe->s.order_out +
+ release_count + placeholder_count) {
+ /*
+ * Elements on the reorder list fall into one of
+ * three categories:
+ *
+ * 1. Those destined for the same queue. These
+ * can be enq'd now if they were waiting to
+ * be unblocked by this enq.
+ *
+ * 2. Those representing placeholders for events
+ * whose ordering was released by a prior
+ * odp_schedule_release_ordered() call. These
+ * can now just be freed.
+ *
+ * 3. Those representing events destined for another
+ * queue. These cannot be consolidated with this
+ * enq since they have a different target.
+ *
+ * Detecting an element with an order sequence gap, an
+ * element in category 3, or running out of elements
+ * stops the scan.
+ */
+ next_buf = reorder_buf->next;
+
+ if (odp_likely(reorder_buf->target_qe == queue)) {
+ reorder_prev = reorder_buf;
+ reorder_buf = next_buf;
+ release_count++;
+ } else if (!reorder_buf->target_qe) {
+ if (reorder_prev)
+ reorder_prev->next = next_buf;
+ else
+ origin_qe->s.reorder_head = next_buf;
+
+ reorder_buf->next = placeholder_buf;
+ placeholder_buf = reorder_buf;
+
+ reorder_buf = next_buf;
+ placeholder_count++;
+ } else {
+ break;
+ }
}
/* Add released buffers to the queue as well */
@@ -444,19 +486,28 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
queue->s.tail = reorder_prev;
origin_qe->s.reorder_head = reorder_prev->next;
reorder_prev->next = NULL;
- origin_qe->s.order_out += release_count;
}
- /* Now handle unblocked buffers destined for other queues */
+ /* Reflect the above two in the output sequence */
+ origin_qe->s.order_out += release_count + placeholder_count;
+
+ /* Now handle any unblocked buffers destined for other queues */
+ UNLOCK(&queue->s.lock);
if (reorder_buf &&
- reorder_buf->order <= origin_qe->s.order_out) {
- UNLOCK(&origin_qe->s.lock);
- UNLOCK(&queue->s.lock);
- if (schedule_enq(reorder_buf->target_qe, origin_qe))
- ODP_ABORT("schedule_enq failed\n");
- } else {
- UNLOCK(&origin_qe->s.lock);
- UNLOCK(&queue->s.lock);
+ reorder_buf->order <= origin_qe->s.order_out)
+ origin_qe->s.reorder_head = reorder_buf->next;
+ else
+ reorder_buf = NULL;
+ UNLOCK(&origin_qe->s.lock);
+ if (reorder_buf)
+ odp_queue_enq(reorder_buf->target_qe->s.handle,
+ (odp_event_t)reorder_buf->handle.handle);
+
+ /* Free all placeholder bufs that are now released */
+ while (placeholder_buf) {
+ next_buf = placeholder_buf->next;
+ odp_buffer_free(buf_hdr->handle.handle);
+ placeholder_buf = next_buf;
}
} else {
UNLOCK(&queue->s.lock);
@@ -685,3 +736,87 @@ void odp_queue_param_init(odp_queue_param_t *params)
{
memset(params, 0, sizeof(odp_queue_param_t));
}
+
+/* This routine exists here rather than in odp_schedule
+ * because it operates on queue interenal structures
+ */
+int odp_schedule_release_ordered(odp_event_t ev)
+{
+ odp_buffer_t placeholder_buf;
+ odp_buffer_hdr_t *placeholder_buf_hdr, *reorder_buf, *reorder_prev;
+ odp_buffer_hdr_t *buf_hdr =
+ odp_buf_to_hdr(odp_buffer_from_event(ev));
+ queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+ /* Can't release if we didn't originate from an ordered queue */
+ if (!origin_qe)
+ return -1;
+
+ LOCK(&origin_qe->s.lock);
+
+ /* If we are the first or second element beyond the next in order,
+ * we can release immediately since there can be no confusion about
+ * intermediate elements
+ */
+ if (buf_hdr->order <= origin_qe->s.order_out + 1) {
+ buf_hdr->origin_qe = NULL;
+ origin_qe->s.order_out++;
+
+ /* check if this release allows us to unblock waiters */
+ reorder_buf = origin_qe->s.reorder_head;
+ if (reorder_buf &&
+ reorder_buf->order <= origin_qe->s.order_out)
+ origin_qe->s.reorder_head = reorder_buf->next;
+ else
+ reorder_buf = NULL;
+ UNLOCK(&origin_qe->s.lock);
+ if (reorder_buf)
+ odp_queue_enq(reorder_buf->target_qe->s.handle,
+ (odp_event_t)reorder_buf->handle.handle);
+ return 0;
+ }
+
+ /* If we are beyond the second element in the expected order, we need
+ * a placeholder to represent our "place in line". Just use an element
+ * from the same pool the buffer being released is from.
+ */
+ placeholder_buf = odp_buffer_alloc(buf_hdr->pool_hdl);
+
+ /* Can't release if no placeholder is available */
+ if (placeholder_buf == ODP_BUFFER_INVALID) {
+ UNLOCK(&origin_qe->s.lock);
+ return -1;
+ }
+
+ placeholder_buf_hdr = odp_buf_to_hdr(placeholder_buf);
+ reorder_buf = origin_qe->s.reorder_head;
+
+ if (!reorder_buf) {
+ placeholder_buf_hdr->next = NULL;
+ origin_qe->s.reorder_head = placeholder_buf_hdr;
+ origin_qe->s.reorder_tail = placeholder_buf_hdr;
+ } else {
+ reorder_prev = NULL;
+
+ while (buf_hdr->order > reorder_buf->order) {
+ reorder_prev = reorder_buf;
+ reorder_buf = reorder_buf->next;
+ if (!reorder_buf)
+ break;
+ }
+
+ placeholder_buf_hdr->next = reorder_buf;
+ if (reorder_prev)
+ reorder_prev->next = placeholder_buf_hdr;
+ else
+ origin_qe->s.reorder_head = placeholder_buf_hdr;
+
+ if (!reorder_buf)
+ origin_qe->s.reorder_tail = placeholder_buf_hdr;
+ }
+
+ placeholder_buf_hdr->target_qe = NULL;
+
+ UNLOCK(&origin_qe->s.lock);
+ return 0;
+}
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- platform/linux-generic/odp_queue.c | 169 +++++++++++++++++++++++++++++++++---- 1 file changed, 152 insertions(+), 17 deletions(-)