@@ -103,16 +103,23 @@ typedef union odp_buffer_bits_t {
/* forward declaration */
struct odp_buffer_hdr_t;
+union queue_entry_u;
+typedef union queue_entry_u queue_entry_t;
/* Common buffer header */
typedef struct odp_buffer_hdr_t {
- struct odp_buffer_hdr_t *next; /* next buf in a list */
+ struct odp_buffer_hdr_t *next; /* next buf in a list--keep 1st */
+ union { /* Multi-use secondary link */
+ struct odp_buffer_hdr_t *prev;
+ struct odp_buffer_hdr_t *link;
+ };
odp_buffer_bits_t handle; /* handle */
union {
uint32_t all;
struct {
uint32_t zeroized:1; /* Zeroize buf data on free */
uint32_t hdrdata:1; /* Data is in buffer hdr */
+ uint32_t sustain:1; /* Sustain order */
};
} flags;
int16_t allocator; /* allocating thread id */
@@ -131,6 +138,12 @@ typedef struct odp_buffer_hdr_t {
uint32_t segcount; /* segment count */
uint32_t segsize; /* segment size */
void *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
+ uint64_t order; /* sequence for ordered queues */
+ queue_entry_t *origin_qe; /* ordered queue origin */
+ union {
+ queue_entry_t *target_qe; /* ordered queue target */
+ uint64_t sync; /* for ordered synchronization */
+ };
} odp_buffer_hdr_t;
/** @internal Compile time assert that the
@@ -23,6 +23,7 @@ extern "C" {
#include <odp_align_internal.h>
#include <odp/packet_io.h>
#include <odp/align.h>
+#include <odp/hints.h>
#define USE_TICKETLOCK
@@ -77,6 +78,12 @@ struct queue_entry_s {
odp_pktio_t pktin;
odp_pktio_t pktout;
char name[ODP_QUEUE_NAME_LEN];
+ uint64_t order_in;
+ uint64_t order_out;
+ odp_buffer_hdr_t *reorder_head;
+ odp_buffer_hdr_t *reorder_tail;
+ odp_atomic_u64_t sync_in;
+ odp_atomic_u64_t sync_out;
};
typedef union queue_entry_u {
@@ -93,6 +100,10 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
+int queue_pktout_enq_multi(queue_entry_t *queue,
+ odp_buffer_hdr_t *buf_hdr[], int num);
+
int queue_enq_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
int queue_enq_multi_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
int num);
@@ -137,6 +148,116 @@ static inline int queue_prio(queue_entry_t *qe)
return qe->s.param.sched.prio;
}
+static inline void reorder_enq(queue_entry_t *queue,
+ queue_entry_t *origin_qe,
+ odp_buffer_hdr_t *buf_hdr)
+{
+ odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+ odp_buffer_hdr_t *reorder_prev =
+ (odp_buffer_hdr_t *)&origin_qe->s.reorder_head;
+
+ while (reorder_buf && buf_hdr->order >= reorder_buf->order) {
+ reorder_prev = reorder_buf;
+ reorder_buf = reorder_buf->next;
+ }
+
+ buf_hdr->next = reorder_buf;
+ reorder_prev->next = buf_hdr;
+
+ if (!reorder_buf)
+ origin_qe->s.reorder_tail = buf_hdr;
+
+ buf_hdr->target_qe = queue;
+}
+
+static inline void order_release(queue_entry_t *origin_qe, int count)
+{
+ origin_qe->s.order_out += count;
+}
+
+static inline void reorder_deq(queue_entry_t *queue,
+ queue_entry_t *origin_qe,
+ odp_buffer_hdr_t **reorder_buf_return,
+ odp_buffer_hdr_t **reorder_prev_return,
+ odp_buffer_hdr_t **placeholder_buf_return,
+ uint32_t *release_count_return,
+ uint32_t *placeholder_count_return)
+{
+ odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+ odp_buffer_hdr_t *reorder_prev = NULL;
+ odp_buffer_hdr_t *placeholder_buf = NULL;
+ odp_buffer_hdr_t *next_buf;
+ uint32_t release_count = 0;
+ uint32_t placeholder_count = 0;
+
+ while (reorder_buf &&
+ reorder_buf->order <= origin_qe->s.order_out +
+ release_count + placeholder_count) {
+ /*
+ * Elements on the reorder list fall into one of
+ * three categories:
+ *
+ * 1. Those destined for the same queue. These
+ * can be enq'd now if they were waiting to
+ * be unblocked by this enq.
+ *
+ * 2. Those representing placeholders for events
+ * whose ordering was released by a prior
+ * odp_schedule_release_ordered() call. These
+ * can now just be freed.
+ *
+ * 3. Those representing events destined for another
+ * queue. These cannot be consolidated with this
+ * enq since they have a different target.
+ *
+ * Detecting an element with an order sequence gap, an
+ * element in category 3, or running out of elements
+ * stops the scan.
+ */
+ next_buf = reorder_buf->next;
+
+ if (odp_likely(reorder_buf->target_qe == queue)) {
+ /* promote any chain */
+ odp_buffer_hdr_t *reorder_link =
+ reorder_buf->link;
+
+ if (reorder_link) {
+ reorder_buf->next = reorder_link;
+ reorder_buf->link = NULL;
+ while (reorder_link->next)
+ reorder_link = reorder_link->next;
+ reorder_link->next = next_buf;
+ reorder_prev = reorder_link;
+ } else {
+ reorder_prev = reorder_buf;
+ }
+
+ if (!reorder_buf->flags.sustain)
+ release_count++;
+ reorder_buf = next_buf;
+ } else if (!reorder_buf->target_qe) {
+ if (reorder_prev)
+ reorder_prev->next = next_buf;
+ else
+ origin_qe->s.reorder_head = next_buf;
+
+ reorder_buf->next = placeholder_buf;
+ placeholder_buf = reorder_buf;
+
+ reorder_buf = next_buf;
+ placeholder_count++;
+ } else {
+ break;
+ }
+ }
+
+ *reorder_buf_return = reorder_buf;
+ *reorder_prev_return = reorder_prev;
+ *placeholder_buf_return = placeholder_buf;
+ *release_count_return = release_count;
+ *placeholder_count_return = placeholder_count;
+}
+
void queue_destroy_finalize(queue_entry_t *qe);
#ifdef __cplusplus
@@ -15,6 +15,7 @@ extern "C" {
#include <odp/buffer.h>
+#include <odp_buffer_internal.h>
#include <odp/queue.h>
#include <odp/packet_io.h>
#include <odp_queue_internal.h>
@@ -28,7 +29,6 @@ static inline int schedule_queue(const queue_entry_t *qe)
return odp_queue_enq(qe->s.pri_queue, qe->s.cmd_ev);
}
-
int schedule_pktio_start(odp_pktio_t pktio, int prio);
Add additional fields to internal buffer and queue structs to enable support for ordered queues. Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> --- .../linux-generic/include/odp_buffer_internal.h | 15 ++- .../linux-generic/include/odp_queue_internal.h | 121 +++++++++++++++++++++ .../linux-generic/include/odp_schedule_internal.h | 2 +- 3 files changed, 136 insertions(+), 2 deletions(-)