@@ -62,7 +62,15 @@ typedef enum odp_queue_type_t {
* Scheduled queues are connected to the scheduler. Application must
* not dequeue events directly from these queues but use the scheduler
* instead. */
- ODP_QUEUE_TYPE_SCHED
+ ODP_QUEUE_TYPE_SCHED,
+
+ /** Notification queue
+ *
+ * Behave similar to plain queues, however, threads can execute a waiting
+ * dequeue, causing them to block if the queue is empty.
+ *
+ */
+ ODP_QUEUE_TYPE_NOTIF
} odp_queue_type_t;
/**
@@ -309,6 +317,19 @@ odp_event_t odp_queue_deq(odp_queue_t queue);
int odp_queue_deq_multi(odp_queue_t queue, odp_event_t events[], int num);
/**
+ * Queue dequeue
+ *
+ * Dequeues next event from head of the queue, blocking if the queue
+ * is empty. Must be used only with ODP_QUEUE_TYPE_NOTIF type queues.
+ *
+ * @param queue Queue handle
+ *
+ * @return Event handle
+ * @retval ODP_EVENT_INVALID on failure (e.g. not a notification queue)
+ */
+odp_event_t odp_queue_deq_wait(odp_queue_t handle);
+
+/**
* Queue type
*
* @param queue Queue handle
@@ -29,14 +29,14 @@ ODP_STATIC_ASSERT(ODP_PKTIN_QUEUE_MAX_BURST >= QUEUE_MULTI_MAX,
"ODP_PKTIN_DEQ_MULTI_MAX_ERROR");
int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
-odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *queue);
+odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *queue, int wait);
int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
int pktin_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
int pktout_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
-odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *queue);
+odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *queue, int wait);
int pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
int num);
@@ -26,9 +26,12 @@ extern "C" {
#include <odp/api/packet_io.h>
#include <odp/api/align.h>
#include <odp/api/hints.h>
+
#include <odp/api/ticketlock.h>
#include <odp_config_internal.h>
+#include <semaphore.h>
+
#define QUEUE_MULTI_MAX CONFIG_BURST_SIZE
#define QUEUE_STATUS_FREE 0
@@ -42,7 +45,7 @@ extern "C" {
union queue_entry_u;
typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *);
-typedef odp_buffer_hdr_t *(*deq_func_t)(union queue_entry_u *);
+typedef odp_buffer_hdr_t *(*deq_func_t)(union queue_entry_u *, int);
typedef int (*enq_multi_func_t)(union queue_entry_u *,
odp_buffer_hdr_t **, int);
@@ -77,6 +80,7 @@ struct queue_entry_s {
char name[ODP_QUEUE_NAME_LEN];
int depth;
+ sem_t notif_sem;
};
union queue_entry_u {
@@ -88,7 +92,7 @@ union queue_entry_u {
queue_entry_t *get_qentry(uint32_t queue_id);
int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
-odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
+odp_buffer_hdr_t *queue_deq(queue_entry_t *queue, int wait);
int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
@@ -590,7 +590,7 @@ int pktout_enqueue(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr)
return (nbr == len ? 0 : -1);
}
-odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *qentry ODP_UNUSED)
+odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *qentry ODP_UNUSED, int wait ODP_UNUSED)
{
ODP_ABORT("attempted dequeue from a pktout queue");
return NULL;
@@ -625,13 +625,13 @@ int pktin_enqueue(queue_entry_t *qentry ODP_UNUSED,
return -1;
}
-odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
+odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry, int wait ODP_UNUSED)
{
odp_buffer_hdr_t *buf_hdr;
odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
int pkts;
- buf_hdr = queue_deq(qentry);
+ buf_hdr = queue_deq(qentry, 0);
if (buf_hdr != NULL)
return buf_hdr;
@@ -34,6 +34,7 @@
#include <string.h>
#include <inttypes.h>
#include <unistd.h>
+#include <semaphore.h>
typedef struct queue_table_t {
queue_entry_t queue[ODP_CONFIG_QUEUES];
@@ -88,6 +89,10 @@ static int queue_init(queue_entry_t *queue, const char *name,
0);
}
}
+
+ if (param->type == ODP_QUEUE_TYPE_NOTIF)
+ sem_init(&queue->s.notif_sem, 0, 0);
+
queue->s.type = queue->s.param.type;
queue->s.enqueue = queue_enq;
@@ -452,8 +457,14 @@ static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
queue->s.status = QUEUE_STATUS_SCHED;
sched = 1; /* retval: schedule queue */
}
+
UNLOCK(&queue->s.lock);
+ if (queue->s.type == ODP_QUEUE_TYPE_NOTIF) {
+ for (i = 0; i < num; i++)
+ sem_post(&queue->s.notif_sem);
+ }
+
/* Add queue to scheduling */
if (sched && sched_fn->sched_queue(queue->s.index))
ODP_ABORT("schedule_queue failed\n");
@@ -571,6 +582,12 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
queue->s.depth -= i;
}
+ if (queue->s.type == ODP_QUEUE_TYPE_NOTIF) {
+ for (j = 0; j < i; j++) {
+ sem_trywait(&queue->s.notif_sem);
+ }
+ }
+
/* Queue is empty */
if (hdr == NULL)
queue->s.tail = NULL;
@@ -588,11 +605,20 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
return deq_multi(queue, buf_hdr, num);
}
-odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
+odp_buffer_hdr_t *queue_deq(queue_entry_t *queue, int wait)
{
odp_buffer_hdr_t *buf_hdr = NULL;
int ret;
+ if (queue->s.type == ODP_QUEUE_TYPE_NOTIF) {
+ if (wait) {
+ sem_wait(&queue->s.notif_sem);
+ } else {
+ if (sem_trywait(&queue->s.notif_sem))
+ return NULL;
+ }
+ }
+
ret = deq_multi(queue, &buf_hdr, 1);
if (ret == 1)
@@ -620,14 +646,31 @@ int odp_queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num)
return ret;
}
-
odp_event_t odp_queue_deq(odp_queue_t handle)
{
queue_entry_t *queue;
odp_buffer_hdr_t *buf_hdr;
queue = queue_to_qentry(handle);
- buf_hdr = queue->s.dequeue(queue);
+ buf_hdr = queue->s.dequeue(queue, 0);
+
+ if (buf_hdr)
+ return odp_buffer_to_event(buf_hdr->handle.handle);
+
+ return ODP_EVENT_INVALID;
+}
+
+odp_event_t odp_queue_deq_wait(odp_queue_t handle)
+{
+ queue_entry_t *queue;
+ odp_buffer_hdr_t *buf_hdr;
+
+ queue = queue_to_qentry(handle);
+
+ if (queue->s.type != ODP_QUEUE_TYPE_NOTIF)
+ return ODP_EVENT_INVALID;
+
+ buf_hdr = queue->s.dequeue(queue, 1);
if (buf_hdr)
return odp_buffer_to_event(buf_hdr->handle.handle);
This adds ODP_QUEUE_TYPE_NOTIF along side the existing plain and sched queues. Notification queues are similar to plain queues, in that they are dequeued manually. However it is also possible to wait on dequeue (via the new odp_queue_deq_wait) method, which will block on an empty queue in such a way that (if supported by the platform) the execution will yield allowing the CPU to either run another thread or idle. Signed-off-by: Sergei Trofimov <sergei.trofimov@arm.com> --- include/odp/api/spec/queue.h | 23 +++++++++- .../linux-generic/include/odp_packet_io_queue.h | 4 +- .../linux-generic/include/odp_queue_internal.h | 8 +++- platform/linux-generic/odp_packet_io.c | 6 +-- platform/linux-generic/odp_queue.c | 49 ++++++++++++++++++++-- 5 files changed, 79 insertions(+), 11 deletions(-) -- 1.9.1