@@ -30,6 +30,9 @@
#include <odp_libconfig_internal.h>
#include <odp/api/plat/queue_inlines.h>
+/* No synchronization context */
+#define NO_SYNC_CONTEXT ODP_SCHED_SYNC_PARALLEL
+
/* Number of priority levels */
#define NUM_PRIO 8
@@ -124,7 +127,8 @@ ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t),
/* Scheduler local data */
typedef struct ODP_ALIGNED_CACHE {
uint16_t thr;
- uint16_t pause;
+ uint8_t pause;
+ uint8_t sync_ctx;
uint16_t grp_round;
uint16_t spread_round;
@@ -241,9 +245,6 @@ static sched_global_t *sched;
/* Thread local scheduler context */
static __thread sched_local_t sched_local;
-/* Function prototypes */
-static inline void schedule_release_context(void);
-
static int read_config_file(sched_global_t *sched)
{
const char *str;
@@ -311,6 +312,7 @@ static void sched_local_init(void)
memset(&sched_local, 0, sizeof(sched_local_t));
sched_local.thr = odp_thread_id();
+ sched_local.sync_ctx = NO_SYNC_CONTEXT;
sched_local.stash.queue = ODP_QUEUE_INVALID;
sched_local.stash.qi = PRIO_QUEUE_EMPTY;
sched_local.ordered.src_queue = NULL_INDEX;
@@ -450,17 +452,6 @@ static int schedule_init_local(void)
return 0;
}
-static int schedule_term_local(void)
-{
- if (sched_local.stash.num_ev) {
- ODP_ERR("Locally pre-scheduled events exist.\n");
- return -1;
- }
-
- schedule_release_context();
- return 0;
-}
-
static inline void grp_update_mask(int grp, const odp_thrmask_t *new_mask)
{
odp_thrmask_copy(&sched->sched_grp[grp].mask, new_mask);
@@ -565,14 +556,9 @@ static int schedule_init_queue(uint32_t queue_index,
return 0;
}
-static inline int queue_is_atomic(uint32_t queue_index)
+static inline uint8_t sched_sync_type(uint32_t queue_index)
{
- return sched->queue[queue_index].sync == ODP_SCHED_SYNC_ATOMIC;
-}
-
-static inline int queue_is_ordered(uint32_t queue_index)
-{
- return sched->queue[queue_index].sync == ODP_SCHED_SYNC_ORDERED;
+ return sched->queue[queue_index].sync;
}
static void schedule_destroy_queue(uint32_t queue_index)
@@ -584,7 +570,7 @@ static void schedule_destroy_queue(uint32_t queue_index)
sched->queue[queue_index].prio = 0;
sched->queue[queue_index].spread = 0;
- if (queue_is_ordered(queue_index) &&
+ if ((sched_sync_type(queue_index) == ODP_SCHED_SYNC_ORDERED) &&
odp_atomic_load_u64(&sched->order[queue_index].ctx) !=
odp_atomic_load_u64(&sched->order[queue_index].next_ctx))
ODP_ERR("queue reorder incomplete\n");
@@ -623,21 +609,26 @@ static void schedule_pktio_start(int pktio_index, int num_pktin,
}
}
-static void schedule_release_atomic(void)
+static inline void release_atomic(void)
{
- uint32_t qi = sched_local.stash.qi;
+ uint32_t qi = sched_local.stash.qi;
+ int grp = sched->queue[qi].grp;
+ int prio = sched->queue[qi].prio;
+ int spread = sched->queue[qi].spread;
+ ring_t *ring = &sched->prio_q[grp][prio][spread].ring;
- if (qi != PRIO_QUEUE_EMPTY && sched_local.stash.num_ev == 0) {
- int grp = sched->queue[qi].grp;
- int prio = sched->queue[qi].prio;
- int spread = sched->queue[qi].spread;
- ring_t *ring = &sched->prio_q[grp][prio][spread].ring;
+ /* Release current atomic queue */
+ ring_enq(ring, sched->ring_mask, qi);
- /* Release current atomic queue */
- ring_enq(ring, sched->ring_mask, qi);
+ /* We don't hold sync context anymore */
+ sched_local.sync_ctx = NO_SYNC_CONTEXT;
+}
- sched_local.stash.qi = PRIO_QUEUE_EMPTY;
- }
+static void schedule_release_atomic(void)
+{
+ if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC &&
+ sched_local.stash.num_ev == 0)
+ release_atomic();
}
static inline int ordered_own_turn(uint32_t queue_index)
@@ -709,9 +700,11 @@ static inline void release_ordered(void)
}
sched_local.ordered.lock_called.all = 0;
- sched_local.ordered.src_queue = NULL_INDEX;
sched_local.ordered.in_order = 0;
+ /* We don't hold sync context anymore */
+ sched_local.sync_ctx = NO_SYNC_CONTEXT;
+
ordered_stash_release();
/* Next thread can continue processing */
@@ -720,23 +713,26 @@ static inline void release_ordered(void)
static void schedule_release_ordered(void)
{
- uint32_t queue_index;
-
- queue_index = sched_local.ordered.src_queue;
-
- if (odp_unlikely((queue_index == NULL_INDEX) ||
+ if (odp_unlikely((sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED) ||
sched_local.stash.num_ev))
return;
release_ordered();
}
-static inline void schedule_release_context(void)
+static int schedule_term_local(void)
{
- if (sched_local.ordered.src_queue != NULL_INDEX)
- release_ordered();
- else
+ if (sched_local.stash.num_ev) {
+ ODP_ERR("Locally pre-scheduled events exist.\n");
+ return -1;
+ }
+
+ if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC)
schedule_release_atomic();
+ else if (sched_local.sync_ctx == ODP_SCHED_SYNC_ORDERED)
+ schedule_release_ordered();
+
+ return 0;
}
static inline int copy_from_stash(odp_event_t out_ev[], unsigned int max)
@@ -758,13 +754,22 @@ static int schedule_ord_enq_multi(odp_queue_t dst_queue, void *buf_hdr[],
int num, int *ret)
{
int i;
- uint32_t stash_num = sched_local.ordered.stash_num;
- queue_entry_t *dst_qentry = qentry_from_handle(dst_queue);
- uint32_t src_queue = sched_local.ordered.src_queue;
+ uint32_t stash_num;
+ queue_entry_t *dst_qentry;
+ uint32_t src_queue;
- if ((src_queue == NULL_INDEX) || sched_local.ordered.in_order)
+ /* This check is done for every queue enqueue operation, also for plain
+ * queues. Return fast when not holding a scheduling context. */
+ if (odp_likely(sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED))
return 0;
+ if (sched_local.ordered.in_order)
+ return 0;
+
+ src_queue = sched_local.ordered.src_queue;
+ stash_num = sched_local.ordered.stash_num;
+ dst_qentry = qentry_from_handle(dst_queue);
+
if (ordered_own_turn(src_queue)) {
/* Own turn, so can do enqueue directly. */
sched_local.ordered.in_order = 1;
@@ -891,7 +896,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
for (i = 0; i < num_spread;) {
int num;
- int ordered;
+ uint8_t sync_ctx, ordered;
odp_queue_t handle;
ring_t *ring;
int pktin;
@@ -921,7 +926,8 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
continue;
}
- ordered = queue_is_ordered(qi);
+ sync_ctx = sched_sync_type(qi);
+ ordered = (sync_ctx == ODP_SCHED_SYNC_ORDERED);
/* When application's array is larger than max burst
* size, output all events directly there. Also, ordered
@@ -989,10 +995,12 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
/* Continue scheduling ordered queues */
ring_enq(ring, ring_mask, qi);
+ sched_local.sync_ctx = sync_ctx;
- } else if (queue_is_atomic(qi)) {
+ } else if (sync_ctx == ODP_SCHED_SYNC_ATOMIC) {
/* Hold queue during atomic access */
sched_local.stash.qi = qi;
+ sched_local.sync_ctx = sync_ctx;
} else {
/* Continue scheduling the queue */
ring_enq(ring, ring_mask, qi);
@@ -1042,7 +1050,11 @@ static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
return ret;
}
- schedule_release_context();
+ /* Release schedule context */
+ if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC)
+ release_atomic();
+ else if (sched_local.sync_ctx == ODP_SCHED_SYNC_ORDERED)
+ release_ordered();
if (odp_unlikely(sched_local.pause))
return 0;
@@ -1141,14 +1153,10 @@ static int schedule_multi(odp_queue_t *out_queue, uint64_t wait,
static inline void order_lock(void)
{
- uint32_t queue_index;
-
- queue_index = sched_local.ordered.src_queue;
-
- if (queue_index == NULL_INDEX)
+ if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED)
return;
- wait_for_order(queue_index);
+ wait_for_order(sched_local.ordered.src_queue);
}
static void order_unlock(void)
@@ -1160,6 +1168,9 @@ static void schedule_order_lock(uint32_t lock_index)
odp_atomic_u64_t *ord_lock;
uint32_t queue_index;
+ if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED)
+ return;
+
queue_index = sched_local.ordered.src_queue;
ODP_ASSERT(queue_index != NULL_INDEX &&
@@ -1187,6 +1198,9 @@ static void schedule_order_unlock(uint32_t lock_index)
odp_atomic_u64_t *ord_lock;
uint32_t queue_index;
+ if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED)
+ return;
+
queue_index = sched_local.ordered.src_queue;
ODP_ASSERT(queue_index != NULL_INDEX &&