Message ID | 1438815321-12344-10-git-send-email-bill.fischofer@linaro.org |
---|---|
State | New |
Headers | show |
On Thu, Aug 6, 2015 at 2:58 AM, Jianbo Liu <Jianbo.Liu@arm.com> wrote: > > > > -----Original Message----- > > From: lng-odp [mailto:lng-odp-bounces@lists.linaro.org] On Behalf Of > Bill > > Fischofer > > Sent: Thursday, August 06, 2015 6:55 AM > > To: lng-odp@lists.linaro.org > > Subject: [lng-odp] [API-NEXT PATCHv8 09/13] linux-generic: queue: add > > ordered chain enq support > > > > Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> > > --- > > .../linux-generic/include/odp_buffer_internal.h | 4 + > > platform/linux-generic/odp_queue.c | 151 > +++++++++++++++------ > > 2 files changed, 115 insertions(+), 40 deletions(-) > > > > diff --git a/platform/linux-generic/include/odp_buffer_internal.h > > b/platform/linux-generic/include/odp_buffer_internal.h > > index c459fce..30f061e 100644 > > --- a/platform/linux-generic/include/odp_buffer_internal.h > > +++ b/platform/linux-generic/include/odp_buffer_internal.h > ................. > > > @@ -523,53 +555,89 @@ int queue_enq(queue_entry_t *queue, > > odp_buffer_hdr_t *buf_hdr) > > int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t > > *buf_hdr[], int num) > > { > > int sched = 0; > > - int i, j; > > + int i, rc, ret_count = 0; > > + int ordered_head[num]; > > + int ordered_count = 0; > > odp_buffer_hdr_t *tail; > > > > + /* Identify ordered chains in the input buffer list */ > > for (i = 0; i < num; i++) { > > - /* If any buffer is coming from an ordered queue, enqueue > > them > > - * individually since in the general case each might > originate > > - * from a different ordered queue. If any of these fail, > the > > - * return code tells the caller how many succeeded. > > - */ > > - if (buf_hdr[i]->origin_qe) { > > - for (j = 0; j < num; j++) { > > - if (queue_enq(queue, buf_hdr[j])) > > - return j; > > - } > > - return num; > > + if (buf_hdr[i]->origin_qe) > > + ordered_head[ordered_count++] = i; > > + if (i < num - 1) > > + buf_hdr[i]->next = buf_hdr[i + 1]; > > + } > > + > > + buf_hdr[num - 1]->next = NULL; > > + > > + if (ordered_count) { > > + if (ordered_head[0] > 0) { > > + tail = buf_hdr[ordered_head[0] - 1]; > > + tail->next = NULL; > > + ret_count = ordered_head[0]; > > + } else { > > + tail = NULL; > > + ret_count = 0; > > } > > - buf_hdr[i]->next = i == num - 1 ? NULL : buf_hdr[i + 1]; > > + } else { > > + tail = buf_hdr[num - 1]; > > + ret_count = num; > > } > > > > - tail = buf_hdr[num-1]; > > + /* Handle regular enq's at start of list */ > > + if (tail) { > > + tail->next = NULL; > > The above line is not necessary since tail->next is already NULL. > Good catch. > > > + LOCK(&queue->s.lock); > > + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) > > { > > + UNLOCK(&queue->s.lock); > > + ODP_ERR("Bad queue status\n"); > > + return -1; > > + } > > > > - LOCK(&queue->s.lock); > > - if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { > > + /* Empty queue */ > > + if (queue->s.head) > > + queue->s.tail->next = buf_hdr[0]; > > + else > > + queue->s.head = buf_hdr[0]; > > + > > + queue->s.tail = tail; > > + > > + if (queue->s.status == QUEUE_STATUS_NOTSCHED) { > > + queue->s.status = QUEUE_STATUS_SCHED; > > + sched = 1; /* retval: schedule queue */ > > + } > > UNLOCK(&queue->s.lock); > > - ODP_ERR("Bad queue status\n"); > > - return -1; > > + > > + /* Add queue to scheduling */ > > + if (sched && schedule_queue(queue)) > > + ODP_ABORT("schedule_queue failed\n"); > > } > > > > - /* Empty queue */ > > - if (queue->s.head == NULL) > > - queue->s.head = buf_hdr[0]; > > - else > > - queue->s.tail->next = buf_hdr[0]; > > + /* Handle ordered chains in the list */ > > + for (i = 0; i < ordered_count; i++) { > > + int eol = i < ordered_count - 1 ? ordered_head[i + 1] : > num; > > + int list_count = eol - i; > > > > - queue->s.tail = tail; > > + if (i < ordered_count - 1) > > + buf_hdr[ordered_head[i + 1] - 1]->next = NULL; > > Is it better to use eol instead of ordered_head[i + 1]? > The compiler should optimize this, but agree it might be cleaner. > > > > > - if (queue->s.status == QUEUE_STATUS_NOTSCHED) { > > - queue->s.status = QUEUE_STATUS_SCHED; > > - sched = 1; /* retval: schedule queue */ > > - } > > - UNLOCK(&queue->s.lock); > > + if (ordered_head[i] < eol - 1) > > + buf_hdr[ordered_head[i]]->link = > > + buf_hdr[ordered_head[i] + 1]; > > + else > > + buf_hdr[ordered_head[i]]->link = NULL; > > > > - /* Add queue to scheduling */ > > - if (sched && schedule_queue(queue)) > > - ODP_ABORT("schedule_queue failed\n"); > > + rc = queue_enq(queue, buf_hdr[ordered_head[i]]); > > + if (rc < 0) > > + return ret_count; > > + > > + if (rc < list_count) > > + return ret_count + rc; > > > > - return num; /* All events enqueued */ > > + ret_count += rc; > > + } > > + > > + return ret_count; > > } > > > > int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], > > int num) > > @@ -598,6 +666,9 @@ int odp_queue_enq(odp_queue_t handle, > > odp_event_t ev) > > queue = queue_to_qentry(handle); > > buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev)); > > > > + /* No chains via this entry */ > > + buf_hdr->link = NULL; > > + > > return queue->s.enqueue(queue, buf_hdr); > > } > > > > -- > > 2.1.4 > > > > _______________________________________________ > > lng-odp mailing list > > lng-odp@lists.linaro.org > > https://lists.linaro.org/mailman/listinfo/lng-odp > > -- IMPORTANT NOTICE: The contents of this email and any attachments are > confidential and may also be privileged. If you are not the intended > recipient, please notify the sender immediately and do not disclose the > contents to any other person, use it for any purpose, or store or copy the > information in any medium. Thank you. > > ARM Limited, Registered office 110 Fulbourn Road, Cambridge CB1 9NJ, > Registered in England & Wales, Company No: 2557590 > ARM Holdings plc, Registered office 110 Fulbourn Road, Cambridge CB1 9NJ, > Registered in England & Wales, Company No: 2548782 >
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index c459fce..30f061e 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -109,6 +109,10 @@ typedef union queue_entry_u queue_entry_t; /* Common buffer header */ typedef struct odp_buffer_hdr_t { struct odp_buffer_hdr_t *next; /* next buf in a list */ + union { /* Multi-use secondary link */ + struct odp_buffer_hdr_t *prev; + struct odp_buffer_hdr_t *link; + }; odp_buffer_bits_t handle; /* handle */ union { uint32_t all; diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index a2460e7..e56da71 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -336,6 +336,7 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) { int sched = 0; queue_entry_t *origin_qe = buf_hdr->origin_qe; + odp_buffer_hdr_t *buf_tail; /* Need two locks for enq operations from ordered queues */ if (origin_qe) { @@ -404,17 +405,33 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) /* We're in order, so account for this and proceed with enq */ origin_qe->s.order_out++; + + /* if this element is linked, restore the linked chain */ + buf_tail = buf_hdr->link; + + if (buf_tail) { + buf_hdr->next = buf_tail; + buf_hdr->link = NULL; + + /* find end of the chain */ + while (buf_tail->next) + buf_tail = buf_tail->next; + } else { + buf_tail = buf_hdr; + } + } else { + buf_tail = buf_hdr; } - if (queue->s.head == NULL) { + if (!queue->s.head) { /* Empty queue */ queue->s.head = buf_hdr; - queue->s.tail = buf_hdr; - buf_hdr->next = NULL; + queue->s.tail = buf_tail; + buf_tail->next = NULL; } else { queue->s.tail->next = buf_hdr; - queue->s.tail = buf_hdr; - buf_hdr->next = NULL; + queue->s.tail = buf_tail; + buf_tail->next = NULL; } if (queue->s.status == QUEUE_STATUS_NOTSCHED) { @@ -461,8 +478,23 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) next_buf = reorder_buf->next; if (odp_likely(reorder_buf->target_qe == queue)) { - reorder_prev = reorder_buf; - reorder_buf = next_buf; + /* promote any chain */ + odp_buffer_hdr_t *reorder_link = + reorder_buf->link; + + if (reorder_link) { + reorder_buf->next = reorder_link; + reorder_buf->link = NULL; + while (reorder_link->next) + reorder_link = + reorder_link->next; + reorder_link->next = next_buf; + reorder_prev = reorder_link; + } else { + reorder_prev = reorder_buf; + } + + reorder_buf = next_buf; release_count++; } else if (!reorder_buf->target_qe) { if (reorder_prev) @@ -523,53 +555,89 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { int sched = 0; - int i, j; + int i, rc, ret_count = 0; + int ordered_head[num]; + int ordered_count = 0; odp_buffer_hdr_t *tail; + /* Identify ordered chains in the input buffer list */ for (i = 0; i < num; i++) { - /* If any buffer is coming from an ordered queue, enqueue them - * individually since in the general case each might originate - * from a different ordered queue. If any of these fail, the - * return code tells the caller how many succeeded. - */ - if (buf_hdr[i]->origin_qe) { - for (j = 0; j < num; j++) { - if (queue_enq(queue, buf_hdr[j])) - return j; - } - return num; + if (buf_hdr[i]->origin_qe) + ordered_head[ordered_count++] = i; + if (i < num - 1) + buf_hdr[i]->next = buf_hdr[i + 1]; + } + + buf_hdr[num - 1]->next = NULL; + + if (ordered_count) { + if (ordered_head[0] > 0) { + tail = buf_hdr[ordered_head[0] - 1]; + tail->next = NULL; + ret_count = ordered_head[0]; + } else { + tail = NULL; + ret_count = 0; } - buf_hdr[i]->next = i == num - 1 ? NULL : buf_hdr[i + 1]; + } else { + tail = buf_hdr[num - 1]; + ret_count = num; } - tail = buf_hdr[num-1]; + /* Handle regular enq's at start of list */ + if (tail) { + tail->next = NULL; + LOCK(&queue->s.lock); + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { + UNLOCK(&queue->s.lock); + ODP_ERR("Bad queue status\n"); + return -1; + } - LOCK(&queue->s.lock); - if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { + /* Empty queue */ + if (queue->s.head) + queue->s.tail->next = buf_hdr[0]; + else + queue->s.head = buf_hdr[0]; + + queue->s.tail = tail; + + if (queue->s.status == QUEUE_STATUS_NOTSCHED) { + queue->s.status = QUEUE_STATUS_SCHED; + sched = 1; /* retval: schedule queue */ + } UNLOCK(&queue->s.lock); - ODP_ERR("Bad queue status\n"); - return -1; + + /* Add queue to scheduling */ + if (sched && schedule_queue(queue)) + ODP_ABORT("schedule_queue failed\n"); } - /* Empty queue */ - if (queue->s.head == NULL) - queue->s.head = buf_hdr[0]; - else - queue->s.tail->next = buf_hdr[0]; + /* Handle ordered chains in the list */ + for (i = 0; i < ordered_count; i++) { + int eol = i < ordered_count - 1 ? ordered_head[i + 1] : num; + int list_count = eol - i; - queue->s.tail = tail; + if (i < ordered_count - 1) + buf_hdr[ordered_head[i + 1] - 1]->next = NULL; - if (queue->s.status == QUEUE_STATUS_NOTSCHED) { - queue->s.status = QUEUE_STATUS_SCHED; - sched = 1; /* retval: schedule queue */ - } - UNLOCK(&queue->s.lock); + if (ordered_head[i] < eol - 1) + buf_hdr[ordered_head[i]]->link = + buf_hdr[ordered_head[i] + 1]; + else + buf_hdr[ordered_head[i]]->link = NULL; - /* Add queue to scheduling */ - if (sched && schedule_queue(queue)) - ODP_ABORT("schedule_queue failed\n"); + rc = queue_enq(queue, buf_hdr[ordered_head[i]]); + if (rc < 0) + return ret_count; + + if (rc < list_count) + return ret_count + rc; - return num; /* All events enqueued */ + ret_count += rc; + } + + return ret_count; } int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num) @@ -598,6 +666,9 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev) queue = queue_to_qentry(handle); buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev)); + /* No chains via this entry */ + buf_hdr->link = NULL; + return queue->s.enqueue(queue, buf_hdr); }
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- .../linux-generic/include/odp_buffer_internal.h | 4 + platform/linux-generic/odp_queue.c | 151 +++++++++++++++------ 2 files changed, 115 insertions(+), 40 deletions(-)