@@ -109,6 +109,10 @@ typedef union queue_entry_u queue_entry_t;
/* Common buffer header */
typedef struct odp_buffer_hdr_t {
struct odp_buffer_hdr_t *next; /* next buf in a list */
+ union { /* Multi-use secondary link */
+ struct odp_buffer_hdr_t *prev;
+ struct odp_buffer_hdr_t *link;
+ };
odp_buffer_bits_t handle; /* handle */
union {
uint32_t all;
@@ -336,6 +336,7 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
{
int sched = 0;
queue_entry_t *origin_qe = buf_hdr->origin_qe;
+ odp_buffer_hdr_t *buf_tail;
/* Need two locks for enq operations from ordered queues */
if (origin_qe) {
@@ -404,17 +405,33 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
/* We're in order, so account for this and proceed with enq */
origin_qe->s.order_out++;
+
+ /* if this element is linked, restore the linked chain */
+ buf_tail = buf_hdr->link;
+
+ if (buf_tail) {
+ buf_hdr->next = buf_tail;
+ buf_hdr->link = NULL;
+
+ /* find end of the chain */
+ while (buf_tail->next)
+ buf_tail = buf_tail->next;
+ } else {
+ buf_tail = buf_hdr;
+ }
+ } else {
+ buf_tail = buf_hdr;
}
- if (queue->s.head == NULL) {
+ if (!queue->s.head) {
/* Empty queue */
queue->s.head = buf_hdr;
- queue->s.tail = buf_hdr;
- buf_hdr->next = NULL;
+ queue->s.tail = buf_tail;
+ buf_tail->next = NULL;
} else {
queue->s.tail->next = buf_hdr;
- queue->s.tail = buf_hdr;
- buf_hdr->next = NULL;
+ queue->s.tail = buf_tail;
+ buf_tail->next = NULL;
}
if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
@@ -461,8 +478,23 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
next_buf = reorder_buf->next;
if (odp_likely(reorder_buf->target_qe == queue)) {
- reorder_prev = reorder_buf;
- reorder_buf = next_buf;
+ /* promote any chain */
+ odp_buffer_hdr_t *reorder_link =
+ reorder_buf->link;
+
+ if (reorder_link) {
+ reorder_buf->next = reorder_link;
+ reorder_buf->link = NULL;
+ while (reorder_link->next)
+ reorder_link =
+ reorder_link->next;
+ reorder_link->next = next_buf;
+ reorder_prev = reorder_link;
+ } else {
+ reorder_prev = reorder_buf;
+ }
+
+ reorder_buf = next_buf;
release_count++;
} else if (!reorder_buf->target_qe) {
if (reorder_prev)
@@ -523,53 +555,83 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
{
int sched = 0;
- int i, j;
+ int i, rc, ret_count = 0;
+ int ordered_head[num];
+ int ordered_count = 0;
odp_buffer_hdr_t *tail;
+ /* Identify ordered chains in the input buffer list */
for (i = 0; i < num; i++) {
- /* If any buffer is coming from an ordered queue, enqueue them
- * individually since in the general case each might originate
- * from a different ordered queue. If any of these fail, the
- * return code tells the caller how many succeeded.
- */
- if (buf_hdr[i]->origin_qe) {
- for (j = 0; j < num; j++) {
- if (queue_enq(queue, buf_hdr[j]))
- return j;
- }
- return num;
+ if (buf_hdr[i]->origin_qe)
+ ordered_head[ordered_count++] = i;
+
+ buf_hdr[i]->next = i < num - 1 ? buf_hdr[i + 1] : NULL;
+ }
+
+ if (ordered_count) {
+ if (ordered_head[0] > 0) {
+ tail = buf_hdr[ordered_head[0] - 1];
+ tail->next = NULL;
+ ret_count = ordered_head[0];
+ } else {
+ tail = NULL;
+ ret_count = 0;
}
- buf_hdr[i]->next = i == num - 1 ? NULL : buf_hdr[i + 1];
+ } else {
+ tail = buf_hdr[num - 1];
+ ret_count = num;
}
- tail = buf_hdr[num-1];
+ /* Handle regular enq's at start of list */
+ if (tail) {
+ LOCK(&queue->s.lock);
+ if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&queue->s.lock);
+ ODP_ERR("Bad queue status\n");
+ return -1;
+ }
+
+ /* Handle empty queue */
+ if (queue->s.head)
+ queue->s.tail->next = buf_hdr[0];
+ else
+ queue->s.head = buf_hdr[0];
- LOCK(&queue->s.lock);
- if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+ queue->s.tail = tail;
+
+ if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
+ queue->s.status = QUEUE_STATUS_SCHED;
+ sched = 1; /* retval: schedule queue */
+ }
UNLOCK(&queue->s.lock);
- ODP_ERR("Bad queue status\n");
- return -1;
+
+ /* Add queue to scheduling */
+ if (sched && schedule_queue(queue))
+ ODP_ABORT("schedule_queue failed\n");
}
- /* Empty queue */
- if (queue->s.head == NULL)
- queue->s.head = buf_hdr[0];
- else
- queue->s.tail->next = buf_hdr[0];
+ /* Handle ordered chains in the list */
+ for (i = 0; i < ordered_count; i++) {
+ int eol = i < ordered_count - 1 ? ordered_head[i + 1] : num;
+ int list_count = eol - i;
- queue->s.tail = tail;
+ if (i < ordered_count - 1)
+ buf_hdr[eol - 1]->next = NULL;
- if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
- queue->s.status = QUEUE_STATUS_SCHED;
- sched = 1; /* retval: schedule queue */
- }
- UNLOCK(&queue->s.lock);
+ buf_hdr[ordered_head[i]]->link =
+ list_count > 1 ? buf_hdr[ordered_head[i] + 1] : NULL;
- /* Add queue to scheduling */
- if (sched && schedule_queue(queue))
- ODP_ABORT("schedule_queue failed\n");
+ rc = queue_enq(queue, buf_hdr[ordered_head[i]]);
+ if (rc < 0)
+ return ret_count;
+
+ if (rc < list_count)
+ return ret_count + rc;
- return num; /* All events enqueued */
+ ret_count += rc;
+ }
+
+ return ret_count;
}
int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
@@ -598,6 +660,9 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
queue = queue_to_qentry(handle);
buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
+ /* No chains via this entry */
+ buf_hdr->link = NULL;
+
return queue->s.enqueue(queue, buf_hdr);
}
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- .../linux-generic/include/odp_buffer_internal.h | 4 + platform/linux-generic/odp_queue.c | 145 +++++++++++++++------ 2 files changed, 109 insertions(+), 40 deletions(-)