@@ -33,19 +33,31 @@ extern "C" {
#include <odp_schedule_if.h>
#include <stddef.h>
-#define BUFFER_BURST_SIZE CONFIG_BURST_SIZE
-
typedef struct seg_entry_t {
void *hdr;
uint8_t *data;
uint32_t len;
} seg_entry_t;
+typedef union buffer_index_t {
+ uint32_t u32;
+
+ struct {
+ uint32_t pool :8;
+ uint32_t buffer :24;
+ };
+} buffer_index_t;
+
+/* Check that pool index fit into bit field */
+ODP_STATIC_ASSERT(ODP_CONFIG_POOLS <= (0xFF + 1), "TOO_MANY_POOLS");
+
+/* Check that buffer index fit into bit field */
+ODP_STATIC_ASSERT(CONFIG_POOL_MAX_NUM <= (0xFFFFFF + 1), "TOO_LARGE_POOL");
+
/* Common buffer header */
struct ODP_ALIGNED_CACHE odp_buffer_hdr_t {
-
- /* Buffer index in the pool */
- uint32_t index;
+ /* Combined pool and buffer index */
+ buffer_index_t index;
/* Total segment count */
uint16_t segcount;
@@ -73,16 +85,6 @@ struct ODP_ALIGNED_CACHE odp_buffer_hdr_t {
/* Segments */
seg_entry_t seg[CONFIG_PACKET_SEGS_PER_HDR];
- /* Burst counts */
- uint8_t burst_num;
- uint8_t burst_first;
-
- /* Next buf in a list */
- struct odp_buffer_hdr_t *next;
-
- /* Burst table */
- struct odp_buffer_hdr_t *burst[BUFFER_BURST_SIZE];
-
/* --- Mostly read only data --- */
const void *user_ptr;
@@ -109,8 +111,6 @@ struct ODP_ALIGNED_CACHE odp_buffer_hdr_t {
ODP_STATIC_ASSERT(CONFIG_PACKET_SEGS_PER_HDR < 256,
"CONFIG_PACKET_SEGS_PER_HDR_TOO_LARGE");
-ODP_STATIC_ASSERT(BUFFER_BURST_SIZE < 256, "BUFFER_BURST_SIZE_TOO_LARGE");
-
#ifdef __cplusplus
}
#endif
@@ -104,6 +104,34 @@ static inline odp_buffer_hdr_t *buf_hdl_to_hdr(odp_buffer_t buf)
return (odp_buffer_hdr_t *)(uintptr_t)buf;
}
+static inline odp_buffer_hdr_t *buf_hdr_from_index(pool_t *pool,
+ uint32_t buffer_idx)
+{
+ uint64_t block_offset;
+ odp_buffer_hdr_t *buf_hdr;
+
+ block_offset = buffer_idx * (uint64_t)pool->block_size;
+
+ /* clang requires cast to uintptr_t */
+ buf_hdr = (odp_buffer_hdr_t *)(uintptr_t)&pool->base_addr[block_offset];
+
+ return buf_hdr;
+}
+
+static inline odp_buffer_hdr_t *buf_hdr_from_index_u32(uint32_t u32)
+{
+ buffer_index_t index;
+ uint32_t pool_idx, buffer_idx;
+ pool_t *pool;
+
+ index.u32 = u32;
+ pool_idx = index.pool;
+ buffer_idx = index.buffer;
+ pool = pool_entry(pool_idx);
+
+ return buf_hdr_from_index(pool, buffer_idx);
+}
+
int buffer_alloc_multi(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int num);
void buffer_free_multi(odp_buffer_hdr_t *buf_hdr[], int num_free);
@@ -29,6 +29,7 @@ extern "C" {
#include <odp/api/hints.h>
#include <odp/api/ticketlock.h>
#include <odp_config_internal.h>
+#include <odp_ring_st_internal.h>
#define QUEUE_STATUS_FREE 0
#define QUEUE_STATUS_DESTROYED 1
@@ -38,9 +39,7 @@ extern "C" {
struct queue_entry_s {
odp_ticketlock_t ODP_ALIGNED_CACHE lock;
-
- odp_buffer_hdr_t *head;
- odp_buffer_hdr_t *tail;
+ ring_st_t ring_st;
int status;
queue_enq_fn_t ODP_ALIGNED_CACHE enqueue;
@@ -1819,7 +1819,8 @@ void odp_packet_print_data(odp_packet_t pkt, uint32_t offset,
len += snprintf(&str[len], n - len,
" pool index %" PRIu32 "\n", pool->pool_idx);
len += snprintf(&str[len], n - len,
- " buf index %" PRIu32 "\n", hdr->buf_hdr.index);
+ " buf index %" PRIu32 "\n",
+ hdr->buf_hdr.index.buffer);
len += snprintf(&str[len], n - len,
" segcount %" PRIu16 "\n", hdr->buf_hdr.segcount);
len += snprintf(&str[len], n - len,
@@ -80,20 +80,6 @@ static inline pool_t *pool_from_buf(odp_buffer_t buf)
return buf_hdr->pool_ptr;
}
-static inline odp_buffer_hdr_t *buf_hdr_from_index(pool_t *pool,
- uint32_t buffer_idx)
-{
- uint64_t block_offset;
- odp_buffer_hdr_t *buf_hdr;
-
- block_offset = buffer_idx * (uint64_t)pool->block_size;
-
- /* clang requires cast to uintptr_t */
- buf_hdr = (odp_buffer_hdr_t *)(uintptr_t)&pool->base_addr[block_offset];
-
- return buf_hdr;
-}
-
int odp_pool_init_global(void)
{
uint32_t i;
@@ -296,7 +282,9 @@ static void init_buffers(pool_t *pool)
memset(buf_hdr, 0, (uintptr_t)data - (uintptr_t)buf_hdr);
/* Initialize buffer metadata */
- buf_hdr->index = i;
+ buf_hdr->index.u32 = 0;
+ buf_hdr->index.pool = pool->pool_idx;
+ buf_hdr->index.buffer = i;
buf_hdr->type = type;
buf_hdr->event_type = type;
buf_hdr->pool_ptr = pool;
@@ -785,7 +773,7 @@ static inline void buffer_free_to_pool(pool_t *pool,
ring = &pool->ring->hdr;
mask = pool->ring_mask;
for (i = 0; i < num; i++)
- buf_index[i] = buf_hdr[i]->index;
+ buf_index[i] = buf_hdr[i]->index.buffer;
ring_enq_multi(ring, mask, buf_index, num);
@@ -825,7 +813,7 @@ static inline void buffer_free_to_pool(pool_t *pool,
}
for (i = 0; i < num; i++)
- cache->buf_index[cache_num + i] = buf_hdr[i]->index;
+ cache->buf_index[cache_num + i] = buf_hdr[i]->index.buffer;
cache->num = cache_num + num;
}
@@ -40,9 +40,14 @@
static int queue_init(queue_entry_t *queue, const char *name,
const odp_queue_param_t *param);
+typedef struct ODP_ALIGNED_CACHE {
+ /* Storage space for ring data */
+ uint32_t data[CONFIG_QUEUE_SIZE];
+} ring_data_t;
+
typedef struct queue_global_t {
queue_entry_t queue[ODP_CONFIG_QUEUES];
-
+ ring_data_t ring_data[ODP_CONFIG_QUEUES];
uint32_t queue_lf_num;
uint32_t queue_lf_size;
queue_lf_func_t queue_lf_func;
@@ -154,9 +159,11 @@ static int queue_capability(odp_queue_capability_t *capa)
capa->max_sched_groups = sched_fn->num_grps();
capa->sched_prios = odp_schedule_num_prio();
capa->plain.max_num = capa->max_queues;
- capa->sched.max_num = capa->max_queues;
+ capa->plain.max_size = CONFIG_QUEUE_SIZE;
capa->plain.lockfree.max_num = queue_glb->queue_lf_num;
capa->plain.lockfree.max_size = queue_glb->queue_lf_size;
+ capa->sched.max_num = capa->max_queues;
+ capa->sched.max_size = CONFIG_QUEUE_SIZE;
return 0;
}
@@ -204,6 +211,20 @@ static odp_queue_t queue_create(const char *name,
param = &default_param;
}
+ if (param->nonblocking == ODP_BLOCKING) {
+ if (param->size > CONFIG_QUEUE_SIZE)
+ return ODP_QUEUE_INVALID;
+ } else if (param->nonblocking == ODP_NONBLOCKING_LF) {
+ /* Only plain type lock-free queues supported */
+ if (param->type != ODP_QUEUE_TYPE_PLAIN)
+ return ODP_QUEUE_INVALID;
+ if (param->size > queue_glb->queue_lf_size)
+ return ODP_QUEUE_INVALID;
+ } else {
+ /* Wait-free queues not supported */
+ return ODP_QUEUE_INVALID;
+ }
+
for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
queue = &queue_glb->queue[i];
@@ -297,7 +318,7 @@ static int queue_destroy(odp_queue_t handle)
ODP_ERR("queue \"%s\" already destroyed\n", queue->s.name);
return -1;
}
- if (queue->s.head != NULL) {
+ if (ring_st_is_empty(&queue->s.ring_st) == 0) {
UNLOCK(&queue->s.lock);
ODP_ERR("queue \"%s\" not empty\n", queue->s.name);
return -1;
@@ -364,81 +385,71 @@ static odp_queue_t queue_lookup(const char *name)
return ODP_QUEUE_INVALID;
}
+static inline void buffer_index_from_buf(uint32_t buffer_index[],
+ odp_buffer_hdr_t *buf_hdr[], int num)
+{
+ int i;
+
+ for (i = 0; i < num; i++)
+ buffer_index[i] = buf_hdr[i]->index.u32;
+}
+
+static inline void buffer_index_to_buf(odp_buffer_hdr_t *buf_hdr[],
+ uint32_t buffer_index[], int num)
+{
+ int i;
+
+ for (i = 0; i < num; i++) {
+ buf_hdr[i] = buf_hdr_from_index_u32(buffer_index[i]);
+ odp_prefetch(buf_hdr[i]);
+ }
+}
+
static inline int enq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[],
int num)
{
int sched = 0;
- int i, ret;
+ int ret;
queue_entry_t *queue;
- odp_buffer_hdr_t *hdr, *tail, *next_hdr;
+ int num_enq;
+ ring_st_t *ring_st;
+ uint32_t buf_idx[num];
queue = qentry_from_int(q_int);
+ ring_st = &queue->s.ring_st;
+
if (sched_fn->ord_enq_multi(q_int, (void **)buf_hdr, num, &ret))
return ret;
- /* Optimize the common case of single enqueue */
- if (num == 1) {
- tail = buf_hdr[0];
- hdr = tail;
- hdr->burst_num = 0;
- hdr->next = NULL;
- } else {
- int next;
-
- /* Start from the last buffer header */
- tail = buf_hdr[num - 1];
- hdr = tail;
- hdr->next = NULL;
- next = num - 2;
-
- while (1) {
- /* Build a burst. The buffer header carrying
- * a burst is the last buffer of the burst. */
- for (i = 0; next >= 0 && i < BUFFER_BURST_SIZE;
- i++, next--)
- hdr->burst[BUFFER_BURST_SIZE - 1 - i] =
- buf_hdr[next];
-
- hdr->burst_num = i;
- hdr->burst_first = BUFFER_BURST_SIZE - i;
-
- if (odp_likely(next < 0))
- break;
-
- /* Get another header and link it */
- next_hdr = hdr;
- hdr = buf_hdr[next];
- hdr->next = next_hdr;
- next--;
- }
- }
+ buffer_index_from_buf(buf_idx, buf_hdr, num);
LOCK(&queue->s.lock);
+
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
UNLOCK(&queue->s.lock);
ODP_ERR("Bad queue status\n");
return -1;
}
- /* Empty queue */
- if (queue->s.head == NULL)
- queue->s.head = hdr;
- else
- queue->s.tail->next = hdr;
+ num_enq = ring_st_enq_multi(ring_st, buf_idx, num);
- queue->s.tail = tail;
+ if (odp_unlikely(num_enq == 0)) {
+ UNLOCK(&queue->s.lock);
+ return 0;
+ }
if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
queue->s.status = QUEUE_STATUS_SCHED;
- sched = 1; /* retval: schedule queue */
+ sched = 1;
}
+
UNLOCK(&queue->s.lock);
/* Add queue to scheduling */
if (sched && sched_fn->sched_queue(queue->s.index))
ODP_ABORT("schedule_queue failed\n");
- return num; /* All events enqueued */
+ return num_enq;
}
static int queue_int_enq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[],
@@ -484,12 +495,15 @@ static int queue_enq(odp_queue_t handle, odp_event_t ev)
static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
int num)
{
- odp_buffer_hdr_t *hdr, *next;
- int i, j;
- int updated = 0;
int status_sync = sched_fn->status_sync;
+ int num_deq;
+ ring_st_t *ring_st;
+ uint32_t buf_idx[num];
+
+ ring_st = &queue->s.ring_st;
LOCK(&queue->s.lock);
+
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
/* Bad queue, or queue has been destroyed.
* Scheduler finalizes queue destroy after this. */
@@ -497,9 +511,9 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
return -1;
}
- hdr = queue->s.head;
+ num_deq = ring_st_deq_multi(ring_st, buf_idx, num);
- if (hdr == NULL) {
+ if (num_deq == 0) {
/* Already empty queue */
if (queue->s.status == QUEUE_STATUS_SCHED) {
queue->s.status = QUEUE_STATUS_NOTSCHED;
@@ -509,51 +523,18 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
}
UNLOCK(&queue->s.lock);
- return 0;
- }
-
- for (i = 0; i < num && hdr; ) {
- int burst_num = hdr->burst_num;
- int first = hdr->burst_first;
-
- /* First, get bursted buffers */
- for (j = 0; j < burst_num && i < num; j++, i++) {
- buf_hdr[i] = hdr->burst[first + j];
- odp_prefetch(buf_hdr[i]);
- }
-
- if (burst_num) {
- hdr->burst_num = burst_num - j;
- hdr->burst_first = first + j;
- }
- if (i == num)
- break;
-
- /* When burst is empty, consume the current buffer header and
- * move to the next header */
- buf_hdr[i] = hdr;
- next = hdr->next;
- hdr->next = NULL;
- hdr = next;
- updated++;
- i++;
+ return 0;
}
- /* Write head only if updated */
- if (updated)
- queue->s.head = hdr;
-
- /* Queue is empty */
- if (hdr == NULL)
- queue->s.tail = NULL;
-
if (status_sync && queue->s.type == ODP_QUEUE_TYPE_SCHED)
sched_fn->save_context(queue->s.index);
UNLOCK(&queue->s.lock);
- return i;
+ buffer_index_to_buf(buf_hdr, buf_idx, num_deq);
+
+ return num_deq;
}
static int queue_int_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[],
@@ -622,8 +603,9 @@ static int queue_init(queue_entry_t *queue, const char *name,
queue->s.pktin = PKTIN_INVALID;
queue->s.pktout = PKTOUT_INVALID;
- queue->s.head = NULL;
- queue->s.tail = NULL;
+ ring_st_init(&queue->s.ring_st,
+ queue_glb->ring_data[queue->s.index].data,
+ CONFIG_QUEUE_SIZE);
return 0;
}
@@ -699,7 +681,7 @@ int sched_cb_queue_empty(uint32_t queue_index)
return -1;
}
- if (queue->s.head == NULL) {
+ if (ring_st_is_empty(&queue->s.ring_st)) {
/* Already empty queue. Update status. */
if (queue->s.status == QUEUE_STATUS_SCHED)
queue->s.status = QUEUE_STATUS_NOTSCHED;