@@ -335,8 +335,7 @@ static inline int reorder_deq(queue_entry_t *queue,
static inline void reorder_complete(queue_entry_t *origin_qe,
odp_buffer_hdr_t **reorder_buf_return,
odp_buffer_hdr_t **placeholder_buf,
- int placeholder_append,
- int order_released)
+ int placeholder_append)
{
odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
odp_buffer_hdr_t *next_buf;
@@ -356,7 +355,7 @@ static inline void reorder_complete(queue_entry_t *origin_qe,
reorder_buf = next_buf;
order_release(origin_qe, 1);
- } else if (!order_released && reorder_buf->flags.sustain) {
+ } else if (reorder_buf->flags.sustain) {
reorder_buf = next_buf;
} else {
*reorder_buf_return = origin_qe->s.reorder_head;
@@ -39,6 +39,11 @@
#include <string.h>
+#define RESOLVE_ORDER 0
+#define SUSTAIN_ORDER 1
+
+#define NOAPPEND 0
+#define APPEND 1
typedef struct queue_table_t {
queue_entry_t queue[ODP_CONFIG_QUEUES];
@@ -521,8 +526,7 @@ int ordered_queue_enq(queue_entry_t *queue,
if (sched && schedule_queue(queue))
ODP_ABORT("schedule_queue failed\n");
- reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
- 1, 0);
+ reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, APPEND);
UNLOCK(&origin_qe->s.lock);
if (reorder_buf)
@@ -606,7 +610,8 @@ int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
for (i = 0; i < num; i++)
buf_hdr[i] = odp_buf_to_hdr(odp_buffer_from_event(ev[i]));
- return num == 0 ? 0 : queue->s.enqueue_multi(queue, buf_hdr, num, 1);
+ return num == 0 ? 0 : queue->s.enqueue_multi(queue, buf_hdr,
+ num, SUSTAIN_ORDER);
}
int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
@@ -620,7 +625,7 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
/* No chains via this entry */
buf_hdr->link = NULL;
- return queue->s.enqueue(queue, buf_hdr, 1);
+ return queue->s.enqueue(queue, buf_hdr, SUSTAIN_ORDER);
}
int queue_enq_internal(odp_buffer_hdr_t *buf_hdr)
@@ -660,7 +665,7 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
buf_hdr->sync[i] =
odp_atomic_fetch_inc_u64(&queue->s.sync_in[i]);
}
- buf_hdr->flags.sustain = 0;
+ buf_hdr->flags.sustain = SUSTAIN_ORDER;
} else {
buf_hdr->origin_qe = NULL;
}
@@ -713,7 +718,7 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
odp_atomic_fetch_inc_u64
(&queue->s.sync_in[j]);
}
- buf_hdr[i]->flags.sustain = 0;
+ buf_hdr[i]->flags.sustain = SUSTAIN_ORDER;
} else {
buf_hdr[i]->origin_qe = NULL;
}
@@ -879,7 +884,7 @@ int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
order_release(origin_qe, release_count + placeholder_count);
/* Now handle sends to other queues that are ready to go */
- reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1, 0);
+ reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, APPEND);
/* We're fully done with the origin_qe at last */
UNLOCK(&origin_qe->s.lock);
@@ -947,13 +952,43 @@ int release_order(queue_entry_t *origin_qe, uint64_t order,
odp_buffer_t placeholder_buf;
odp_buffer_hdr_t *placeholder_buf_hdr, *reorder_buf, *next_buf;
- /* Must tlock the origin queue to process the release */
+ /* Must lock the origin queue to process the release */
LOCK(&origin_qe->s.lock);
- /* If we are in the order we can release immediately since there can
- * be no confusion about intermediate elements
+ /* If we are in order we can release immediately since there can be no
+ * confusion about intermediate elements
*/
if (order <= origin_qe->s.order_out) {
+ reorder_buf = origin_qe->s.reorder_head;
+
+ /* We're in order, however there may be one or more events on
+ * the reorder queue that are part of this order. If that is
+ * the case, remove them and let ordered_queue_enq() handle
+ * them and resolve the order for us.
+ */
+ if (reorder_buf && reorder_buf->order == order) {
+ odp_buffer_hdr_t *reorder_head = reorder_buf;
+
+ next_buf = reorder_buf->next;
+
+ while (next_buf && next_buf->order == order) {
+ reorder_buf = next_buf;
+ next_buf = next_buf->next;
+ }
+
+ origin_qe->s.reorder_head = reorder_buf->next;
+ reorder_buf->next = NULL;
+
+ UNLOCK(&origin_qe->s.lock);
+ reorder_head->link = reorder_buf->next;
+ return ordered_queue_enq(reorder_head->target_qe,
+ reorder_head, RESOLVE_ORDER,
+ origin_qe, order);
+ }
+
+ /* Reorder queue has no elements for this order, so it's safe
+ * to resolve order here
+ */
order_release(origin_qe, 1);
/* Check if this release allows us to unblock waiters. At the
@@ -965,7 +1000,7 @@ int release_order(queue_entry_t *origin_qe, uint64_t order,
* element(s) on the reorder queue
*/
reorder_complete(origin_qe, &reorder_buf,
- &placeholder_buf_hdr, 0, 1);
+ &placeholder_buf_hdr, NOAPPEND);
/* Now safe to unlock */
UNLOCK(&origin_qe->s.lock);
Resolve the corner case of releasing order for an order that still has events on the reorder queue. This also allows the reorder_complete() routine to be streamlined. This patch resolves Bug https://bugs.linaro.org/show_bug.cgi?id=1879 Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- .../linux-generic/include/odp_queue_internal.h | 5 +- platform/linux-generic/odp_queue.c | 57 +++++++++++++++++----- 2 files changed, 48 insertions(+), 14 deletions(-)