@@ -108,7 +108,7 @@ typedef union queue_entry_u queue_entry_t;
/* Common buffer header */
typedef struct odp_buffer_hdr_t {
- struct odp_buffer_hdr_t *next; /* next buf in a list */
+ struct odp_buffer_hdr_t *next; /* next buf in a list--keep 1st */
union { /* Multi-use secondary link */
struct odp_buffer_hdr_t *prev;
struct odp_buffer_hdr_t *link;
@@ -23,6 +23,7 @@ extern "C" {
#include <odp_align_internal.h>
#include <odp/packet_io.h>
#include <odp/align.h>
+#include <odp/hints.h>
#define USE_TICKETLOCK
@@ -99,6 +100,10 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
+int queue_pktout_enq_multi(queue_entry_t *queue,
+ odp_buffer_hdr_t *buf_hdr[], int num);
+
int queue_enq_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
int queue_enq_multi_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
int num);
@@ -143,6 +148,117 @@ static inline int queue_prio(queue_entry_t *qe)
return qe->s.param.sched.prio;
}
+static inline void reorder_enq(queue_entry_t *queue,
+ queue_entry_t *origin_qe,
+ odp_buffer_hdr_t *buf_hdr)
+{
+ odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+ odp_buffer_hdr_t *reorder_prev =
+ (odp_buffer_hdr_t *)&origin_qe->s.reorder_head;
+
+ while (reorder_buf && buf_hdr->order >= reorder_buf->order) {
+ reorder_prev = reorder_buf;
+ reorder_buf = reorder_buf->next;
+ }
+
+ buf_hdr->next = reorder_buf;
+ reorder_prev->next = buf_hdr;
+
+ if (!reorder_buf)
+ origin_qe->s.reorder_tail = buf_hdr;
+
+ buf_hdr->target_qe = queue;
+}
+
+static inline void order_release(queue_entry_t *origin_qe, int count)
+{
+ origin_qe->s.order_out += count;
+ odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, count);
+}
+
+static inline void reorder_deq(queue_entry_t *queue,
+ queue_entry_t *origin_qe,
+ odp_buffer_hdr_t **reorder_buf_return,
+ odp_buffer_hdr_t **reorder_prev_return,
+ odp_buffer_hdr_t **placeholder_buf_return,
+ uint32_t *release_count_return,
+ uint32_t *placeholder_count_return)
+{
+ odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+ odp_buffer_hdr_t *reorder_prev = NULL;
+ odp_buffer_hdr_t *placeholder_buf = NULL;
+ odp_buffer_hdr_t *next_buf;
+ uint32_t release_count = 0;
+ uint32_t placeholder_count = 0;
+
+ while (reorder_buf &&
+ reorder_buf->order <= origin_qe->s.order_out +
+ release_count + placeholder_count) {
+ /*
+ * Elements on the reorder list fall into one of
+ * three categories:
+ *
+ * 1. Those destined for the same queue. These
+ * can be enq'd now if they were waiting to
+ * be unblocked by this enq.
+ *
+ * 2. Those representing placeholders for events
+ * whose ordering was released by a prior
+ * odp_schedule_release_ordered() call. These
+ * can now just be freed.
+ *
+ * 3. Those representing events destined for another
+ * queue. These cannot be consolidated with this
+ * enq since they have a different target.
+ *
+ * Detecting an element with an order sequence gap, an
+ * element in category 3, or running out of elements
+ * stops the scan.
+ */
+ next_buf = reorder_buf->next;
+
+ if (odp_likely(reorder_buf->target_qe == queue)) {
+ /* promote any chain */
+ odp_buffer_hdr_t *reorder_link =
+ reorder_buf->link;
+
+ if (reorder_link) {
+ reorder_buf->next = reorder_link;
+ reorder_buf->link = NULL;
+ while (reorder_link->next)
+ reorder_link = reorder_link->next;
+ reorder_link->next = next_buf;
+ reorder_prev = reorder_link;
+ } else {
+ reorder_prev = reorder_buf;
+ }
+
+ if (!reorder_buf->flags.sustain)
+ release_count++;
+ reorder_buf = next_buf;
+ } else if (!reorder_buf->target_qe) {
+ if (reorder_prev)
+ reorder_prev->next = next_buf;
+ else
+ origin_qe->s.reorder_head = next_buf;
+
+ reorder_buf->next = placeholder_buf;
+ placeholder_buf = reorder_buf;
+
+ reorder_buf = next_buf;
+ placeholder_count++;
+ } else {
+ break;
+ }
+ }
+
+ *reorder_buf_return = reorder_buf;
+ *reorder_prev_return = reorder_prev;
+ *placeholder_buf_return = placeholder_buf;
+ *release_count_return = release_count;
+ *placeholder_count_return = placeholder_count;
+}
+
void queue_destroy_finalize(queue_entry_t *qe);
#ifdef __cplusplus
@@ -77,9 +77,9 @@ static void queue_init(queue_entry_t *queue, const char *name,
queue->s.dequeue_multi = pktin_deq_multi;
break;
case ODP_QUEUE_TYPE_PKTOUT:
- queue->s.enqueue = pktout_enqueue;
+ queue->s.enqueue = queue_pktout_enq;
queue->s.dequeue = pktout_dequeue;
- queue->s.enqueue_multi = pktout_enq_multi;
+ queue->s.enqueue_multi = queue_pktout_enq_multi;
queue->s.dequeue_multi = pktout_deq_multi;
break;
default:
@@ -369,34 +369,7 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
/* We can only complete the enq if we're in order */
if (origin_qe) {
if (buf_hdr->order > origin_qe->s.order_out) {
- odp_buffer_hdr_t *reorder_buf =
- origin_qe->s.reorder_head;
-
- if (!reorder_buf) {
- buf_hdr->next = NULL;
- origin_qe->s.reorder_head = buf_hdr;
- origin_qe->s.reorder_tail = buf_hdr;
- } else {
- odp_buffer_hdr_t *reorder_prev = NULL;
-
- while (buf_hdr->order >= reorder_buf->order) {
- reorder_prev = reorder_buf;
- reorder_buf = reorder_buf->next;
- if (!reorder_buf)
- break;
- }
-
- buf_hdr->next = reorder_buf;
- if (reorder_prev)
- reorder_prev->next = buf_hdr;
- else
- origin_qe->s.reorder_head = buf_hdr;
-
- if (!reorder_buf)
- origin_qe->s.reorder_tail = buf_hdr;
- }
-
- buf_hdr->target_qe = queue;
+ reorder_enq(queue, origin_qe, buf_hdr);
/* This enq can't complete until order is restored, so
* we're done here.
@@ -407,10 +380,8 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
}
/* We're in order, so account for this and proceed with enq */
- if (!buf_hdr->flags.sustain) {
- origin_qe->s.order_out++;
- odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
- }
+ if (!buf_hdr->flags.sustain)
+ order_release(origin_qe, 1);
/* if this element is linked, restore the linked chain */
buf_tail = buf_hdr->link;
@@ -450,74 +421,16 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
* enq has unblocked other buffers in the origin's reorder queue.
*/
if (origin_qe) {
- odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
- odp_buffer_hdr_t *reorder_prev = NULL;
- odp_buffer_hdr_t *placeholder_buf = NULL;
+ odp_buffer_hdr_t *reorder_buf;
odp_buffer_hdr_t *next_buf;
- uint32_t release_count = 0;
- uint32_t placeholder_count = 0;
-
- while (reorder_buf &&
- reorder_buf->order <= origin_qe->s.order_out +
- release_count + placeholder_count) {
- /*
- * Elements on the reorder list fall into one of
- * three categories:
- *
- * 1. Those destined for the same queue. These
- * can be enq'd now if they were waiting to
- * be unblocked by this enq.
- *
- * 2. Those representing placeholders for events
- * whose ordering was released by a prior
- * odp_schedule_release_ordered() call. These
- * can now just be freed.
- *
- * 3. Those representing events destined for another
- * queue. These cannot be consolidated with this
- * enq since they have a different target.
- *
- * Detecting an element with an order sequence gap, an
- * element in category 3, or running out of elements
- * stops the scan.
- */
- next_buf = reorder_buf->next;
-
- if (odp_likely(reorder_buf->target_qe == queue)) {
- /* promote any chain */
- odp_buffer_hdr_t *reorder_link =
- reorder_buf->link;
-
- if (reorder_link) {
- reorder_buf->next = reorder_link;
- reorder_buf->link = NULL;
- while (reorder_link->next)
- reorder_link =
- reorder_link->next;
- reorder_link->next = next_buf;
- reorder_prev = reorder_link;
- } else {
- reorder_prev = reorder_buf;
- }
-
- if (!reorder_buf->flags.sustain)
- release_count++;
- reorder_buf = next_buf;
- } else if (!reorder_buf->target_qe) {
- if (reorder_prev)
- reorder_prev->next = next_buf;
- else
- origin_qe->s.reorder_head = next_buf;
-
- reorder_buf->next = placeholder_buf;
- placeholder_buf = reorder_buf;
-
- reorder_buf = next_buf;
- placeholder_count++;
- } else {
- break;
- }
- }
+ odp_buffer_hdr_t *reorder_prev;
+ odp_buffer_hdr_t *placeholder_buf;
+ uint32_t release_count;
+ uint32_t placeholder_count;
+
+ reorder_deq(queue, origin_qe,
+ &reorder_buf, &reorder_prev, &placeholder_buf,
+ &release_count, &placeholder_count);
/* Add released buffers to the queue as well */
if (release_count > 0) {
@@ -528,18 +441,18 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
}
/* Reflect the above two in the output sequence */
- origin_qe->s.order_out += release_count + placeholder_count;
- odp_atomic_fetch_add_u64(&origin_qe->s.sync_out,
- release_count + placeholder_count);
+ order_release(origin_qe, release_count + placeholder_count);
/* Now handle any unblocked buffers destined for other queues */
UNLOCK(&queue->s.lock);
+
if (reorder_buf &&
reorder_buf->order <= origin_qe->s.order_out)
origin_qe->s.reorder_head = reorder_buf->next;
else
reorder_buf = NULL;
UNLOCK(&origin_qe->s.lock);
+
if (reorder_buf)
odp_queue_enq(reorder_buf->target_qe->s.handle,
(odp_event_t)reorder_buf->handle.handle);
@@ -547,7 +460,7 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
/* Free all placeholder bufs that are now released */
while (placeholder_buf) {
next_buf = placeholder_buf->next;
- odp_buffer_free(buf_hdr->handle.handle);
+ odp_buffer_free(placeholder_buf->handle.handle);
placeholder_buf = next_buf;
}
} else {
@@ -799,6 +712,191 @@ odp_event_t odp_queue_deq(odp_queue_t handle)
return ODP_EVENT_INVALID;
}
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
+{
+ queue_entry_t *origin_qe = buf_hdr->origin_qe;
+ int rc, sustain;
+
+ /* Special processing needed only if we came from an ordered queue */
+ if (!origin_qe)
+ return pktout_enqueue(queue, buf_hdr);
+
+ /* Must lock origin_qe for ordered processing */
+ LOCK(&origin_qe->s.lock);
+ if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&origin_qe->s.lock);
+ ODP_ERR("Bad origin queue status\n");
+ return -1;
+ }
+
+ /* We can only complete the enq if we're in order */
+ if (buf_hdr->order > origin_qe->s.order_out) {
+ reorder_enq(queue, origin_qe, buf_hdr);
+
+ /* This enq can't complete until order is restored, so
+ * we're done here.
+ */
+ UNLOCK(&origin_qe->s.lock);
+ return 0;
+ }
+
+ /* Perform our enq since we're in order.
+ * Note: Don't hold the origin_qe lock across an I/O operation!
+ * Note that we also cache the sustain flag since the buffer may
+ * be freed by the I/O operation so we can't reference it afterwards.
+ */
+ UNLOCK(&origin_qe->s.lock);
+ sustain = buf_hdr->flags.sustain;
+
+ /* Handle any chained buffers (internal calls) */
+ if (buf_hdr->link) {
+ odp_buffer_hdr_t *buf_hdrs[QUEUE_MULTI_MAX];
+ odp_buffer_hdr_t *next_buf;
+ int num = 0;
+
+ next_buf = buf_hdr->link;
+ buf_hdr->link = NULL;
+
+ while (next_buf) {
+ buf_hdrs[num++] = next_buf;
+ next_buf = next_buf->next;
+ }
+
+ rc = pktout_enq_multi(queue, buf_hdrs, num);
+ if (rc < num)
+ return -1;
+ } else {
+ rc = pktout_enqueue(queue, buf_hdr);
+ if (!rc)
+ return rc;
+ }
+
+ /* Reacquire the lock following the I/O send. Note that we're still
+ * guaranteed to be in order here since we haven't released
+ * order yet.
+ */
+ LOCK(&origin_qe->s.lock);
+ if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&origin_qe->s.lock);
+ ODP_ERR("Bad origin queue status\n");
+ return -1;
+ }
+
+ /* Account for this ordered enq */
+ if (!sustain)
+ order_release(origin_qe, 1);
+
+ /* Now check to see if our successful enq has unblocked other buffers
+ * in the origin's reorder queue.
+ */
+ odp_buffer_hdr_t *reorder_buf;
+ odp_buffer_hdr_t *next_buf;
+ odp_buffer_hdr_t *reorder_prev;
+ odp_buffer_hdr_t *xmit_buf;
+ odp_buffer_hdr_t *placeholder_buf;
+ uint32_t release_count;
+ uint32_t placeholder_count;
+
+ reorder_deq(queue, origin_qe,
+ &reorder_buf, &reorder_prev, &placeholder_buf,
+ &release_count, &placeholder_count);
+
+ /* Send released buffers as well */
+ if (release_count > 0) {
+ xmit_buf = origin_qe->s.reorder_head;
+ origin_qe->s.reorder_head = reorder_prev->next;
+ reorder_prev->next = NULL;
+ UNLOCK(&origin_qe->s.lock);
+
+ do {
+ next_buf = xmit_buf->next;
+ pktout_enqueue(queue, xmit_buf);
+ xmit_buf = next_buf;
+ } while (xmit_buf);
+
+ /* Reacquire the origin_qe lock to continue */
+ LOCK(&origin_qe->s.lock);
+ if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&origin_qe->s.lock);
+ ODP_ERR("Bad origin queue status\n");
+ return -1;
+ }
+ }
+
+ /* Update the order sequence to reflect the deq'd elements */
+ order_release(origin_qe, release_count + placeholder_count);
+
+ /* Now handle sends to other queues that are ready to go */
+ if (reorder_buf && reorder_buf->order <= origin_qe->s.order_out)
+ origin_qe->s.reorder_head = reorder_buf->next;
+ else
+ reorder_buf = NULL;
+
+ /* We're fully done with the origin_qe at last */
+ UNLOCK(&origin_qe->s.lock);
+
+ /* Now send the next buffer to its target queue */
+ if (reorder_buf)
+ odp_queue_enq(reorder_buf->target_qe->s.handle,
+ (odp_event_t)reorder_buf->handle.handle);
+
+ /* Free all placeholder bufs that are now released */
+ while (placeholder_buf) {
+ next_buf = placeholder_buf->next;
+ odp_buffer_free(placeholder_buf->handle.handle);
+ placeholder_buf = next_buf;
+ }
+
+ return 0;
+}
+
+int queue_pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ int i, rc, ret_count = 0;
+ int ordered_head[num];
+ int ordered_count = 0;
+
+ /* Identify ordered chains in the input buffer list */
+ for (i = 0; i < num; i++) {
+ if (buf_hdr[i]->origin_qe)
+ ordered_head[ordered_count++] = i;
+
+ buf_hdr[i]->next = i < num - 1 ? buf_hdr[i + 1] : NULL;
+ }
+
+ ret_count = ordered_count ? ordered_head[0] : num;
+
+ /* Handle regular enq's at start of list */
+ if (ret_count) {
+ rc = pktout_enq_multi(queue, buf_hdr, ret_count);
+ if (rc < ret_count)
+ return rc;
+ }
+
+ /* Handle ordered chains in the list */
+ for (i = 0; i < ordered_count; i++) {
+ int eol = i < ordered_count - 1 ? ordered_head[i + 1] : num;
+ int list_count = eol - i;
+
+ if (i < ordered_count - 1)
+ buf_hdr[eol - 1]->next = NULL;
+
+ buf_hdr[ordered_head[i]]->link =
+ list_count > 1 ? buf_hdr[ordered_head[i] + 1] : NULL;
+
+ rc = queue_pktout_enq(queue, buf_hdr[ordered_head[i]]);
+ if (rc < 0)
+ return ret_count;
+
+ if (rc < list_count)
+ return ret_count + rc;
+
+ ret_count += rc;
+ }
+
+ return ret_count;
+}
void queue_lock(queue_entry_t *queue)
{
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- .../linux-generic/include/odp_buffer_internal.h | 2 +- .../linux-generic/include/odp_queue_internal.h | 116 ++++++++ platform/linux-generic/odp_queue.c | 308 ++++++++++++++------- 3 files changed, 320 insertions(+), 106 deletions(-)