@@ -52,8 +52,6 @@ typedef int odp_schedule_group_t;
#define ODP_SCHED_GROUP_NAME_LEN 32
-typedef int odp_schedule_olock_t;
-
/**
* @}
*/
@@ -140,7 +140,10 @@ typedef struct odp_buffer_hdr_t {
void *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
uint64_t order; /* sequence for ordered queues */
queue_entry_t *origin_qe; /* ordered queue origin */
- queue_entry_t *target_qe; /* ordered queue target */
+ union {
+ queue_entry_t *target_qe; /* ordered queue target */
+ uint64_t sync; /* for ordered synchronization */
+ };
} odp_buffer_hdr_t;
/** @internal Compile time assert that the
@@ -81,6 +81,8 @@ struct queue_entry_s {
uint64_t order_out;
odp_buffer_hdr_t *reorder_head;
odp_buffer_hdr_t *reorder_tail;
+ odp_atomic_u64_t sync_in;
+ odp_atomic_u64_t sync_out;
};
typedef union queue_entry_u {
@@ -22,6 +22,7 @@
#include <odp_debug_internal.h>
#include <odp/hints.h>
#include <odp/sync.h>
+#include <odp_spin_internal.h>
#ifdef USE_TICKETLOCK
#include <odp/ticketlock.h>
@@ -122,6 +123,8 @@ int odp_queue_init_global(void)
/* init locks */
queue_entry_t *queue = get_qentry(i);
LOCK_INIT(&queue->s.lock);
+ odp_atomic_init_u64(&queue->s.sync_in, 0);
+ odp_atomic_init_u64(&queue->s.sync_out, 0);
queue->s.handle = queue_from_id(i);
}
@@ -404,8 +407,10 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
}
/* We're in order, so account for this and proceed with enq */
- if (!buf_hdr->flags.sustain)
+ if (!buf_hdr->flags.sustain) {
origin_qe->s.order_out++;
+ odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+ }
/* if this element is linked, restore the linked chain */
buf_tail = buf_hdr->link;
@@ -524,6 +529,8 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
/* Reflect the above two in the output sequence */
origin_qe->s.order_out += release_count + placeholder_count;
+ odp_atomic_fetch_add_u64(&origin_qe->s.sync_out,
+ release_count + placeholder_count);
/* Now handle any unblocked buffers destined for other queues */
UNLOCK(&queue->s.lock);
@@ -689,7 +696,8 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
buf_hdr->next = NULL;
if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
buf_hdr->origin_qe = queue;
- buf_hdr->order = queue->s.order_in++;
+ buf_hdr->order = queue->s.order_in++;
+ buf_hdr->sync = odp_atomic_fetch_inc_u64(&queue->s.sync_in);
buf_hdr->flags.sustain = 0;
} else {
buf_hdr->origin_qe = NULL;
@@ -737,6 +745,8 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
buf_hdr[i]->origin_qe = queue;
buf_hdr[i]->order = queue->s.order_in++;
+ buf_hdr[i]->sync =
+ odp_atomic_fetch_inc_u64(&queue->s.sync_in);
buf_hdr[i]->flags.sustain = 0;
} else {
buf_hdr[i]->origin_qe = NULL;
@@ -829,8 +839,10 @@ int odp_schedule_release_ordered(odp_event_t ev)
*/
if (buf_hdr->order <= origin_qe->s.order_out + 1) {
buf_hdr->origin_qe = NULL;
- if (!buf_hdr->flags.sustain)
+ if (!buf_hdr->flags.sustain) {
origin_qe->s.order_out++;
+ odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+ }
/* check if this release allows us to unblock waiters */
reorder_buf = origin_qe->s.reorder_head;
@@ -911,8 +923,38 @@ int odp_schedule_order_copy(odp_event_t src_event, odp_event_t dst_event)
dst->origin_qe = origin_qe;
dst->order = src->order;
+ dst->sync = src->sync;
src->flags.sustain = 1;
UNLOCK(&origin_qe->s.lock);
return 0;
}
+
+void odp_schedule_order_lock(odp_event_t ev)
+{
+ odp_buffer_hdr_t *buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
+ queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+ /* Wait until we are in order. Note that sync_out will be incremented
+ * both by unlocks as well as order resolution, so we're OK if only
+ * some events in the ordered flow need to lock.
+ */
+ while (buf_hdr->sync > odp_atomic_load_u64(&origin_qe->s.sync_out))
+ odp_spin();
+}
+
+void odp_schedule_order_unlock(odp_event_t ev)
+{
+ odp_buffer_hdr_t *buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
+ queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+ /* Get a new sync order for reusability, and release the lock. Note
+ * that this must be done in this sequence to prevent race conditions
+ * where the next waiter could lock and unlock before we're able to
+ * get a new sync order since that would cause order inversion on
+ * subsequent locks we may perform on this event in this ordered
+ * context.
+ */
+ buf_hdr->sync = odp_atomic_fetch_inc_u64(&origin_qe->s.sync_in);
+ odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+}
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- .../include/odp/plat/schedule_types.h | 2 - .../linux-generic/include/odp_buffer_internal.h | 5 ++- .../linux-generic/include/odp_queue_internal.h | 2 + platform/linux-generic/odp_queue.c | 48 ++++++++++++++++++++-- 4 files changed, 51 insertions(+), 6 deletions(-)