@@ -284,18 +284,38 @@ static inline int reorder_deq(queue_entry_t *queue,
return deq_count;
}
-static inline int reorder_complete(odp_buffer_hdr_t *reorder_buf)
+static inline void reorder_complete(queue_entry_t *origin_qe,
+ odp_buffer_hdr_t **reorder_buf_return,
+ odp_buffer_hdr_t **placeholder_buf,
+ int placeholder_append)
{
- odp_buffer_hdr_t *next_buf = reorder_buf->next;
- uint64_t order = reorder_buf->order;
+ odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+ odp_buffer_hdr_t *next_buf;
+
+ *reorder_buf_return = NULL;
+ if (!placeholder_append)
+ *placeholder_buf = NULL;
- while (reorder_buf->flags.sustain &&
- next_buf && next_buf->order == order) {
- reorder_buf = next_buf;
+ while (reorder_buf &&
+ reorder_buf->order <= origin_qe->s.order_out) {
next_buf = reorder_buf->next;
- }
- return !reorder_buf->flags.sustain;
+ if (!reorder_buf->target_qe) {
+ origin_qe->s.reorder_head = next_buf;
+ reorder_buf->next = *placeholder_buf;
+ *placeholder_buf = reorder_buf;
+
+ reorder_buf = next_buf;
+ order_release(origin_qe, 1);
+ } else if (reorder_buf->flags.sustain) {
+ reorder_buf = next_buf;
+ } else {
+ *reorder_buf_return = origin_qe->s.reorder_head;
+ origin_qe->s.reorder_head =
+ origin_qe->s.reorder_head->next;
+ break;
+ }
+ }
}
static inline void get_queue_order(queue_entry_t **origin_qe, uint64_t *order,
@@ -457,18 +457,10 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
order_release(origin_qe, release_count + placeholder_count);
/* Now handle any unblocked complete buffers destined for
- * other queues. Note that these must be complete because
- * otherwise another thread is working on it and is
- * responsible for resolving order when it is complete.
+ * other queues, appending placeholder bufs as needed.
*/
UNLOCK(&queue->s.lock);
-
- if (reorder_buf &&
- reorder_buf->order <= origin_qe->s.order_out &&
- reorder_complete(reorder_buf))
- origin_qe->s.reorder_head = reorder_buf->next;
- else
- reorder_buf = NULL;
+ reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1);
UNLOCK(&origin_qe->s.lock);
if (reorder_buf)
@@ -826,12 +818,7 @@ int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
order_release(origin_qe, release_count + placeholder_count);
/* Now handle sends to other queues that are ready to go */
- if (reorder_buf &&
- reorder_buf->order <= origin_qe->s.order_out &&
- reorder_complete(reorder_buf))
- origin_qe->s.reorder_head = reorder_buf->next;
- else
- reorder_buf = NULL;
+ reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1);
/* We're fully done with the origin_qe at last */
UNLOCK(&origin_qe->s.lock);
@@ -909,23 +896,28 @@ int release_order(queue_entry_t *origin_qe, uint64_t order,
order_release(origin_qe, 1);
/* Check if this release allows us to unblock waiters.
- * Note that we can only process complete waiters since
- * if the sustain bit is set for a buffer this means that
- * some other thread is working on it and will be
- * responsible for resolving order when it is complete.
+ * At the point of this call, the reorder list may contain
+ * zero or more placeholders that need to be freed, followed
+ * by zero or one complete reorder buffer chain.
*/
- reorder_buf = origin_qe->s.reorder_head;
-
- if (reorder_buf &&
- reorder_buf->order <= origin_qe->s.order_out &&
- reorder_complete(reorder_buf))
- origin_qe->s.reorder_head = reorder_buf->next;
- else
- reorder_buf = NULL;
+ reorder_complete(origin_qe, &reorder_buf,
+ &placeholder_buf_hdr, 0);
+ /* Now safe to unlock */
UNLOCK(&origin_qe->s.lock);
+
+ /* If reorder_buf has a target, do the enq now */
if (reorder_buf)
queue_enq_internal(reorder_buf);
+
+ while (placeholder_buf_hdr) {
+ odp_buffer_hdr_t *placeholder_next =
+ placeholder_buf_hdr->next;
+
+ odp_buffer_free(placeholder_buf_hdr->handle.handle);
+ placeholder_buf_hdr = placeholder_next;
+ }
+
return 0;
}
Fix a race condition that could cause events to be stuck in an ordered queue's reorder queue. Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- .../linux-generic/include/odp_queue_internal.h | 36 ++++++++++++---- platform/linux-generic/odp_queue.c | 48 +++++++++------------- 2 files changed, 48 insertions(+), 36 deletions(-)