@@ -42,13 +42,6 @@ struct queue_entry_s {
odp_buffer_hdr_t *tail;
int status;
- struct {
- odp_atomic_u64_t ctx; /**< Current ordered context id */
- odp_atomic_u64_t next_ctx; /**< Next unallocated context id */
- /** Array of ordered locks */
- odp_atomic_u64_t lock[CONFIG_QUEUE_MAX_ORD_LOCKS];
- } ordered ODP_ALIGNED_CACHE;
-
queue_enq_fn_t enqueue ODP_ALIGNED_CACHE;
queue_deq_fn_t dequeue;
queue_enq_multi_fn_t enqueue_multi;
@@ -57,16 +57,6 @@ static inline odp_queue_t queue_from_id(uint32_t queue_id)
return _odp_cast_scalar(odp_queue_t, queue_id + 1);
}
-static inline int queue_is_atomic(queue_entry_t *qe)
-{
- return qe->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC;
-}
-
-static inline int queue_is_ordered(queue_entry_t *qe)
-{
- return qe->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED;
-}
-
queue_entry_t *get_qentry(uint32_t queue_id)
{
return &queue_tbl->queue[queue_id];
@@ -278,13 +268,6 @@ static int queue_destroy(odp_queue_t handle)
ODP_ERR("queue \"%s\" not empty\n", queue->s.name);
return -1;
}
- if (queue_is_ordered(queue) &&
- odp_atomic_load_u64(&queue->s.ordered.ctx) !=
- odp_atomic_load_u64(&queue->s.ordered.next_ctx)) {
- UNLOCK(&queue->s.lock);
- ODP_ERR("queue \"%s\" reorder incomplete\n", queue->s.name);
- return -1;
- }
switch (queue->s.status) {
case QUEUE_STATUS_READY:
@@ -610,20 +593,9 @@ static int queue_init(queue_entry_t *queue, const char *name,
if (queue->s.param.sched.lock_count > sched_fn->max_ordered_locks())
return -1;
- if (param->type == ODP_QUEUE_TYPE_SCHED) {
+ if (param->type == ODP_QUEUE_TYPE_SCHED)
queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED;
- if (param->sched.sync == ODP_SCHED_SYNC_ORDERED) {
- unsigned i;
-
- odp_atomic_init_u64(&queue->s.ordered.ctx, 0);
- odp_atomic_init_u64(&queue->s.ordered.next_ctx, 0);
-
- for (i = 0; i < queue->s.param.sched.lock_count; i++)
- odp_atomic_init_u64(&queue->s.ordered.lock[i],
- 0);
- }
- }
queue->s.type = queue->s.param.type;
queue->s.enqueue = queue_int_enq;
@@ -719,16 +691,6 @@ int sched_cb_queue_grp(uint32_t queue_index)
return qe->s.param.sched.group;
}
-int sched_cb_queue_is_ordered(uint32_t queue_index)
-{
- return queue_is_ordered(get_qentry(queue_index));
-}
-
-int sched_cb_queue_is_atomic(uint32_t queue_index)
-{
- return queue_is_atomic(get_qentry(queue_index));
-}
-
odp_queue_t sched_cb_queue_handle(uint32_t queue_index)
{
return queue_from_id(queue_index);
@@ -65,8 +65,11 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
/* Maximum number of pktio poll commands */
#define NUM_PKTIO_CMD (MAX_PKTIN * NUM_PKTIO)
+/* Not a valid index */
+#define NULL_INDEX ((uint32_t)-1)
+
/* Not a valid poll command */
-#define PKTIO_CMD_INVALID ((uint32_t)-1)
+#define PKTIO_CMD_INVALID NULL_INDEX
/* Pktio command is free */
#define PKTIO_CMD_FREE PKTIO_CMD_INVALID
@@ -90,7 +93,7 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
#define PRIO_QUEUE_MASK (PRIO_QUEUE_RING_SIZE - 1)
/* Priority queue empty, not a valid queue index. */
-#define PRIO_QUEUE_EMPTY ((uint32_t)-1)
+#define PRIO_QUEUE_EMPTY NULL_INDEX
/* For best performance, the number of queues should be a power of two. */
ODP_STATIC_ASSERT(CHECK_IS_POWER2(ODP_CONFIG_QUEUES),
@@ -127,7 +130,7 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= QUEUES_PER_PRIO,
/* Storage for stashed enqueue operation arguments */
typedef struct {
odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
- queue_entry_t *queue;
+ uint32_t queue_index;
int num;
} ordered_stash_t;
@@ -152,7 +155,8 @@ typedef struct {
odp_queue_t queue;
odp_event_t ev_stash[MAX_DEQ];
struct {
- queue_entry_t *src_queue; /**< Source queue entry */
+ /* Source queue index */
+ uint32_t src_queue;
uint64_t ctx; /**< Ordered context id */
int stash_num; /**< Number of stashed enqueue operations */
uint8_t in_order; /**< Order status */
@@ -197,6 +201,19 @@ typedef struct {
uint32_t cmd_index;
} pktio_cmd_t;
+/* Order context of a queue */
+typedef struct {
+ /* Current ordered context id */
+ odp_atomic_u64_t ctx ODP_ALIGNED_CACHE;
+
+ /* Next unallocated context id */
+ odp_atomic_u64_t next_ctx;
+
+ /* Array of ordered locks */
+ odp_atomic_u64_t lock[CONFIG_QUEUE_MAX_ORD_LOCKS];
+
+} order_context_t ODP_ALIGNED_CACHE;
+
typedef struct {
pri_mask_t pri_mask[NUM_PRIO];
odp_spinlock_t mask_lock;
@@ -230,6 +247,8 @@ typedef struct {
int grp;
int prio;
int queue_per_prio;
+ int sync;
+ unsigned order_lock_count;
} queue[ODP_CONFIG_QUEUES];
struct {
@@ -237,6 +256,8 @@ typedef struct {
int num_cmd;
} pktio[NUM_PKTIO];
+ order_context_t order[ODP_CONFIG_QUEUES];
+
} sched_global_t;
/* Global scheduler context */
@@ -259,6 +280,7 @@ static void sched_local_init(void)
sched_local.thr = odp_thread_id();
sched_local.queue = ODP_QUEUE_INVALID;
sched_local.queue_index = PRIO_QUEUE_EMPTY;
+ sched_local.ordered.src_queue = NULL_INDEX;
id = sched_local.thr & (QUEUES_PER_PRIO - 1);
@@ -488,16 +510,35 @@ static void pri_clr_queue(uint32_t queue_index, int prio)
static int schedule_init_queue(uint32_t queue_index,
const odp_schedule_param_t *sched_param)
{
+ int i;
int prio = sched_param->prio;
pri_set_queue(queue_index, prio);
sched->queue[queue_index].grp = sched_param->group;
sched->queue[queue_index].prio = prio;
sched->queue[queue_index].queue_per_prio = queue_per_prio(queue_index);
+ sched->queue[queue_index].sync = sched_param->sync;
+ sched->queue[queue_index].order_lock_count = sched_param->lock_count;
+
+ odp_atomic_init_u64(&sched->order[queue_index].ctx, 0);
+ odp_atomic_init_u64(&sched->order[queue_index].next_ctx, 0);
+
+ for (i = 0; i < CONFIG_QUEUE_MAX_ORD_LOCKS; i++)
+ odp_atomic_init_u64(&sched->order[queue_index].lock[i], 0);
return 0;
}
+static inline int queue_is_atomic(uint32_t queue_index)
+{
+ return sched->queue[queue_index].sync == ODP_SCHED_SYNC_ATOMIC;
+}
+
+static inline int queue_is_ordered(uint32_t queue_index)
+{
+ return sched->queue[queue_index].sync == ODP_SCHED_SYNC_ORDERED;
+}
+
static void schedule_destroy_queue(uint32_t queue_index)
{
int prio = sched->queue[queue_index].prio;
@@ -506,6 +547,11 @@ static void schedule_destroy_queue(uint32_t queue_index)
sched->queue[queue_index].grp = 0;
sched->queue[queue_index].prio = 0;
sched->queue[queue_index].queue_per_prio = 0;
+
+ if (queue_is_ordered(queue_index) &&
+ odp_atomic_load_u64(&sched->order[queue_index].ctx) !=
+ odp_atomic_load_u64(&sched->order[queue_index].next_ctx))
+ ODP_ERR("queue reorder incomplete\n");
}
static int poll_cmd_queue_idx(int pktio_index, int pktin_idx)
@@ -606,20 +652,20 @@ static void schedule_release_atomic(void)
}
}
-static inline int ordered_own_turn(queue_entry_t *queue)
+static inline int ordered_own_turn(uint32_t queue_index)
{
uint64_t ctx;
- ctx = odp_atomic_load_acq_u64(&queue->s.ordered.ctx);
+ ctx = odp_atomic_load_acq_u64(&sched->order[queue_index].ctx);
return ctx == sched_local.ordered.ctx;
}
-static inline void wait_for_order(queue_entry_t *queue)
+static inline void wait_for_order(uint32_t queue_index)
{
/* Busy loop to synchronize ordered processing */
while (1) {
- if (ordered_own_turn(queue))
+ if (ordered_own_turn(queue_index))
break;
odp_cpu_pause();
}
@@ -635,52 +681,54 @@ static inline void ordered_stash_release(void)
int i;
for (i = 0; i < sched_local.ordered.stash_num; i++) {
- queue_entry_t *queue;
+ queue_entry_t *queue_entry;
+ uint32_t queue_index;
odp_buffer_hdr_t **buf_hdr;
int num;
- queue = sched_local.ordered.stash[i].queue;
+ queue_index = sched_local.ordered.stash[i].queue_index;
+ queue_entry = get_qentry(queue_index);
buf_hdr = sched_local.ordered.stash[i].buf_hdr;
num = sched_local.ordered.stash[i].num;
- queue_fn->enq_multi(qentry_to_int(queue), buf_hdr, num);
+ queue_fn->enq_multi(qentry_to_int(queue_entry), buf_hdr, num);
}
sched_local.ordered.stash_num = 0;
}
static inline void release_ordered(void)
{
+ uint32_t qi;
unsigned i;
- queue_entry_t *queue;
- queue = sched_local.ordered.src_queue;
+ qi = sched_local.ordered.src_queue;
- wait_for_order(queue);
+ wait_for_order(qi);
/* Release all ordered locks */
- for (i = 0; i < queue->s.param.sched.lock_count; i++) {
+ for (i = 0; i < sched->queue[qi].order_lock_count; i++) {
if (!sched_local.ordered.lock_called.u8[i])
- odp_atomic_store_rel_u64(&queue->s.ordered.lock[i],
+ odp_atomic_store_rel_u64(&sched->order[qi].lock[i],
sched_local.ordered.ctx + 1);
}
sched_local.ordered.lock_called.all = 0;
- sched_local.ordered.src_queue = NULL;
+ sched_local.ordered.src_queue = NULL_INDEX;
sched_local.ordered.in_order = 0;
ordered_stash_release();
/* Next thread can continue processing */
- odp_atomic_add_rel_u64(&queue->s.ordered.ctx, 1);
+ odp_atomic_add_rel_u64(&sched->order[qi].ctx, 1);
}
static void schedule_release_ordered(void)
{
- queue_entry_t *queue;
+ uint32_t queue_index;
- queue = sched_local.ordered.src_queue;
+ queue_index = sched_local.ordered.src_queue;
- if (odp_unlikely(!queue || sched_local.num))
+ if (odp_unlikely((queue_index == NULL_INDEX) || sched_local.num))
return;
release_ordered();
@@ -688,7 +736,7 @@ static void schedule_release_ordered(void)
static inline void schedule_release_context(void)
{
- if (sched_local.ordered.src_queue != NULL)
+ if (sched_local.ordered.src_queue != NULL_INDEX)
release_ordered();
else
schedule_release_atomic();
@@ -715,9 +763,9 @@ static int schedule_ord_enq_multi(queue_t q_int, void *buf_hdr[],
int i;
uint32_t stash_num = sched_local.ordered.stash_num;
queue_entry_t *dst_queue = qentry_from_int(q_int);
- queue_entry_t *src_queue = sched_local.ordered.src_queue;
+ uint32_t src_queue = sched_local.ordered.src_queue;
- if (!sched_local.ordered.src_queue || sched_local.ordered.in_order)
+ if ((src_queue == NULL_INDEX) || sched_local.ordered.in_order)
return 0;
if (ordered_own_turn(src_queue)) {
@@ -740,7 +788,7 @@ static int schedule_ord_enq_multi(queue_t q_int, void *buf_hdr[],
return 0;
}
- sched_local.ordered.stash[stash_num].queue = dst_queue;
+ sched_local.ordered.stash[stash_num].queue_index = dst_queue->s.index;
sched_local.ordered.stash[stash_num].num = num;
for (i = 0; i < num; i++)
sched_local.ordered.stash[stash_num].buf_hdr[i] = buf_hdr[i];
@@ -803,7 +851,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
prio > ODP_SCHED_PRIO_DEFAULT))
max_deq = MAX_DEQ / 2;
- ordered = sched_cb_queue_is_ordered(qi);
+ ordered = queue_is_ordered(qi);
/* Do not cache ordered events locally to improve
* parallelism. Ordered context can only be released
@@ -835,21 +883,18 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
if (ordered) {
uint64_t ctx;
- queue_entry_t *queue;
odp_atomic_u64_t *next_ctx;
- queue = get_qentry(qi);
- next_ctx = &queue->s.ordered.next_ctx;
-
+ next_ctx = &sched->order[qi].next_ctx;
ctx = odp_atomic_fetch_inc_u64(next_ctx);
sched_local.ordered.ctx = ctx;
- sched_local.ordered.src_queue = queue;
+ sched_local.ordered.src_queue = qi;
/* Continue scheduling ordered queues */
ring_enq(ring, PRIO_QUEUE_MASK, qi);
- } else if (sched_cb_queue_is_atomic(qi)) {
+ } else if (queue_is_atomic(qi)) {
/* Hold queue during atomic access */
sched_local.queue_index = qi;
} else {
@@ -1041,14 +1086,14 @@ static int schedule_multi(odp_queue_t *out_queue, uint64_t wait,
static inline void order_lock(void)
{
- queue_entry_t *queue;
+ uint32_t queue_index;
- queue = sched_local.ordered.src_queue;
+ queue_index = sched_local.ordered.src_queue;
- if (!queue)
+ if (queue_index == NULL_INDEX)
return;
- wait_for_order(queue);
+ wait_for_order(queue_index);
}
static void order_unlock(void)
@@ -1058,14 +1103,15 @@ static void order_unlock(void)
static void schedule_order_lock(unsigned lock_index)
{
odp_atomic_u64_t *ord_lock;
- queue_entry_t *queue;
+ uint32_t queue_index;
- queue = sched_local.ordered.src_queue;
+ queue_index = sched_local.ordered.src_queue;
- ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count &&
+ ODP_ASSERT(queue_index != NULL_INDEX &&
+ lock_index <= sched->queue[queue_index].order_lock_count &&
!sched_local.ordered.lock_called.u8[lock_index]);
- ord_lock = &queue->s.ordered.lock[lock_index];
+ ord_lock = &sched->order[queue_index].lock[lock_index];
/* Busy loop to synchronize ordered processing */
while (1) {
@@ -1084,13 +1130,14 @@ static void schedule_order_lock(unsigned lock_index)
static void schedule_order_unlock(unsigned lock_index)
{
odp_atomic_u64_t *ord_lock;
- queue_entry_t *queue;
+ uint32_t queue_index;
- queue = sched_local.ordered.src_queue;
+ queue_index = sched_local.ordered.src_queue;
- ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count);
+ ODP_ASSERT(queue_index != NULL_INDEX &&
+ lock_index <= sched->queue[queue_index].order_lock_count);
- ord_lock = &queue->s.ordered.lock[lock_index];
+ ord_lock = &sched->order[queue_index].lock[lock_index];
ODP_ASSERT(sched_local.ordered.ctx == odp_atomic_load_u64(ord_lock));
@@ -71,6 +71,8 @@ typedef struct {
/* Maximum number of pktio poll commands */
#define NUM_PKTIO_CMD (MAX_PKTIN * NUM_PKTIO)
+/* Not a valid index */
+#define NULL_INDEX ((uint32_t)-1)
/* Pktio command is free */
#define PKTIO_CMD_FREE ((uint32_t)-1)
@@ -117,6 +119,19 @@ typedef struct {
/* Forward declaration */
typedef struct sched_thread_local sched_thread_local_t;
+/* Order context of a queue */
+typedef struct {
+ /* Current ordered context id */
+ odp_atomic_u64_t ctx ODP_ALIGNED_CACHE;
+
+ /* Next unallocated context id */
+ odp_atomic_u64_t next_ctx;
+
+ /* Array of ordered locks */
+ odp_atomic_u64_t lock[CONFIG_QUEUE_MAX_ORD_LOCKS];
+
+} order_context_t ODP_ALIGNED_CACHE;
+
typedef struct {
odp_shm_t selfie;
@@ -139,6 +154,8 @@ typedef struct {
/* Quick reference to per thread context */
sched_thread_local_t *threads[ODP_THREAD_COUNT_MAX];
+
+ order_context_t order[ODP_CONFIG_QUEUES];
} sched_global_t;
/* Per thread events cache */
@@ -154,7 +171,7 @@ typedef struct {
/* Storage for stashed enqueue operation arguments */
typedef struct {
odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
- queue_entry_t *queue;
+ uint32_t queue_index;
int num;
} ordered_stash_t;
@@ -195,7 +212,8 @@ struct sched_thread_local {
sparse_bitmap_iterator_t iterators[NUM_SCHED_PRIO];
struct {
- queue_entry_t *src_queue; /**< Source queue entry */
+ /* Source queue index */
+ uint32_t src_queue;
uint64_t ctx; /**< Ordered context id */
int stash_num; /**< Number of stashed enqueue operations */
uint8_t in_order; /**< Order status */
@@ -314,6 +332,7 @@ static void sched_thread_local_reset(void)
thread_local.thread = odp_thread_id();
thread_local.cache.queue = ODP_QUEUE_INVALID;
+ thread_local.ordered.src_queue = NULL_INDEX;
odp_rwlock_init(&thread_local.lock);
@@ -395,7 +414,7 @@ static int schedule_term_local(void)
static int init_sched_queue(uint32_t queue_index,
const odp_schedule_param_t *sched_param)
{
- int prio, group, thread;
+ int prio, group, thread, i;
sched_prio_t *P;
sched_group_t *G;
sched_thread_local_t *local;
@@ -428,6 +447,12 @@ static int init_sched_queue(uint32_t queue_index,
memcpy(&sched->queues[queue_index],
sched_param, sizeof(odp_schedule_param_t));
+ odp_atomic_init_u64(&sched->order[queue_index].ctx, 0);
+ odp_atomic_init_u64(&sched->order[queue_index].next_ctx, 0);
+
+ for (i = 0; i < CONFIG_QUEUE_MAX_ORD_LOCKS; i++)
+ odp_atomic_init_u64(&sched->order[queue_index].lock[i], 0);
+
/* Update all threads in this schedule group to
* start check this queue index upon scheduling.
*/
@@ -502,6 +527,11 @@ static void destroy_sched_queue(uint32_t queue_index)
__destroy_sched_queue(G, queue_index);
odp_rwlock_write_unlock(&G->lock);
+
+ if (sched->queues[queue_index].sync == ODP_SCHED_SYNC_ORDERED &&
+ odp_atomic_load_u64(&sched->order[queue_index].ctx) !=
+ odp_atomic_load_u64(&sched->order[queue_index].next_ctx))
+ ODP_ERR("queue reorder incomplete\n");
}
static int pktio_cmd_queue_hash(int pktio, int pktin)
@@ -1070,20 +1100,20 @@ static void schedule_release_atomic(void)
}
}
-static inline int ordered_own_turn(queue_entry_t *queue)
+static inline int ordered_own_turn(uint32_t queue_index)
{
uint64_t ctx;
- ctx = odp_atomic_load_acq_u64(&queue->s.ordered.ctx);
+ ctx = odp_atomic_load_acq_u64(&sched->order[queue_index].ctx);
return ctx == thread_local.ordered.ctx;
}
-static inline void wait_for_order(queue_entry_t *queue)
+static inline void wait_for_order(uint32_t queue_index)
{
/* Busy loop to synchronize ordered processing */
while (1) {
- if (ordered_own_turn(queue))
+ if (ordered_own_turn(queue_index))
break;
odp_cpu_pause();
}
@@ -1099,52 +1129,55 @@ static inline void ordered_stash_release(void)
int i;
for (i = 0; i < thread_local.ordered.stash_num; i++) {
- queue_entry_t *queue;
+ queue_entry_t *queue_entry;
+ uint32_t queue_index;
odp_buffer_hdr_t **buf_hdr;
int num;
- queue = thread_local.ordered.stash[i].queue;
+ queue_index = thread_local.ordered.stash[i].queue_index;
+ queue_entry = get_qentry(queue_index);
buf_hdr = thread_local.ordered.stash[i].buf_hdr;
num = thread_local.ordered.stash[i].num;
- queue_fn->enq_multi(qentry_to_int(queue), buf_hdr, num);
+ queue_fn->enq_multi(qentry_to_int(queue_entry), buf_hdr, num);
}
thread_local.ordered.stash_num = 0;
}
static inline void release_ordered(void)
{
+ uint32_t qi;
unsigned i;
- queue_entry_t *queue;
- queue = thread_local.ordered.src_queue;
+ qi = thread_local.ordered.src_queue;
- wait_for_order(queue);
+ wait_for_order(qi);
/* Release all ordered locks */
- for (i = 0; i < queue->s.param.sched.lock_count; i++) {
+ for (i = 0; i < sched->queues[qi].lock_count; i++) {
if (!thread_local.ordered.lock_called.u8[i])
- odp_atomic_store_rel_u64(&queue->s.ordered.lock[i],
+ odp_atomic_store_rel_u64(&sched->order[qi].lock[i],
thread_local.ordered.ctx + 1);
}
thread_local.ordered.lock_called.all = 0;
- thread_local.ordered.src_queue = NULL;
+ thread_local.ordered.src_queue = NULL_INDEX;
thread_local.ordered.in_order = 0;
ordered_stash_release();
/* Next thread can continue processing */
- odp_atomic_add_rel_u64(&queue->s.ordered.ctx, 1);
+ odp_atomic_add_rel_u64(&sched->order[qi].ctx, 1);
}
static void schedule_release_ordered(void)
{
- queue_entry_t *queue;
+ uint32_t queue_index;
- queue = thread_local.ordered.src_queue;
+ queue_index = thread_local.ordered.src_queue;
- if (odp_unlikely(!queue || thread_local.cache.count))
+ if (odp_unlikely((queue_index == NULL_INDEX) ||
+ thread_local.cache.count))
return;
release_ordered();
@@ -1152,7 +1185,7 @@ static void schedule_release_ordered(void)
static inline void schedule_release_context(void)
{
- if (thread_local.ordered.src_queue != NULL)
+ if (thread_local.ordered.src_queue != NULL_INDEX)
release_ordered();
else
schedule_release_atomic();
@@ -1164,9 +1197,9 @@ static int schedule_ord_enq_multi(queue_t q_int, void *buf_hdr[],
int i;
uint32_t stash_num = thread_local.ordered.stash_num;
queue_entry_t *dst_queue = qentry_from_int(q_int);
- queue_entry_t *src_queue = thread_local.ordered.src_queue;
+ uint32_t src_queue = thread_local.ordered.src_queue;
- if (!thread_local.ordered.src_queue || thread_local.ordered.in_order)
+ if ((src_queue == NULL_INDEX) || thread_local.ordered.in_order)
return 0;
if (ordered_own_turn(src_queue)) {
@@ -1189,7 +1222,7 @@ static int schedule_ord_enq_multi(queue_t q_int, void *buf_hdr[],
return 0;
}
- thread_local.ordered.stash[stash_num].queue = dst_queue;
+ thread_local.ordered.stash[stash_num].queue_index = dst_queue->s.index;
thread_local.ordered.stash[stash_num].num = num;
for (i = 0; i < num; i++)
thread_local.ordered.stash[stash_num].buf_hdr[i] = buf_hdr[i];
@@ -1202,14 +1235,14 @@ static int schedule_ord_enq_multi(queue_t q_int, void *buf_hdr[],
static void order_lock(void)
{
- queue_entry_t *queue;
+ uint32_t queue_index;
- queue = thread_local.ordered.src_queue;
+ queue_index = thread_local.ordered.src_queue;
- if (!queue)
+ if (queue_index == NULL_INDEX)
return;
- wait_for_order(queue);
+ wait_for_order(queue_index);
}
static void order_unlock(void)
@@ -1219,14 +1252,15 @@ static void order_unlock(void)
static void schedule_order_lock(unsigned lock_index)
{
odp_atomic_u64_t *ord_lock;
- queue_entry_t *queue;
+ uint32_t queue_index;
- queue = thread_local.ordered.src_queue;
+ queue_index = thread_local.ordered.src_queue;
- ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count &&
+ ODP_ASSERT(queue_index != NULL_INDEX &&
+ lock_index <= sched->queues[queue_index].lock_count &&
!thread_local.ordered.lock_called.u8[lock_index]);
- ord_lock = &queue->s.ordered.lock[lock_index];
+ ord_lock = &sched->order[queue_index].lock[lock_index];
/* Busy loop to synchronize ordered processing */
while (1) {
@@ -1245,13 +1279,14 @@ static void schedule_order_lock(unsigned lock_index)
static void schedule_order_unlock(unsigned lock_index)
{
odp_atomic_u64_t *ord_lock;
- queue_entry_t *queue;
+ uint32_t queue_index;
- queue = thread_local.ordered.src_queue;
+ queue_index = thread_local.ordered.src_queue;
- ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count);
+ ODP_ASSERT(queue_index != NULL_INDEX &&
+ lock_index <= sched->queues[queue_index].lock_count);
- ord_lock = &queue->s.ordered.lock[lock_index];
+ ord_lock = &sched->order[queue_index].lock[lock_index];
ODP_ASSERT(thread_local.ordered.ctx == odp_atomic_load_u64(ord_lock));
@@ -1275,7 +1310,7 @@ static inline bool is_ordered_queue(unsigned int queue_index)
static void schedule_save_context(uint32_t queue_index, void *ptr)
{
- queue_entry_t *queue = ptr;
+ (void)ptr;
if (is_atomic_queue(queue_index)) {
thread_local.atomic = &sched->availables[queue_index];
@@ -1283,11 +1318,11 @@ static void schedule_save_context(uint32_t queue_index, void *ptr)
uint64_t ctx;
odp_atomic_u64_t *next_ctx;
- next_ctx = &queue->s.ordered.next_ctx;
+ next_ctx = &sched->order[queue_index].next_ctx;
ctx = odp_atomic_fetch_inc_u64(next_ctx);
thread_local.ordered.ctx = ctx;
- thread_local.ordered.src_queue = queue;
+ thread_local.ordered.src_queue = queue_index;
}
}
Moved ordered queue context structure from queue internal structure to the scheduler. Ordering is a scheduler feature and thus all data and code about ordering should be in scheduler implementation. This removes most dependencies to qentry from the scheduler. Remaining dependecies are due to queue interface definition, which is not changed in this patch. Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org> --- .../linux-generic/include/odp_queue_internal.h | 7 -- platform/linux-generic/odp_queue.c | 40 +----- platform/linux-generic/odp_schedule.c | 135 ++++++++++++++------- platform/linux-generic/odp_schedule_iquery.c | 113 +++++++++++------ 4 files changed, 166 insertions(+), 129 deletions(-) -- 2.13.0