From patchwork Wed Feb 21 12:00:03 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Github ODP bot X-Patchwork-Id: 129046 Delivered-To: patch@linaro.org Received: by 10.46.124.24 with SMTP id x24csp546218ljc; Wed, 21 Feb 2018 04:07:40 -0800 (PST) X-Google-Smtp-Source: AH8x226VthHYjeWeGNSTbH0jpzfmRvyPUBRRx03yOwQgm45UNp3n4cCfiGzYg2ZscORcKGEfBS9S X-Received: by 10.55.11.65 with SMTP id 62mr4956269qkl.258.1519214860805; Wed, 21 Feb 2018 04:07:40 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1519214860; cv=none; d=google.com; s=arc-20160816; b=u+QUOowneISC1cxvMmCptrywXLvEGBWFaRRkPdebTSwNyLvTKVdpYx3ATVgGz7qabR 8EFHtUUrtXEHcsHpezH5ugUANqcogQ1t3QB91KRHMeBYGZpk/2hBoNYGEv58p4e6Ni4c Gq1ekvlp5JgnXDcIk3J2Z3cvFMrsJ3+IlG360ymtPRq/J93sit90px1i/K8GYiiArx3R k/VYcwN4elnUov5LG4qJPD7o8R0CihEem27wU46VEVvaW9FmGq+f+dv5e/EwOShw/CVH t63WSJ7Hf1HLcvebp32Z63fqc55WqAZnmMSEHL3im6AAYpBciIf0yNfd/t3yDKH8ETcq 0XFg== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:list-subscribe:list-help:list-post:list-archive :list-unsubscribe:list-id:precedence:subject:github-pr-num :references:in-reply-to:message-id:date:to:from:delivered-to :arc-authentication-results; bh=ayXCK1oNiOhXeQkDDnSib42PHoyDCh1L0PGCX9cqpos=; b=QnR3FvQZwqU8grNqs7RdXwQ9w+ag5qr7Lx6NKlT+ujq6M5+JwAq8gmW5iwfla0vF1d bG9N2s3halXDHFbN1Jn6NkgaHIDsje53XY0ihMBWJixZfqY0lFXiSRiKyT5L0QBdF+iX DHmUZd7KkAIzfjENzj9A+nqAWH4FKWLRx3JWbWlREX+bd0pcvLXU26QOR5x4m1HcxFh/ l0c3R7QRVCJ/NAo5FkHVTL7sMzKHAm/WXKvNqEMhForywdiKY+T1rEDKVVg4/brMESa3 FVU9DY+olYfF4oYG0107guR5hv1BiYVN9VEWaTQuS3ReJ7f8SSZ6H67+ZCPIP5T6qdYC kPjA== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=yandex.ru Return-Path: Received: from lists.linaro.org (ec2-54-197-127-237.compute-1.amazonaws.com. [54.197.127.237]) by mx.google.com with ESMTP id p14si2210455qtj.222.2018.02.21.04.07.40; Wed, 21 Feb 2018 04:07:40 -0800 (PST) Received-SPF: pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) client-ip=54.197.127.237; Authentication-Results: mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=yandex.ru Received: by lists.linaro.org (Postfix, from userid 109) id 7514061519; Wed, 21 Feb 2018 12:07:40 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on ip-10-142-244-252 X-Spam-Level: X-Spam-Status: No, score=-2.6 required=5.0 tests=BAYES_00,FREEMAIL_FROM, RCVD_IN_DNSWL_LOW, RCVD_IN_MSPIKE_H3, RCVD_IN_MSPIKE_WL autolearn=disabled version=3.4.0 Received: from [127.0.0.1] (localhost [127.0.0.1]) by lists.linaro.org (Postfix) with ESMTP id A123061741; Wed, 21 Feb 2018 12:02:26 +0000 (UTC) X-Original-To: lng-odp@lists.linaro.org Delivered-To: lng-odp@lists.linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id F2EC160C40; Wed, 21 Feb 2018 12:01:57 +0000 (UTC) Received: from forward102j.mail.yandex.net (forward102j.mail.yandex.net [5.45.198.243]) by lists.linaro.org (Postfix) with ESMTPS id 35CD56172B for ; Wed, 21 Feb 2018 12:00:15 +0000 (UTC) Received: from mxback8g.mail.yandex.net (mxback8g.mail.yandex.net [IPv6:2a02:6b8:0:1472:2741:0:8b7:169]) by forward102j.mail.yandex.net (Yandex) with ESMTP id 19D845603162 for ; Wed, 21 Feb 2018 15:00:14 +0300 (MSK) Received: from smtp2j.mail.yandex.net (smtp2j.mail.yandex.net [2a02:6b8:0:801::ac]) by mxback8g.mail.yandex.net (nwsmtp/Yandex) with ESMTP id 2xhl8Vfnjl-0EmuY63T; Wed, 21 Feb 2018 15:00:14 +0300 Received: by smtp2j.mail.yandex.net (nwsmtp/Yandex) with ESMTPSA id a6UlmCoovI-0DvO71g3; Wed, 21 Feb 2018 15:00:13 +0300 (using TLSv1.2 with cipher ECDHE-RSA-AES128-SHA256 (128/128 bits)) (Client certificate not present) From: Github ODP bot To: lng-odp@lists.linaro.org Date: Wed, 21 Feb 2018 15:00:03 +0300 Message-Id: <1519214407-28047-7-git-send-email-odpbot@yandex.ru> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1519214407-28047-1-git-send-email-odpbot@yandex.ru> References: <1519214407-28047-1-git-send-email-odpbot@yandex.ru> Github-pr-num: 492 Subject: [lng-odp] [PATCH v1 6/10] linux-gen: queue: ring based queue implementation X-BeenThere: lng-odp@lists.linaro.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: "The OpenDataPlane \(ODP\) List" List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: lng-odp-bounces@lists.linaro.org Sender: "lng-odp" From: Petri Savolainen Change from linked list of bursts to a ring implementation. Queues have maximum size but code is simpler and performance is a bit better. This step helps in a potential future step to implement queues with a lockless ring. Signed-off-by: Petri Savolainen --- /** Email created from pull request 492 (psavol:master-sched-optim) ** https://github.com/Linaro/odp/pull/492 ** Patch: https://github.com/Linaro/odp/pull/492.patch ** Base sha: 5a58bbf2bb331fd7dde2ebbc0430634ace6900fb ** Merge commit sha: 82a6bfe942419330a430b63149220e6b472f419c **/ .../linux-generic/include/odp_buffer_internal.h | 24 ++-- platform/linux-generic/include/odp_pool_internal.h | 28 ++++ .../linux-generic/include/odp_queue_internal.h | 5 +- platform/linux-generic/odp_packet.c | 3 +- platform/linux-generic/odp_pool.c | 22 +-- platform/linux-generic/odp_queue.c | 159 +++++++++------------ 6 files changed, 114 insertions(+), 127 deletions(-) diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index b52669387..7d649e1c4 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -41,11 +41,19 @@ typedef struct seg_entry_t { uint32_t len; } seg_entry_t; +typedef union buffer_index_t { + uint32_t u32; + + struct { + uint32_t pool :8; + uint32_t buffer :24; + }; +} buffer_index_t; + /* Common buffer header */ struct ODP_ALIGNED_CACHE odp_buffer_hdr_t { - - /* Buffer index in the pool */ - uint32_t index; + /* Combined pool and buffer index */ + buffer_index_t index; /* Total segment count */ uint16_t segcount; @@ -73,16 +81,6 @@ struct ODP_ALIGNED_CACHE odp_buffer_hdr_t { /* Segments */ seg_entry_t seg[CONFIG_PACKET_SEGS_PER_HDR]; - /* Burst counts */ - uint8_t burst_num; - uint8_t burst_first; - - /* Next buf in a list */ - struct odp_buffer_hdr_t *next; - - /* Burst table */ - struct odp_buffer_hdr_t *burst[BUFFER_BURST_SIZE]; - /* --- Mostly read only data --- */ /* User context pointer or u64 */ diff --git a/platform/linux-generic/include/odp_pool_internal.h b/platform/linux-generic/include/odp_pool_internal.h index 8284bcd7d..276032e7b 100644 --- a/platform/linux-generic/include/odp_pool_internal.h +++ b/platform/linux-generic/include/odp_pool_internal.h @@ -104,6 +104,34 @@ static inline odp_buffer_hdr_t *buf_hdl_to_hdr(odp_buffer_t buf) return (odp_buffer_hdr_t *)(uintptr_t)buf; } +static inline odp_buffer_hdr_t *buf_hdr_from_index(pool_t *pool, + uint32_t buffer_idx) +{ + uint32_t block_offset; + odp_buffer_hdr_t *buf_hdr; + + block_offset = buffer_idx * pool->block_size; + + /* clang requires cast to uintptr_t */ + buf_hdr = (odp_buffer_hdr_t *)(uintptr_t)&pool->base_addr[block_offset]; + + return buf_hdr; +} + +static inline odp_buffer_hdr_t *buf_hdr_from_index_u32(uint32_t u32) +{ + buffer_index_t index; + uint32_t pool_idx, buffer_idx; + pool_t *pool; + + index.u32 = u32; + pool_idx = index.pool; + buffer_idx = index.buffer; + pool = pool_entry(pool_idx); + + return buf_hdr_from_index(pool, buffer_idx); +} + int buffer_alloc_multi(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int num); void buffer_free_multi(odp_buffer_hdr_t *buf_hdr[], int num_free); diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 6fe3fb462..68dddbb0b 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -29,6 +29,7 @@ extern "C" { #include #include #include +#include #define QUEUE_STATUS_FREE 0 #define QUEUE_STATUS_DESTROYED 1 @@ -38,9 +39,7 @@ extern "C" { struct queue_entry_s { odp_ticketlock_t ODP_ALIGNED_CACHE lock; - - odp_buffer_hdr_t *head; - odp_buffer_hdr_t *tail; + ring_st_t ring_st; int status; queue_enq_fn_t ODP_ALIGNED_CACHE enqueue; diff --git a/platform/linux-generic/odp_packet.c b/platform/linux-generic/odp_packet.c index 1c1b45ff1..6f8f2d039 100644 --- a/platform/linux-generic/odp_packet.c +++ b/platform/linux-generic/odp_packet.c @@ -1814,7 +1814,8 @@ void odp_packet_print_data(odp_packet_t pkt, uint32_t offset, len += snprintf(&str[len], n - len, " pool index %" PRIu32 "\n", pool->pool_idx); len += snprintf(&str[len], n - len, - " buf index %" PRIu32 "\n", hdr->buf_hdr.index); + " buf index %" PRIu32 "\n", + hdr->buf_hdr.index.buffer); len += snprintf(&str[len], n - len, " segcount %" PRIu16 "\n", hdr->buf_hdr.segcount); len += snprintf(&str[len], n - len, diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c index e5ba8982a..a9c57f4cb 100644 --- a/platform/linux-generic/odp_pool.c +++ b/platform/linux-generic/odp_pool.c @@ -80,20 +80,6 @@ static inline pool_t *pool_from_buf(odp_buffer_t buf) return buf_hdr->pool_ptr; } -static inline odp_buffer_hdr_t *buf_hdr_from_index(pool_t *pool, - uint32_t buffer_idx) -{ - uint32_t block_offset; - odp_buffer_hdr_t *buf_hdr; - - block_offset = buffer_idx * pool->block_size; - - /* clang requires cast to uintptr_t */ - buf_hdr = (odp_buffer_hdr_t *)(uintptr_t)&pool->base_addr[block_offset]; - - return buf_hdr; -} - int odp_pool_init_global(void) { uint32_t i; @@ -296,7 +282,9 @@ static void init_buffers(pool_t *pool) memset(buf_hdr, 0, (uintptr_t)data - (uintptr_t)buf_hdr); /* Initialize buffer metadata */ - buf_hdr->index = i; + buf_hdr->index.u32 = 0; + buf_hdr->index.pool = pool->pool_idx; + buf_hdr->index.buffer = i; buf_hdr->type = type; buf_hdr->event_type = type; buf_hdr->pool_ptr = pool; @@ -782,7 +770,7 @@ static inline void buffer_free_to_pool(pool_t *pool, ring = &pool->ring->hdr; mask = pool->ring_mask; for (i = 0; i < num; i++) - buf_index[i] = buf_hdr[i]->index; + buf_index[i] = buf_hdr[i]->index.buffer; ring_enq_multi(ring, mask, buf_index, num); @@ -822,7 +810,7 @@ static inline void buffer_free_to_pool(pool_t *pool, } for (i = 0; i < num; i++) - cache->buf_index[cache_num + i] = buf_hdr[i]->index; + cache->buf_index[cache_num + i] = buf_hdr[i]->index.buffer; cache->num = cache_num + num; } diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 21135df63..eda872321 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -39,8 +39,15 @@ static int queue_init(queue_entry_t *queue, const char *name, const odp_queue_param_t *param); +typedef struct ODP_ALIGNED_CACHE { + /* Storage space for ring data */ + uint32_t data[CONFIG_QUEUE_SIZE]; +} ring_data_t; + typedef struct queue_table_t { - queue_entry_t queue[ODP_CONFIG_QUEUES]; + queue_entry_t queue[ODP_CONFIG_QUEUES]; + ring_data_t ring_data[ODP_CONFIG_QUEUES]; + } queue_table_t; static queue_table_t *queue_tbl; @@ -143,8 +150,10 @@ static int queue_capability(odp_queue_capability_t *capa) capa->max_sched_groups = sched_fn->num_grps(); capa->sched_prios = odp_schedule_num_prio(); capa->plain.max_num = capa->max_queues; + capa->plain.max_size = CONFIG_QUEUE_SIZE; capa->plain.nonblocking = ODP_BLOCKING; capa->sched.max_num = capa->max_queues; + capa->sched.max_size = CONFIG_QUEUE_SIZE; capa->sched.nonblocking = ODP_BLOCKING; return 0; @@ -192,6 +201,9 @@ static odp_queue_t queue_create(const char *name, param = &default_param; } + if (param->size > CONFIG_QUEUE_SIZE) + return ODP_QUEUE_INVALID; + for (i = 0; i < ODP_CONFIG_QUEUES; i++) { queue = &queue_tbl->queue[i]; @@ -263,7 +275,7 @@ static int queue_destroy(odp_queue_t handle) ODP_ERR("queue \"%s\" already destroyed\n", queue->s.name); return -1; } - if (queue->s.head != NULL) { + if (ring_st_is_empty(&queue->s.ring_st) == 0) { UNLOCK(&queue->s.lock); ODP_ERR("queue \"%s\" not empty\n", queue->s.name); return -1; @@ -326,81 +338,71 @@ static odp_queue_t queue_lookup(const char *name) return ODP_QUEUE_INVALID; } +static inline void buffer_index_from_buf(uint32_t buffer_index[], + odp_buffer_hdr_t *buf_hdr[], int num) +{ + int i; + + for (i = 0; i < num; i++) + buffer_index[i] = buf_hdr[i]->index.u32; +} + +static inline void buffer_index_to_buf(odp_buffer_hdr_t *buf_hdr[], + uint32_t buffer_index[], int num) +{ + int i; + + for (i = 0; i < num; i++) { + buf_hdr[i] = buf_hdr_from_index_u32(buffer_index[i]); + odp_prefetch(buf_hdr[i]); + } +} + static inline int enq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], int num) { int sched = 0; - int i, ret; + int ret; queue_entry_t *queue; - odp_buffer_hdr_t *hdr, *tail, *next_hdr; + int num_enq; + ring_st_t *ring_st; + uint32_t buf_idx[num]; queue = qentry_from_int(q_int); + ring_st = &queue->s.ring_st; + if (sched_fn->ord_enq_multi(q_int, (void **)buf_hdr, num, &ret)) return ret; - /* Optimize the common case of single enqueue */ - if (num == 1) { - tail = buf_hdr[0]; - hdr = tail; - hdr->burst_num = 0; - hdr->next = NULL; - } else { - int next; - - /* Start from the last buffer header */ - tail = buf_hdr[num - 1]; - hdr = tail; - hdr->next = NULL; - next = num - 2; - - while (1) { - /* Build a burst. The buffer header carrying - * a burst is the last buffer of the burst. */ - for (i = 0; next >= 0 && i < BUFFER_BURST_SIZE; - i++, next--) - hdr->burst[BUFFER_BURST_SIZE - 1 - i] = - buf_hdr[next]; - - hdr->burst_num = i; - hdr->burst_first = BUFFER_BURST_SIZE - i; - - if (odp_likely(next < 0)) - break; - - /* Get another header and link it */ - next_hdr = hdr; - hdr = buf_hdr[next]; - hdr->next = next_hdr; - next--; - } - } + buffer_index_from_buf(buf_idx, buf_hdr, num); LOCK(&queue->s.lock); + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { UNLOCK(&queue->s.lock); ODP_ERR("Bad queue status\n"); return -1; } - /* Empty queue */ - if (queue->s.head == NULL) - queue->s.head = hdr; - else - queue->s.tail->next = hdr; + num_enq = ring_st_enq_multi(ring_st, buf_idx, num); - queue->s.tail = tail; + if (odp_unlikely(num_enq == 0)) { + UNLOCK(&queue->s.lock); + return 0; + } if (queue->s.status == QUEUE_STATUS_NOTSCHED) { queue->s.status = QUEUE_STATUS_SCHED; - sched = 1; /* retval: schedule queue */ + sched = 1; } + UNLOCK(&queue->s.lock); /* Add queue to scheduling */ if (sched && sched_fn->sched_queue(queue->s.index)) ODP_ABORT("schedule_queue failed\n"); - return num; /* All events enqueued */ + return num_enq; } static int queue_int_enq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], @@ -446,12 +448,15 @@ static int queue_enq(odp_queue_t handle, odp_event_t ev) static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { - odp_buffer_hdr_t *hdr, *next; - int i, j; - int updated = 0; int status_sync = sched_fn->status_sync; + int num_deq; + ring_st_t *ring_st; + uint32_t buf_idx[num]; + + ring_st = &queue->s.ring_st; LOCK(&queue->s.lock); + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { /* Bad queue, or queue has been destroyed. * Scheduler finalizes queue destroy after this. */ @@ -459,9 +464,9 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], return -1; } - hdr = queue->s.head; + num_deq = ring_st_deq_multi(ring_st, buf_idx, num); - if (hdr == NULL) { + if (num_deq == 0) { /* Already empty queue */ if (queue->s.status == QUEUE_STATUS_SCHED) { queue->s.status = QUEUE_STATUS_NOTSCHED; @@ -471,51 +476,18 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], } UNLOCK(&queue->s.lock); - return 0; - } - - for (i = 0; i < num && hdr; ) { - int burst_num = hdr->burst_num; - int first = hdr->burst_first; - /* First, get bursted buffers */ - for (j = 0; j < burst_num && i < num; j++, i++) { - buf_hdr[i] = hdr->burst[first + j]; - odp_prefetch(buf_hdr[i]); - } - - if (burst_num) { - hdr->burst_num = burst_num - j; - hdr->burst_first = first + j; - } - - if (i == num) - break; - - /* When burst is empty, consume the current buffer header and - * move to the next header */ - buf_hdr[i] = hdr; - next = hdr->next; - hdr->next = NULL; - hdr = next; - updated++; - i++; + return 0; } - /* Write head only if updated */ - if (updated) - queue->s.head = hdr; - - /* Queue is empty */ - if (hdr == NULL) - queue->s.tail = NULL; - if (status_sync && queue->s.type == ODP_QUEUE_TYPE_SCHED) sched_fn->save_context(queue->s.index); UNLOCK(&queue->s.lock); - return i; + buffer_index_to_buf(buf_hdr, buf_idx, num_deq); + + return num_deq; } static int queue_int_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], @@ -584,8 +556,9 @@ static int queue_init(queue_entry_t *queue, const char *name, queue->s.pktin = PKTIN_INVALID; queue->s.pktout = PKTOUT_INVALID; - queue->s.head = NULL; - queue->s.tail = NULL; + ring_st_init(&queue->s.ring_st, + queue_tbl->ring_data[queue->s.index].data, + CONFIG_QUEUE_SIZE); return 0; } @@ -661,7 +634,7 @@ int sched_cb_queue_empty(uint32_t queue_index) return -1; } - if (queue->s.head == NULL) { + if (ring_st_is_empty(&queue->s.ring_st)) { /* Already empty queue. Update status. */ if (queue->s.status == QUEUE_STATUS_SCHED) queue->s.status = QUEUE_STATUS_NOTSCHED;