@@ -103,6 +103,8 @@ typedef union odp_buffer_bits_t {
/* forward declaration */
struct odp_buffer_hdr_t;
+union queue_entry_u;
+typedef union queue_entry_u queue_entry_t;
/* Common buffer header */
typedef struct odp_buffer_hdr_t {
@@ -131,6 +133,9 @@ typedef struct odp_buffer_hdr_t {
uint32_t segcount; /* segment count */
uint32_t segsize; /* segment size */
void *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
+ uint64_t order; /* sequence for ordered queues */
+ queue_entry_t *origin_qe; /* ordered queue origin */
+ queue_entry_t *target_qe; /* ordered queue target */
} odp_buffer_hdr_t;
/** @internal Compile time assert that the
@@ -77,6 +77,10 @@ struct queue_entry_s {
odp_pktio_t pktin;
odp_pktio_t pktout;
char name[ODP_QUEUE_NAME_LEN];
+ uint64_t order_in;
+ uint64_t order_out;
+ odp_buffer_hdr_t *reorder_head;
+ odp_buffer_hdr_t *reorder_tail;
};
typedef union queue_entry_u {
@@ -28,6 +28,13 @@ static inline int schedule_queue(const queue_entry_t *qe)
return odp_queue_enq(qe->s.pri_queue, qe->s.cmd_ev);
}
+static inline int schedule_enq(const queue_entry_t *target_qe,
+ const queue_entry_t *origin_qe)
+{
+ return odp_queue_enq(target_qe->s.handle,
+ (odp_event_t)origin_qe->s.reorder_head->
+ handle.handle);
+}
int schedule_pktio_start(odp_pktio_t pktio, int prio);
@@ -514,6 +514,9 @@ odp_buffer_t buffer_alloc(odp_pool_t pool_hdl, size_t size)
/* By default, buffers inherit their pool's zeroization setting */
buf->buf.flags.zeroized = pool->s.flags.zeroized;
+ /* By default, buffers are not associated with an ordered queue */
+ buf->buf.origin_qe = NULL;
+
if (buf->buf.type == ODP_EVENT_PACKET)
packet_init(pool, &buf->pkt, size);
@@ -27,11 +27,13 @@
#define LOCK(a) odp_ticketlock_lock(a)
#define UNLOCK(a) odp_ticketlock_unlock(a)
#define LOCK_INIT(a) odp_ticketlock_init(a)
+#define LOCK_TRY(a) odp_ticketlock_trylock(a)
#else
#include <odp/spinlock.h>
#define LOCK(a) odp_spinlock_lock(a)
#define UNLOCK(a) odp_spinlock_unlock(a)
#define LOCK_INIT(a) odp_spinlock_init(a)
+#define LOCK_TRY(a) odp_spinlock_trylock(a)
#endif
#include <string.h>
@@ -89,6 +91,9 @@ static void queue_init(queue_entry_t *queue, const char *name,
queue->s.head = NULL;
queue->s.tail = NULL;
+ queue->s.reorder_head = NULL;
+ queue->s.reorder_tail = NULL;
+
queue->s.pri_queue = ODP_QUEUE_INVALID;
queue->s.cmd_ev = ODP_EVENT_INVALID;
}
@@ -329,14 +334,76 @@ odp_queue_t odp_queue_lookup(const char *name)
int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
{
int sched = 0;
+ queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+ /* Need two locks for enq operations from ordered queues */
+ if (origin_qe) {
+ LOCK(&origin_qe->s.lock);
+ while (!LOCK_TRY(&queue->s.lock)) {
+ UNLOCK(&origin_qe->s.lock);
+ LOCK(&origin_qe->s.lock);
+ }
+ if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+ UNLOCK(&queue->s.lock);
+ UNLOCK(&origin_qe->s.lock);
+ ODP_ERR("Bad origin queue status\n");
+ return -1;
+ }
+ } else {
+ LOCK(&queue->s.lock);
+ }
- LOCK(&queue->s.lock);
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
UNLOCK(&queue->s.lock);
+ if (origin_qe)
+ UNLOCK(&origin_qe->s.lock);
ODP_ERR("Bad queue status\n");
return -1;
}
+ /* We can only complete the enq if we're in order */
+ if (origin_qe) {
+ if (buf_hdr->order > origin_qe->s.order_out) {
+ odp_buffer_hdr_t *reorder_buf =
+ origin_qe->s.reorder_head;
+
+ if (!reorder_buf) {
+ buf_hdr->next = NULL;
+ origin_qe->s.reorder_head = buf_hdr;
+ origin_qe->s.reorder_tail = buf_hdr;
+ } else {
+ odp_buffer_hdr_t *reorder_prev = NULL;
+
+ while (buf_hdr->order > reorder_buf->order) {
+ reorder_prev = reorder_buf;
+ reorder_buf = reorder_buf->next;
+ if (!reorder_buf)
+ break;
+ }
+
+ buf_hdr->next = reorder_buf;
+ if (reorder_prev)
+ reorder_prev->next = buf_hdr;
+ else
+ origin_qe->s.reorder_head = buf_hdr;
+
+ if (!reorder_buf)
+ origin_qe->s.reorder_tail = buf_hdr;
+ }
+
+ buf_hdr->target_qe = queue;
+
+ /* This enq can't complete until order is restored, so
+ * we're done here.
+ */
+ UNLOCK(&queue->s.lock);
+ UNLOCK(&origin_qe->s.lock);
+ return 0;
+ }
+
+ origin_qe->s.order_out++;
+ }
+
if (queue->s.head == NULL) {
/* Empty queue */
queue->s.head = buf_hdr;
@@ -352,7 +419,48 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
queue->s.status = QUEUE_STATUS_SCHED;
sched = 1; /* retval: schedule queue */
}
- UNLOCK(&queue->s.lock);
+
+ /*
+ * If we came from an ordered queue, check to see if our successful
+ * enq has unblocked other buffers in the origin's reorder queue.
+ */
+ if (origin_qe) {
+ odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+ odp_buffer_hdr_t *reorder_prev;
+ uint32_t release_count = 0;
+
+ while (reorder_buf &&
+ reorder_buf->target_qe == queue &&
+ reorder_buf->order <=
+ origin_qe->s.order_out + release_count) {
+ release_count++;
+ reorder_prev = reorder_buf;
+ reorder_buf = reorder_buf->next;
+ }
+
+ /* Add released buffers to the queue as well */
+ if (release_count > 0) {
+ queue->s.tail->next = origin_qe->s.reorder_head;
+ queue->s.tail = reorder_prev;
+ origin_qe->s.reorder_head = reorder_prev->next;
+ reorder_prev->next = NULL;
+ origin_qe->s.order_out += release_count;
+ }
+
+ /* Now handle unblocked buffers destined for other queues */
+ if (reorder_buf &&
+ reorder_buf->order <= origin_qe->s.order_out) {
+ UNLOCK(&origin_qe->s.lock);
+ UNLOCK(&queue->s.lock);
+ if (schedule_enq(reorder_buf->target_qe, origin_qe))
+ ODP_ABORT("schedule_enq failed\n");
+ } else {
+ UNLOCK(&origin_qe->s.lock);
+ UNLOCK(&queue->s.lock);
+ }
+ } else {
+ UNLOCK(&queue->s.lock);
+ }
/* Add queue to scheduling */
if (sched && schedule_queue(queue))
@@ -364,14 +472,26 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
{
int sched = 0;
- int i;
+ int i, j;
odp_buffer_hdr_t *tail;
- for (i = 0; i < num - 1; i++)
- buf_hdr[i]->next = buf_hdr[i+1];
+ for (i = 0; i < num; i++) {
+ /* If any buffer is coming from an ordered queue, enqueue them
+ * individually since in the general case each might originate
+ * from a different ordered queue. If any of these fail, the
+ * return code tells the caller how many succeeded.
+ */
+ if (buf_hdr[i]->origin_qe) {
+ for (j = 0; j < num; j++) {
+ if (queue_enq(queue, buf_hdr[j]))
+ return j;
+ }
+ return num;
+ }
+ buf_hdr[i]->next = i == num - 1 ? NULL : buf_hdr[i + 1];
+ }
tail = buf_hdr[num-1];
- buf_hdr[num-1]->next = NULL;
LOCK(&queue->s.lock);
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
@@ -449,6 +569,12 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
buf_hdr = queue->s.head;
queue->s.head = buf_hdr->next;
buf_hdr->next = NULL;
+ if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
+ buf_hdr->origin_qe = queue;
+ buf_hdr->order = queue->s.order_in++;
+ } else {
+ buf_hdr->origin_qe = NULL;
+ }
if (queue->s.head == NULL) {
/* Queue is now empty */
@@ -489,6 +615,12 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
buf_hdr[i] = hdr;
hdr = hdr->next;
buf_hdr[i]->next = NULL;
+ if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
+ buf_hdr[i]->origin_qe = queue;
+ buf_hdr[i]->order = queue->s.order_in++;
+ } else {
+ buf_hdr[i]->origin_qe = NULL;
+ }
}
queue->s.head = hdr;
@@ -411,8 +411,6 @@ static inline int copy_events(odp_event_t out_ev[], unsigned int max)
/*
* Schedule queues
- *
- * TODO: SYNC_ORDERED not implemented yet
*/
static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
unsigned int max_num, unsigned int max_deq)
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- .../linux-generic/include/odp_buffer_internal.h | 5 + .../linux-generic/include/odp_queue_internal.h | 4 + .../linux-generic/include/odp_schedule_internal.h | 7 + platform/linux-generic/odp_pool.c | 3 + platform/linux-generic/odp_queue.c | 144 ++++++++++++++++++++- platform/linux-generic/odp_schedule.c | 2 - 6 files changed, 157 insertions(+), 8 deletions(-)