@@ -386,146 +386,37 @@ odp_queue_t odp_queue_lookup(const char *name)
return ODP_QUEUE_INVALID;
}
-
int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
{
- int sched = 0;
queue_entry_t *origin_qe;
uint64_t order;
- odp_buffer_hdr_t *buf_tail;
get_queue_order(&origin_qe, &order, buf_hdr);
- /* Need two locks for enq operations from ordered queues */
- if (origin_qe) {
- get_qe_locks(origin_qe, queue);
- if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
- UNLOCK(&queue->s.lock);
- if (origin_qe != queue)
- UNLOCK(&origin_qe->s.lock);
- ODP_ERR("Bad origin queue status\n");
- ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
- queue->s.name, origin_qe->s.name, buf_hdr);
- return -1;
- }
- } else {
- LOCK(&queue->s.lock);
- }
+ /* Handle enqueues from ordered queues separately */
+ if (origin_qe)
+ return ordered_queue_enq(queue, buf_hdr, sustain,
+ origin_qe, order);
+
+ LOCK(&queue->s.lock);
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
UNLOCK(&queue->s.lock);
- if (origin_qe && origin_qe != queue)
- UNLOCK(&origin_qe->s.lock);
ODP_ERR("Bad queue status\n");
return -1;
}
- /* We can only complete the enq if we're in order */
- if (origin_qe) {
- sched_enq_called();
- if (order > origin_qe->s.order_out) {
- reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
-
- /* This enq can't complete until order is restored, so
- * we're done here.
- */
- UNLOCK(&queue->s.lock);
- if (origin_qe != queue)
- UNLOCK(&origin_qe->s.lock);
- return 0;
- }
-
- /* We're in order, so account for this and proceed with enq */
- if (!sustain) {
- order_release(origin_qe, 1);
- sched_order_resolved(buf_hdr);
- }
-
- /* if this element is linked, restore the linked chain */
- buf_tail = buf_hdr->link;
-
- if (buf_tail) {
- buf_hdr->next = buf_tail;
- buf_hdr->link = NULL;
-
- /* find end of the chain */
- while (buf_tail->next)
- buf_tail = buf_tail->next;
- } else {
- buf_tail = buf_hdr;
- }
- } else {
- buf_tail = buf_hdr;
- }
-
- if (!queue->s.head) {
- /* Empty queue */
- queue->s.head = buf_hdr;
- queue->s.tail = buf_tail;
- buf_tail->next = NULL;
- } else {
- queue->s.tail->next = buf_hdr;
- queue->s.tail = buf_tail;
- buf_tail->next = NULL;
- }
+ queue_add(queue, buf_hdr);
if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
queue->s.status = QUEUE_STATUS_SCHED;
- sched = 1; /* retval: schedule queue */
- }
-
- /*
- * If we came from an ordered queue, check to see if our successful
- * enq has unblocked other buffers in the origin's reorder queue.
- */
- if (origin_qe) {
- odp_buffer_hdr_t *reorder_buf;
- odp_buffer_hdr_t *next_buf;
- odp_buffer_hdr_t *reorder_prev;
- odp_buffer_hdr_t *placeholder_buf;
- int deq_count, release_count, placeholder_count;
-
- deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
- &reorder_prev, &placeholder_buf,
- &release_count, &placeholder_count);
-
- /* Add released buffers to the queue as well */
- if (deq_count > 0) {
- queue->s.tail->next = origin_qe->s.reorder_head;
- queue->s.tail = reorder_prev;
- origin_qe->s.reorder_head = reorder_prev->next;
- reorder_prev->next = NULL;
- }
-
- /* Reflect resolved orders in the output sequence */
- order_release(origin_qe, release_count + placeholder_count);
-
- /* Now handle any unblocked complete buffers destined for
- * other queues, appending placeholder bufs as needed.
- */
- if (origin_qe != queue)
- UNLOCK(&queue->s.lock);
- reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
- 1, 0);
- UNLOCK(&origin_qe->s.lock);
-
- if (reorder_buf)
- queue_enq_internal(reorder_buf);
-
- /* Free all placeholder bufs that are now released */
- while (placeholder_buf) {
- next_buf = placeholder_buf->next;
- odp_buffer_free(placeholder_buf->handle.handle);
- placeholder_buf = next_buf;
- }
- } else {
UNLOCK(&queue->s.lock);
+ if (schedule_queue(queue))
+ ODP_ABORT("schedule_queue failed\n");
+ return 0;
}
- /* Add queue to scheduling */
- if (sched && schedule_queue(queue))
- ODP_ABORT("schedule_queue failed\n");
-
+ UNLOCK(&queue->s.lock);
return 0;
}
@@ -667,7 +558,8 @@ int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
get_queue_order(&origin_qe, &order, buf_hdr[0]);
if (origin_qe) {
buf_hdr[0]->link = buf_hdr[0]->next;
- rc = queue_enq(queue, buf_hdr[0], sustain);
+ rc = ordered_queue_enq(queue, buf_hdr[0], sustain,
+ origin_qe, order);
return rc == 0 ? num : rc;
}
Restructure queue_enq() to streamline it while using the new ordered_queue_enq() routine to handle ordered queues. This change is made in two parts to make the diffs easier to follow. Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- platform/linux-generic/odp_queue.c | 134 ++++--------------------------------- 1 file changed, 13 insertions(+), 121 deletions(-)