@@ -161,6 +161,7 @@ noinst_HEADERS = \
include/odp_queue_scalable_internal.h \
include/odp_ring_internal.h \
include/odp_queue_if.h \
+ include/odp_queue_lf.h \
include/odp_schedule_if.h \
include/odp_schedule_scalable.h \
include/odp_schedule_scalable_config.h \
@@ -223,6 +224,7 @@ __LIB__libodp_linux_la_SOURCES = \
odp_pool.c \
odp_queue.c \
odp_queue_if.c \
+ odp_queue_lf.c \
odp_queue_scalable.c \
odp_rwlock.c \
odp_rwlock_recursive.c \
@@ -53,6 +53,7 @@ struct queue_entry_s {
odp_queue_param_t param;
odp_pktin_queue_t pktin;
odp_pktout_queue_t pktout;
+ void *queue_lf;
char name[ODP_QUEUE_NAME_LEN];
};
new file mode 100644
@@ -0,0 +1,36 @@
+/* Copyright (c) 2017, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_QUEUE_LF_H_
+#define ODP_QUEUE_LF_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp_queue_if.h>
+#include <odp_queue_internal.h>
+
+/* Lock-free queue functions */
+typedef struct {
+ queue_enq_fn_t enq;
+ queue_enq_multi_fn_t enq_multi;
+ queue_deq_fn_t deq;
+ queue_deq_multi_fn_t deq_multi;
+
+} queue_lf_func_t;
+
+uint32_t queue_lf_init_global(uint32_t *queue_lf_size,
+ queue_lf_func_t *lf_func);
+void queue_lf_term_global(void);
+void *queue_lf_create(queue_entry_t *queue);
+void queue_lf_destroy(void *queue_lf);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
@@ -8,6 +8,7 @@
#include <odp/api/queue.h>
#include <odp_queue_internal.h>
+#include <odp_queue_lf.h>
#include <odp_queue_if.h>
#include <odp/api/std_types.h>
#include <odp/api/align.h>
@@ -39,11 +40,16 @@
static int queue_init(queue_entry_t *queue, const char *name,
const odp_queue_param_t *param);
-typedef struct queue_table_t {
- queue_entry_t queue[ODP_CONFIG_QUEUES];
-} queue_table_t;
+typedef struct queue_global_t {
+ queue_entry_t queue[ODP_CONFIG_QUEUES];
-static queue_table_t *queue_tbl;
+ uint32_t queue_lf_num;
+ uint32_t queue_lf_size;
+ queue_lf_func_t queue_lf_func;
+
+} queue_global_t;
+
+static queue_global_t *queue_glb;
static
queue_entry_t *get_qentry(uint32_t queue_id);
@@ -64,26 +70,28 @@ static inline odp_queue_t queue_from_id(uint32_t queue_id)
static
queue_entry_t *get_qentry(uint32_t queue_id)
{
- return &queue_tbl->queue[queue_id];
+ return &queue_glb->queue[queue_id];
}
static int queue_init_global(void)
{
uint32_t i;
odp_shm_t shm;
+ uint32_t lf_size = 0;
+ queue_lf_func_t *lf_func;
ODP_DBG("Queue init ... ");
shm = odp_shm_reserve("odp_queues",
- sizeof(queue_table_t),
+ sizeof(queue_global_t),
sizeof(queue_entry_t), 0);
- queue_tbl = odp_shm_addr(shm);
+ queue_glb = odp_shm_addr(shm);
- if (queue_tbl == NULL)
+ if (queue_glb == NULL)
return -1;
- memset(queue_tbl, 0, sizeof(queue_table_t));
+ memset(queue_glb, 0, sizeof(queue_global_t));
for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
/* init locks */
@@ -93,6 +101,10 @@ static int queue_init_global(void)
queue->s.handle = queue_from_id(i);
}
+ lf_func = &queue_glb->queue_lf_func;
+ queue_glb->queue_lf_num = queue_lf_init_global(&lf_size, lf_func);
+ queue_glb->queue_lf_size = lf_size;
+
ODP_DBG("done\n");
ODP_DBG("Queue init global\n");
ODP_DBG(" struct queue_entry_s size %zu\n",
@@ -122,7 +134,7 @@ static int queue_term_global(void)
int i;
for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
- queue = &queue_tbl->queue[i];
+ queue = &queue_glb->queue[i];
LOCK(&queue->s.lock);
if (queue->s.status != QUEUE_STATUS_FREE) {
ODP_ERR("Not destroyed queue: %s\n", queue->s.name);
@@ -131,6 +143,8 @@ static int queue_term_global(void)
UNLOCK(&queue->s.lock);
}
+ queue_lf_term_global();
+
ret = odp_shm_free(odp_shm_lookup("odp_queues"));
if (ret < 0) {
ODP_ERR("shm free failed for odp_queues");
@@ -151,6 +165,8 @@ static int queue_capability(odp_queue_capability_t *capa)
capa->sched_prios = odp_schedule_num_prio();
capa->plain.max_num = capa->max_queues;
capa->sched.max_num = capa->max_queues;
+ capa->plain.lockfree.max_num = queue_glb->queue_lf_num;
+ capa->plain.lockfree.max_size = queue_glb->queue_lf_size;
return 0;
}
@@ -188,6 +204,7 @@ static odp_queue_t queue_create(const char *name,
{
uint32_t i;
queue_entry_t *queue;
+ void *queue_lf;
odp_queue_t handle = ODP_QUEUE_INVALID;
odp_queue_type_t type = ODP_QUEUE_TYPE_PLAIN;
odp_queue_param_t default_param;
@@ -198,7 +215,7 @@ static odp_queue_t queue_create(const char *name,
}
for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
- queue = &queue_tbl->queue[i];
+ queue = &queue_glb->queue[i];
if (queue->s.status != QUEUE_STATUS_FREE)
continue;
@@ -207,7 +224,26 @@ static odp_queue_t queue_create(const char *name,
if (queue->s.status == QUEUE_STATUS_FREE) {
if (queue_init(queue, name, param)) {
UNLOCK(&queue->s.lock);
- return handle;
+ return ODP_QUEUE_INVALID;
+ }
+
+ if (param->nonblocking == ODP_NONBLOCKING_LF) {
+ queue_lf_func_t *lf_func;
+
+ lf_func = &queue_glb->queue_lf_func;
+
+ queue_lf = queue_lf_create(queue);
+
+ if (queue_lf == NULL) {
+ UNLOCK(&queue->s.lock);
+ return ODP_QUEUE_INVALID;
+ }
+ queue->s.queue_lf = queue_lf;
+
+ queue->s.enqueue = lf_func->enq;
+ queue->s.enqueue_multi = lf_func->enq_multi;
+ queue->s.dequeue = lf_func->deq;
+ queue->s.dequeue_multi = lf_func->deq_multi;
}
type = queue->s.type;
@@ -224,7 +260,10 @@ static odp_queue_t queue_create(const char *name,
UNLOCK(&queue->s.lock);
}
- if (handle != ODP_QUEUE_INVALID && type == ODP_QUEUE_TYPE_SCHED) {
+ if (handle == ODP_QUEUE_INVALID)
+ return ODP_QUEUE_INVALID;
+
+ if (type == ODP_QUEUE_TYPE_SCHED) {
if (sched_fn->init_queue(queue->s.index,
&queue->s.param.sched)) {
queue->s.status = QUEUE_STATUS_FREE;
@@ -289,6 +328,10 @@ static int queue_destroy(odp_queue_t handle)
default:
ODP_ABORT("Unexpected queue status\n");
}
+
+ if (queue->s.param.nonblocking == ODP_NONBLOCKING_LF)
+ queue_lf_destroy(queue->s.queue_lf);
+
UNLOCK(&queue->s.lock);
return 0;
@@ -313,7 +356,7 @@ static odp_queue_t queue_lookup(const char *name)
uint32_t i;
for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
- queue_entry_t *queue = &queue_tbl->queue[i];
+ queue_entry_t *queue = &queue_glb->queue[i];
if (queue->s.status == QUEUE_STATUS_FREE ||
queue->s.status == QUEUE_STATUS_DESTROYED)
new file mode 100644
@@ -0,0 +1,302 @@
+/* Copyright (c) 2017, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#include <odp/api/queue.h>
+#include <odp/api/atomic.h>
+#include <odp/api/shared_memory.h>
+#include <odp_queue_lf.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "config.h"
+#include <odp_debug_internal.h>
+
+#define LF_NODE_EMPTY 0
+#define LF_NODE_DATA 1
+#define LF_NODE_MARK 2
+#define RING_LF_SIZE 64
+#define QUEUE_LF_NUM 128
+
+/* Node in lock-free ring */
+typedef union {
+ odp_atomic_u64_t atomic_u64;
+
+ struct {
+ /* marker == DATA */
+ uint64_t marker: 2;
+
+ /* Max 62 bits stored from a pointer */
+ uint64_t ptr: 62;
+ } data;
+
+ struct {
+ /* marker == EMPTY or MARK */
+ uint64_t marker: 2;
+
+ /* Counter to avoid ABA issues */
+ uint64_t counter: 62;
+ };
+
+ uint64_t u64;
+
+} ring_lf_node_t;
+
+/* Lock-free ring */
+typedef struct {
+ ring_lf_node_t node[RING_LF_SIZE];
+
+ int used;
+ odp_atomic_u64_t aba_counter;
+
+} queue_lf_t ODP_ALIGNED_CACHE;
+
+/* Lock-free queue globals */
+typedef struct {
+ queue_lf_t queue_lf[QUEUE_LF_NUM];
+
+ odp_shm_t shm;
+
+} queue_lf_global_t ODP_ALIGNED_CACHE;
+
+static queue_lf_global_t *queue_lf_glb;
+
+static inline int next_idx(int idx)
+{
+ int next = idx + 1;
+
+ if (next == RING_LF_SIZE)
+ next = 0;
+
+ return next;
+}
+
+static inline int next_is_tail(ring_lf_node_t node, ring_lf_node_t next,
+ ring_lf_node_t next_next)
+{
+ if ((node.marker == LF_NODE_MARK || node.marker == LF_NODE_DATA) &&
+ next.marker == LF_NODE_EMPTY)
+ return 1;
+
+ if (node.marker == LF_NODE_DATA && next.marker == LF_NODE_MARK &&
+ next_next.marker == LF_NODE_MARK)
+ return 1;
+
+ return 0;
+}
+
+static inline int next_is_head(ring_lf_node_t node, ring_lf_node_t next)
+{
+ if ((node.marker == LF_NODE_MARK && next.marker == LF_NODE_DATA))
+ return 1;
+
+ return 0;
+}
+
+static int queue_lf_enq(queue_t q_int, odp_buffer_hdr_t *buf_hdr)
+{
+ queue_entry_t *queue;
+ queue_lf_t *queue_lf;
+ int i, i_next, i_nn;
+ ring_lf_node_t node_val;
+ ring_lf_node_t next_val;
+ ring_lf_node_t nn_val;
+ ring_lf_node_t new_val;
+ ring_lf_node_t *node;
+ ring_lf_node_t *next;
+ ring_lf_node_t *nn;
+ uint64_t cur_val;
+
+ queue = qentry_from_int(q_int);
+ queue_lf = queue->s.queue_lf;
+
+ for (i = 0; i < RING_LF_SIZE; i++) {
+ i_next = next_idx(i);
+ i_nn = next_idx(i_next);
+
+ node = &queue_lf->node[i];
+ next = &queue_lf->node[i_next];
+ nn = &queue_lf->node[i_nn];
+ node_val.u64 = odp_atomic_load_u64(&node->atomic_u64);
+ next_val.u64 = odp_atomic_load_u64(&next->atomic_u64);
+ nn_val.u64 = odp_atomic_load_u64(&nn->atomic_u64);
+
+ if (!next_is_tail(node_val, next_val, nn_val))
+ continue;
+
+ /* Next node is the tail. Replace it with data. */
+ new_val.data.marker = LF_NODE_DATA;
+ new_val.data.ptr = ((uintptr_t)buf_hdr) >> 2;
+
+ cur_val = next_val.u64;
+
+ if (odp_atomic_cas_rel_u64(&next->atomic_u64, &cur_val,
+ new_val.u64))
+ return 0;
+ }
+
+ return -1;
+}
+
+static int queue_lf_enq_multi(queue_t q_int, odp_buffer_hdr_t **buf_hdr,
+ int num)
+{
+ (void)num;
+
+ if (queue_lf_enq(q_int, buf_hdr[0]) == 0)
+ return 1;
+
+ return 0;
+}
+
+static odp_buffer_hdr_t *queue_lf_deq(queue_t q_int)
+{
+ queue_entry_t *queue;
+ queue_lf_t *queue_lf;
+ int i, i_next, i_nn;
+ ring_lf_node_t node_val;
+ ring_lf_node_t next_val;
+ ring_lf_node_t nn_val;
+ ring_lf_node_t new_val;
+ ring_lf_node_t *node;
+ ring_lf_node_t *next;
+ ring_lf_node_t *nn;
+ uint64_t counter;
+
+ queue = qentry_from_int(q_int);
+ queue_lf = queue->s.queue_lf;
+
+ for (i = 0; i < RING_LF_SIZE; i++) {
+retry:
+ i_next = next_idx(i);
+
+ node = &queue_lf->node[i];
+ next = &queue_lf->node[i_next];
+ node_val.u64 = odp_atomic_load_u64(&node->atomic_u64);
+ next_val.u64 = odp_atomic_load_u64(&next->atomic_u64);
+
+ if (!next_is_head(node_val, next_val))
+ continue;
+
+ /* Next node is the head.*/
+ i_nn = next_idx(i_next);
+ nn = &queue_lf->node[i_nn];
+ nn_val.u64 = odp_atomic_load_u64(&nn->atomic_u64);
+
+ counter = odp_atomic_fetch_inc_u64(&queue_lf->aba_counter);
+ new_val.counter = counter;
+
+ /* Cannot replace next with marker, as long as next-next is
+ * also a marker. Otherwise, the ring would be full of markers.
+ * Empty it and retry. */
+ if (nn_val.marker == LF_NODE_MARK) {
+ new_val.marker = LF_NODE_EMPTY;
+ odp_atomic_cas_u64(&nn->atomic_u64, &nn_val.u64,
+ new_val.u64);
+ goto retry;
+ }
+
+ new_val.marker = LF_NODE_MARK;
+
+ if (odp_atomic_cas_acq_u64(&next->atomic_u64, &next_val.u64,
+ new_val.u64)) {
+ /* Successfully replaced data with marker. */
+ return (void *)(uintptr_t)(next_val.data.ptr << 2);
+ }
+ }
+
+ return NULL;
+}
+
+static int queue_lf_deq_multi(queue_t q_int, odp_buffer_hdr_t **buf_hdr,
+ int num)
+{
+ odp_buffer_hdr_t *buf;
+
+ (void)num;
+
+ buf = queue_lf_deq(q_int);
+
+ if (buf == NULL)
+ return 0;
+
+ buf_hdr[0] = buf;
+ return 1;
+}
+
+uint32_t queue_lf_init_global(uint32_t *queue_lf_size,
+ queue_lf_func_t *lf_func)
+{
+ odp_shm_t shm;
+ odp_atomic_op_t lockfree;
+
+ odp_atomic_lock_free_u64(&lockfree);
+
+ ODP_DBG("\nLock-free queue init\n");
+ ODP_DBG(" u64 lock-free: %i\n\n", lockfree.op.cas);
+
+ if (lockfree.op.cas == 0)
+ return 0;
+
+ shm = odp_shm_reserve("odp_queues_lf",
+ sizeof(queue_lf_global_t),
+ ODP_CACHE_LINE_SIZE, 0);
+
+ queue_lf_glb = odp_shm_addr(shm);
+ memset(queue_lf_glb, 0, sizeof(queue_lf_global_t));
+
+ queue_lf_glb->shm = shm;
+
+ memset(lf_func, 0, sizeof(queue_lf_func_t));
+ lf_func->enq = queue_lf_enq;
+ lf_func->enq_multi = queue_lf_enq_multi;
+ lf_func->deq = queue_lf_deq;
+ lf_func->deq_multi = queue_lf_deq_multi;
+
+ *queue_lf_size = RING_LF_SIZE - 1;
+
+ return QUEUE_LF_NUM;
+}
+
+void queue_lf_term_global(void)
+{
+ odp_shm_t shm;
+
+ if (queue_lf_glb == NULL)
+ return;
+
+ shm = queue_lf_glb->shm;
+
+ if (odp_shm_free(shm) < 0)
+ ODP_ERR("shm free failed");
+}
+
+void *queue_lf_create(queue_entry_t *queue)
+{
+ int i;
+ queue_lf_t *queue_lf = NULL;
+
+ if (queue->s.type != ODP_QUEUE_TYPE_PLAIN)
+ return NULL;
+
+ for (i = 0; i < QUEUE_LF_NUM; i++) {
+ if (queue_lf_glb->queue_lf[i].used == 0) {
+ queue_lf = &queue_lf_glb->queue_lf[i];
+ memset(queue_lf, 0, sizeof(queue_lf_t));
+ queue_lf->node[0].marker = LF_NODE_MARK;
+ queue_lf->used = 1;
+ break;
+ }
+ }
+
+ return queue_lf;
+}
+
+void queue_lf_destroy(void *queue_lf_ptr)
+{
+ queue_lf_t *queue_lf = queue_lf_ptr;
+
+ queue_lf->used = 0;
+}