diff mbox

[API-NEXT,PATCHv3,6/6] linux-generic: schedule: implement odp_schedule_release_ordered()

Message ID 1438310507-10750-7-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer July 31, 2015, 2:41 a.m. UTC
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 platform/linux-generic/odp_queue.c | 169 +++++++++++++++++++++++++++++++++----
 1 file changed, 152 insertions(+), 17 deletions(-)

Comments

Maxim Uvarov July 31, 2015, 2:48 p.m. UTC | #1
On 07/31/15 05:41, Bill Fischofer wrote:
> Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
> ---
>   platform/linux-generic/odp_queue.c | 169 +++++++++++++++++++++++++++++++++----
>   1 file changed, 152 insertions(+), 17 deletions(-)
>
> diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
> index 4d0e1b4..a2460e7 100644
> --- a/platform/linux-generic/odp_queue.c
> +++ b/platform/linux-generic/odp_queue.c
> @@ -14,6 +14,7 @@
>   #include <odp_buffer_inlines.h>
>   #include <odp_internal.h>
>   #include <odp/shared_memory.h>
> +#include <odp/schedule.h>
>   #include <odp_schedule_internal.h>
>   #include <odp/config.h>
>   #include <odp_packet_io_internal.h>
> @@ -401,6 +402,7 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
>   			return 0;
>   		}
>   
> +		/* We're in order, so account for this and proceed with enq */
>   		origin_qe->s.order_out++;
>   	}
>   
> @@ -426,16 +428,56 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
>   	 */
>   	if (origin_qe) {
>   		odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
> -		odp_buffer_hdr_t *reorder_prev;
> +		odp_buffer_hdr_t *reorder_prev = NULL;
> +		odp_buffer_hdr_t *placeholder_buf = NULL;
> +		odp_buffer_hdr_t *next_buf;
>   		uint32_t          release_count = 0;
> +		uint32_t          placeholder_count = 0;
>   
>   		while (reorder_buf &&
> -		       reorder_buf->target_qe == queue &&
> -		       reorder_buf->order <=
> -		       origin_qe->s.order_out + release_count) {
> -			release_count++;
> -			reorder_prev = reorder_buf;
> -			reorder_buf  = reorder_buf->next;
> +		       reorder_buf->order <= origin_qe->s.order_out +
> +		       release_count + placeholder_count) {
> +			/*
> +			 * Elements on the reorder list fall into one of
> +			 * three categories:
> +			 *
> +			 * 1. Those destined for the same queue.  These
> +			 *    can be enq'd now if they were waiting to
> +			 *    be unblocked by this enq.
> +			 *
> +			 * 2. Those representing placeholders for events
> +			 *    whose ordering was released by a prior
> +			 *    odp_schedule_release_ordered() call.  These
> +			 *    can now just be freed.
> +			 *
> +			 * 3. Those representing events destined for another
> +			 *    queue. These cannot be consolidated with this
> +			 *    enq since they have a different target.
> +			 *
> +			 * Detecting an element with an order sequence gap, an
> +			 * element in category 3, or running out of elements
> +			 * stops the scan.
> +			 */
> +			next_buf = reorder_buf->next;
> +
> +			if (odp_likely(reorder_buf->target_qe == queue)) {
> +				reorder_prev = reorder_buf;
> +				reorder_buf  = next_buf;
> +				release_count++;
> +			} else if (!reorder_buf->target_qe) {
> +				if (reorder_prev)
> +					reorder_prev->next = next_buf;
> +				else
> +					origin_qe->s.reorder_head = next_buf;
> +
> +				reorder_buf->next = placeholder_buf;
> +				placeholder_buf = reorder_buf;
> +
> +				reorder_buf = next_buf;
> +				placeholder_count++;
> +			} else {
> +				break;
> +			}
>   		}
>   
>   		/* Add released buffers to the queue as well */
> @@ -444,19 +486,28 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
>   			queue->s.tail             = reorder_prev;
>   			origin_qe->s.reorder_head = reorder_prev->next;
>   			reorder_prev->next        = NULL;
> -			origin_qe->s.order_out   += release_count;
>   		}
>   
> -		/* Now handle unblocked buffers destined for other queues */
> +		/* Reflect the above two in the output sequence */
> +		origin_qe->s.order_out += release_count + placeholder_count;
> +
> +		/* Now handle any unblocked buffers destined for other queues */
> +		UNLOCK(&queue->s.lock);
>   		if (reorder_buf &&
> -		    reorder_buf->order <= origin_qe->s.order_out) {
> -			UNLOCK(&origin_qe->s.lock);
> -			UNLOCK(&queue->s.lock);
> -			if (schedule_enq(reorder_buf->target_qe, origin_qe))
> -				ODP_ABORT("schedule_enq failed\n");
> -		} else {
> -			UNLOCK(&origin_qe->s.lock);
> -			UNLOCK(&queue->s.lock);
> +		    reorder_buf->order <= origin_qe->s.order_out)
> +			origin_qe->s.reorder_head = reorder_buf->next;
> +		else
> +			reorder_buf = NULL;
> +		UNLOCK(&origin_qe->s.lock);
> +		if (reorder_buf)
> +			odp_queue_enq(reorder_buf->target_qe->s.handle,
> +				      (odp_event_t)reorder_buf->handle.handle);
> +
> +		/* Free all placeholder bufs that are now released */
> +		while (placeholder_buf) {
> +			next_buf = placeholder_buf->next;
> +			odp_buffer_free(buf_hdr->handle.handle);
> +			placeholder_buf = next_buf;
>   		}
>   	} else {
>   		UNLOCK(&queue->s.lock);
> @@ -685,3 +736,87 @@ void odp_queue_param_init(odp_queue_param_t *params)
>   {
>   	memset(params, 0, sizeof(odp_queue_param_t));
>   }
> +
> +/* This routine exists here rather than in odp_schedule
> + * because it operates on queue interenal structures
> + */
> +int odp_schedule_release_ordered(odp_event_t ev)
> +{
> +	odp_buffer_t placeholder_buf;
> +	odp_buffer_hdr_t *placeholder_buf_hdr, *reorder_buf, *reorder_prev;
placeholder_buf_hdr looks too long name. How about change it to tmp_buf 
or simple buf?
> +	odp_buffer_hdr_t *buf_hdr =
> +		odp_buf_to_hdr(odp_buffer_from_event(ev));
> +	queue_entry_t *origin_qe = buf_hdr->origin_qe;
> +
> +	/* Can't release if we didn't originate from an ordered queue */
> +	if (!origin_qe)
> +		return -1;
> +
> +	LOCK(&origin_qe->s.lock);
> +
> +	/* If we are the first or second element beyond the next in order,
> +	 * we can release immediately since there can be no confusion about
> +	 * intermediate elements
> +	 */
> +	if (buf_hdr->order <= origin_qe->s.order_out + 1) {
> +		buf_hdr->origin_qe = NULL;
> +		origin_qe->s.order_out++;
> +
> +		/* check if this release allows us to unblock waiters */
> +		reorder_buf = origin_qe->s.reorder_head;
> +		if (reorder_buf &&
> +		    reorder_buf->order <= origin_qe->s.order_out)
> +			origin_qe->s.reorder_head = reorder_buf->next;
> +		else
> +			reorder_buf = NULL;
> +		UNLOCK(&origin_qe->s.lock);
> +		if (reorder_buf)
> +			odp_queue_enq(reorder_buf->target_qe->s.handle,
> +				      (odp_event_t)reorder_buf->handle.handle);
> +		return 0;
> +	}
> +
> +	/* If we are beyond the second element in the expected order, we need
> +	 * a placeholder to represent our "place in line". Just use an element
> +	 * from the same pool the buffer being released is from.
> +	 */
> +	placeholder_buf = odp_buffer_alloc(buf_hdr->pool_hdl);
> +
> +	/* Can't release if no placeholder is available */
> +	if (placeholder_buf == ODP_BUFFER_INVALID) {
> +		UNLOCK(&origin_qe->s.lock);
> +		return -1;
> +	}
> +
> +	placeholder_buf_hdr = odp_buf_to_hdr(placeholder_buf);
> +	reorder_buf = origin_qe->s.reorder_head;
> +
Just  style comment. You can avoid of many <tabs> in your code which 
should make it more readable.

> +	if (!reorder_buf) {
> +		placeholder_buf_hdr->next = NULL;
> +		origin_qe->s.reorder_head = placeholder_buf_hdr;
> +		origin_qe->s.reorder_tail = placeholder_buf_hdr;

placeholder_buf_hdr->target_qe = NULL;
UNLOCK(&origin_qe->s.lock);
return 0;
}

> +	} else {
that here you shift code on one tab left.
> +		reorder_prev = NULL;
> +
Setting to NULL can go on top.
> +		while (buf_hdr->order > reorder_buf->order) {
> +			reorder_prev = reorder_buf;
> +			reorder_buf  = reorder_buf->next;
> +			if (!reorder_buf)
> +				break;
> +		}
> +
> +		placeholder_buf_hdr->next = reorder_buf;
> +		if (reorder_prev)
> +			reorder_prev->next = placeholder_buf_hdr;
> +		else
> +			origin_qe->s.reorder_head = placeholder_buf_hdr;
> +
> +		if (!reorder_buf)
> +			origin_qe->s.reorder_tail = placeholder_buf_hdr;
> +	}
> +
> +	placeholder_buf_hdr->target_qe = NULL;
> +
> +	UNLOCK(&origin_qe->s.lock);
> +	return 0;
> +}
diff mbox

