diff mbox

[API-NEXT,PATCHv8,09/13] linux-generic: queue: add ordered chain enq support

Message ID 1438815321-12344-10-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer Aug. 5, 2015, 10:55 p.m. UTC
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../linux-generic/include/odp_buffer_internal.h    |   4 +
 platform/linux-generic/odp_queue.c                 | 151 +++++++++++++++------
 2 files changed, 115 insertions(+), 40 deletions(-)

Comments

Bill Fischofer Aug. 6, 2015, 12:35 p.m. UTC | #1
On Thu, Aug 6, 2015 at 2:58 AM, Jianbo Liu <Jianbo.Liu@arm.com> wrote:

>
>
> > -----Original Message-----
> > From: lng-odp [mailto:lng-odp-bounces@lists.linaro.org] On Behalf Of
> Bill
> > Fischofer
> > Sent: Thursday, August 06, 2015 6:55 AM
> > To: lng-odp@lists.linaro.org
> > Subject: [lng-odp] [API-NEXT PATCHv8 09/13] linux-generic: queue: add
> > ordered chain enq support
> >
> > Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
> > ---
> >  .../linux-generic/include/odp_buffer_internal.h    |   4 +
> >  platform/linux-generic/odp_queue.c                 | 151
> +++++++++++++++------
> >  2 files changed, 115 insertions(+), 40 deletions(-)
> >
> > diff --git a/platform/linux-generic/include/odp_buffer_internal.h
> > b/platform/linux-generic/include/odp_buffer_internal.h
> > index c459fce..30f061e 100644
> > --- a/platform/linux-generic/include/odp_buffer_internal.h
> > +++ b/platform/linux-generic/include/odp_buffer_internal.h
> .................
>
> > @@ -523,53 +555,89 @@ int queue_enq(queue_entry_t *queue,
> > odp_buffer_hdr_t *buf_hdr)
> >  int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t
> > *buf_hdr[], int num)
> >  {
> >       int sched = 0;
> > -     int i, j;
> > +     int i, rc, ret_count = 0;
> > +     int ordered_head[num];
> > +     int ordered_count = 0;
> >       odp_buffer_hdr_t *tail;
> >
> > +     /* Identify ordered chains in the input buffer list */
> >       for (i = 0; i < num; i++) {
> > -             /* If any buffer is coming from an ordered queue, enqueue
> > them
> > -              * individually since in the general case each might
> originate
> > -              * from a different ordered queue.  If any of these fail,
> the
> > -              * return code tells the caller how many succeeded.
> > -              */
> > -             if (buf_hdr[i]->origin_qe) {
> > -                     for (j = 0; j < num; j++) {
> > -                             if (queue_enq(queue, buf_hdr[j]))
> > -                                     return j;
> > -                     }
> > -                     return num;
> > +             if (buf_hdr[i]->origin_qe)
> > +                     ordered_head[ordered_count++] = i;
> > +             if (i < num - 1)
> > +                     buf_hdr[i]->next = buf_hdr[i + 1];
> > +     }
> > +
> > +     buf_hdr[num - 1]->next = NULL;
> > +
> > +     if (ordered_count) {
> > +             if (ordered_head[0] > 0) {
> > +                     tail = buf_hdr[ordered_head[0] - 1];
> > +                     tail->next = NULL;
> > +                     ret_count = ordered_head[0];
> > +             } else {
> > +                     tail = NULL;
> > +                     ret_count = 0;
> >               }
> > -             buf_hdr[i]->next = i == num - 1 ? NULL : buf_hdr[i + 1];
> > +     } else {
> > +             tail = buf_hdr[num - 1];
> > +             ret_count = num;
> >       }
> >
> > -     tail = buf_hdr[num-1];
> > +     /* Handle regular enq's at start of list */
> > +     if (tail) {
> > +             tail->next = NULL;
>
> The above line is not necessary since tail->next is already NULL.
>

Good catch.

>
> > +             LOCK(&queue->s.lock);
> > +             if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY))
> > {
> > +                     UNLOCK(&queue->s.lock);
> > +                     ODP_ERR("Bad queue status\n");
> > +                     return -1;
> > +             }
> >
> > -     LOCK(&queue->s.lock);
> > -     if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
> > +             /* Empty queue */
> > +             if (queue->s.head)
> > +                     queue->s.tail->next = buf_hdr[0];
> > +             else
> > +                     queue->s.head = buf_hdr[0];
> > +
> > +             queue->s.tail = tail;
> > +
> > +             if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
> > +                     queue->s.status = QUEUE_STATUS_SCHED;
> > +                     sched = 1; /* retval: schedule queue */
> > +             }
> >               UNLOCK(&queue->s.lock);
> > -             ODP_ERR("Bad queue status\n");
> > -             return -1;
> > +
> > +             /* Add queue to scheduling */
> > +             if (sched && schedule_queue(queue))
> > +                     ODP_ABORT("schedule_queue failed\n");
> >       }
> >
> > -     /* Empty queue */
> > -     if (queue->s.head == NULL)
> > -             queue->s.head = buf_hdr[0];
> > -     else
> > -             queue->s.tail->next = buf_hdr[0];
> > +     /* Handle ordered chains in the list */
> > +     for (i = 0; i < ordered_count; i++) {
> > +             int eol = i < ordered_count - 1 ? ordered_head[i + 1] :
> num;
> > +             int list_count = eol - i;
> >
> > -     queue->s.tail = tail;
> > +             if (i < ordered_count - 1)
> > +                     buf_hdr[ordered_head[i + 1] - 1]->next = NULL;
>
> Is it better to use eol instead of ordered_head[i + 1]?
>

The compiler should optimize this, but agree it might be cleaner.

>
> >
> > -     if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
> > -             queue->s.status = QUEUE_STATUS_SCHED;
> > -             sched = 1; /* retval: schedule queue */
> > -     }
> > -     UNLOCK(&queue->s.lock);
> > +             if (ordered_head[i] < eol - 1)
> > +                     buf_hdr[ordered_head[i]]->link =
> > +                             buf_hdr[ordered_head[i] + 1];
> > +             else
> > +                     buf_hdr[ordered_head[i]]->link = NULL;
> >
> > -     /* Add queue to scheduling */
> > -     if (sched && schedule_queue(queue))
> > -             ODP_ABORT("schedule_queue failed\n");
> > +             rc = queue_enq(queue, buf_hdr[ordered_head[i]]);
> > +             if (rc < 0)
> > +                     return ret_count;
> > +
> > +             if (rc < list_count)
> > +                     return ret_count + rc;
> >
> > -     return num; /* All events enqueued */
> > +             ret_count += rc;
> > +     }
> > +
> > +     return ret_count;
> >  }
> >
> >  int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[],
> > int num)
> > @@ -598,6 +666,9 @@ int odp_queue_enq(odp_queue_t handle,
> > odp_event_t ev)
> >       queue   = queue_to_qentry(handle);
> >       buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
> >
> > +     /* No chains via this entry */
> > +     buf_hdr->link = NULL;
> > +
> >       return queue->s.enqueue(queue, buf_hdr);
> >  }
> >
> > --
> > 2.1.4
> >
> > _______________________________________________
> > lng-odp mailing list
> > lng-odp@lists.linaro.org
> > https://lists.linaro.org/mailman/listinfo/lng-odp
>
> -- IMPORTANT NOTICE: The contents of this email and any attachments are
> confidential and may also be privileged. If you are not the intended
> recipient, please notify the sender immediately and do not disclose the
> contents to any other person, use it for any purpose, or store or copy the
> information in any medium.  Thank you.
>
> ARM Limited, Registered office 110 Fulbourn Road, Cambridge CB1 9NJ,
> Registered in England & Wales, Company No:  2557590
> ARM Holdings plc, Registered office 110 Fulbourn Road, Cambridge CB1 9NJ,
> Registered in England & Wales, Company No:  2548782
>
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index c459fce..30f061e 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -109,6 +109,10 @@  typedef union queue_entry_u queue_entry_t;
 /* Common buffer header */
 typedef struct odp_buffer_hdr_t {
 	struct odp_buffer_hdr_t *next;       /* next buf in a list */
+	union {                              /* Multi-use secondary link */
+		struct odp_buffer_hdr_t *prev;
+		struct odp_buffer_hdr_t *link;
+	};
 	odp_buffer_bits_t        handle;     /* handle */
 	union {
 		uint32_t all;
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index a2460e7..e56da71 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -336,6 +336,7 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 {
 	int sched = 0;
 	queue_entry_t *origin_qe = buf_hdr->origin_qe;
+	odp_buffer_hdr_t *buf_tail;
 
 	/* Need two locks for enq operations from ordered queues */
 	if (origin_qe) {
@@ -404,17 +405,33 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 
 		/* We're in order, so account for this and proceed with enq */
 		origin_qe->s.order_out++;
+
+		/* if this element is linked, restore the linked chain */
+		buf_tail = buf_hdr->link;
+
+		if (buf_tail) {
+			buf_hdr->next = buf_tail;
+			buf_hdr->link = NULL;
+
+			/* find end of the chain */
+			while (buf_tail->next)
+				buf_tail = buf_tail->next;
+		} else {
+			buf_tail = buf_hdr;
+		}
+	} else {
+		buf_tail = buf_hdr;
 	}
 
-	if (queue->s.head == NULL) {
+	if (!queue->s.head) {
 		/* Empty queue */
 		queue->s.head = buf_hdr;
-		queue->s.tail = buf_hdr;
-		buf_hdr->next = NULL;
+		queue->s.tail = buf_tail;
+		buf_tail->next = NULL;
 	} else {
 		queue->s.tail->next = buf_hdr;
-		queue->s.tail = buf_hdr;
-		buf_hdr->next = NULL;
+		queue->s.tail = buf_tail;
+		buf_tail->next = NULL;
 	}
 
 	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
@@ -461,8 +478,23 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 			next_buf = reorder_buf->next;
 
 			if (odp_likely(reorder_buf->target_qe == queue)) {
-				reorder_prev = reorder_buf;
-				reorder_buf  = next_buf;
+				/* promote any chain */
+				odp_buffer_hdr_t *reorder_link =
+					reorder_buf->link;
+
+				if (reorder_link) {
+					reorder_buf->next = reorder_link;
+					reorder_buf->link = NULL;
+					while (reorder_link->next)
+						reorder_link =
+							reorder_link->next;
+					reorder_link->next = next_buf;
+					reorder_prev = reorder_link;
+				} else {
+					reorder_prev = reorder_buf;
+				}
+
+				reorder_buf = next_buf;
 				release_count++;
 			} else if (!reorder_buf->target_qe) {
 				if (reorder_prev)
@@ -523,53 +555,89 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 {
 	int sched = 0;
-	int i, j;
+	int i, rc, ret_count = 0;
+	int ordered_head[num];
+	int ordered_count = 0;
 	odp_buffer_hdr_t *tail;
 
+	/* Identify ordered chains in the input buffer list */
 	for (i = 0; i < num; i++) {
-		/* If any buffer is coming from an ordered queue, enqueue them
-		 * individually since in the general case each might originate
-		 * from a different ordered queue.  If any of these fail, the
-		 * return code tells the caller how many succeeded.
-		 */
-		if (buf_hdr[i]->origin_qe) {
-			for (j = 0; j < num; j++) {
-				if (queue_enq(queue, buf_hdr[j]))
-					return j;
-			}
-			return num;
+		if (buf_hdr[i]->origin_qe)
+			ordered_head[ordered_count++] = i;
+		if (i < num - 1)
+			buf_hdr[i]->next = buf_hdr[i + 1];
+	}
+
+	buf_hdr[num - 1]->next = NULL;
+
+	if (ordered_count) {
+		if (ordered_head[0] > 0) {
+			tail = buf_hdr[ordered_head[0] - 1];
+			tail->next = NULL;
+			ret_count = ordered_head[0];
+		} else {
+			tail = NULL;
+			ret_count = 0;
 		}
-		buf_hdr[i]->next = i == num - 1 ? NULL : buf_hdr[i + 1];
+	} else {
+		tail = buf_hdr[num - 1];
+		ret_count = num;
 	}
 
-	tail = buf_hdr[num-1];
+	/* Handle regular enq's at start of list */
+	if (tail) {
+		tail->next = NULL;
+		LOCK(&queue->s.lock);
+		if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+			UNLOCK(&queue->s.lock);
+			ODP_ERR("Bad queue status\n");
+			return -1;
+		}
 
-	LOCK(&queue->s.lock);
-	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+		/* Empty queue */
+		if (queue->s.head)
+			queue->s.tail->next = buf_hdr[0];
+		else
+			queue->s.head = buf_hdr[0];
+
+		queue->s.tail = tail;
+
+		if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
+			queue->s.status = QUEUE_STATUS_SCHED;
+			sched = 1; /* retval: schedule queue */
+		}
 		UNLOCK(&queue->s.lock);
-		ODP_ERR("Bad queue status\n");
-		return -1;
+
+		/* Add queue to scheduling */
+		if (sched && schedule_queue(queue))
+			ODP_ABORT("schedule_queue failed\n");
 	}
 
-	/* Empty queue */
-	if (queue->s.head == NULL)
-		queue->s.head = buf_hdr[0];
-	else
-		queue->s.tail->next = buf_hdr[0];
+	/* Handle ordered chains in the list */
+	for (i = 0; i < ordered_count; i++) {
+		int eol = i < ordered_count - 1 ? ordered_head[i + 1] : num;
+		int list_count = eol - i;
 
-	queue->s.tail = tail;
+		if (i < ordered_count - 1)
+			buf_hdr[ordered_head[i + 1] - 1]->next = NULL;
 
-	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
-		queue->s.status = QUEUE_STATUS_SCHED;
-		sched = 1; /* retval: schedule queue */
-	}
-	UNLOCK(&queue->s.lock);
+		if (ordered_head[i] < eol - 1)
+			buf_hdr[ordered_head[i]]->link =
+				buf_hdr[ordered_head[i] + 1];
+		else
+			buf_hdr[ordered_head[i]]->link = NULL;
 
-	/* Add queue to scheduling */
-	if (sched && schedule_queue(queue))
-		ODP_ABORT("schedule_queue failed\n");
+		rc = queue_enq(queue, buf_hdr[ordered_head[i]]);
+		if (rc < 0)
+			return ret_count;
+
+		if (rc < list_count)
+			return ret_count + rc;
 
-	return num; /* All events enqueued */
+		ret_count += rc;
+	}
+
+	return ret_count;
 }
 
 int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
@@ -598,6 +666,9 @@  int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
 	queue   = queue_to_qentry(handle);
 	buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
 
+	/* No chains via this entry */
+	buf_hdr->link = NULL;
+
 	return queue->s.enqueue(queue, buf_hdr);
 }