@@ -40,6 +40,8 @@ typedef enum {
struct pktio_entry {
odp_spinlock_t lock; /**< entry spinlock */
int taken; /**< is entry taken(1) or free(0) */
+ int cls_ena; /**< is classifier enabled */
cls_ena is not very descriptive clsfy_enable is better but still ugly
struct pktio_entry {
odp_spinlock_t lock; /**< entry spinlock */
int taken; /**< is entry taken(1) or free(0) */
odp_queue_t inq_default; /**< default input queue, if set */
odp_queue_t outq_default; /**< default out queue */
odp_queue_t loopq; /**< loopback queue for "loop" device */
odp_pktio_type_t type; /**< pktio type */
pkt_sock_t pkt_sock; /**< using socket API for IO */
pkt_sock_mmap_t pkt_sock_mmap; /**< using socket mmap API for IO */
classifier_t cls; /**< classifier linked with this pktio*/
char name[IFNAMSIZ]; /**< name of pktio provided to
pktio_open() */
odp_bool_t promisc; /**< promiscuous mode state */
};
It does not show in the diff, but the classifier is referenced as “cls” in the struct already.
+ odp_pktio_t handle; /**< pktio handle */
odp_queue_t inq_default; /**< default input queue, if set */
odp_queue_t outq_default; /**< default out queue */
odp_queue_t loopq; /**< loopback queue for "loop" device */
@@ -64,15 +66,22 @@ typedef struct {
extern void *pktio_entry_ptr[];
+static inline int pktio_to_id(odp_pktio_t pktio)
+{
+ return _odp_typeval(pktio) - 1;
+}
-static inline pktio_entry_t *get_pktio_entry(odp_pktio_t id)
+static inline pktio_entry_t *get_pktio_entry(odp_pktio_t pktio)
{
- if (odp_unlikely(id == ODP_PKTIO_INVALID ||
- _odp_typeval(id) > ODP_CONFIG_PKTIO_ENTRIES))
+ if (odp_unlikely(pktio == ODP_PKTIO_INVALID ||
+ _odp_typeval(pktio) > ODP_CONFIG_PKTIO_ENTRIES))
return NULL;
- return pktio_entry_ptr[_odp_typeval(id) - 1];
+ return pktio_entry_ptr[pktio_to_id(pktio)];
}
+
+int pktin_poll(pktio_entry_t *entry);
+
#ifdef __cplusplus
}
#endif
@@ -36,10 +36,11 @@ extern "C" {
#define QUEUE_MULTI_MAX 8
#define QUEUE_STATUS_FREE 0
-#define QUEUE_STATUS_READY 1
-#define QUEUE_STATUS_NOTSCHED 2
-#define QUEUE_STATUS_SCHED 3
-#define QUEUE_STATUS_DESTROYED 4
+#define QUEUE_STATUS_DESTROYED 1
+#define QUEUE_STATUS_READY 2
+#define QUEUE_STATUS_NOTSCHED 3
+#define QUEUE_STATUS_SCHED 4
+
/* forward declaration */
union queue_entry_u;
@@ -69,7 +70,8 @@ struct queue_entry_s {
deq_multi_func_t dequeue_multi;
odp_queue_t handle;
- odp_buffer_t sched_buf;
+ odp_queue_t pri_queue;
+ odp_event_t cmd_ev;
odp_queue_type_t type;
odp_queue_param_t param;
odp_pktio_t pktin;
@@ -100,7 +102,6 @@ int queue_deq_multi_destroy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
void queue_lock(queue_entry_t *queue);
void queue_unlock(queue_entry_t *queue);
-odp_buffer_t queue_sched_buf(odp_queue_t queue);
int queue_sched_atomic(odp_queue_t handle);
static inline uint32_t queue_to_id(odp_queue_t handle)
@@ -121,24 +122,23 @@ static inline queue_entry_t *queue_to_qentry(odp_queue_t handle)
return get_qentry(queue_id);
}
-static inline int queue_is_free(odp_queue_t handle)
+static inline int queue_is_atomic(queue_entry_t *qe)
{
- queue_entry_t *queue;
-
- queue = queue_to_qentry(handle);
+ return qe->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC;
+}
- return queue->s.status == QUEUE_STATUS_FREE;
+static inline odp_queue_t queue_handle(queue_entry_t *qe)
+{
+ return qe->s.handle;
}
-static inline int queue_is_sched(odp_queue_t handle)
+static inline int queue_prio(queue_entry_t *qe)
{
- queue_entry_t *queue;
+ return qe->s.param.sched.prio;
+}
- queue = queue_to_qentry(handle);
+void queue_destroy_finalize(queue_entry_t *qe);
- return ((queue->s.status == QUEUE_STATUS_SCHED) &&
- (queue->s.pktin != ODP_PKTIO_INVALID));
-}
#ifdef __cplusplus
}
#endif
@@ -16,12 +16,20 @@ extern "C" {
#include <odp/buffer.h>
#include <odp/queue.h>
+#include <odp/packet_io.h>
+#include <odp_queue_internal.h>
-void odp_schedule_mask_set(odp_queue_t queue, int prio);
-odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue);
+int schedule_queue_init(queue_entry_t *qe);
+void schedule_queue_destroy(queue_entry_t *qe);
-void odp_schedule_queue(odp_queue_t queue, int prio);
+static inline void schedule_queue(const queue_entry_t *qe)
+{
+ odp_queue_enq(qe->s.pri_queue, qe->s.cmd_ev);
+}
+
+
+int schedule_pktio_start(odp_pktio_t pktio, int prio);
#ifdef __cplusplus
@@ -142,6 +142,7 @@ static void unlock_entry_classifier(pktio_entry_t *entry)
static void init_pktio_entry(pktio_entry_t *entry)
{
set_taken(entry);
+ entry->s.cls_ena = 1; /* TODO: disable cls by default */
Needs to have a bug link, a todo that makes it into the repo is a known deficiency in the code for that published revision of the code.
This TODO highlight a point of new development. It’s a feature not a bug: classifier is now enabled by default, we should enable it only when actually used. Both ways work, but latter is cleaner. I’ll modify the comment.
entry->s.inq_default = ODP_QUEUE_INVALID;
memset(&entry->s.pkt_sock, 0, sizeof(entry->s.pkt_sock));
memset(&entry->s.pkt_sock_mmap, 0, sizeof(entry->s.pkt_sock_mmap));
@@ -273,6 +274,8 @@ static odp_pktio_t setup_pktio_entry(const char *dev, odp_pool_t pool)
unlock_entry_classifier(pktio_entry);
}
+ pktio_entry->s.handle = id;
+
return id;
}
@@ -475,19 +478,27 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t queue)
qentry = queue_to_qentry(queue);
- if (qentry->s.type != ODP_QUEUE_TYPE_PKTIN)
- return -1;
-
lock_entry(pktio_entry);
pktio_entry->s.inq_default = queue;
unlock_entry(pktio_entry);
- queue_lock(qentry);
- qentry->s.pktin = id;
- qentry->s.status = QUEUE_STATUS_SCHED;
- queue_unlock(qentry);
-
- odp_schedule_queue(queue, qentry->s.param.sched.prio);
+ switch (qentry->s.type) {
+ case ODP_QUEUE_TYPE_PKTIN:
+ /* User polls the input queue */
+ queue_lock(qentry);
+ qentry->s.pktin = id;
+ queue_unlock(qentry);
+ /*break; TODO: Uncomment and change _TYPE_PKTIN to _POLL*/
Needs to have a bug link, a todo that makes it into the repo is a known deficiency in the code
It’s not a bug. The new development documented here needs an API change: remove PKTIN type and use POLL/SCHED type instead. The API change will follow in another patch, this code structure prepares that change already. Anyway, I’ll modify the comment.
+ case ODP_QUEUE_TYPE_SCHED:
+ /* Packet input through the scheduler */
+ if (schedule_pktio_start(id, ODP_SCHED_PRIO_LOWEST)) {
+ ODP_ERR("Schedule pktio start failed\n");
+ return -1;
+ }
+ break;
+ default:
+ ODP_ABORT("Bad queue type\n");
If it permissible for an API to abort, I would say that is important enough to be described in the API docs as part of the expected and permissible behavior.
You would otherwise expect an error return code.
True. I’ll put the type check back on top.
+ }
return 0;
}
@@ -506,15 +517,6 @@ int odp_pktio_inq_remdef(odp_pktio_t id)
qentry = queue_to_qentry(queue);
queue_lock(qentry);
- if (qentry->s.status == QUEUE_STATUS_FREE) {
- queue_unlock(qentry);
- unlock_entry(pktio_entry);
- return -1;
- }
-
- qentry->s.enqueue = queue_enq_dummy;
- qentry->s.enqueue_multi = queue_enq_multi_dummy;
- qentry->s.status = QUEUE_STATUS_NOTSCHED;
qentry->s.pktin = ODP_PKTIO_INVALID;
queue_unlock(qentry);
@@ -665,6 +667,46 @@ int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num)
return nbr;
}
+int pktin_poll(pktio_entry_t *entry)
+{
+ odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
+ odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
+ int num, num_enq, i;
+
+ if (odp_unlikely(is_free(entry)))
+ return -1;
+
+ num = odp_pktio_recv(entry->s.handle, pkt_tbl, QUEUE_MULTI_MAX);
+
+ if (num < 0) {
+ ODP_ERR("Packet recv error\n");
+ return -1;
+ }
+
+ for (i = 0, num_enq = 0; i < num; ++i) {
+ odp_buffer_t buf;
+ odp_buffer_hdr_t *hdr;
+
+ buf = _odp_packet_to_buffer(pkt_tbl[i]);
+ hdr = odp_buf_to_hdr(buf);
+
+ if (entry->s.cls_ena) {
+ if (packet_classifier(entry->s.handle, pkt_tbl[i]) < 0)
+ hdr_tbl[num_enq++] = hdr;
+ } else {
+ hdr_tbl[num_enq++] = hdr;
+ }
+ }
+
+ if (num_enq) {
+ queue_entry_t *qentry;
+ qentry = queue_to_qentry(entry->s.inq_default);
+ queue_enq_multi(qentry, hdr_tbl, num_enq);
+ }
+
+ return 0;
+}
+
/** function should be called with locked entry */
static int sockfd_from_pktio_entry(pktio_entry_t *entry)
{
@@ -88,7 +88,9 @@ static void queue_init(queue_entry_t *queue, const char *name,
queue->s.head = NULL;
queue->s.tail = NULL;
- queue->s.sched_buf = ODP_BUFFER_INVALID;
+
+ queue->s.pri_queue = ODP_QUEUE_INVALID;
+ queue->s.cmd_ev = ODP_EVENT_INVALID;
}
@@ -222,22 +224,26 @@ odp_queue_t odp_queue_create(const char *name, odp_queue_type_t type,
if (handle != ODP_QUEUE_INVALID &&
(type == ODP_QUEUE_TYPE_SCHED || type == ODP_QUEUE_TYPE_PKTIN)) {
- odp_buffer_t buf;
-
- buf = odp_schedule_buffer_alloc(handle);
- if (buf == ODP_BUFFER_INVALID) {
- queue->s.status = QUEUE_STATUS_FREE;
- ODP_ERR("queue_init: sched buf alloc failed\n");
+ if (schedule_queue_init(queue)) {
+ ODP_ERR("schedule queue init failed\n");
return ODP_QUEUE_INVALID;
}
-
- queue->s.sched_buf = buf;
- odp_schedule_mask_set(handle, queue->s.param.sched.prio);
}
return handle;
}
+void queue_destroy_finalize(queue_entry_t *queue)
+{
+ LOCK(&queue->s.lock);
+
+ if (queue->s.status == QUEUE_STATUS_DESTROYED) {
+ queue->s.status = QUEUE_STATUS_FREE;
+ schedule_queue_destroy(queue);
+ }
+ UNLOCK(&queue->s.lock);
+}
+
int odp_queue_destroy(odp_queue_t handle)
{
queue_entry_t *queue;
@@ -246,41 +252,31 @@ int odp_queue_destroy(odp_queue_t handle)
LOCK(&queue->s.lock);
if (queue->s.status == QUEUE_STATUS_FREE) {
UNLOCK(&queue->s.lock);
- ODP_ERR("queue_destroy: queue \"%s\" already free\n",
- queue->s.name<http://s.name>);
+ ODP_ERR("queue \"%s\" already free\n", queue->s.name<http://s.name>);
+ return -1;
+ }
+ if (queue->s.status == QUEUE_STATUS_DESTROYED) {
+ UNLOCK(&queue->s.lock);
+ ODP_ERR("queue \"%s\" already destroyed\n", queue->s.name<http://s.name>);
return -1;
}
if (queue->s.head != NULL) {
UNLOCK(&queue->s.lock);
- ODP_ERR("queue_destroy: queue \"%s\" not empty\n",
- queue->s.name<http://s.name>);
+ ODP_ERR("queue \"%s\" not empty\n", queue->s.name<http://s.name>);
return -1;
}
- queue->s.enqueue = queue_enq_dummy;
- queue->s.enqueue_multi = queue_enq_multi_dummy;
-
switch (queue->s.status) {
case QUEUE_STATUS_READY:
queue->s.status = QUEUE_STATUS_FREE;
- queue->s.head = NULL;
- queue->s.tail = NULL;
+ break;
+ case QUEUE_STATUS_NOTSCHED:
+ queue->s.status = QUEUE_STATUS_FREE;
+ schedule_queue_destroy(queue);
break;
case QUEUE_STATUS_SCHED:
- /*
- * Override dequeue_multi to destroy queue when it will
- * be scheduled next time.
- */
+ /* Queue is still in scheduling */
queue->s.status = QUEUE_STATUS_DESTROYED;
- queue->s.dequeue_multi = queue_deq_multi_destroy;
- break;
- case QUEUE_STATUS_NOTSCHED:
- /* Queue won't be scheduled anymore */
- odp_buffer_free(queue->s.sched_buf);
- queue->s.sched_buf = ODP_BUFFER_INVALID;
- queue->s.status = QUEUE_STATUS_FREE;
- queue->s.head = NULL;
- queue->s.tail = NULL;
break;
default:
ODP_ABORT("Unexpected queue status\n");
If it permissible for an API to abort, I would say that is important enough to be described in the API docs as part of the expected and permissible behavior.
You would otherwise expect an error return code
This line was not changed and it’s doing the right thing. Unknown queue status == internal error => crash.
@@ -290,23 +286,6 @@ int odp_queue_destroy(odp_queue_t handle)
return 0;
}
-odp_buffer_t queue_sched_buf(odp_queue_t handle)
-{
- queue_entry_t *queue;
- queue = queue_to_qentry(handle);
-
- return queue->s.sched_buf;
-}
-
-
-int queue_sched_atomic(odp_queue_t handle)
-{
- queue_entry_t *queue;
- queue = queue_to_qentry(handle);
-
- return queue->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC;
-}
-
int odp_queue_set_context(odp_queue_t handle, void *context)
{
queue_entry_t *queue;
@@ -352,6 +331,12 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
int sched = 0;
LOCK(&queue->s.lock);
+ if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&queue->s.lock);
+ ODP_ERR("Bad queue status\n");
+ return -1;
+ }
+
if (queue->s.head == NULL) {
/* Empty queue */
queue->s.head = buf_hdr;
@@ -370,8 +355,8 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
UNLOCK(&queue->s.lock);
/* Add queue to scheduling */
- if (sched == 1)
- odp_schedule_queue(queue->s.handle, queue->s.param.sched.prio);
+ if (sched)
+ schedule_queue(queue);
return 0;
}
@@ -389,6 +374,12 @@ int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
buf_hdr[num-1]->next = NULL;
LOCK(&queue->s.lock);
+ if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&queue->s.lock);
+ ODP_ERR("Bad queue status\n");
+ return -1;
+ }
+
/* Empty queue */
if (queue->s.head == NULL)
queue->s.head = buf_hdr[0];
@@ -404,25 +395,12 @@ int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
UNLOCK(&queue->s.lock);
/* Add queue to scheduling */
- if (sched == 1)
- odp_schedule_queue(queue->s.handle, queue->s.param.sched.prio);
+ if (sched)
+ schedule_queue(queue);
return num; /* All events enqueued */
}
-int queue_enq_dummy(queue_entry_t *queue ODP_UNUSED,
- odp_buffer_hdr_t *buf_hdr ODP_UNUSED)
-{
- return -1;
-}
-
-int queue_enq_multi_dummy(queue_entry_t *queue ODP_UNUSED,
- odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED,
- int num ODP_UNUSED)
-{
- return -1;
-}
-
int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
{
odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
@@ -455,24 +433,26 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
{
- odp_buffer_hdr_t *buf_hdr = NULL;
+ odp_buffer_hdr_t *buf_hdr;
LOCK(&queue->s.lock);
if (queue->s.head == NULL) {
/* Already empty queue */
- if (queue->s.status == QUEUE_STATUS_SCHED &&
- queue->s.type != ODP_QUEUE_TYPE_PKTIN)
+ if (queue->s.status == QUEUE_STATUS_SCHED)
queue->s.status = QUEUE_STATUS_NOTSCHED;
- } else {
- buf_hdr = queue->s.head;
- queue->s.head = buf_hdr->next;
- buf_hdr->next = NULL;
- if (queue->s.head == NULL) {
- /* Queue is now empty */
- queue->s.tail = NULL;
- }
+ UNLOCK(&queue->s.lock);
+ return NULL;
+ }
+
+ buf_hdr = queue->s.head;
+ queue->s.head = buf_hdr->next;
+ buf_hdr->next = NULL;
+
+ if (queue->s.head == NULL) {
+ /* Queue is now empty */
+ queue->s.tail = NULL;
}
UNLOCK(&queue->s.lock);
@@ -483,31 +463,39 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
{
- int i = 0;
+ odp_buffer_hdr_t *hdr;
+ int i;
LOCK(&queue->s.lock);
+ if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+ /* Bad queue, or queue has been destroyed.
+ * Scheduler finalizes queue destroy after this. */
+ UNLOCK(&queue->s.lock);
+ return -1;
+ }
- if (queue->s.head == NULL) {
+ hdr = queue->s.head;
+
+ if (hdr == NULL) {
/* Already empty queue */
- if (queue->s.status == QUEUE_STATUS_SCHED &&
- queue->s.type != ODP_QUEUE_TYPE_PKTIN)
+ if (queue->s.status == QUEUE_STATUS_SCHED)
queue->s.status = QUEUE_STATUS_NOTSCHED;
- } else {
- odp_buffer_hdr_t *hdr = queue->s.head;
- for (; i < num && hdr; i++) {
- buf_hdr[i] = hdr;
- /* odp_prefetch(hdr->addr); */
- hdr = hdr->next;
- buf_hdr[i]->next = NULL;
- }
+ UNLOCK(&queue->s.lock);
+ return 0;
+ }
- queue->s.head = hdr;
+ for (i = 0; i < num && hdr; i++) {
+ buf_hdr[i] = hdr;
+ hdr = hdr->next;
+ buf_hdr[i]->next = NULL;
+ }
- if (hdr == NULL) {
- /* Queue is now empty */
- queue->s.tail = NULL;
- }
+ queue->s.head = hdr;
+
+ if (hdr == NULL) {
+ /* Queue is now empty */
+ queue->s.tail = NULL;
}
UNLOCK(&queue->s.lock);
@@ -515,23 +503,6 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
return i;
}
-int queue_deq_multi_destroy(queue_entry_t *queue,
- odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED,
- int num ODP_UNUSED)
-{
- LOCK(&queue->s.lock);
-
- odp_buffer_free(queue->s.sched_buf);
- queue->s.sched_buf = ODP_BUFFER_INVALID;
- queue->s.status = QUEUE_STATUS_FREE;
- queue->s.head = NULL;
- queue->s.tail = NULL;
-
- UNLOCK(&queue->s.lock);
-
- return 0;
-}
-
int odp_queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num)
{
queue_entry_t *queue;
@@ -21,17 +21,15 @@
#include <odp/hints.h>
#include <odp_queue_internal.h>
+#include <odp_packet_io_internal.h>
-
-/* Limits to number of scheduled queues */
-#define SCHED_POOL_SIZE (256*1024)
+/* Number of schedule commands.
+ * One per scheduled queue and packet interface */
+#define NUM_SCHED_CMD (ODP_CONFIG_QUEUES + ODP_CONFIG_PKTIO_ENTRIES)
/* Scheduler sub queues */
#define QUEUES_PER_PRIO 4
-/* TODO: random or queue based selection */
Needs to have a bug link, a todo that makes it into the repo is a known deficiency in the code
This line was removed.