@@ -96,6 +96,8 @@ union queue_entry_u {
queue_entry_t *get_qentry(uint32_t queue_id);
int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain);
+int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
+ int systain, queue_entry_t *origin_qe, uint64_t order);
odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
int queue_enq_internal(odp_buffer_hdr_t *buf_hdr);
@@ -529,6 +529,124 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
return 0;
}
+int ordered_queue_enq(queue_entry_t *queue,
+ odp_buffer_hdr_t *buf_hdr,
+ int sustain,
+ queue_entry_t *origin_qe,
+ uint64_t order)
+{
+ odp_buffer_hdr_t *reorder_buf;
+ odp_buffer_hdr_t *next_buf;
+ odp_buffer_hdr_t *reorder_prev;
+ odp_buffer_hdr_t *placeholder_buf = NULL;
+ int release_count, placeholder_count;
+ int sched = 0;
+
+ /* Need two locks for enq operations from ordered queues */
+ get_qe_locks(origin_qe, queue);
+
+ if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY ||
+ queue->s.status < QUEUE_STATUS_READY)) {
+ free_qe_locks(queue, origin_qe);
+ ODP_ERR("Bad queue status\n");
+ ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
+ queue->s.name, origin_qe->s.name, buf_hdr);
+ return -1;
+ }
+
+ /* Remember that enq was called for this order */
+ sched_enq_called();
+
+ /* We can only complete this enq if we're in order */
+ if (order > origin_qe->s.order_out) {
+ reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
+
+ /* This enq can't complete until order is restored, so
+ * we're done here.
+ */
+ free_qe_locks(queue, origin_qe);
+ return 0;
+ }
+
+ /* Resolve order if requested */
+ if (!sustain) {
+ order_release(origin_qe, 1);
+ sched_order_resolved(buf_hdr);
+ }
+
+ /* Update queue status */
+ if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
+ queue->s.status = QUEUE_STATUS_SCHED;
+ sched = 1;
+ }
+
+ /* We're in order, however the reorder queue may have other buffers
+ * sharing this order on it and this buffer must not be enqueued ahead
+ * of them. If the reorder queue is empty we can short-cut and
+ * simply add to the target queue directly.
+ */
+
+ if (!origin_qe->s.reorder_head) {
+ queue_add_chain(queue, buf_hdr);
+ free_qe_locks(queue, origin_qe);
+
+ /* Add queue to scheduling */
+ if (sched && schedule_queue(queue))
+ ODP_ABORT("schedule_queue failed\n");
+ return 0;
+ }
+
+ /* The reorder_queue is non-empty, so sort this buffer into it. Note
+ * that we force the sustain bit on here because we'll be removing
+ * this immediately and we already accounted for this order earlier.
+ */
+ reorder_enq(queue, order, origin_qe, buf_hdr, 1);
+
+ /* Pick up this element, and all others resolved by this enq,
+ * and add them to the target queue.
+ */
+ reorder_deq(queue, origin_qe, &reorder_buf, &reorder_prev,
+ &placeholder_buf, &release_count, &placeholder_count);
+
+ /* Move the list from the reorder queue to the target queue */
+ if (queue->s.head)
+ queue->s.tail->next = origin_qe->s.reorder_head;
+ else
+ queue->s.head = origin_qe->s.reorder_head;
+ queue->s.tail = reorder_prev;
+ origin_qe->s.reorder_head = reorder_prev->next;
+ reorder_prev->next = NULL;
+
+ /* Reflect resolved orders in the output sequence */
+ order_release(origin_qe, release_count + placeholder_count);
+
+ /* Now handle any resolved orders for events destined for other
+ * queues, appending placeholder bufs as needed.
+ */
+ if (origin_qe != queue)
+ UNLOCK(&queue->s.lock);
+
+ /* Add queue to scheduling */
+ if (sched && schedule_queue(queue))
+ ODP_ABORT("schedule_queue failed\n");
+
+ reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
+ 1, 0);
+ UNLOCK(&origin_qe->s.lock);
+
+ if (reorder_buf)
+ queue_enq_internal(reorder_buf);
+
+ /* Free all placeholder bufs that are now released */
+ while (placeholder_buf) {
+ next_buf = placeholder_buf->next;
+ odp_buffer_free(placeholder_buf->handle.handle);
+ placeholder_buf = next_buf;
+ }
+
+ return 0;
+}
+
int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
int num, int sustain)
{
Add the new ordered_queue_enq() internal routine. This is done in two parts to make the diffs easier to follow. Part 1 adds the new routine while Part 2 replaces queue_enq() to use it. Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- .../linux-generic/include/odp_queue_internal.h | 2 + platform/linux-generic/odp_queue.c | 118 +++++++++++++++++++++ 2 files changed, 120 insertions(+)