@@ -82,7 +82,9 @@ int sched_cb_pktin_poll_one(int pktio_index, int rx_queue, odp_event_t evts[]);
void sched_cb_pktio_stop_finalize(int pktio_index);
odp_queue_t sched_cb_queue_handle(uint32_t queue_index);
void sched_cb_queue_destroy_finalize(uint32_t queue_index);
-int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int num);
+void sched_cb_queue_set_status(uint32_t queue_index, int status);
+int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int num,
+ int update_status);
int sched_cb_queue_empty(uint32_t queue_index);
/* API functions */
@@ -299,6 +299,17 @@ void sched_cb_queue_destroy_finalize(uint32_t queue_index)
UNLOCK(queue);
}
+void sched_cb_queue_set_status(uint32_t queue_index, int status)
+{
+ queue_entry_t *queue = get_qentry(queue_index);
+
+ LOCK(queue);
+
+ queue->s.status = status;
+
+ UNLOCK(queue);
+}
+
static int queue_destroy(odp_queue_t handle)
{
queue_entry_t *queue;
@@ -493,7 +504,7 @@ static int queue_enq(odp_queue_t handle, odp_event_t ev)
}
static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
- int num)
+ int num, int update_status)
{
int status_sync = sched_fn->status_sync;
int num_deq;
@@ -515,7 +526,7 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
if (num_deq == 0) {
/* Already empty queue */
- if (queue->s.status == QUEUE_STATUS_SCHED) {
+ if (update_status && queue->s.status == QUEUE_STATUS_SCHED) {
queue->s.status = QUEUE_STATUS_NOTSCHED;
if (status_sync)
@@ -542,7 +553,7 @@ static int queue_int_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[],
{
queue_entry_t *queue = qentry_from_int(q_int);
- return deq_multi(queue, buf_hdr, num);
+ return deq_multi(queue, buf_hdr, num, 0);
}
static odp_buffer_hdr_t *queue_int_deq(queue_t q_int)
@@ -551,7 +562,7 @@ static odp_buffer_hdr_t *queue_int_deq(queue_t q_int)
odp_buffer_hdr_t *buf_hdr = NULL;
int ret;
- ret = deq_multi(queue, &buf_hdr, 1);
+ ret = deq_multi(queue, &buf_hdr, 1, 0);
if (ret == 1)
return buf_hdr;
@@ -661,11 +672,12 @@ static int queue_info(odp_queue_t handle, odp_queue_info_t *info)
return 0;
}
-int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int num)
+int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int num,
+ int update_status)
{
queue_entry_t *qe = get_qentry(queue_index);
- return deq_multi(qe, (odp_buffer_hdr_t **)ev, num);
+ return deq_multi(qe, (odp_buffer_hdr_t **)ev, num, update_status);
}
int sched_cb_queue_empty(uint32_t queue_index)
@@ -50,39 +50,15 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
/* Size of poll weight table */
#define WEIGHT_TBL_SIZE ((QUEUES_PER_PRIO - 1) * PREFER_RATIO)
-/* Packet input poll cmd queues */
-#define PKTIO_CMD_QUEUES 4
-
-/* Mask for wrapping command queues */
-#define PKTIO_CMD_QUEUE_MASK (PKTIO_CMD_QUEUES - 1)
-
-/* Maximum number of packet input queues per command */
-#define MAX_PKTIN 16
-
/* Maximum number of packet IO interfaces */
#define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES
-/* Maximum number of pktio poll commands */
-#define NUM_PKTIO_CMD (MAX_PKTIN * NUM_PKTIO)
+/* Maximum pktin index. Needs to fit into 8 bits. */
+#define MAX_PKTIN_INDEX 255
/* Not a valid index */
#define NULL_INDEX ((uint32_t)-1)
-/* Not a valid poll command */
-#define PKTIO_CMD_INVALID NULL_INDEX
-
-/* Pktio command is free */
-#define PKTIO_CMD_FREE PKTIO_CMD_INVALID
-
-/* Packet IO poll queue ring size. In worst case, all pktios have all pktins
- * enabled and one poll command is created per pktin queue. The ring size must
- * be larger than or equal to NUM_PKTIO_CMD / PKTIO_CMD_QUEUES, so that it can
- * hold all poll commands in the worst case. */
-#define PKTIO_RING_SIZE (NUM_PKTIO_CMD / PKTIO_CMD_QUEUES)
-
-/* Mask for wrapping around pktio poll command index */
-#define PKTIO_RING_MASK (PKTIO_RING_SIZE - 1)
-
/* Priority queue ring size. In worst case, all event queues are scheduled
* queues and have the same priority. The ring size must be larger than or
* equal to ODP_CONFIG_QUEUES / QUEUES_PER_PRIO, so that it can hold all
@@ -90,7 +66,7 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
#define PRIO_QUEUE_RING_SIZE (ODP_CONFIG_QUEUES / QUEUES_PER_PRIO)
/* Mask for wrapping around priority queue index */
-#define PRIO_QUEUE_MASK (PRIO_QUEUE_RING_SIZE - 1)
+#define RING_MASK (PRIO_QUEUE_RING_SIZE - 1)
/* Priority queue empty, not a valid queue index. */
#define PRIO_QUEUE_EMPTY NULL_INDEX
@@ -103,15 +79,6 @@ ODP_STATIC_ASSERT(CHECK_IS_POWER2(ODP_CONFIG_QUEUES),
ODP_STATIC_ASSERT(CHECK_IS_POWER2(PRIO_QUEUE_RING_SIZE),
"Ring_size_is_not_power_of_two");
-/* Ring size must be power of two, so that PKTIO_RING_MASK can be used. */
-ODP_STATIC_ASSERT(CHECK_IS_POWER2(PKTIO_RING_SIZE),
- "pktio_ring_size_is_not_power_of_two");
-
-/* Number of commands queues must be power of two, so that PKTIO_CMD_QUEUE_MASK
- * can be used. */
-ODP_STATIC_ASSERT(CHECK_IS_POWER2(PKTIO_CMD_QUEUES),
- "pktio_cmd_queues_is_not_power_of_two");
-
/* Mask of queues per priority */
typedef uint8_t pri_mask_t;
@@ -150,7 +117,6 @@ typedef struct {
int index;
int pause;
uint16_t round;
- uint16_t pktin_polls;
uint32_t queue_index;
odp_queue_t queue;
odp_event_t ev_stash[MAX_DEQ];
@@ -183,24 +149,6 @@ typedef struct ODP_ALIGNED_CACHE {
} prio_queue_t;
-/* Packet IO queue */
-typedef struct ODP_ALIGNED_CACHE {
- /* Ring header */
- ring_t ring;
-
- /* Ring data: pktio poll command indexes */
- uint32_t cmd_index[PKTIO_RING_SIZE];
-
-} pktio_queue_t;
-
-/* Packet IO poll command */
-typedef struct {
- int pktio_index;
- int num_pktin;
- int pktin[MAX_PKTIN];
- uint32_t cmd_index;
-} pktio_cmd_t;
-
/* Order context of a queue */
typedef struct ODP_ALIGNED_CACHE {
/* Current ordered context id */
@@ -220,16 +168,6 @@ typedef struct {
prio_queue_t prio_q[NUM_SCHED_GRPS][NUM_PRIO][QUEUES_PER_PRIO];
- odp_spinlock_t poll_cmd_lock;
- /* Number of commands in a command queue */
- uint16_t num_pktio_cmd[PKTIO_CMD_QUEUES];
-
- /* Packet IO command queues */
- pktio_queue_t pktio_q[PKTIO_CMD_QUEUES];
-
- /* Packet IO poll commands */
- pktio_cmd_t pktio_cmd[NUM_PKTIO_CMD];
-
odp_shm_t shm;
uint32_t pri_count[NUM_PRIO][QUEUES_PER_PRIO];
@@ -244,22 +182,34 @@ typedef struct {
} sched_grp[NUM_SCHED_GRPS];
struct {
- int grp;
- int prio;
- int queue_per_prio;
- int sync;
- uint32_t order_lock_count;
+ uint8_t grp;
+ uint8_t prio;
+ uint8_t queue_per_prio;
+ uint8_t sync;
+ uint8_t order_lock_count;
+ uint8_t poll_pktin;
+ uint8_t pktio_index;
+ uint8_t pktin_index;
} queue[ODP_CONFIG_QUEUES];
struct {
- /* Number of active commands for a pktio interface */
- int num_cmd;
+ int num_pktin;
} pktio[NUM_PKTIO];
+ odp_spinlock_t pktio_lock;
order_context_t order[ODP_CONFIG_QUEUES];
} sched_global_t;
+/* Check that queue[] variables are large enough */
+ODP_STATIC_ASSERT(NUM_SCHED_GRPS <= 256, "Group_does_not_fit_8_bits");
+ODP_STATIC_ASSERT(NUM_PRIO <= 256, "Prio_does_not_fit_8_bits");
+ODP_STATIC_ASSERT(QUEUES_PER_PRIO <= 256,
+ "Queues_per_prio_does_not_fit_8_bits");
+ODP_STATIC_ASSERT(CONFIG_QUEUE_MAX_ORD_LOCKS <= 256,
+ "Ordered_lock_count_does_not_fit_8_bits");
+ODP_STATIC_ASSERT(NUM_PKTIO <= 256, "Pktio_index_does_not_fit_8_bits");
+
/* Global scheduler context */
static sched_global_t *sched;
@@ -337,16 +287,9 @@ static int schedule_init_global(void)
}
}
- odp_spinlock_init(&sched->poll_cmd_lock);
- for (i = 0; i < PKTIO_CMD_QUEUES; i++) {
- ring_init(&sched->pktio_q[i].ring);
-
- for (j = 0; j < PKTIO_RING_SIZE; j++)
- sched->pktio_q[i].cmd_index[j] = PKTIO_CMD_INVALID;
- }
-
- for (i = 0; i < NUM_PKTIO_CMD; i++)
- sched->pktio_cmd[i].cmd_index = PKTIO_CMD_FREE;
+ odp_spinlock_init(&sched->pktio_lock);
+ for (i = 0; i < NUM_PKTIO; i++)
+ sched->pktio[i].num_pktin = 0;
odp_spinlock_init(&sched->grp_lock);
odp_atomic_init_u32(&sched->grp_epoch, 0);
@@ -384,14 +327,14 @@ static int schedule_term_global(void)
ring_t *ring = &sched->prio_q[grp][i][j].ring;
uint32_t qi;
- while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) !=
+ while ((qi = ring_deq(ring, RING_MASK)) !=
RING_EMPTY) {
odp_event_t events[1];
int num;
num = sched_cb_queue_deq_multi(qi,
events,
- 1);
+ 1, 1);
if (num < 0)
queue_destroy_finalize(qi);
@@ -519,6 +462,9 @@ static int schedule_init_queue(uint32_t queue_index,
sched->queue[queue_index].queue_per_prio = queue_per_prio(queue_index);
sched->queue[queue_index].sync = sched_param->sync;
sched->queue[queue_index].order_lock_count = sched_param->lock_count;
+ sched->queue[queue_index].poll_pktin = 0;
+ sched->queue[queue_index].pktio_index = 0;
+ sched->queue[queue_index].pktin_index = 0;
odp_atomic_init_u64(&sched->order[queue_index].ctx, 0);
odp_atomic_init_u64(&sched->order[queue_index].next_ctx, 0);
@@ -554,88 +500,39 @@ static void schedule_destroy_queue(uint32_t queue_index)
ODP_ERR("queue reorder incomplete\n");
}
-static int poll_cmd_queue_idx(int pktio_index, int pktin_idx)
-{
- return PKTIO_CMD_QUEUE_MASK & (pktio_index ^ pktin_idx);
-}
-
-static inline pktio_cmd_t *alloc_pktio_cmd(void)
-{
- int i;
- pktio_cmd_t *cmd = NULL;
-
- odp_spinlock_lock(&sched->poll_cmd_lock);
-
- /* Find next free command */
- for (i = 0; i < NUM_PKTIO_CMD; i++) {
- if (sched->pktio_cmd[i].cmd_index == PKTIO_CMD_FREE) {
- cmd = &sched->pktio_cmd[i];
- cmd->cmd_index = i;
- break;
- }
- }
-
- odp_spinlock_unlock(&sched->poll_cmd_lock);
-
- return cmd;
-}
-
-static inline void free_pktio_cmd(pktio_cmd_t *cmd)
+static int schedule_sched_queue(uint32_t queue_index)
{
- odp_spinlock_lock(&sched->poll_cmd_lock);
-
- cmd->cmd_index = PKTIO_CMD_FREE;
+ int grp = sched->queue[queue_index].grp;
+ int prio = sched->queue[queue_index].prio;
+ int queue_per_prio = sched->queue[queue_index].queue_per_prio;
+ ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring;
- odp_spinlock_unlock(&sched->poll_cmd_lock);
+ ring_enq(ring, RING_MASK, queue_index);
+ return 0;
}
static void schedule_pktio_start(int pktio_index, int num_pktin,
- int pktin_idx[], odp_queue_t odpq[] ODP_UNUSED)
+ int pktin_idx[], odp_queue_t queue[])
{
- int i, idx;
- pktio_cmd_t *cmd;
-
- if (num_pktin > MAX_PKTIN)
- ODP_ABORT("Too many input queues for scheduler\n");
+ int i;
+ uint32_t qi;
- sched->pktio[pktio_index].num_cmd = num_pktin;
+ sched->pktio[pktio_index].num_pktin = num_pktin;
- /* Create a pktio poll command per queue */
for (i = 0; i < num_pktin; i++) {
+ qi = queue_to_index(queue[i]);
+ sched->queue[qi].poll_pktin = 1;
+ sched->queue[qi].pktio_index = pktio_index;
+ sched->queue[qi].pktin_index = pktin_idx[i];
- cmd = alloc_pktio_cmd();
-
- if (cmd == NULL)
- ODP_ABORT("Scheduler out of pktio commands\n");
-
- idx = poll_cmd_queue_idx(pktio_index, pktin_idx[i]);
+ ODP_ASSERT(pktin_idx[i] <= MAX_PKTIN_INDEX);
- odp_spinlock_lock(&sched->poll_cmd_lock);
- sched->num_pktio_cmd[idx]++;
- odp_spinlock_unlock(&sched->poll_cmd_lock);
-
- cmd->pktio_index = pktio_index;
- cmd->num_pktin = 1;
- cmd->pktin[0] = pktin_idx[i];
- ring_enq(&sched->pktio_q[idx].ring, PKTIO_RING_MASK,
- cmd->cmd_index);
+ /* Start polling */
+ sched_cb_queue_set_status(qi, QUEUE_STATUS_SCHED);
+ schedule_sched_queue(qi);
}
}
-static int schedule_pktio_stop(int pktio_index, int first_pktin)
-{
- int num;
- int idx = poll_cmd_queue_idx(pktio_index, first_pktin);
-
- odp_spinlock_lock(&sched->poll_cmd_lock);
- sched->num_pktio_cmd[idx]--;
- sched->pktio[pktio_index].num_cmd--;
- num = sched->pktio[pktio_index].num_cmd;
- odp_spinlock_unlock(&sched->poll_cmd_lock);
-
- return num;
-}
-
static void schedule_release_atomic(void)
{
uint32_t qi = sched_local.queue_index;
@@ -647,7 +544,7 @@ static void schedule_release_atomic(void)
ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring;
/* Release current atomic queue */
- ring_enq(ring, PRIO_QUEUE_MASK, qi);
+ ring_enq(ring, RING_MASK, qi);
sched_local.queue_index = PRIO_QUEUE_EMPTY;
}
}
@@ -797,6 +694,37 @@ static int schedule_ord_enq_multi(queue_t q_int, void *buf_hdr[],
return 1;
}
+static inline int queue_is_pktin(uint32_t queue_index)
+{
+ return sched->queue[queue_index].poll_pktin;
+}
+
+static inline int poll_pktin(uint32_t qi)
+{
+ int pktio_index, pktin_index, num, num_pktin;
+
+ pktio_index = sched->queue[qi].pktio_index;
+ pktin_index = sched->queue[qi].pktin_index;
+
+ num = sched_cb_pktin_poll(pktio_index, 1, &pktin_index);
+
+ /* Pktio stopped or closed. Call stop_finalize when we have stopped
+ * polling all pktin queues of the pktio. */
+ if (odp_unlikely(num < 0)) {
+ odp_spinlock_lock(&sched->pktio_lock);
+ sched->pktio[pktio_index].num_pktin--;
+ num_pktin = sched->pktio[pktio_index].num_pktin;
+ odp_spinlock_unlock(&sched->pktio_lock);
+
+ sched_cb_queue_set_status(qi, QUEUE_STATUS_NOTSCHED);
+
+ if (num_pktin == 0)
+ sched_cb_pktio_stop_finalize(pktio_index);
+ }
+
+ return num;
+}
+
static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
unsigned int max_num, int grp, int first)
{
@@ -820,6 +748,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
int ordered;
odp_queue_t handle;
ring_t *ring;
+ int pktin;
if (id >= QUEUES_PER_PRIO)
id = 0;
@@ -834,7 +763,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
/* Get queue index from the priority queue */
ring = &sched->prio_q[grp][prio][id].ring;
- qi = ring_deq(ring, PRIO_QUEUE_MASK);
+ qi = ring_deq(ring, RING_MASK);
/* Priority queue empty */
if (qi == RING_EMPTY) {
@@ -857,8 +786,10 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
if (ordered && max_num < MAX_DEQ)
max_deq = max_num;
+ pktin = queue_is_pktin(qi);
+
num = sched_cb_queue_deq_multi(qi, sched_local.ev_stash,
- max_deq);
+ max_deq, !pktin);
if (num < 0) {
/* Destroyed queue. Continue scheduling the same
@@ -868,6 +799,20 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
}
if (num == 0) {
+ /* Poll packet input. Continue scheduling queue
+ * connected to a packet input. Move to the next
+ * priority to avoid starvation of other
+ * priorities. Stop scheduling queue when pktio
+ * has been stopped. */
+ if (pktin) {
+ int num_pkt = poll_pktin(qi);
+
+ if (odp_likely(num_pkt >= 0)) {
+ ring_enq(ring, RING_MASK, qi);
+ break;
+ }
+ }
+
/* Remove empty queue from scheduling. Continue
* scheduling the same priority queue. */
continue;
@@ -890,14 +835,14 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
sched_local.ordered.src_queue = qi;
/* Continue scheduling ordered queues */
- ring_enq(ring, PRIO_QUEUE_MASK, qi);
+ ring_enq(ring, RING_MASK, qi);
} else if (queue_is_atomic(qi)) {
/* Hold queue during atomic access */
sched_local.queue_index = qi;
} else {
/* Continue scheduling the queue */
- ring_enq(ring, PRIO_QUEUE_MASK, qi);
+ ring_enq(ring, RING_MASK, qi);
}
/* Output the source queue handle */
@@ -919,7 +864,7 @@ static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
{
int i, num_grp;
int ret;
- int id, first, grp_id;
+ int first, grp_id;
uint16_t round;
uint32_t epoch;
@@ -972,66 +917,11 @@ static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
grp_id = 0;
}
- /*
- * Poll packet input when there are no events
- * * Each thread starts the search for a poll command from its
- * preferred command queue. If the queue is empty, it moves to other
- * queues.
- * * Most of the times, the search stops on the first command found to
- * optimize multi-threaded performance. A small portion of polls
- * have to do full iteration to avoid packet input starvation when
- * there are less threads than command queues.
- */
- id = sched_local.thr & PKTIO_CMD_QUEUE_MASK;
-
- for (i = 0; i < PKTIO_CMD_QUEUES; i++, id = ((id + 1) &
- PKTIO_CMD_QUEUE_MASK)) {
- ring_t *ring;
- uint32_t cmd_index;
- pktio_cmd_t *cmd;
-
- if (odp_unlikely(sched->num_pktio_cmd[id] == 0))
- continue;
-
- ring = &sched->pktio_q[id].ring;
- cmd_index = ring_deq(ring, PKTIO_RING_MASK);
-
- if (odp_unlikely(cmd_index == RING_EMPTY))
- continue;
-
- cmd = &sched->pktio_cmd[cmd_index];
-
- /* Poll packet input */
- if (odp_unlikely(sched_cb_pktin_poll(cmd->pktio_index,
- cmd->num_pktin,
- cmd->pktin))){
- /* Pktio stopped or closed. Remove poll command and call
- * stop_finalize when all commands of the pktio has
- * been removed. */
- if (schedule_pktio_stop(cmd->pktio_index,
- cmd->pktin[0]) == 0)
- sched_cb_pktio_stop_finalize(cmd->pktio_index);
-
- free_pktio_cmd(cmd);
- } else {
- /* Continue scheduling the pktio */
- ring_enq(ring, PKTIO_RING_MASK, cmd_index);
-
- /* Do not iterate through all pktin poll command queues
- * every time. */
- if (odp_likely(sched_local.pktin_polls & 0xf))
- break;
- }
- }
-
- sched_local.pktin_polls++;
return 0;
}
-
-static int schedule_loop(odp_queue_t *out_queue, uint64_t wait,
- odp_event_t out_ev[],
- unsigned int max_num)
+static inline int schedule_loop(odp_queue_t *out_queue, uint64_t wait,
+ odp_event_t out_ev[], unsigned int max_num)
{
odp_time_t next, wtime;
int first = 1;
@@ -1387,17 +1277,6 @@ static void schedule_prefetch(int num ODP_UNUSED)
{
}
-static int schedule_sched_queue(uint32_t queue_index)
-{
- int grp = sched->queue[queue_index].grp;
- int prio = sched->queue[queue_index].prio;
- int queue_per_prio = sched->queue[queue_index].queue_per_prio;
- ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring;
-
- ring_enq(ring, PRIO_QUEUE_MASK, queue_index);
- return 0;
-}
-
static int schedule_num_grps(void)
{
return NUM_SCHED_GRPS;
@@ -291,7 +291,7 @@ static int schedule_term_global(void)
odp_event_t events[1];
if (sched->availables[i])
- count = sched_cb_queue_deq_multi(i, events, 1);
+ count = sched_cb_queue_deq_multi(i, events, 1, 1);
if (count < 0)
sched_cb_queue_destroy_finalize(i);
@@ -1527,7 +1527,7 @@ static inline int consume_queue(int prio, unsigned int queue_index)
max = 1;
count = sched_cb_queue_deq_multi(
- queue_index, cache->stash, max);
+ queue_index, cache->stash, max, 1);
if (count < 0) {
DO_SCHED_UNLOCK();
@@ -559,7 +559,7 @@ static int schedule_multi(odp_queue_t *from, uint64_t wait,
}
qi = cmd->s.index;
- num = sched_cb_queue_deq_multi(qi, events, 1);
+ num = sched_cb_queue_deq_multi(qi, events, 1, 1);
if (num > 0) {
sched_local.cmd = cmd;