@@ -14,6 +14,22 @@
#define MAX_NUM_EVENT (1 * 1024)
#define MAX_ITERATION (100)
#define MAX_QUEUES (64 * 1024)
+#define GLOBALS_NAME "queue_test_globals"
+#define DEQ_RETRIES 100
+#define ENQ_RETRIES 100
+
+typedef struct {
+ pthrd_arg cu_thr;
+ int num_workers;
+ odp_barrier_t barrier;
+ odp_queue_t queue;
+ odp_atomic_u32_t num_event;
+
+ struct {
+ uint32_t num_event;
+ } thread[ODP_THREAD_COUNT_MAX];
+
+} test_globals_t;
static int queue_context = 0xff;
static odp_pool_t pool;
@@ -31,7 +47,30 @@ static void generate_name(char *name, uint32_t index)
int queue_suite_init(void)
{
+ odp_shm_t shm;
+ test_globals_t *globals;
odp_pool_param_t params;
+ int num_workers;
+ odp_cpumask_t mask;
+
+ shm = odp_shm_reserve(GLOBALS_NAME, sizeof(test_globals_t),
+ ODP_CACHE_LINE_SIZE, 0);
+
+ if (shm == ODP_SHM_INVALID) {
+ printf("Shared memory reserve failed\n");
+ return -1;
+ }
+
+ globals = odp_shm_addr(shm);
+ memset(globals, 0, sizeof(test_globals_t));
+
+ num_workers = odp_cpumask_default_worker(&mask, 0);
+
+ if (num_workers > MAX_WORKERS)
+ num_workers = MAX_WORKERS;
+
+ globals->num_workers = num_workers;
+ odp_barrier_init(&globals->barrier, num_workers);
odp_pool_param_init(¶ms);
@@ -51,7 +90,25 @@ int queue_suite_init(void)
int queue_suite_term(void)
{
- return odp_pool_destroy(pool);
+ odp_shm_t shm;
+
+ shm = odp_shm_lookup(GLOBALS_NAME);
+ if (shm == ODP_SHM_INVALID) {
+ printf("SHM lookup failed.\n");
+ return -1;
+ }
+
+ if (odp_shm_free(shm)) {
+ printf("SHM free failed.\n");
+ return -1;
+ }
+
+ if (odp_pool_destroy(pool)) {
+ printf("Pool destroy failed.\n");
+ return -1;
+ }
+
+ return 0;
}
void queue_test_capa(void)
@@ -411,12 +468,211 @@ void queue_test_info(void)
CU_ASSERT(odp_queue_destroy(q_order) == 0);
}
+static uint32_t alloc_and_enqueue(odp_queue_t queue, odp_pool_t pool,
+ uint32_t num)
+{
+ uint32_t i, ret;
+ odp_buffer_t buf;
+ odp_event_t ev;
+
+ for (i = 0; i < num; i++) {
+ buf = odp_buffer_alloc(pool);
+
+ CU_ASSERT(buf != ODP_BUFFER_INVALID);
+
+ ev = odp_buffer_to_event(buf);
+
+ ret = odp_queue_enq(queue, ev);
+
+ CU_ASSERT(ret == 0);
+
+ if (ret)
+ break;
+ }
+
+ return i;
+}
+
+static uint32_t dequeue_and_free_all(odp_queue_t queue)
+{
+ odp_event_t ev;
+ uint32_t num, retries;
+
+ num = 0;
+ retries = 0;
+
+ while (1) {
+ ev = odp_queue_deq(queue);
+
+ if (ev == ODP_EVENT_INVALID) {
+ if (retries >= DEQ_RETRIES)
+ return num;
+
+ retries++;
+ continue;
+ }
+
+ retries = 0;
+ num++;
+
+ odp_event_free(ev);
+ }
+
+ return num;
+}
+
+static int enqueue_with_retry(odp_queue_t queue, odp_event_t ev)
+{
+ int i;
+
+ for (i = 0; i < ENQ_RETRIES; i++)
+ if (odp_queue_enq(queue, ev) == 0)
+ return 0;
+
+ return -1;
+}
+
+static int queue_test_worker(void *arg)
+{
+ uint32_t num, retries, num_workers;
+ int thr_id, ret;
+ odp_event_t ev;
+ odp_queue_t queue;
+ test_globals_t *globals = arg;
+
+ thr_id = odp_thread_id();
+ queue = globals->queue;
+ num_workers = globals->num_workers;
+
+ if (num_workers > 1)
+ odp_barrier_wait(&globals->barrier);
+
+ retries = 0;
+ num = odp_atomic_fetch_inc_u32(&globals->num_event);
+
+ /* On average, each worker deq-enq each event once */
+ while (num < (num_workers * MAX_NUM_EVENT)) {
+ ev = odp_queue_deq(queue);
+
+ if (ev == ODP_EVENT_INVALID) {
+ if (retries < DEQ_RETRIES) {
+ retries++;
+ continue;
+ }
+
+ /* Prevent thread to starve */
+ num = odp_atomic_fetch_inc_u32(&globals->num_event);
+ retries = 0;
+ continue;
+ }
+
+ globals->thread[thr_id].num_event++;
+
+ ret = enqueue_with_retry(queue, ev);
+
+ CU_ASSERT(ret == 0);
+
+ num = odp_atomic_fetch_inc_u32(&globals->num_event);
+ }
+
+ return 0;
+}
+
+static void reset_thread_stat(test_globals_t *globals)
+{
+ int i;
+
+ odp_atomic_init_u32(&globals->num_event, 0);
+
+ for (i = 0; i < ODP_THREAD_COUNT_MAX; i++)
+ globals->thread[i].num_event = 0;
+}
+
+static void multithread_test(odp_nonblocking_t nonblocking)
+{
+ odp_shm_t shm;
+ test_globals_t *globals;
+ odp_queue_t queue;
+ odp_queue_param_t qparams;
+ odp_queue_capability_t capa;
+ uint32_t queue_size, max_size;
+ uint32_t num, sum, num_free, i;
+
+ CU_ASSERT(odp_queue_capability(&capa) == 0);
+
+ queue_size = 2 * MAX_NUM_EVENT;
+
+ max_size = capa.plain.max_size;
+
+ if (nonblocking == ODP_NONBLOCKING_LF) {
+ if (capa.plain.lockfree.max_num == 0)
+ return;
+
+ max_size = capa.plain.lockfree.max_size;
+ }
+
+ if (max_size && queue_size > max_size)
+ queue_size = max_size;
+
+ num = MAX_NUM_EVENT;
+
+ if (num > queue_size)
+ num = queue_size / 2;
+
+ shm = odp_shm_lookup(GLOBALS_NAME);
+ CU_ASSERT_FATAL(shm != ODP_SHM_INVALID);
+
+ globals = odp_shm_addr(shm);
+ globals->cu_thr.numthrds = globals->num_workers;
+
+ odp_queue_param_init(&qparams);
+ qparams.type = ODP_QUEUE_TYPE_PLAIN;
+ qparams.size = queue_size;
+ qparams.nonblocking = nonblocking;
+
+ queue = odp_queue_create("queue_test_mt", &qparams);
+ CU_ASSERT_FATAL(queue != ODP_QUEUE_INVALID);
+
+ globals->queue = queue;
+ reset_thread_stat(globals);
+
+ CU_ASSERT(alloc_and_enqueue(queue, pool, num) == num);
+
+ odp_cunit_thread_create(queue_test_worker, (pthrd_arg *)globals);
+
+ /* Wait for worker threads to terminate */
+ odp_cunit_thread_exit((pthrd_arg *)globals);
+
+ sum = 0;
+ for (i = 0; i < ODP_THREAD_COUNT_MAX; i++)
+ sum += globals->thread[i].num_event;
+
+ CU_ASSERT(sum != 0);
+
+ num_free = dequeue_and_free_all(queue);
+
+ CU_ASSERT(num_free == num);
+ CU_ASSERT(odp_queue_destroy(queue) == 0);
+}
+
+static void queue_test_mt_plain_block(void)
+{
+ multithread_test(ODP_BLOCKING);
+}
+
+static void queue_test_mt_plain_nonblock_lf(void)
+{
+ multithread_test(ODP_NONBLOCKING_LF);
+}
+
odp_testinfo_t queue_suite[] = {
ODP_TEST_INFO(queue_test_capa),
ODP_TEST_INFO(queue_test_mode),
ODP_TEST_INFO(queue_test_lockfree),
ODP_TEST_INFO(queue_test_param),
ODP_TEST_INFO(queue_test_info),
+ ODP_TEST_INFO(queue_test_mt_plain_block),
+ ODP_TEST_INFO(queue_test_mt_plain_nonblock_lf),
ODP_TEST_INFO_NULL,
};