Patch

diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 4d0e1b4..a2460e7 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -14,6 +14,7 @@ 
 #include <odp_buffer_inlines.h>
 #include <odp_internal.h>
 #include <odp/shared_memory.h>
+#include <odp/schedule.h>
 #include <odp_schedule_internal.h>
 #include <odp/config.h>
 #include <odp_packet_io_internal.h>
@@ -401,6 +402,7 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 			return 0;
 		}
 
+		/* We're in order, so account for this and proceed with enq */
 		origin_qe->s.order_out++;
 	}
 
@@ -426,16 +428,56 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 	 */
 	if (origin_qe) {
 		odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
-		odp_buffer_hdr_t *reorder_prev;
+		odp_buffer_hdr_t *reorder_prev = NULL;
+		odp_buffer_hdr_t *placeholder_buf = NULL;
+		odp_buffer_hdr_t *next_buf;
 		uint32_t          release_count = 0;
+		uint32_t          placeholder_count = 0;
 
 		while (reorder_buf &&
-		       reorder_buf->target_qe == queue &&
-		       reorder_buf->order <=
-		       origin_qe->s.order_out + release_count) {
-			release_count++;
-			reorder_prev = reorder_buf;
-			reorder_buf  = reorder_buf->next;
+		       reorder_buf->order <= origin_qe->s.order_out +
+		       release_count + placeholder_count) {
+			/*
+			 * Elements on the reorder list fall into one of
+			 * three categories:
+			 *
+			 * 1. Those destined for the same queue.  These
+			 *    can be enq'd now if they were waiting to
+			 *    be unblocked by this enq.
+			 *
+			 * 2. Those representing placeholders for events
+			 *    whose ordering was released by a prior
+			 *    odp_schedule_release_ordered() call.  These
+			 *    can now just be freed.
+			 *
+			 * 3. Those representing events destined for another
+			 *    queue. These cannot be consolidated with this
+			 *    enq since they have a different target.
+			 *
+			 * Detecting an element with an order sequence gap, an
+			 * element in category 3, or running out of elements
+			 * stops the scan.
+			 */
+			next_buf = reorder_buf->next;
+
+			if (odp_likely(reorder_buf->target_qe == queue)) {
+				reorder_prev = reorder_buf;
+				reorder_buf  = next_buf;
+				release_count++;
+			} else if (!reorder_buf->target_qe) {
+				if (reorder_prev)
+					reorder_prev->next = next_buf;
+				else
+					origin_qe->s.reorder_head = next_buf;
+
+				reorder_buf->next = placeholder_buf;
+				placeholder_buf = reorder_buf;
+
+				reorder_buf = next_buf;
+				placeholder_count++;
+			} else {
+				break;
+			}
 		}
 
 		/* Add released buffers to the queue as well */
@@ -444,19 +486,28 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 			queue->s.tail             = reorder_prev;
 			origin_qe->s.reorder_head = reorder_prev->next;
 			reorder_prev->next        = NULL;
-			origin_qe->s.order_out   += release_count;
 		}
 
