@@ -58,6 +58,8 @@ union queue_entry_u {
int _odp_queue_deq(sched_elem_t *q, odp_buffer_hdr_t *buf_hdr[], int num);
int _odp_queue_deq_sc(sched_elem_t *q, odp_event_t *evp, int num);
int _odp_queue_deq_mc(sched_elem_t *q, odp_event_t *evp, int num);
+int _odp_queue_enq_sp(sched_elem_t *q, odp_buffer_hdr_t *buf_hdr[], int num);
+queue_entry_t *qentry_from_ext(odp_queue_t handle);
/* Round up memory size to next cache line size to
* align all memory addresses on cache line boundary.
@@ -14,9 +14,15 @@ extern "C" {
#include <odp/api/queue.h>
#include <odp_queue_if.h>
#include <odp/api/schedule.h>
+#include <odp_forward_typedefs_internal.h>
-typedef void (*schedule_pktio_start_fn_t)(int pktio_index, int num_in_queue,
- int in_queue_idx[]);
+/* Number of ordered locks per queue */
+#define SCHEDULE_ORDERED_LOCKS_PER_QUEUE 2
+
+typedef void (*schedule_pktio_start_fn_t)(int pktio_index,
+ int num_in_queue,
+ int in_queue_idx[],
+ odp_queue_t odpq[]);
typedef int (*schedule_thr_add_fn_t)(odp_schedule_group_t group, int thr);
typedef int (*schedule_thr_rem_fn_t)(odp_schedule_group_t group, int thr);
typedef int (*schedule_num_grps_fn_t)(void);
@@ -66,6 +72,7 @@ extern const schedule_fn_t *sched_fn;
/* Interface for the scheduler */
int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[]);
+int sched_cb_pktin_poll_one(int pktio_index, int rx_queue, odp_event_t evts[]);
void sched_cb_pktio_stop_finalize(int pktio_index);
odp_queue_t sched_cb_queue_handle(uint32_t queue_index);
void sched_cb_queue_destroy_finalize(uint32_t queue_index);
@@ -22,8 +22,12 @@
* constants, but not ODP_SCHED_PRIO_NUM. The current API for this
* is odp_schedule_num_prio(). The other schedulers also define
* this internally as NUM_PRIO.
+ *
+ * One additional priority level for idle pktin queues.
+ * This is only for internal use and not visible to the user.
*/
-#define ODP_SCHED_PRIO_NUM 8
+#define ODP_SCHED_PRIO_PKTIN 8
+#define ODP_SCHED_PRIO_NUM 9
typedef struct {
union {
@@ -55,14 +59,18 @@ typedef uint32_t ringidx_t;
#define ODP_NO_SCHED_QUEUE (ODP_SCHED_SYNC_ORDERED + 1)
typedef struct {
- struct llnode node; /* must be first */
+ struct llnode node;
sched_queue_t *schedq;
#ifdef CONFIG_QSCHST_LOCK
odp_ticketlock_t qschlock;
#endif
qschedstate_t qschst;
- uint16_t pop_deficit;
- uint16_t qschst_type;
+ uint8_t pop_deficit;
+ uint8_t qschst_type;
+ uint8_t pktio_idx;
+ uint8_t rx_queue;
+ uint16_t xoffset;
+ uint8_t sched_prio;
ringidx_t prod_read SPLIT_PC;
ringidx_t prod_write;
ringidx_t prod_mask;
@@ -80,6 +88,7 @@ typedef struct {
#define cons_ring prod_ring
#define cons_type qschst_type
#endif
+ odp_schedule_group_t sched_grp;
} sched_elem_t ODP_ALIGNED_CACHE;
/* Number of scheduling groups */
@@ -106,6 +115,8 @@ typedef struct {
typedef struct {
/* Atomic queue currently being processed or NULL */
sched_elem_t *atomq;
+ /* Schedq the currently processed queue was popped from */
+ sched_queue_t *src_schedq;
/* Current reorder context or NULL */
reorder_context_t *rctx;
uint8_t pause;
@@ -113,8 +124,6 @@ typedef struct {
uint8_t tidx;
uint8_t pad;
uint32_t dequeued; /* Number of events dequeued from atomic queue */
- uint16_t pktin_next; /* Next pktin tag to poll */
- uint16_t pktin_poll_cnts;
uint16_t ticket; /* Ticket for atomic queue or TICKET_INVALID */
uint16_t num_schedq;
uint16_t sg_sem; /* Set when sg_wanted is modified by other thread */
@@ -133,7 +142,7 @@ typedef struct {
void sched_update_enq(sched_elem_t *q, uint32_t actual);
void sched_update_enq_sp(sched_elem_t *q, uint32_t actual);
-sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp, uint32_t prio);
-void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio);
+sched_queue_t *sched_queue_add(odp_schedule_group_t grp, uint32_t prio);
+void sched_queue_rem(odp_schedule_group_t grp, uint32_t prio);
#endif /* ODP_SCHEDULE_SCALABLE_H */
@@ -115,6 +115,8 @@ reorder_window_t *rwin_alloc(_odp_ishm_pool_t *pool,
unsigned lock_count);
int rwin_free(_odp_ishm_pool_t *pool, reorder_window_t *rwin);
bool rwin_reserve(reorder_window_t *rwin, uint32_t *sn);
+bool rwin_reserve_sc(reorder_window_t *rwin, uint32_t *sn);
+void rwin_unreserve_sc(reorder_window_t *rwin, uint32_t sn);
void rctx_init(reorder_context_t *rctx, uint16_t idx,
reorder_window_t *rwin, uint32_t sn);
void rctx_release(reorder_context_t *rctx);
@@ -469,9 +469,11 @@ int odp_pktio_start(odp_pktio_t hdl)
unsigned i;
unsigned num = entry->s.num_in_queue;
int index[num];
+ odp_queue_t odpq[num];
for (i = 0; i < num; i++) {
index[i] = i;
+ odpq[i] = entry->s.in_queue[i].queue;
if (entry->s.in_queue[i].queue == ODP_QUEUE_INVALID) {
ODP_ERR("No input queue\n");
@@ -479,7 +481,7 @@ int odp_pktio_start(odp_pktio_t hdl)
}
}
- sched_fn->pktio_start(pktio_to_id(hdl), num, index);
+ sched_fn->pktio_start(pktio_to_id(hdl), num, index, odpq);
}
return res;
@@ -668,6 +670,49 @@ static int pktin_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], int num)
return nbr;
}
+int sched_cb_pktin_poll_one(int pktio_index,
+ int rx_queue,
+ odp_event_t evt_tbl[QUEUE_MULTI_MAX])
+{
+ int num_rx, num_pkts, i;
+ pktio_entry_t *entry = pktio_entry_by_index(pktio_index);
+ odp_packet_t pkt;
+ odp_packet_hdr_t *pkt_hdr;
+ odp_buffer_hdr_t *buf_hdr;
+ odp_packet_t packets[QUEUE_MULTI_MAX];
+ queue_t queue;
+
+ if (odp_unlikely(entry->s.state != PKTIO_STATE_STARTED)) {
+ if (entry->s.state < PKTIO_STATE_ACTIVE ||
+ entry->s.state == PKTIO_STATE_STOP_PENDING)
+ return -1;
+
+ ODP_DBG("interface not started\n");
+ return 0;
+ }
+
+ ODP_ASSERT((unsigned)rx_queue < entry->s.num_in_queue);
+ num_pkts = entry->s.ops->recv(entry, rx_queue,
+ packets, QUEUE_MULTI_MAX);
+
+ num_rx = 0;
+ for (i = 0; i < num_pkts; i++) {
+ pkt = packets[i];
+ pkt_hdr = odp_packet_hdr(pkt);
+ if (odp_unlikely(pkt_hdr->p.input_flags.dst_queue)) {
+ queue = pkt_hdr->dst_queue;
+ buf_hdr = packet_to_buf_hdr(pkt);
+ if (queue_fn->enq_multi(queue, &buf_hdr, 1) < 0)
+ /* Queue full? */
+ odp_packet_free(pkt);
+ /* TODO update pktio dropped packets stats */
+ } else {
+ evt_tbl[num_rx++] = odp_packet_to_event(pkt);
+ }
+ }
+ return num_rx;
+}
+
int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
{
odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
@@ -70,6 +70,11 @@ static queue_entry_t *get_qentry(uint32_t queue_id)
return &queue_tbl->queue[queue_id];
}
+queue_entry_t *qentry_from_ext(odp_queue_t handle)
+{
+ return get_qentry(queue_to_id(handle));
+}
+
static int _odp_queue_disable_enq(sched_elem_t *q)
{
ringidx_t old_read, old_write, new_write;
@@ -170,9 +175,12 @@ static int queue_init(queue_entry_t *queue, const char *name,
goto rwin_create_failed;
}
}
+ sched_elem->sched_grp = param->sched.group;
+ sched_elem->sched_prio = param->sched.prio;
sched_elem->schedq =
- schedq_from_sched_group(param->sched.group,
- param->sched.prio);
+ sched_queue_add(param->sched.group, param->sched.prio);
+ ODP_ASSERT(sched_elem->schedq != NULL);
+
}
return 0;
@@ -433,19 +441,20 @@ static int queue_destroy(odp_queue_t handle)
doze();
}
- /* Adjust the spread factor for the queues in the schedule group */
- if (queue->s.type == ODP_QUEUE_TYPE_SCHED)
- sched_group_xcount_dec(queue->s.param.sched.group,
- queue->s.param.sched.prio);
+ if (q->schedq != NULL) {
+ sched_queue_rem(q->sched_grp, q->sched_prio);
+ q->schedq = NULL;
+ }
_odp_ishm_pool_free(queue_shm_pool, q->prod_ring);
- if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
+ if (q->rwin != NULL) {
if (rwin_free(queue_shm_pool, q->rwin) < 0) {
ODP_ERR("Failed to free reorder window\n");
UNLOCK(&queue->s.lock);
return -1;
}
+ q->rwin = NULL;
}
queue->s.status = QUEUE_STATUS_FREE;
UNLOCK(&queue->s.lock);
@@ -554,11 +563,11 @@ static inline int _odp_queue_enq(sched_elem_t *q,
return actual;
}
-#else
+#endif
-static inline int _odp_queue_enq_sp(sched_elem_t *q,
- odp_buffer_hdr_t *buf_hdr[],
- int num)
+int _odp_queue_enq_sp(sched_elem_t *q,
+ odp_buffer_hdr_t *buf_hdr[],
+ int num)
{
ringidx_t old_read;
ringidx_t old_write;
@@ -602,7 +611,6 @@ static inline int _odp_queue_enq_sp(sched_elem_t *q,
return actual;
}
-#endif
static int _queue_enq_multi(queue_t handle, odp_buffer_hdr_t *buf_hdr[],
int num)
@@ -592,7 +592,7 @@ static inline void free_pktio_cmd(pktio_cmd_t *cmd)
}
static void schedule_pktio_start(int pktio_index, int num_pktin,
- int pktin_idx[])
+ int pktin_idx[], odp_queue_t odpq[] ODP_UNUSED)
{
int i, idx;
pktio_cmd_t *cmd;
@@ -574,7 +574,10 @@ static inline void free_pktio_cmd(pktio_cmd_t *cmd)
odp_rwlock_write_unlock(&sched->pktio_poll.lock);
}
-static void schedule_pktio_start(int pktio, int count, int pktin[])
+static void schedule_pktio_start(int pktio,
+ int count,
+ int pktin[],
+ odp_queue_t odpq[] ODP_UNUSED)
{
int i, index;
pktio_cmd_t *cmd;
@@ -40,42 +40,17 @@
#define LOCK(a) _odp_ticketlock_lock((a))
#define UNLOCK(a) _odp_ticketlock_unlock((a))
-#define TAG_EMPTY 0U
-#define TAG_USED (1U << 15)
-#define TAG_BUSY (1U << 31)
-#define PKTIO_QUEUE_2_TAG(p, q) ((p) << 16 | (q) | TAG_USED)
-#define TAG_2_PKTIO(t) (((t) >> 16) & 0x7FFF)
-#define TAG_2_QUEUE(t) ((t) & 0x7FFF)
-#define TAG_IS_READY(t) (((t) & (TAG_USED | TAG_BUSY)) == TAG_USED)
-#define PKTIN_MAX (ODP_CONFIG_PKTIO_ENTRIES * PKTIO_MAX_QUEUES)
#define MAXTHREADS ATOM_BITSET_SIZE
+#define FLAG_PKTIN 0x80
+
static _odp_ishm_pool_t *sched_shm_pool;
-static uint32_t pktin_num;
-static uint32_t pktin_hi;
-static uint16_t pktin_count[ODP_CONFIG_PKTIO_ENTRIES];
-static uint32_t pktin_tags[PKTIN_MAX] ODP_ALIGNED_CACHE;
-
-#define __atomic_fetch_max(var, v, mo) do { \
- /* Evalulate 'v' once */ \
- __typeof__(v) tmp_v = (v); \
- __typeof__(*var) old_var = \
- __atomic_load_n((var), __ATOMIC_RELAXED); \
- while (tmp_v > old_var) { \
- /* Attempt to store 'v' in '*var' */ \
- if (__atomic_compare_exchange_n((var), &old_var, \
- tmp_v, true, (mo), \
- (mo))) \
- break; \
- } \
- /* v <= old_var, nothing to do */ \
- } while (0)
-
-ODP_STATIC_ASSERT(ODP_SCHED_PRIO_LOWEST == (ODP_SCHED_PRIO_NUM - 1),
+
+ODP_STATIC_ASSERT(ODP_SCHED_PRIO_LOWEST == (ODP_SCHED_PRIO_NUM - 2),
"lowest_prio_does_not_match_with_num_prios");
ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
- (ODP_SCHED_PRIO_NORMAL < (ODP_SCHED_PRIO_NUM - 1)),
+ (ODP_SCHED_PRIO_NORMAL < (ODP_SCHED_PRIO_NUM - 2)),
"normal_prio_is_not_between_highest_and_lowest");
ODP_STATIC_ASSERT(CHECK_IS_POWER2(ODP_CONFIG_QUEUES),
@@ -99,9 +74,6 @@ static odp_spinlock_t sched_grp_lock;
static sched_scalable_thread_state_t thread_state[MAXTHREADS];
__thread sched_scalable_thread_state_t *sched_ts;
-/*
- * Forward declarations.
- */
static int thread_state_init(int tidx)
{
sched_scalable_thread_state_t *ts;
@@ -110,13 +82,12 @@ static int thread_state_init(int tidx)
ODP_ASSERT(tidx < MAXTHREADS);
ts = &thread_state[tidx];
ts->atomq = NULL;
+ ts->src_schedq = NULL;
ts->rctx = NULL;
ts->pause = false;
ts->out_of_order = false;
ts->tidx = tidx;
ts->dequeued = 0;
- ts->pktin_next = 0;
- ts->pktin_poll_cnts = 0;
ts->ticket = TICKET_INVALID;
ts->priv_rvec_free = 0;
ts->rvec_free = (1ULL << TS_RVEC_SIZE) - 1;
@@ -539,7 +510,25 @@ static inline void sched_update_popd(sched_elem_t *elem)
}
#endif
-sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp, uint32_t prio)
+static void signal_threads_add(sched_group_t *sg, uint32_t sgi, uint32_t prio)
+{
+ sched_group_mask_t thrds = sg->thr_wanted;
+ uint32_t thr;
+
+ while (!bitset_is_null(thrds)) {
+ thr = bitset_ffs(thrds) - 1;
+ thrds = bitset_clr(thrds, thr);
+ /* Notify the thread about membership in this
+ * group/priority.
+ */
+ atom_bitset_set(&thread_state[thr].sg_wanted[prio],
+ sgi, __ATOMIC_RELEASE);
+ __atomic_store_n(&thread_state[thr].sg_sem, 1,
+ __ATOMIC_RELEASE);
+ }
+}
+
+sched_queue_t *sched_queue_add(odp_schedule_group_t grp, uint32_t prio)
{
uint32_t sgi;
sched_group_t *sg;
@@ -561,26 +550,46 @@ sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp, uint32_t prio)
* Notify all threads in sg->thr_wanted that they
* should join.
*/
- sched_group_mask_t thrds = sg->thr_wanted;
+ signal_threads_add(sg, sgi, prio);
+ }
+ return &sg->schedq[prio * sg->xfactor + x % sg->xfactor];
+}
- while (!bitset_is_null(thrds)) {
- uint32_t thr;
+static uint32_t sched_pktin_add(odp_schedule_group_t grp, uint32_t prio)
+{
+ uint32_t sgi;
+ sched_group_t *sg;
- thr = bitset_ffs(thrds) - 1;
- thrds = bitset_clr(thrds, thr);
- /* Notify the thread about membership in this
- * group/priority.
- */
- atom_bitset_set(&thread_state[thr].sg_wanted[prio],
- sgi, __ATOMIC_RELEASE);
- __atomic_store_n(&thread_state[thr].sg_sem, 1,
- __ATOMIC_RELEASE);
- }
+ ODP_ASSERT(grp >= 0 && grp < (odp_schedule_group_t)MAX_SCHED_GROUP);
+ ODP_ASSERT((sg_free & (1ULL << grp)) == 0);
+ ODP_ASSERT(prio < ODP_SCHED_PRIO_NUM);
+
+ sgi = grp;
+ sg = sg_vec[sgi];
+
+ (void)sched_queue_add(grp, ODP_SCHED_PRIO_PKTIN);
+ return (ODP_SCHED_PRIO_PKTIN - prio) * sg->xfactor;
+}
+
+static void signal_threads_rem(sched_group_t *sg, uint32_t sgi, uint32_t prio)
+{
+ sched_group_mask_t thrds = sg->thr_wanted;
+ uint32_t thr;
+
+ while (!bitset_is_null(thrds)) {
+ thr = bitset_ffs(thrds) - 1;
+ thrds = bitset_clr(thrds, thr);
+ /* Notify the thread about membership in this
+ * group/priority.
+ */
+ atom_bitset_clr(&thread_state[thr].sg_wanted[prio],
+ sgi, __ATOMIC_RELEASE);
+ __atomic_store_n(&thread_state[thr].sg_sem, 1,
+ __ATOMIC_RELEASE);
}
- return &sg->schedq[prio * sg->xfactor + x % sg->xfactor];
}
-void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio)
+void sched_queue_rem(odp_schedule_group_t grp, uint32_t prio)
{
uint32_t sgi;
sched_group_t *sg;
@@ -592,77 +601,84 @@ void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio)
sgi = grp;
sg = sg_vec[sgi];
- x = __atomic_sub_fetch(&sg->xcount[prio], 1, __ATOMIC_RELAXED);
+ x = __atomic_sub_fetch(&sg->xcount[prio], 1, __ATOMIC_RELAXED);
if (x == 0) {
/* Last ODP queue for this priority
* Notify all threads in sg->thr_wanted that they
* should leave.
*/
- sched_group_mask_t thrds = sg->thr_wanted;
+ signal_threads_rem(sg, sgi, prio);
+ }
+}
- while (!bitset_is_null(thrds)) {
- uint32_t thr;
+static void sched_pktin_rem(odp_schedule_group_t grp)
+{
+ sched_queue_rem(grp, ODP_SCHED_PRIO_PKTIN);
+}
- thr = bitset_ffs(thrds) - 1;
- thrds = bitset_clr(thrds, thr);
- /* Notify the thread about membership in this
- * group/priority.
+static void update_sg_add(sched_scalable_thread_state_t *ts,
+ uint32_t p,
+ sched_group_mask_t sg_wanted)
+{
+ sched_group_mask_t added;
+ uint32_t sgi;
+ sched_group_t *sg;
+ uint32_t x;
+
+ added = bitset_andn(sg_wanted, ts->sg_actual[p]);
+ while (!bitset_is_null(added)) {
+ sgi = bitset_ffs(added) - 1;
+ sg = sg_vec[sgi];
+ for (x = 0; x < sg->xfactor; x++) {
+ /* Include our thread index to shift
+ * (rotate) the order of schedq's
*/
- atom_bitset_clr(&thread_state[thr].sg_wanted[prio],
- sgi, __ATOMIC_RELEASE);
- __atomic_store_n(&thread_state[thr].sg_sem, 1,
- __ATOMIC_RELEASE);
+ insert_schedq_in_list(ts,
+ &sg->schedq[p * sg->xfactor +
+ (x + ts->tidx) % sg->xfactor]);
}
+ atom_bitset_set(&sg->thr_actual[p], ts->tidx, __ATOMIC_RELAXED);
+ added = bitset_clr(added, sgi);
+
}
}
-static void update_sg_membership(sched_scalable_thread_state_t *ts)
+static void update_sg_rem(sched_scalable_thread_state_t *ts,
+ uint32_t p,
+ sched_group_mask_t sg_wanted)
{
- uint32_t p;
- sched_group_mask_t sg_wanted;
- sched_group_mask_t added;
sched_group_mask_t removed;
uint32_t sgi;
sched_group_t *sg;
uint32_t x;
+ removed = bitset_andn(ts->sg_actual[p], sg_wanted);
+ while (!bitset_is_null(removed)) {
+ sgi = bitset_ffs(removed) - 1;
+ sg = sg_vec[sgi];
+ for (x = 0; x < sg->xfactor; x++) {
+ remove_schedq_from_list(ts,
+ &sg->schedq[p *
+ sg->xfactor + x]);
+ }
+ atom_bitset_clr(&sg->thr_actual[p], ts->tidx, __ATOMIC_RELAXED);
+ removed = bitset_clr(removed, sgi);
+ }
+}
+
+static void update_sg_membership(sched_scalable_thread_state_t *ts)
+{
+ uint32_t p;
+ sched_group_mask_t sg_wanted;
+
for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
sg_wanted = atom_bitset_load(&ts->sg_wanted[p],
__ATOMIC_ACQUIRE);
if (!bitset_is_eql(ts->sg_actual[p], sg_wanted)) {
/* Our sched_group membership has changed */
- added = bitset_andn(sg_wanted, ts->sg_actual[p]);
- while (!bitset_is_null(added)) {
- sgi = bitset_ffs(added) - 1;
- sg = sg_vec[sgi];
- for (x = 0; x < sg->xfactor; x++) {
- /* Include our thread index to shift
- * (rotate) the order of schedq's
- */
- insert_schedq_in_list
- (ts,
- &sg->schedq[p * sg->xfactor +
- (x + ts->tidx) % sg->xfactor]);
- }
- atom_bitset_set(&sg->thr_actual[p], ts->tidx,
- __ATOMIC_RELAXED);
- added = bitset_clr(added, sgi);
- }
- removed = bitset_andn(ts->sg_actual[p], sg_wanted);
- while (!bitset_is_null(removed)) {
- sgi = bitset_ffs(removed) - 1;
- sg = sg_vec[sgi];
- for (x = 0; x < sg->xfactor; x++) {
- remove_schedq_from_list
- (ts,
- &sg->schedq[p *
- sg->xfactor + x]);
- }
- atom_bitset_clr(&sg->thr_actual[p], ts->tidx,
- __ATOMIC_RELAXED);
- removed = bitset_clr(removed, sgi);
- }
+ update_sg_add(ts, p, sg_wanted);
+ update_sg_rem(ts, p, sg_wanted);
ts->sg_actual[p] = sg_wanted;
}
}
@@ -693,61 +709,180 @@ static inline void _schedule_release_ordered(sched_scalable_thread_state_t *ts)
ts->rctx = NULL;
}
-static void pktin_poll(sched_scalable_thread_state_t *ts)
+static uint16_t poll_count[ODP_CONFIG_PKTIO_ENTRIES];
+
+static void pktio_start(int pktio_idx,
+ int num_in_queue,
+ int in_queue_idx[],
+ odp_queue_t odpq[])
{
- uint32_t i, tag, hi, npolls = 0;
- int pktio_index, queue_index;
+ int i, rxq;
+ queue_entry_t *qentry;
+ sched_elem_t *elem;
- hi = __atomic_load_n(&pktin_hi, __ATOMIC_RELAXED);
- if (hi == 0)
- return;
+ ODP_ASSERT(pktio_idx < ODP_CONFIG_PKTIO_ENTRIES);
+ for (i = 0; i < num_in_queue; i++) {
+ rxq = in_queue_idx[i];
+ ODP_ASSERT(rxq < PKTIO_MAX_QUEUES);
+ __atomic_fetch_add(&poll_count[pktio_idx], 1, __ATOMIC_RELAXED);
+ qentry = qentry_from_ext(odpq[i]);
+ elem = &qentry->s.sched_elem;
+ elem->cons_type |= FLAG_PKTIN; /* Set pktin queue flag */
+ elem->pktio_idx = pktio_idx;
+ elem->rx_queue = rxq;
+ elem->xoffset = sched_pktin_add(elem->sched_grp,
+ elem->sched_prio);
+ ODP_ASSERT(elem->schedq != NULL);
+ schedq_push(elem->schedq, elem);
+ }
+}
- for (i = ts->pktin_next; npolls != hi; i = (i + 1) % hi, npolls++) {
- tag = __atomic_load_n(&pktin_tags[i], __ATOMIC_RELAXED);
- if (!TAG_IS_READY(tag))
- continue;
- if (!__atomic_compare_exchange_n(&pktin_tags[i], &tag,
- tag | TAG_BUSY,
- true,
- __ATOMIC_ACQUIRE,
- __ATOMIC_RELAXED))
- continue;
- /* Tag grabbed */
- pktio_index = TAG_2_PKTIO(tag);
- queue_index = TAG_2_QUEUE(tag);
- if (odp_unlikely(sched_cb_pktin_poll(pktio_index,
- 1, &queue_index))) {
- /* Pktio stopped or closed
- * Remove tag from pktin_tags
- */
- __atomic_store_n(&pktin_tags[i],
- TAG_EMPTY, __ATOMIC_RELAXED);
- __atomic_fetch_sub(&pktin_num,
- 1, __ATOMIC_RELEASE);
- /* Call stop_finalize when all queues
- * of the pktio have been removed
- */
- if (__atomic_sub_fetch(&pktin_count[pktio_index], 1,
- __ATOMIC_RELAXED) == 0)
- sched_cb_pktio_stop_finalize(pktio_index);
- } else {
- /* We don't know whether any packets were found and enqueued
- * Write back original tag value to release pktin queue
- */
- __atomic_store_n(&pktin_tags[i], tag, __ATOMIC_RELAXED);
- /* Do not iterate through all pktin queues every time */
- if ((ts->pktin_poll_cnts & 0xf) != 0)
- break;
+static void pktio_stop(sched_elem_t *elem)
+{
+ elem->cons_type &= ~FLAG_PKTIN; /* Clear pktin queue flag */
+ sched_pktin_rem(elem->sched_grp);
+ if (__atomic_sub_fetch(&poll_count[elem->pktio_idx],
+ 1, __ATOMIC_RELAXED) == 0) {
+ /* Call stop_finalize when all queues
+ * of the pktio have been removed */
+ sched_cb_pktio_stop_finalize(elem->pktio_idx);
+ }
+}
+
+static bool have_reorder_ctx(sched_scalable_thread_state_t *ts)
+{
+ if (odp_unlikely(bitset_is_null(ts->priv_rvec_free))) {
+ ts->priv_rvec_free = atom_bitset_xchg(&ts->rvec_free, 0,
+ __ATOMIC_RELAXED);
+ if (odp_unlikely(bitset_is_null(ts->priv_rvec_free))) {
+ /* No free reorder contexts for this thread */
+ return false;
+ }
+ }
+ return true;
+}
+
+static inline bool is_pktin(sched_elem_t *elem)
+{
+ return (elem->cons_type & FLAG_PKTIN) != 0;
+}
+
+static inline bool is_atomic(sched_elem_t *elem)
+{
+ return elem->cons_type == (ODP_SCHED_SYNC_ATOMIC | FLAG_PKTIN);
+}
+
+static inline bool is_ordered(sched_elem_t *elem)
+{
+ return elem->cons_type == (ODP_SCHED_SYNC_ORDERED | FLAG_PKTIN);
+}
+
+static int poll_pktin(sched_elem_t *elem, odp_event_t ev[], int num_evts)
+{
+ sched_scalable_thread_state_t *ts = sched_ts;
+ int num, i;
+ /* For ordered queues only */
+ reorder_context_t *rctx;
+ reorder_window_t *rwin = NULL;
+ uint32_t sn;
+ uint32_t idx;
+
+ if (is_ordered(elem)) {
+ /* Need reorder context and slot in reorder window */
+ rwin = queue_get_rwin((queue_entry_t *)elem);
+ ODP_ASSERT(rwin != NULL);
+ if (odp_unlikely(!have_reorder_ctx(ts) ||
+ !rwin_reserve_sc(rwin, &sn))) {
+ /* Put back queue on source schedq */
+ schedq_push(ts->src_schedq, elem);
+ return 0;
+ }
+ /* Slot in reorder window reserved! */
+ }
+
+ /* Try to dequeue events from the ingress queue itself */
+ num = _odp_queue_deq_sc(elem, ev, num_evts);
+ if (odp_likely(num > 0)) {
+events_dequeued:
+ if (is_atomic(elem)) {
+ ts->atomq = elem; /* Remember */
+ ts->dequeued += num;
+ /* Don't push atomic queue on schedq */
+ } else /* Parallel or ordered */ {
+ if (is_ordered(elem)) {
+ /* Find and initialise an unused reorder
+ * context. */
+ idx = bitset_ffs(ts->priv_rvec_free) - 1;
+ ts->priv_rvec_free =
+ bitset_clr(ts->priv_rvec_free, idx);
+ rctx = &ts->rvec[idx];
+ rctx_init(rctx, idx, rwin, sn);
+ /* Are we in-order or out-of-order? */
+ ts->out_of_order = sn != rwin->hc.head;
+ ts->rctx = rctx;
+ }
+ schedq_push(elem->schedq, elem);
+ }
+ return num;
+ }
+
+ /* Ingress queue empty => poll pktio RX queue */
+ odp_event_t rx_evts[QUEUE_MULTI_MAX];
+ int num_rx = sched_cb_pktin_poll_one(elem->pktio_idx,
+ elem->rx_queue,
+ rx_evts);
+ if (odp_likely(num_rx > 0)) {
+ num = num_rx < num_evts ? num_rx : num_evts;
+ for (i = 0; i < num; i++) {
+ /* Return events directly to caller */
+ ev[i] = rx_evts[i];
}
+ if (num_rx > num) {
+ /* Events remain, enqueue them */
+ odp_buffer_hdr_t *bufs[QUEUE_MULTI_MAX];
+
+ for (i = num; i < num_rx; i++)
+ bufs[i] =
+ (odp_buffer_hdr_t *)(void *)rx_evts[i];
+ i = _odp_queue_enq_sp(elem, &bufs[num], num_rx - num);
+ /* Enqueue must succeed as the queue was empty */
+ ODP_ASSERT(i == num_rx - num);
+ }
+ goto events_dequeued;
+ }
+ /* No packets received, reset state and undo side effects */
+ if (is_atomic(elem))
+ ts->atomq = NULL;
+ else if (is_ordered(elem))
+ rwin_unreserve_sc(rwin, sn);
+
+ if (odp_likely(num_rx == 0)) {
+ /* RX queue empty, push it to pktin priority schedq */
+ sched_queue_t *schedq = ts->src_schedq;
+ /* Check if queue came from the designated schedq */
+ if (schedq == elem->schedq) {
+ /* Yes, add offset to the pktin priority level
+ * in order to get alternate schedq */
+ schedq += elem->xoffset;
+ }
+ /* Else no, queue must have come from alternate schedq */
+ schedq_push(schedq, elem);
+ } else /* num_rx < 0 => pktio stopped or closed */ {
+ /* Remove queue */
+ pktio_stop(elem);
+ /* Don't push queue to schedq */
}
- ODP_ASSERT(i < hi);
- ts->pktin_poll_cnts++;
- ts->pktin_next = i;
+
+ ODP_ASSERT(ts->atomq == NULL);
+ ODP_ASSERT(!ts->out_of_order);
+ ODP_ASSERT(ts->rctx == NULL);
+ return 0;
}
static int _schedule(odp_queue_t *from, odp_event_t ev[], int num_evts)
{
sched_scalable_thread_state_t *ts;
+ sched_elem_t *first;
sched_elem_t *atomq;
int num;
uint32_t i;
@@ -758,7 +893,23 @@ static int _schedule(odp_queue_t *from, odp_event_t ev[], int num_evts)
/* Once an atomic queue has been scheduled to a thread, it will stay
* on that thread until empty or 'rotated' by WRR
*/
- if (atomq != NULL) {
+ if (atomq != NULL && is_pktin(atomq)) {
+ /* Atomic pktin queue */
+ if (ts->dequeued < atomq->qschst.wrr_budget) {
+ ODP_ASSERT(ts->src_schedq != NULL);
+ num = poll_pktin(atomq, ev, num_evts);
+ if (odp_likely(num != 0)) {
+ if (from)
+ *from = queue_get_handle(
+ (queue_entry_t *)atomq);
+ return num;
+ }
+ } else {
+ /* WRR budget exhausted, move queue to end of schedq */
+ schedq_push(atomq->schedq, atomq);
+ }
+ ts->atomq = NULL;
+ } else if (atomq != NULL) {
ODP_ASSERT(ts->ticket != TICKET_INVALID);
#ifdef CONFIG_QSCHST_LOCK
LOCK(&atomq->qschlock);
@@ -794,10 +945,6 @@ static int _schedule(odp_queue_t *from, odp_event_t ev[], int num_evts)
#endif
}
- /* Release any previous reorder context. */
- if (ts->rctx != NULL)
- _schedule_release_ordered(ts);
-
/* Check for and perform any scheduler group updates. */
if (odp_unlikely(__atomic_load_n(&ts->sg_sem, __ATOMIC_RELAXED) != 0)) {
(void)__atomic_load_n(&ts->sg_sem, __ATOMIC_ACQUIRE);
@@ -806,7 +953,7 @@ static int _schedule(odp_queue_t *from, odp_event_t ev[], int num_evts)
}
/* Scan our schedq list from beginning to end */
- for (i = 0; i < ts->num_schedq; i++) {
+ for (i = 0, first = NULL; i < ts->num_schedq; i++, first = NULL) {
sched_queue_t *schedq = ts->schedq_list[i];
sched_elem_t *elem;
restart_same:
@@ -815,8 +962,24 @@ static int _schedule(odp_queue_t *from, odp_event_t ev[], int num_evts)
/* Schedq empty, look at next one. */
continue;
}
+ if (is_pktin(elem)) {
+ /* Pktio ingress queue */
+ if (first == NULL)
+ first = elem;
+ else if (elem == first) /* Wrapped around */
+ continue; /* Go to next schedq */
+
+ if (odp_unlikely(!schedq_cond_pop(schedq, elem)))
+ goto restart_same;
- if (elem->cons_type == ODP_SCHED_SYNC_ATOMIC) {
+ ts->src_schedq = schedq; /* Remember source schedq */
+ num = poll_pktin(elem, ev, num_evts);
+ if (odp_unlikely(num <= 0))
+ goto restart_same;
+ if (from)
+ *from = queue_get_handle((queue_entry_t *)elem);
+ return num;
+ } else if (elem->cons_type == ODP_SCHED_SYNC_ATOMIC) {
/* Dequeue element only if it is still at head
* of schedq.
*/
@@ -885,19 +1048,9 @@ static int _schedule(odp_queue_t *from, odp_event_t ev[], int num_evts)
* collect all outgoing events. Ensure there is at least
* one available reorder context.
*/
- if (odp_unlikely(bitset_is_null(ts->priv_rvec_free))) {
- ts->priv_rvec_free = atom_bitset_xchg(
- &ts->rvec_free, 0,
- __ATOMIC_RELAXED);
- if (odp_unlikely(bitset_is_null(
- ts->priv_rvec_free))) {
- /* No free reorder contexts for
- * this thread. Look at next schedq,
- * hope we find non-ordered queue.
- */
- continue;
- }
- }
+ if (odp_unlikely(!have_reorder_ctx(ts)))
+ continue;
+
/* rwin_reserve and odp_queue_deq must be atomic or
* there will be a potential race condition.
* Allocate a slot in the reorder window.
@@ -984,7 +1137,6 @@ static int _schedule(odp_queue_t *from, odp_event_t ev[], int num_evts)
}
}
- pktin_poll(ts);
return 0;
}
@@ -1066,6 +1218,10 @@ static int schedule_multi(odp_queue_t *from, uint64_t wait, odp_event_t ev[],
odp_time_t deadline;
ts = sched_ts;
+ /* Release any previous reorder context. */
+ if (ts->rctx != NULL)
+ _schedule_release_ordered(ts);
+
if (odp_unlikely(ts->pause)) {
if (ts->atomq != NULL) {
#ifdef CONFIG_QSCHST_LOCK
@@ -1078,8 +1234,6 @@ static int schedule_multi(odp_queue_t *from, uint64_t wait, odp_event_t ev[],
#ifdef CONFIG_QSCHST_LOCK
UNLOCK(&atomq->qschlock);
#endif
- } else if (ts->rctx != NULL) {
- _schedule_release_ordered(ts);
}
return 0;
}
@@ -1124,6 +1278,10 @@ static odp_event_t schedule(odp_queue_t *from, uint64_t wait)
odp_time_t deadline;
ts = sched_ts;
+ /* Release any previous reorder context. */
+ if (ts->rctx != NULL)
+ _schedule_release_ordered(ts);
+
if (odp_unlikely(ts->pause)) {
if (ts->atomq != NULL) {
#ifdef CONFIG_QSCHST_LOCK
@@ -1136,8 +1294,6 @@ static odp_event_t schedule(odp_queue_t *from, uint64_t wait)
#ifdef CONFIG_QSCHST_LOCK
UNLOCK(&atomq->qschlock);
#endif
- } else if (ts->rctx != NULL) {
- _schedule_release_ordered(ts);
}
return ev;
}
@@ -1190,7 +1346,7 @@ static uint64_t schedule_wait_time(uint64_t ns)
static int schedule_num_prio(void)
{
- return ODP_SCHED_PRIO_NUM;
+ return ODP_SCHED_PRIO_NUM - 1; /* Discount the pktin priority level */
}
static int schedule_group_update(sched_group_t *sg,
@@ -1387,7 +1543,7 @@ static int schedule_group_destroy(odp_schedule_group_t group)
ret = -1;
goto thrd_q_present_in_group;
}
- if (sg->xcount[p] != 0) {
+ if (p != ODP_SCHED_PRIO_PKTIN && sg->xcount[p] != 0) {
ODP_ERR("Group has queues\n");
ret = -1;
goto thrd_q_present_in_group;
@@ -1783,51 +1939,6 @@ static int schedule_term_local(void)
return rc;
}
-static void pktio_start(int pktio_index, int num_in_queue, int in_queue_idx[])
-{
- int i;
- uint32_t old, tag, j;
-
- for (i = 0; i < num_in_queue; i++) {
- /* Try to reserve a slot */
- if (__atomic_fetch_add(&pktin_num,
- 1, __ATOMIC_RELAXED) >= PKTIN_MAX) {
- __atomic_fetch_sub(&pktin_num, 1, __ATOMIC_RELAXED);
- ODP_ABORT("Too many pktio in queues for scheduler\n");
- }
- /* A slot has been reserved, now we need to find an empty one */
- for (j = 0; ; j = (j + 1) % PKTIN_MAX) {
- if (__atomic_load_n(&pktin_tags[j],
- __ATOMIC_RELAXED) != TAG_EMPTY)
- /* Slot used, continue with next */
- continue;
- /* Empty slot found */
- old = TAG_EMPTY;
- tag = PKTIO_QUEUE_2_TAG(pktio_index, in_queue_idx[i]);
- if (__atomic_compare_exchange_n(&pktin_tags[j],
- &old,
- tag,
- true,
- __ATOMIC_RELEASE,
- __ATOMIC_RELAXED)) {
- /* Success grabbing slot,update high
- * watermark
- */
- __atomic_fetch_max(&pktin_hi,
- j + 1, __ATOMIC_RELAXED);
- /* One more tag (queue) for this pktio
- * instance
- */
- __atomic_fetch_add(&pktin_count[pktio_index],
- 1, __ATOMIC_RELAXED);
- /* Continue with next RX queue */
- break;
- }
- /* Failed to grab slot */
- }
- }
-}
-
static int num_grps(void)
{
return MAX_SCHED_GROUP;
@@ -72,6 +72,32 @@ bool rwin_reserve(reorder_window_t *rwin, uint32_t *sn)
return true;
}
+bool rwin_reserve_sc(reorder_window_t *rwin, uint32_t *sn)
+{
+ uint32_t head;
+ uint32_t oldt;
+ uint32_t newt;
+ uint32_t winmask;
+
+ /* Read head and tail separately */
+ oldt = rwin->tail;
+ winmask = rwin->winmask;
+ head = rwin->hc.head;
+ if (odp_unlikely(oldt - head >= winmask))
+ return false;
+ newt = oldt + 1;
+ rwin->tail = newt;
+ *sn = oldt;
+
+ return true;
+}
+
+void rwin_unreserve_sc(reorder_window_t *rwin, uint32_t sn)
+{
+ ODP_ASSERT(rwin->tail == sn + 1);
+ rwin->tail = sn;
+}
+
static void rwin_insert(reorder_window_t *rwin,
reorder_context_t *rctx,
uint32_t sn,
@@ -424,7 +424,10 @@ static int ord_enq_multi(queue_t q_int, void *buf_hdr[], int num,
return 0;
}
-static void pktio_start(int pktio_index, int num, int pktin_idx[])
+static void pktio_start(int pktio_index,
+ int num,
+ int pktin_idx[],
+ odp_queue_t odpq[] ODP_UNUSED)
{
int i;
sched_cmd_t *cmd;