From patchwork Sat Aug 8 03:03:12 2015 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Bill Fischofer X-Patchwork-Id: 52069 Return-Path: X-Original-To: linaro@patches.linaro.org Delivered-To: linaro@patches.linaro.org Received: from mail-la0-f70.google.com (mail-la0-f70.google.com [209.85.215.70]) by patches.linaro.org (Postfix) with ESMTPS id 5F27520539 for ; Sat, 8 Aug 2015 03:14:48 +0000 (UTC) Received: by lagz9 with SMTP id z9sf25873822lag.3 for ; Fri, 07 Aug 2015 20:14:47 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:delivered-to:delivered-to:from:to:date :message-id:in-reply-to:references:subject:precedence:list-id :list-unsubscribe:list-archive:list-post:list-help:list-subscribe :mime-version:content-type:content-transfer-encoding:errors-to :sender:x-original-sender:x-original-authentication-results :mailing-list; bh=XqOveuGFAx62E86vexWFQp3wbldCd1H9jSqgd3fhTEw=; b=AuBjTYebnDEQB0Bp159qRcXPFZROgT6t7WLb6IS8iB32xig0lMeqsYEEDpv1xXVFyV SUt3qGfGmGgUfEHC73Gg5SPcVkdjSynbv5nQJbEP/lmuc5nDhE4lLz2DHOqvHWlVVSGa JV4aR4UXJ49KQ8qwOO2UcaEHZmguU+G3mlJLfg6HX5qiL+VPavEQTXV+Omv7w0R3VqEn dUkpuKJ1OAPk6DoGNT9PxSNdYHcOzxmjRXePpm7UaS/wc0/2Aiy7K/Tv1t0B767H45C/ XnkUl45OkIpjmY5eBtSmcUL2WtcJUAgp97+M1Syr810frDeq0WLzxJs86wTZUcY9SgX8 E5fw== X-Gm-Message-State: ALoCoQl2hAFPTEV9mJuoZ+AjcxS0Ty8+v6ebHG8FjRtLgJTduH3Je0AH2pL88oDpb1njvhFG1wMp X-Received: by 10.180.186.36 with SMTP id fh4mr392096wic.7.1439003687347; Fri, 07 Aug 2015 20:14:47 -0700 (PDT) X-BeenThere: patchwork-forward@linaro.org Received: by 10.152.203.197 with SMTP id ks5ls498498lac.64.gmail; Fri, 07 Aug 2015 20:14:46 -0700 (PDT) X-Received: by 10.112.160.73 with SMTP id xi9mr10910935lbb.92.1439003686919; Fri, 07 Aug 2015 20:14:46 -0700 (PDT) Received: from mail-lb0-f170.google.com (mail-lb0-f170.google.com. [209.85.217.170]) by mx.google.com with ESMTPS id sc4si9129462lbb.99.2015.08.07.20.14.46 for (version=TLSv1.2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Fri, 07 Aug 2015 20:14:46 -0700 (PDT) Received-SPF: pass (google.com: domain of patch+caf_=patchwork-forward=linaro.org@linaro.org designates 209.85.217.170 as permitted sender) client-ip=209.85.217.170; Received: by lbbyj8 with SMTP id yj8so70177034lbb.0 for ; Fri, 07 Aug 2015 20:14:46 -0700 (PDT) X-Received: by 10.112.219.70 with SMTP id pm6mr10876249lbc.41.1439003686458; Fri, 07 Aug 2015 20:14:46 -0700 (PDT) X-Forwarded-To: patchwork-forward@linaro.org X-Forwarded-For: patch@linaro.org patchwork-forward@linaro.org Delivered-To: patch@linaro.org Received: by 10.112.7.198 with SMTP id l6csp719903lba; Fri, 7 Aug 2015 20:14:45 -0700 (PDT) X-Received: by 10.141.28.11 with SMTP id f11mr20328345qhe.78.1439003684864; Fri, 07 Aug 2015 20:14:44 -0700 (PDT) Received: from lists.linaro.org (lists.linaro.org. [54.225.227.206]) by mx.google.com with ESMTP id k90si22053024qgd.68.2015.08.07.20.14.44; Fri, 07 Aug 2015 20:14:44 -0700 (PDT) Received-SPF: pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.225.227.206 as permitted sender) client-ip=54.225.227.206; Received: by lists.linaro.org (Postfix, from userid 109) id 0783F61DFE; Sat, 8 Aug 2015 03:14:44 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on ip-10-142-244-252.ec2.internal X-Spam-Level: X-Spam-Status: No, score=-2.6 required=5.0 tests=BAYES_00, RCVD_IN_DNSWL_LOW, RCVD_IN_MSPIKE_H3, RCVD_IN_MSPIKE_WL, URIBL_BLOCKED autolearn=disabled version=3.4.0 Received: from ip-10-142-244-252.ec2.internal (localhost [127.0.0.1]) by lists.linaro.org (Postfix) with ESMTP id 04EE7623AC; Sat, 8 Aug 2015 03:05:25 +0000 (UTC) X-Original-To: lng-odp@lists.linaro.org Delivered-To: lng-odp@lists.linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id C33F06239C; Sat, 8 Aug 2015 03:05:15 +0000 (UTC) Received: from mail-oi0-f49.google.com (mail-oi0-f49.google.com [209.85.218.49]) by lists.linaro.org (Postfix) with ESMTPS id 382E16238C for ; Sat, 8 Aug 2015 03:03:30 +0000 (UTC) Received: by oip136 with SMTP id 136so63010534oip.1 for ; Fri, 07 Aug 2015 20:03:29 -0700 (PDT) X-Received: by 10.202.55.7 with SMTP id e7mr9260022oia.56.1439003009704; Fri, 07 Aug 2015 20:03:29 -0700 (PDT) Received: from localhost.localdomain (cpe-24-28-70-239.austin.res.rr.com. [24.28.70.239]) by smtp.gmail.com with ESMTPSA id kp7sm7375667oeb.14.2015.08.07.20.03.28 (version=TLSv1.2 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Fri, 07 Aug 2015 20:03:29 -0700 (PDT) From: Bill Fischofer To: lng-odp@lists.linaro.org Date: Fri, 7 Aug 2015 22:03:12 -0500 Message-Id: <1439002992-29285-15-git-send-email-bill.fischofer@linaro.org> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1439002992-29285-1-git-send-email-bill.fischofer@linaro.org> References: <1439002992-29285-1-git-send-email-bill.fischofer@linaro.org> X-Topics: patch Subject: [lng-odp] [API-NEXT PATCHv10 14/14] linux-generic: queue: add ordered support for pktout queues X-BeenThere: lng-odp@lists.linaro.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: , List-Help: , List-Subscribe: , MIME-Version: 1.0 Errors-To: lng-odp-bounces@lists.linaro.org Sender: "lng-odp" X-Removed-Original-Auth: Dkim didn't pass. X-Original-Sender: bill.fischofer@linaro.org X-Original-Authentication-Results: mx.google.com; spf=pass (google.com: domain of patch+caf_=patchwork-forward=linaro.org@linaro.org designates 209.85.217.170 as permitted sender) smtp.mail=patch+caf_=patchwork-forward=linaro.org@linaro.org Mailing-list: list patchwork-forward@linaro.org; contact patchwork-forward+owners@linaro.org X-Google-Group-Id: 836684582541 Signed-off-by: Bill Fischofer --- .../linux-generic/include/odp_buffer_internal.h | 2 +- .../linux-generic/include/odp_queue_internal.h | 116 ++++++++ platform/linux-generic/odp_queue.c | 308 ++++++++++++++------- 3 files changed, 320 insertions(+), 106 deletions(-) diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index ddd2642..6badeba 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -108,7 +108,7 @@ typedef union queue_entry_u queue_entry_t; /* Common buffer header */ typedef struct odp_buffer_hdr_t { - struct odp_buffer_hdr_t *next; /* next buf in a list */ + struct odp_buffer_hdr_t *next; /* next buf in a list--keep 1st */ union { /* Multi-use secondary link */ struct odp_buffer_hdr_t *prev; struct odp_buffer_hdr_t *link; diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index aa36df5..66aa887 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include #define USE_TICKETLOCK @@ -99,6 +100,10 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue); int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); +int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); +int queue_pktout_enq_multi(queue_entry_t *queue, + odp_buffer_hdr_t *buf_hdr[], int num); + int queue_enq_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); int queue_enq_multi_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); @@ -143,6 +148,117 @@ static inline int queue_prio(queue_entry_t *qe) return qe->s.param.sched.prio; } +static inline void reorder_enq(queue_entry_t *queue, + queue_entry_t *origin_qe, + odp_buffer_hdr_t *buf_hdr) +{ + odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head; + odp_buffer_hdr_t *reorder_prev = + (odp_buffer_hdr_t *)&origin_qe->s.reorder_head; + + while (reorder_buf && buf_hdr->order >= reorder_buf->order) { + reorder_prev = reorder_buf; + reorder_buf = reorder_buf->next; + } + + buf_hdr->next = reorder_buf; + reorder_prev->next = buf_hdr; + + if (!reorder_buf) + origin_qe->s.reorder_tail = buf_hdr; + + buf_hdr->target_qe = queue; +} + +static inline void order_release(queue_entry_t *origin_qe, int count) +{ + origin_qe->s.order_out += count; + odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, count); +} + +static inline void reorder_deq(queue_entry_t *queue, + queue_entry_t *origin_qe, + odp_buffer_hdr_t **reorder_buf_return, + odp_buffer_hdr_t **reorder_prev_return, + odp_buffer_hdr_t **placeholder_buf_return, + uint32_t *release_count_return, + uint32_t *placeholder_count_return) +{ + odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head; + odp_buffer_hdr_t *reorder_prev = NULL; + odp_buffer_hdr_t *placeholder_buf = NULL; + odp_buffer_hdr_t *next_buf; + uint32_t release_count = 0; + uint32_t placeholder_count = 0; + + while (reorder_buf && + reorder_buf->order <= origin_qe->s.order_out + + release_count + placeholder_count) { + /* + * Elements on the reorder list fall into one of + * three categories: + * + * 1. Those destined for the same queue. These + * can be enq'd now if they were waiting to + * be unblocked by this enq. + * + * 2. Those representing placeholders for events + * whose ordering was released by a prior + * odp_schedule_release_ordered() call. These + * can now just be freed. + * + * 3. Those representing events destined for another + * queue. These cannot be consolidated with this + * enq since they have a different target. + * + * Detecting an element with an order sequence gap, an + * element in category 3, or running out of elements + * stops the scan. + */ + next_buf = reorder_buf->next; + + if (odp_likely(reorder_buf->target_qe == queue)) { + /* promote any chain */ + odp_buffer_hdr_t *reorder_link = + reorder_buf->link; + + if (reorder_link) { + reorder_buf->next = reorder_link; + reorder_buf->link = NULL; + while (reorder_link->next) + reorder_link = reorder_link->next; + reorder_link->next = next_buf; + reorder_prev = reorder_link; + } else { + reorder_prev = reorder_buf; + } + + if (!reorder_buf->flags.sustain) + release_count++; + reorder_buf = next_buf; + } else if (!reorder_buf->target_qe) { + if (reorder_prev) + reorder_prev->next = next_buf; + else + origin_qe->s.reorder_head = next_buf; + + reorder_buf->next = placeholder_buf; + placeholder_buf = reorder_buf; + + reorder_buf = next_buf; + placeholder_count++; + } else { + break; + } + } + + *reorder_buf_return = reorder_buf; + *reorder_prev_return = reorder_prev; + *placeholder_buf_return = placeholder_buf; + *release_count_return = release_count; + *placeholder_count_return = placeholder_count; +} + void queue_destroy_finalize(queue_entry_t *qe); #ifdef __cplusplus diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 2d999aa..674717a 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -77,9 +77,9 @@ static void queue_init(queue_entry_t *queue, const char *name, queue->s.dequeue_multi = pktin_deq_multi; break; case ODP_QUEUE_TYPE_PKTOUT: - queue->s.enqueue = pktout_enqueue; + queue->s.enqueue = queue_pktout_enq; queue->s.dequeue = pktout_dequeue; - queue->s.enqueue_multi = pktout_enq_multi; + queue->s.enqueue_multi = queue_pktout_enq_multi; queue->s.dequeue_multi = pktout_deq_multi; break; default: @@ -369,34 +369,7 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) /* We can only complete the enq if we're in order */ if (origin_qe) { if (buf_hdr->order > origin_qe->s.order_out) { - odp_buffer_hdr_t *reorder_buf = - origin_qe->s.reorder_head; - - if (!reorder_buf) { - buf_hdr->next = NULL; - origin_qe->s.reorder_head = buf_hdr; - origin_qe->s.reorder_tail = buf_hdr; - } else { - odp_buffer_hdr_t *reorder_prev = NULL; - - while (buf_hdr->order >= reorder_buf->order) { - reorder_prev = reorder_buf; - reorder_buf = reorder_buf->next; - if (!reorder_buf) - break; - } - - buf_hdr->next = reorder_buf; - if (reorder_prev) - reorder_prev->next = buf_hdr; - else - origin_qe->s.reorder_head = buf_hdr; - - if (!reorder_buf) - origin_qe->s.reorder_tail = buf_hdr; - } - - buf_hdr->target_qe = queue; + reorder_enq(queue, origin_qe, buf_hdr); /* This enq can't complete until order is restored, so * we're done here. @@ -407,10 +380,8 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) } /* We're in order, so account for this and proceed with enq */ - if (!buf_hdr->flags.sustain) { - origin_qe->s.order_out++; - odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out); - } + if (!buf_hdr->flags.sustain) + order_release(origin_qe, 1); /* if this element is linked, restore the linked chain */ buf_tail = buf_hdr->link; @@ -450,74 +421,16 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) * enq has unblocked other buffers in the origin's reorder queue. */ if (origin_qe) { - odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head; - odp_buffer_hdr_t *reorder_prev = NULL; - odp_buffer_hdr_t *placeholder_buf = NULL; + odp_buffer_hdr_t *reorder_buf; odp_buffer_hdr_t *next_buf; - uint32_t release_count = 0; - uint32_t placeholder_count = 0; - - while (reorder_buf && - reorder_buf->order <= origin_qe->s.order_out + - release_count + placeholder_count) { - /* - * Elements on the reorder list fall into one of - * three categories: - * - * 1. Those destined for the same queue. These - * can be enq'd now if they were waiting to - * be unblocked by this enq. - * - * 2. Those representing placeholders for events - * whose ordering was released by a prior - * odp_schedule_release_ordered() call. These - * can now just be freed. - * - * 3. Those representing events destined for another - * queue. These cannot be consolidated with this - * enq since they have a different target. - * - * Detecting an element with an order sequence gap, an - * element in category 3, or running out of elements - * stops the scan. - */ - next_buf = reorder_buf->next; - - if (odp_likely(reorder_buf->target_qe == queue)) { - /* promote any chain */ - odp_buffer_hdr_t *reorder_link = - reorder_buf->link; - - if (reorder_link) { - reorder_buf->next = reorder_link; - reorder_buf->link = NULL; - while (reorder_link->next) - reorder_link = - reorder_link->next; - reorder_link->next = next_buf; - reorder_prev = reorder_link; - } else { - reorder_prev = reorder_buf; - } - - if (!reorder_buf->flags.sustain) - release_count++; - reorder_buf = next_buf; - } else if (!reorder_buf->target_qe) { - if (reorder_prev) - reorder_prev->next = next_buf; - else - origin_qe->s.reorder_head = next_buf; - - reorder_buf->next = placeholder_buf; - placeholder_buf = reorder_buf; - - reorder_buf = next_buf; - placeholder_count++; - } else { - break; - } - } + odp_buffer_hdr_t *reorder_prev; + odp_buffer_hdr_t *placeholder_buf; + uint32_t release_count; + uint32_t placeholder_count; + + reorder_deq(queue, origin_qe, + &reorder_buf, &reorder_prev, &placeholder_buf, + &release_count, &placeholder_count); /* Add released buffers to the queue as well */ if (release_count > 0) { @@ -528,18 +441,18 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) } /* Reflect the above two in the output sequence */ - origin_qe->s.order_out += release_count + placeholder_count; - odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, - release_count + placeholder_count); + order_release(origin_qe, release_count + placeholder_count); /* Now handle any unblocked buffers destined for other queues */ UNLOCK(&queue->s.lock); + if (reorder_buf && reorder_buf->order <= origin_qe->s.order_out) origin_qe->s.reorder_head = reorder_buf->next; else reorder_buf = NULL; UNLOCK(&origin_qe->s.lock); + if (reorder_buf) odp_queue_enq(reorder_buf->target_qe->s.handle, (odp_event_t)reorder_buf->handle.handle); @@ -547,7 +460,7 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) /* Free all placeholder bufs that are now released */ while (placeholder_buf) { next_buf = placeholder_buf->next; - odp_buffer_free(buf_hdr->handle.handle); + odp_buffer_free(placeholder_buf->handle.handle); placeholder_buf = next_buf; } } else { @@ -799,6 +712,191 @@ odp_event_t odp_queue_deq(odp_queue_t handle) return ODP_EVENT_INVALID; } +int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) +{ + queue_entry_t *origin_qe = buf_hdr->origin_qe; + int rc, sustain; + + /* Special processing needed only if we came from an ordered queue */ + if (!origin_qe) + return pktout_enqueue(queue, buf_hdr); + + /* Must lock origin_qe for ordered processing */ + LOCK(&origin_qe->s.lock); + if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { + UNLOCK(&origin_qe->s.lock); + ODP_ERR("Bad origin queue status\n"); + return -1; + } + + /* We can only complete the enq if we're in order */ + if (buf_hdr->order > origin_qe->s.order_out) { + reorder_enq(queue, origin_qe, buf_hdr); + + /* This enq can't complete until order is restored, so + * we're done here. + */ + UNLOCK(&origin_qe->s.lock); + return 0; + } + + /* Perform our enq since we're in order. + * Note: Don't hold the origin_qe lock across an I/O operation! + * Note that we also cache the sustain flag since the buffer may + * be freed by the I/O operation so we can't reference it afterwards. + */ + UNLOCK(&origin_qe->s.lock); + sustain = buf_hdr->flags.sustain; + + /* Handle any chained buffers (internal calls) */ + if (buf_hdr->link) { + odp_buffer_hdr_t *buf_hdrs[QUEUE_MULTI_MAX]; + odp_buffer_hdr_t *next_buf; + int num = 0; + + next_buf = buf_hdr->link; + buf_hdr->link = NULL; + + while (next_buf) { + buf_hdrs[num++] = next_buf; + next_buf = next_buf->next; + } + + rc = pktout_enq_multi(queue, buf_hdrs, num); + if (rc < num) + return -1; + } else { + rc = pktout_enqueue(queue, buf_hdr); + if (!rc) + return rc; + } + + /* Reacquire the lock following the I/O send. Note that we're still + * guaranteed to be in order here since we haven't released + * order yet. + */ + LOCK(&origin_qe->s.lock); + if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { + UNLOCK(&origin_qe->s.lock); + ODP_ERR("Bad origin queue status\n"); + return -1; + } + + /* Account for this ordered enq */ + if (!sustain) + order_release(origin_qe, 1); + + /* Now check to see if our successful enq has unblocked other buffers + * in the origin's reorder queue. + */ + odp_buffer_hdr_t *reorder_buf; + odp_buffer_hdr_t *next_buf; + odp_buffer_hdr_t *reorder_prev; + odp_buffer_hdr_t *xmit_buf; + odp_buffer_hdr_t *placeholder_buf; + uint32_t release_count; + uint32_t placeholder_count; + + reorder_deq(queue, origin_qe, + &reorder_buf, &reorder_prev, &placeholder_buf, + &release_count, &placeholder_count); + + /* Send released buffers as well */ + if (release_count > 0) { + xmit_buf = origin_qe->s.reorder_head; + origin_qe->s.reorder_head = reorder_prev->next; + reorder_prev->next = NULL; + UNLOCK(&origin_qe->s.lock); + + do { + next_buf = xmit_buf->next; + pktout_enqueue(queue, xmit_buf); + xmit_buf = next_buf; + } while (xmit_buf); + + /* Reacquire the origin_qe lock to continue */ + LOCK(&origin_qe->s.lock); + if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { + UNLOCK(&origin_qe->s.lock); + ODP_ERR("Bad origin queue status\n"); + return -1; + } + } + + /* Update the order sequence to reflect the deq'd elements */ + order_release(origin_qe, release_count + placeholder_count); + + /* Now handle sends to other queues that are ready to go */ + if (reorder_buf && reorder_buf->order <= origin_qe->s.order_out) + origin_qe->s.reorder_head = reorder_buf->next; + else + reorder_buf = NULL; + + /* We're fully done with the origin_qe at last */ + UNLOCK(&origin_qe->s.lock); + + /* Now send the next buffer to its target queue */ + if (reorder_buf) + odp_queue_enq(reorder_buf->target_qe->s.handle, + (odp_event_t)reorder_buf->handle.handle); + + /* Free all placeholder bufs that are now released */ + while (placeholder_buf) { + next_buf = placeholder_buf->next; + odp_buffer_free(placeholder_buf->handle.handle); + placeholder_buf = next_buf; + } + + return 0; +} + +int queue_pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + int i, rc, ret_count = 0; + int ordered_head[num]; + int ordered_count = 0; + + /* Identify ordered chains in the input buffer list */ + for (i = 0; i < num; i++) { + if (buf_hdr[i]->origin_qe) + ordered_head[ordered_count++] = i; + + buf_hdr[i]->next = i < num - 1 ? buf_hdr[i + 1] : NULL; + } + + ret_count = ordered_count ? ordered_head[0] : num; + + /* Handle regular enq's at start of list */ + if (ret_count) { + rc = pktout_enq_multi(queue, buf_hdr, ret_count); + if (rc < ret_count) + return rc; + } + + /* Handle ordered chains in the list */ + for (i = 0; i < ordered_count; i++) { + int eol = i < ordered_count - 1 ? ordered_head[i + 1] : num; + int list_count = eol - i; + + if (i < ordered_count - 1) + buf_hdr[eol - 1]->next = NULL; + + buf_hdr[ordered_head[i]]->link = + list_count > 1 ? buf_hdr[ordered_head[i] + 1] : NULL; + + rc = queue_pktout_enq(queue, buf_hdr[ordered_head[i]]); + if (rc < 0) + return ret_count; + + if (rc < list_count) + return ret_count + rc; + + ret_count += rc; + } + + return ret_count; +} void queue_lock(queue_entry_t *queue) {