-		/* Now handle unblocked buffers destined for other queues */
+		/* Reflect the above two in the output sequence */
+		origin_qe->s.order_out += release_count + placeholder_count;
+
+		/* Now handle any unblocked buffers destined for other queues */
+		UNLOCK(&queue->s.lock);
 		if (reorder_buf &&
-		    reorder_buf->order <= origin_qe->s.order_out) {
-			UNLOCK(&origin_qe->s.lock);
-			UNLOCK(&queue->s.lock);
-			if (schedule_enq(reorder_buf->target_qe, origin_qe))
-				ODP_ABORT("schedule_enq failed\n");
-		} else {
-			UNLOCK(&origin_qe->s.lock);
-			UNLOCK(&queue->s.lock);
+		    reorder_buf->order <= origin_qe->s.order_out)
+			origin_qe->s.reorder_head = reorder_buf->next;
+		else
+			reorder_buf = NULL;
+		UNLOCK(&origin_qe->s.lock);
+		if (reorder_buf)
+			odp_queue_enq(reorder_buf->target_qe->s.handle,
+				      (odp_event_t)reorder_buf->handle.handle);
+
+		/* Free all placeholder bufs that are now released */
+		while (placeholder_buf) {
+			next_buf = placeholder_buf->next;
+			odp_buffer_free(buf_hdr->handle.handle);
+			placeholder_buf = next_buf;
 		}
 	} else {
 		UNLOCK(&queue->s.lock);
@@ -685,3 +736,87 @@  void odp_queue_param_init(odp_queue_param_t *params)
 {
 	memset(params, 0, sizeof(odp_queue_param_t));
 }
+
+/* This routine exists here rather than in odp_schedule
+ * because it operates on queue interenal structures
+ */
+int odp_schedule_release_ordered(odp_event_t ev)
+{
+	odp_buffer_t placeholder_buf;
+	odp_buffer_hdr_t *placeholder_buf_hdr, *reorder_buf, *reorder_prev;
+	odp_buffer_hdr_t *buf_hdr =
+		odp_buf_to_hdr(odp_buffer_from_event(ev));
+	queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+	/* Can't release if we didn't originate from an ordered queue */
+	if (!origin_qe)
+		return -1;
+
+	LOCK(&origin_qe->s.lock);
+
+	/* If we are the first or second element beyond the next in order,
+	 * we can release immediately since there can be no confusion about
+	 * intermediate elements
+	 */
+	if (buf_hdr->order <= origin_qe->s.order_out + 1) {
+		buf_hdr->origin_qe = NULL;
+		origin_qe->s.order_out++;
+
+		/* check if this release allows us to unblock waiters */
+		reorder_buf = origin_qe->s.reorder_head;
+		if (reorder_buf &&
+		    reorder_buf->order <= origin_qe->s.order_out)
+			origin_qe->s.reorder_head = reorder_buf->next;
+		else
+			reorder_buf = NULL;
+		UNLOCK(&origin_qe->s.lock);
+		if (reorder_buf)
+			odp_queue_enq(reorder_buf->target_qe->s.handle,
+				      (odp_event_t)reorder_buf->handle.handle);
+		return 0;
+	}
+
+	/* If we are beyond the second element in the expected order, we need
+	 * a placeholder to represent our "place in line". Just use an element
+	 * from the same pool the buffer being released is from.
+	 */
+	placeholder_buf = odp_buffer_alloc(buf_hdr->pool_hdl);
+
+	/* Can't release if no placeholder is available */
+	if (placeholder_buf == ODP_BUFFER_INVALID) {
+		UNLOCK(&origin_qe->s.lock);
+		return -1;
+	}
+
+	placeholder_buf_hdr = odp_buf_to_hdr(placeholder_buf);
+	reorder_buf = origin_qe->s.reorder_head;
+
+	if (!reorder_buf) {
+		placeholder_buf_hdr->next = NULL;
+		origin_qe->s.reorder_head = placeholder_buf_hdr;
+		origin_qe->s.reorder_tail = placeholder_buf_hdr;
+	} else {
+		reorder_prev = NULL;
+
+		while (buf_hdr->order > reorder_buf->order) {
+			reorder_prev = reorder_buf;
+			reorder_buf  = reorder_buf->next;
+			if (!reorder_buf)
+				break;
+		}
+
+		placeholder_buf_hdr->next = reorder_buf;
+		if (reorder_prev)
+			reorder_prev->next = placeholder_buf_hdr;
+		else
+			origin_qe->s.reorder_head = placeholder_buf_hdr;
+
+		if (!reorder_buf)
+			origin_qe->s.reorder_tail = placeholder_buf_hdr;
+	}
+
+	placeholder_buf_hdr->target_qe = NULL;
+
+	UNLOCK(&origin_qe->s.lock);
+	return 0;
+}