@@ -445,6 +445,39 @@ int odp_queue_info(odp_queue_t queue, odp_queue_info_t *info);
int odp_queue_depth(odp_queue_t queue);
/**
+ *
+ * Sets the queue depth threshold. When the threshold is reached, a
+ * notification event (if one has been armed) will be sent to the notification
+ * queue when the threshold is exceeded.
+ *
+ * @param queue Queue whose notification threshold will be set.
+ * @param threshold Number of the events in the queue on which to trigger.
+ * @param notifq Notification queue to be triggered on reaching the number
+ * of events. This queue must be of type ODP_QUEUE_TYPE_NOTIF.
+ *
+ */
+int odp_queue_threshold_set(odp_queue_t queue, int threshold, odp_queue_t notifq);
+
+/**
+ *
+ * Reset a previously set queue threshold.
+ * @param queue Queue whose threshold is to be reset.
+ *
+ */
+int odp_queue_threshold_reset(odp_queue_t queue);
+
+/**
+ * Resets a notification queue. A notification queue must be "armed" with an event
+ * before it can be triggered. It is the responsibility of the notification listener to
+ * reset the notification queue
+ *
+ * @param notif Notification queue to be reset This queue must be of
+ * type ODP_QUEUE_TYPE_NOTIF.
+ * @param event An event to reset the queue with.
+ */
+int odp_queue_threshold_arm(odp_queue_t queue, odp_event_t event);
+
+/**
* @}
*/
@@ -79,8 +79,12 @@ struct queue_entry_s {
odp_pktout_queue_t pktout;
char name[ODP_QUEUE_NAME_LEN];
- int depth;
+ int depth;
+ int depth_threshold;
+ odp_queue_t threshold_queue;
+
sem_t notif_sem;
+ odp_event_t notif_event;
};
union queue_entry_u {
@@ -100,6 +104,8 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
void queue_lock(queue_entry_t *queue);
void queue_unlock(queue_entry_t *queue);
+int queue_trigger(odp_queue_t handle);
+
static inline uint32_t queue_to_id(odp_queue_t handle)
{
return _odp_typeval(handle) - 1;
@@ -93,6 +93,8 @@ static int queue_init(queue_entry_t *queue, const char *name,
if (param->type == ODP_QUEUE_TYPE_NOTIF)
sem_init(&queue->s.notif_sem, 0, 0);
+ queue->s.notif_event = ODP_EVENT_INVALID;
+
queue->s.type = queue->s.param.type;
queue->s.enqueue = queue_enq;
@@ -389,6 +391,32 @@ odp_queue_t odp_queue_lookup(const char *name)
return ODP_QUEUE_INVALID;
}
+int queue_trigger(odp_queue_t handle)
+{
+ queue_entry_t *queue = queue_to_qentry(handle);
+
+ ODP_ASSERT(queue->s.type == ODP_QUEUE_TYPE_NOTIF);
+
+ LOCK(&queue->s.lock);
+
+ if (queue->s.notif_event == ODP_EVENT_INVALID) {
+ UNLOCK(&queue->s.lock);
+ return 0;
+ }
+
+ odp_event_t event = queue->s.notif_event;
+ queue->s.notif_event = ODP_EVENT_INVALID;
+
+ UNLOCK(&queue->s.lock);
+
+ int ret = odp_queue_enq(handle, event);
+
+ if (ret != 0)
+ free(event);
+
+ return ret;
+}
+
static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
int num)
{
@@ -453,6 +481,11 @@ static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
queue->s.tail = tail;
queue->s.depth += num;
+ if (queue->s.threshold_queue != ODP_QUEUE_INVALID &&
+ queue->s.depth >= queue->s.depth_threshold) {
+ queue_trigger(queue->s.threshold_queue);
+ }
+
if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
queue->s.status = QUEUE_STATUS_SCHED;
sched = 1; /* retval: schedule queue */
@@ -832,3 +865,87 @@ int odp_queue_depth(odp_queue_t handle)
return depth;
}
+
+int odp_queue_threshold_set(odp_queue_t handle, int threshold, odp_queue_t destq)
+{
+ uint32_t queue_id;
+ queue_entry_t *queue;
+
+ queue_id = queue_to_id(handle);
+
+ if (odp_unlikely(queue_id >= ODP_CONFIG_QUEUES)) {
+ ODP_ERR("Invalid queue handle:%" PRIu64 "\n",
+ odp_queue_to_u64(handle));
+ return 0;
+ }
+
+ queue = get_qentry(queue_id);
+
+ LOCK(&queue->s.lock);
+
+ queue->s.depth_threshold = threshold;
+ queue->s.threshold_queue = destq;
+
+ UNLOCK(&queue->s.lock);
+
+ return 0;
+}
+
+int odp_queue_threshold_reset(odp_queue_t handle)
+{
+ uint32_t queue_id;
+ queue_entry_t *queue;
+
+ queue_id = queue_to_id(handle);
+
+ if (odp_unlikely(queue_id >= ODP_CONFIG_QUEUES)) {
+ ODP_ERR("Invalid queue handle:%" PRIu64 "\n",
+ odp_queue_to_u64(handle));
+ return -1;
+ }
+
+ queue = get_qentry(queue_id);
+
+ LOCK(&queue->s.lock);
+
+ queue->s.depth_threshold = 0;
+ queue->s.threshold_queue = ODP_QUEUE_INVALID;
+
+ if (queue->s.notif_event != ODP_EVENT_INVALID) {
+ odp_event_free(queue->s.notif_event);
+ queue->s.notif_event = ODP_EVENT_INVALID;
+ }
+
+ UNLOCK(&queue->s.lock);
+
+ return 0;
+}
+
+int odp_queue_threshold_arm(odp_queue_t handle, odp_event_t event)
+{
+ uint32_t queue_id;
+ queue_entry_t *queue;
+
+ queue_id = queue_to_id(handle);
+
+ if (odp_unlikely(queue_id >= ODP_CONFIG_QUEUES)) {
+ ODP_ERR("Invalid queue handle:%" PRIu64 "\n",
+ odp_queue_to_u64(handle));
+ return -1;
+ }
+
+ queue = get_qentry(queue_id);
+
+ if (odp_unlikely(queue->s.type != ODP_QUEUE_TYPE_NOTIF))
+ return -1;
+
+ LOCK(&queue->s.lock);
+
+ if (queue->s.notif_event != ODP_EVENT_INVALID) {
+ odp_event_free(queue->s.notif_event);
+ }
+
+ queue->s.notif_event = event;
+ UNLOCK(&queue->s.lock);
+ return 0;
+}
This adds an ability to set a depth threshold for a queue. When the threshold is exceeded, an event will be enqueued on another queue as notification of the fact that the threshold has been exceeded. Signed-off-by: Sergei Trofimov <sergei.trofimov@arm.com> --- include/odp/api/spec/queue.h | 33 ++++++ .../linux-generic/include/odp_queue_internal.h | 8 +- platform/linux-generic/odp_queue.c | 117 +++++++++++++++++++++ 3 files changed, 157 insertions(+), 1 deletion(-) -- 1.9.1