new file mode 100644
@@ -0,0 +1,28 @@
+Copyright (c) 2016, ARM Limited. All rights reserved.
+
+SPDX-License-Identifier: BSD-3-Clause
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice, this
+list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright notice, this
+list of conditions and the following disclaimer in the documentation and/or
+other materials provided with the distribution.
+
+Neither the name of ARM Limited nor the names of its contributors may be
+used to endorse or promote products derived from this software without specific
+prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
new file mode 100644
@@ -0,0 +1,164 @@
+###############################################################################
+# Copyright (c) 2016, ARM Limited. All rights reserved.
+#
+# SPDX-License-Identifier: BSD-3-Clause
+################################################################################
+
+###############################################################################
+# Project specific definitions
+################################################################################
+
+#Name of directory and also Dropbox source tar file
+DIRNAME = scheduler
+#List of executable files to build
+TARGETS = scheduler
+#List object files for each target
+OBJECTS_scheduler = scheduler.o
+
+#Customizable compiler and linker flags
+GCCTARGET =
+CCFLAGS += -mcx16#Required for CMPXCHG16 on x86
+#GCCTARGET = aarch64-linux-gnu
+#CCFLAGS += -mcpu=cortex-a53
+DEFINE += -DNDEBUG#disable assertions
+CCFLAGS += -std=c99
+CCFLAGS += -g -ggdb -Wall
+CCFLAGS += -O2 -fno-stack-check -fno-stack-protector
+LDFLAGS += -g -ggdb -pthread
+LIBS = -lrt
+
+#Where to find the source files
+VPATH += .
+
+#Default to non-verbose mode (echo command lines)
+VERB = @
+
+#Location of object and other derived/temporary files
+OBJDIR = obj#Must not be .
+
+###############################################################################
+# Make actions (phony targets)
+################################################################################
+
+.PHONY : default all clean tags etags
+
+default:
+ @echo "Make targets:"
+ @echo "all build all targets ($(TARGETS))"
+ @echo "clean remove derived files"
+ @echo "tags generate vi tags file"
+ @echo "etags generate emacs tags file"
+
+all : $(TARGETS)
+
+#Make sure we don't remove current directory with all source files
+ifeq ($(OBJDIR),.)
+$(error invalid OBJDIR=$(OBJDIR))
+endif
+ifeq ($(TARGETS),.)
+$(error invalid TARGETS=$(TARGETS))
+endif
+clean:
+ @echo "--- Removing derived files"
+ $(VERB)-rm -rf $(OBJDIR) $(TARGETS) tags TAGS perf.data perf.data.old
+
+tags :
+ $(VERB)ctags -R .
+
+etags :
+ $(VERB)ctags -e -R .
+
+################################################################################
+# Setup tool commands and flags
+################################################################################
+
+#Defaults to be overriden by compiler makefragment
+CCOUT = -o $@
+ASOUT = -o $@
+LDOUT = -o $@
+
+ifneq ($(GCCTARGET),)
+#Some experimental cross compiling support
+#GCCLIB = $(GCCROOT)/lib/gcc/$(GCCTARGET)/4.7.3
+GCCROOT = /opt/gcc-linaro-5.3-2016.02-x86_64_aarch64-linux-gnu
+GCCSETUP = PATH=$(GCCROOT)/bin:$(GCCROOT)/$(GCCTARGET)/bin:/bin:/usr/bin
+CC = $(GCCSETUP) $(GCCROOT)/bin/$(GCCTARGET)-gcc
+CXX = $(GCCSETUP) $(GCCROOT)/bin/$(GCCTARGET)-g++
+LD = $(GCCSETUP) $(GCCROOT)/bin/$(GCCTARGET)-g++
+else
+#Native compilation
+ifeq ($(CLANG),yes)
+CC = clang
+CXX = clang++
+AS = as
+LD = clang++
+else
+CC = gcc
+CXX = g++
+AS = as
+LD = g++
+endif
+endif
+#GROUPSTART = -Wl,--start-group
+#GROUPEND = -Wl,--end-group
+BIN2C = bin2c
+
+#Important compilation flags
+CCFLAGS += -c -MMD -MP
+
+################################################################################
+# Post-process some variables and definitions, generate dependencies
+################################################################################
+
+CCFLAGS += $(DEFINE) $(INCLUDE)
+#Generate list of all object files (for all targets)
+override OBJECTS := $(addprefix $(OBJDIR)/,$(foreach var,$(TARGETS),$(OBJECTS_$(var))))
+#Generate target:objects dependencies for all targets
+$(foreach target,$(TARGETS),$(eval $(target) : $$(addprefix $$(OBJDIR)/,$$(OBJECTS_$(target)))))
+#Special dependency for object files on object directory
+$(OBJECTS) : | $(OBJDIR)
+
+################################################################################
+# Build recipes
+################################################################################
+
+$(OBJDIR) :
+ $(VERB)mkdir -p $(OBJDIR)
+
+#Keep intermediate pcap C-files
+.PRECIOUS : $(OBJDIR)/%_pcap.c
+
+$(OBJDIR)/%_pcap.o : $(OBJDIR)/%_pcap.c
+ @echo "--- Compiling $<"
+ $(VERB)$(CC) $(CCFLAGS) $(CCOUT) $<
+
+$(OBJDIR)/%_pcap.c : %.pcap
+ @echo "--- Generating $@"
+ $(VERB)$(BIN2C) -n $(notdir $(basename $@)) -o $@ $<
+
+$(OBJDIR)/%.o : %.cc
+ @echo "--- Compiling $<"
+ $(VERB)$(CXX) $(CXXFLAGS) $(CCFLAGS) $(CCFLAGS_$(basename $<)) $(CCOUT) $<
+
+$(OBJDIR)/%.o : %.c
+ @echo "--- Compiling $<"
+ $(VERB)$(CC) $(CCFLAGS) $(CCFLAGS_$(basename $<)) $(CCOUT) $<
+
+$(OBJDIR)/%.o : %.s
+ @echo "--- Compiling $<"
+ $(VERB)$(AS) $(ASFLAGS) $(ASONLYFLAGS) $(ASOUT) $<
+
+$(OBJDIR)/%.o : %.S
+ @echo "--- Compiling $<"
+ $(VERB)$(CC) $(CCFLAGS) $(addprefix $(ASPREFIX),$(ASFLAGS)) $(CCOUT) $<
+
+$(TARGETS) :
+ @echo "--- Linking $@ from $(OBJECTS_$@) $(LIBS)"
+ $(VERB)$(LD) $(LDFLAGS) $(LDOUT) $(addprefix $(OBJDIR)/,$(OBJECTS_$@)) $(GROUPSTART) $(LIBS) $(GROUPEND) $(LDMAP)
+
+################################################################################
+# Include generated dependencies
+################################################################################
+
+-include $(patsubst %.o,%.d,$(OBJECTS))
+# DO NOT DELETE
new file mode 100644
@@ -0,0 +1,363 @@
+//Copyright (c) 2016, ARM Limited. All rights reserved.
+//
+//SPDX-License-Identifier: BSD-3-Clause
+
+#define _GNU_SOURCE
+#include <assert.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#undef likely
+#undef unlikely
+#if defined __GNUC__
+#define likely(x) __builtin_expect(!!(x), 1)
+#define unlikely(x) __builtin_expect(!!(x), 0)
+#else
+#define likely(x) (x)
+#define unlikely(x) (x)
+#endif
+
+/******************************************************************************
+ * Linked list queues
+ *****************************************************************************/
+
+struct llnode
+{
+ struct llnode *next;
+ uint32_t tag;//For consistency checks
+};
+
+union llht
+{
+ struct
+ {
+ struct llnode *head, *tail;
+ } st;
+ dintptr_t ui;
+};
+
+struct llqueue
+{
+ union llht u;
+//x86-64 seems faster using spin lock instead of CMPXCHG16
+ pthread_spinlock_t lock;
+};
+
+#define SENTINEL ((void *)~(uintptr_t)0)
+
+//static void llq_enqueue(struct llqueue *llq, struct llnode *node, uint32_t *numfailed) __attribute__((noinline));
+static inline void llq_enqueue(struct llqueue *llq, struct llnode *node, uint32_t *numfailed)
+{
+ union llht old;
+ assert(node->next == NULL);
+ node->next = SENTINEL;
+#ifdef USE_LLSC
+retry: //Failed SC requires new LL
+ old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED);
+#else
+ __atomic_load(&llq->u, &old, __ATOMIC_RELAXED);
+retry: //Failed CAS returns existing value
+ (void)0;//Need statement after label
+#endif
+ union llht neu;
+ neu.st.head = old.st.head == NULL ? node : old.st.head;
+ neu.st.tail = node;
+#ifdef USE_LLSC
+ if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELEASE)))
+#else
+ if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu,
+ /*weak=*/false,
+ __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED)))
+#endif
+ {
+ //Failed
+ doze();
+ if (numfailed != NULL)
+ (*numfailed)++;
+ goto retry;
+ }
+ if (old.st.tail != NULL)
+ {
+ //List was not empty
+ assert(old.st.tail->next == SENTINEL);
+ old.st.tail->next = node;
+ }
+}
+
+//static void llq_enqueue_l(struct llqueue *llq, struct llnode *node, uint32_t *numfailed) __attribute__((noinline));
+static inline void llq_enqueue_l(struct llqueue *llq, struct llnode *node, uint32_t *numfailed)
+{
+ assert(node->next == NULL);
+ node->next = SENTINEL;
+ pthread_spin_lock(&llq->lock);
+ if(llq->u.st.head == NULL)
+ {
+ llq->u.st.head = llq->u.st.tail = node;
+ }
+ else
+ {
+ llq->u.st.tail->next = node;
+ llq->u.st.tail = node;
+ }
+ pthread_spin_unlock(&llq->lock);
+}
+
+//static struct llnode *llq_dequeue(struct llqueue *llq, uint32_t *numfailed) __attribute__((noinline));
+static inline struct llnode *llq_dequeue(struct llqueue *llq, uint32_t *numfailed)
+{
+ struct llnode *head;
+
+ //llq_dequeue() may be used in a busy-waiting fashion
+ //Read head using plain load to avoid disturbing remote LL/SC
+ if ((head = __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED)) == NULL)
+ {
+ return NULL;
+ }
+ //Read head->next before LL to minimize cache miss latency in LL/SC below
+ (void)__atomic_load_n(&head->next, __ATOMIC_RELAXED);
+
+ union llht old;
+#ifdef USE_LLSC
+retry: //Failed SC requires new LL
+ old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED);
+#else
+ __atomic_load(&llq->u, &old, __ATOMIC_RELAXED);
+retry: //Failed CAS returns existing value
+#endif
+ if (unlikely(old.st.head == NULL)) //Empty list
+ {
+ clrex();
+ return NULL;
+ }
+ else if (unlikely(old.st.head == old.st.tail))//Single-element in list
+ {
+ union llht neu;
+ neu.st.head = NULL;
+ neu.st.tail = NULL;
+#ifdef USE_LLSC
+ if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)))
+#else
+ if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu,
+ /*weak=*/false,
+ __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED)))
+#endif
+ {
+ //Failed
+ doze();
+ if (numfailed != NULL)
+ (*numfailed)++;
+ goto retry;
+ }
+ assert(old.st.head->next == SENTINEL);
+ }
+ else//Multi-element list, dequeue head
+ {
+ struct llnode *next = __atomic_load_n(&old.st.head->next,
+ __ATOMIC_RELAXED);
+ //Check if llq_enqueue() has yet written true next pointer
+ if (unlikely(next == SENTINEL))
+ {
+ //Sorry, can't continue
+ clrex();
+ doze();
+ if (numfailed != NULL)
+ (*numfailed)++;
+ goto retry;
+ }
+ union llht neu;
+ neu.st.head = next;
+ neu.st.tail = old.st.tail;
+#ifdef USE_LLSC
+ if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)))
+#else
+ if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu,
+ /*weak=*/false,
+ __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED)))
+#endif
+ {
+ //Failed
+ doze();
+ if (numfailed != NULL)
+ (*numfailed)++;
+ goto retry;
+ }
+ assert(old.st.head->next != SENTINEL);
+ }
+ old.st.head->next = NULL;
+ return old.st.head;
+}
+
+//static struct llnode *llq_dequeue_l(struct llqueue *llq, uint32_t *numfailed) __attribute__((noinline));
+static inline struct llnode *llq_dequeue_l(struct llqueue *llq, uint32_t *numfailed)
+{
+ struct llnode *head;
+ if ((head = __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED)) == NULL)
+ {
+ return NULL;
+ }
+
+ struct llnode *node = NULL;
+ pthread_spin_lock(&llq->lock);
+ if (llq->u.st.head != NULL)
+ {
+ node = llq->u.st.head;
+ if (llq->u.st.head == llq->u.st.tail)
+ {
+ assert(node->next == SENTINEL);
+ llq->u.st.head = llq->u.st.tail = NULL;
+ }
+ else
+ {
+ assert(node->next != SENTINEL);
+ llq->u.st.head = node->next;
+ }
+ node->next = NULL;
+ }
+ pthread_spin_unlock(&llq->lock);
+ return node;
+}
+
+static struct llnode *llq_dequeue_cond(struct llqueue *llq, struct llnode *exp, uint32_t *numfailed) __attribute__((always_inline));
+static inline struct llnode *llq_dequeue_cond(struct llqueue *llq, struct llnode *exp, uint32_t *numfailed)
+{
+ union llht old;
+#ifdef USE_LLSC
+retry: //Failed SC requires new LL
+ old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED);
+#else
+ __atomic_load(&llq->u, &old, __ATOMIC_RELAXED);
+retry: //Failed CAS returns existing value
+#endif
+ if (unlikely(old.st.head == NULL || old.st.head != exp)) //Empty list or wrong head
+ {
+ clrex();
+ return NULL;
+ }
+ else if (unlikely(old.st.head == old.st.tail))//Single-element in list
+ {
+ union llht neu;
+ neu.st.head = NULL;
+ neu.st.tail = NULL;
+#ifdef USE_LLSC
+ if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)))
+#else
+ if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu,
+ /*weak=*/false,
+ __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED)))
+#endif
+ {
+ //Failed
+ doze();
+ if (numfailed != NULL)
+ (*numfailed)++;
+ goto retry;
+ }
+ assert(old.st.head->next == SENTINEL);
+ }
+ else//Multi-element list, dequeue head
+ {
+ struct llnode *next = __atomic_load_n(&old.st.head->next,
+ __ATOMIC_RELAXED);
+ //Check if llq_enqueue() has yet written true next pointer
+ if (unlikely(next == SENTINEL))
+ {
+ //Sorry, can't continue
+ clrex();
+ doze();
+ if (numfailed != NULL)
+ (*numfailed)++;
+ goto retry;
+ }
+ union llht neu;
+ neu.st.head = next;
+ neu.st.tail = old.st.tail;
+#ifdef USE_LLSC
+ if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)))
+#else
+ if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu,
+ /*weak=*/false,
+ __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED)))
+#endif
+ {
+ //Failed
+ doze();
+ if (numfailed != NULL)
+ (*numfailed)++;
+ goto retry;
+ }
+ assert(old.st.head->next != SENTINEL);
+ }
+ old.st.head->next = NULL;
+ return old.st.head;
+}
+
+//static struct llnode *llq_dequeue_cond_l(struct llqueue *llq, struct llnode *exp, uint32_t *numfailed) __attribute__((noinline));
+static inline struct llnode *llq_dequeue_cond_l(struct llqueue *llq, struct llnode *exp, uint32_t *numfailed)
+{
+ struct llnode *node = NULL;
+ pthread_spin_lock(&llq->lock);
+ if (likely(llq->u.st.head != NULL && llq->u.st.head == exp))
+ {
+ node = llq->u.st.head;
+ if (llq->u.st.head == llq->u.st.tail)
+ {
+ assert(node->next == SENTINEL);
+ llq->u.st.head = llq->u.st.tail = NULL;
+ }
+ else
+ {
+ assert(node->next != SENTINEL);
+ llq->u.st.head = node->next;
+ }
+ node->next = NULL;
+ }
+ pthread_spin_unlock(&llq->lock);
+ return node;
+}
+
+static inline struct llnode *llq_head(struct llqueue *llq)
+{
+ return llq->u.st.head;
+}
+
+static inline uint32_t llq_assert(struct llqueue *llq)
+{
+ uint32_t nelems = 0;
+ struct llnode *node = llq->u.st.head;
+ if (node != NULL)
+ {
+ uint32_t tag = node->tag + 1;
+ node->tag = tag;
+ nelems++;
+ //Find last element in list
+ while (node->next != SENTINEL)
+ {
+ node = node->next;
+ assert(node->tag != tag);
+ node->tag = tag;
+ nelems++;
+ }
+ //Tail must point to last element
+ assert(llq->u.st.tail == node);
+ }
+ else//No elements in list
+ {
+ assert(llq->u.st.tail == NULL);
+ }
+ return nelems;
+}
+
+static void llqueue_init(struct llqueue *llq)
+{
+ llq->u.st.head = NULL;
+ llq->u.st.tail = NULL;
+ pthread_spin_init(&llq->lock, PTHREAD_PROCESS_PRIVATE);
+}
new file mode 100644
@@ -0,0 +1,254 @@
+//Copyright (c) 2016, ARM Limited. All rights reserved.
+//
+//SPDX-License-Identifier: BSD-3-Clause
+
+#define _GNU_SOURCE
+#include <assert.h>
+#include <inttypes.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#undef likely
+#undef unlikely
+#if defined __GNUC__
+#define likely(x) __builtin_expect(!!(x), 1)
+#define unlikely(x) __builtin_expect(!!(x), 0)
+#else
+#define likely(x) (x)
+#define unlikely(x) (x)
+#endif
+
+#define ALIGNED(x) __attribute__((__aligned__(x)))
+#define CACHE_LINE 64
+
+/******************************************************************************
+ * LL/SC primitives
+ *****************************************************************************/
+
+#if defined __ARM_ARCH && __ARM_ARCH == 7
+static inline void dmb()
+{
+ __asm __volatile("dmb" : : : "memory");
+}
+
+static inline uint32_t ll(uint32_t *var, int mm)
+{
+ uint32_t old;
+ __asm __volatile("ldrex %0, [%1]"
+ : "=&r" (old)
+ : "r" (var)
+ : );
+ //Barrier after an acquiring load
+ if (mm == __ATOMIC_ACQUIRE)
+ dmb();
+ return old;
+}
+#define ll32(a, b) ll((a), (b))
+
+//Return 0 on success, 1 on failure
+static inline uint32_t sc(uint32_t *var, uint32_t neu, int mm)
+{
+ uint32_t ret;
+ //Barrier before a releasing store
+ if (mm == __ATOMIC_RELEASE)
+ dmb();
+ __asm __volatile("strex %0, %1, [%2]"
+ : "=&r" (ret)
+ : "r" (neu), "r" (var)
+ : );
+ return ret;
+}
+#define sc32(a, b, c) sc((a), (b), (c))
+
+static inline uint64_t lld(uint64_t *var, int mm)
+{
+ uint64_t old;
+ __asm __volatile("ldrexd %0, %H0, [%1]"
+ : "=&r" (old)
+ : "r" (var)
+ : );
+ //Barrier after an acquiring load
+ if (mm == __ATOMIC_ACQUIRE)
+ dmb();
+ return old;
+}
+#define ll64(a, b) lld((a), (b))
+
+//Return 0 on success, 1 on failure
+static inline uint32_t scd(uint64_t *var, uint64_t neu, int mm)
+{
+ uint32_t ret;
+ //Barrier before a releasing store
+ if (mm == __ATOMIC_RELEASE)
+ dmb();
+ __asm __volatile("strexd %0, %1, %H1, [%2]"
+ : "=&r" (ret)
+ : "r" (neu), "r" (var)
+ : );
+ return ret;
+}
+#define sc64(a, b, c) scd((a), (b), (c))
+
+#endif
+
+#if defined __ARM_ARCH && __ARM_ARCH == 8
+static inline uint32_t ll32(uint32_t *var, int mm)
+{
+ uint32_t old;
+ if (mm == __ATOMIC_ACQUIRE)
+ __asm __volatile("ldaxr %w0, [%1]"
+ : "=&r" (old)
+ : "r" (var)
+ : "memory");
+ else if (mm == __ATOMIC_RELAXED)
+ __asm __volatile("ldxr %w0, [%1]"
+ : "=&r" (old)
+ : "r" (var)
+ : );
+ else
+ abort();
+ return old;
+}
+
+//Return 0 on success, 1 on failure
+static inline uint32_t sc32(uint32_t *var, uint32_t neu, int mm)
+{
+ uint32_t ret;
+ if (mm == __ATOMIC_RELEASE)
+ __asm __volatile("stlxr %w0, %w1, [%2]"
+ : "=&r" (ret)
+ : "r" (neu), "r" (var)
+ : "memory");
+ else if (mm == __ATOMIC_RELAXED)
+ __asm __volatile("stxr %w0, %w1, [%2]"
+ : "=&r" (ret)
+ : "r" (neu), "r" (var)
+ : );
+ else
+ abort();
+ return ret;
+}
+
+static inline uint64_t ll(uint64_t *var, int mm)
+{
+ uint64_t old;
+ if (mm == __ATOMIC_ACQUIRE)
+ __asm __volatile("ldaxr %0, [%1]"
+ : "=&r" (old)
+ : "r" (var)
+ : "memory");
+ else if (mm == __ATOMIC_RELAXED)
+ __asm __volatile("ldxr %0, [%1]"
+ : "=&r" (old)
+ : "r" (var)
+ : );
+ else
+ abort();
+ return old;
+}
+#define ll64(a, b) ll((a), (b))
+
+//Return 0 on success, 1 on failure
+static inline uint32_t sc(uint64_t *var, uint64_t neu, int mm)
+{
+ uint32_t ret;
+ if (mm == __ATOMIC_RELEASE)
+ __asm __volatile("stlxr %w0, %1, [%2]"
+ : "=&r" (ret)
+ : "r" (neu), "r" (var)
+ : "memory");
+ else if (mm == __ATOMIC_RELAXED)
+ __asm __volatile("stxr %w0, %1, [%2]"
+ : "=&r" (ret)
+ : "r" (neu), "r" (var)
+ : );
+ else
+ abort();
+ return ret;
+}
+#define sc64(a, b, c) sc((a), (b), (c))
+
+static inline __int128 lld(__int128 *var, int mm)
+{
+ __int128 old;
+ if (mm == __ATOMIC_ACQUIRE)
+ __asm __volatile("ldaxp %0, %H0, [%1]"
+ : "=&r" (old)
+ : "r" (var)
+ : "memory");
+ else if (mm == __ATOMIC_RELAXED)
+ __asm __volatile("ldxp %0, %H0, [%1]"
+ : "=&r" (old)
+ : "r" (var)
+ : );
+ else
+ abort();
+ return old;
+}
+
+//Return 0 on success, 1 on failure
+static inline uint32_t scd(__int128 *var, __int128 neu, int mm)
+{
+ uint32_t ret;
+ if (mm == __ATOMIC_RELEASE)
+ __asm __volatile("stlxp %w0, %1, %H1, [%2]"
+ : "=&r" (ret)
+ : "r" (neu), "r" (var)
+ : "memory");
+ else if (mm == __ATOMIC_RELAXED)
+ __asm __volatile("stxp %w0, %1, %H1, [%2]"
+ : "=&r" (ret)
+ : "r" (neu), "r" (var)
+ : );
+ else
+ abort();
+ return ret;
+}
+#endif
+
+//Clear exclusive monitor, used when LL is not followed by SC
+static inline void clrex(void)
+{
+#if defined __ARM_ARCH
+ __asm __volatile("clrex" : : : );
+#endif
+}
+
+static inline void sevl(void)
+{
+#if defined __ARM_ARCH
+ __asm __volatile("sevl" : : : );
+#endif
+}
+
+static inline void wfe(void)
+{
+#if defined __ARM_ARCH
+ __asm __volatile("wfe" : : : );
+#endif
+}
+
+static inline void doze(void)
+{
+#if defined __ARM_ARCH
+ //YIELD hints the CPU to switch to another thread if available
+ //but otherwise executes as a NOP
+// __asm __volatile("yield" : : : "memory");
+ //ISB flushes the pipeline, then restarts. This is guaranteed to stall
+ //the CPU a number of cycles
+ __asm __volatile("isb" : : : "memory");
+#else
+ //Assume x86
+ __asm __volatile("pause" : : : "memory");
+#endif
+}
+
+//The scalar equivalent of a double pointer
+#if __SIZEOF_PTRDIFF_T__ == 4
+typedef uint64_t dintptr_t;
+#endif
+#if __SIZEOF_PTRDIFF_T__ == 8
+typedef __int128 dintptr_t;
+#endif
new file mode 100644
@@ -0,0 +1,2042 @@
+//Copyright (c) 2016, ARM Limited. All rights reserved.
+//
+//SPDX-License-Identifier: BSD-3-Clause
+
+#define _GNU_SOURCE
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <inttypes.h>
+#include <limits.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+
+//#define LOG
+
+#ifdef __ARM_ARCH
+#define USE_LLSC
+#endif
+
+#if defined __GNUC__
+#define likely(x) __builtin_expect(!!(x), 1)
+#define unlikely(x) __builtin_expect(!!(x), 0)
+#else
+#define likely(x) (x)
+#define unlikely(x) (x)
+#endif
+
+//Function to set breakpoint on
+void bp(void) __attribute((noinline));
+void bp(void)
+{
+}
+
+
+#define MIN(a, b) ((a) < (b) ? (a) : (b))
+
+//Enable for Cortex-A57!
+#if 0
+//Implement store-release (STLR) using DMB; STR (store-relaxed).
+//This alternative is interesting to test since it has proven more
+//performant in some cases on A57.
+//We implement this using a macro since it is used with different types of
+//parameters.
+#define far_atomic_store(_ptr, _val, _mo) \
+do \
+{ \
+ if ((_mo) == __ATOMIC_RELEASE) \
+ { \
+ __asm __volatile("dmb ishst" ::: "memory"); \
+ __atomic_store_n((_ptr), (_val), __ATOMIC_RELAXED); \
+ } \
+ else \
+ __atomic_store_n((_ptr), (_val), (_mo)); \
+} \
+while (0)
+#else
+#define far_atomic_store(_ptr, _val, _mo) \
+ __atomic_store_n((_ptr), (_val), (_mo))
+#endif
+
+//Possibly, store-release a ticket after CAS can use store-relaxed
+//Possibly, this has less overhead for the issuing thread
+#define __ATOMIC_RELEASE_AFTER_CAS __ATOMIC_RELEASE
+
+#define CAS_WEAK false
+
+static bool VERBOSE = false;
+
+static inline bool is_power_of_two(uint32_t n)
+{
+ return n != 0 && (n & (n - 1)) == 0;
+}
+
+//Thread priority and scheduling
+#define PRIO 1
+#define SCHED SCHED_FIFO
+//#define SCHED SCHED_OTHER
+
+#define ALIGNED(x) __attribute__((__aligned__(x)))
+#define CACHE_LINE 64
+
+/******************************************************************************
+ * Linked list queue and its LL/SC support
+ *****************************************************************************/
+
+#include "llsc.c"
+#include "llqueue.c"
+
+/******************************************************************************
+ * Type and forward declarations
+ *****************************************************************************/
+
+//Max 64 threads
+typedef uint64_t odp_thrmask_t;
+#define ODP_THRMASK_ALL ((uint64_t)~0ULL)
+
+typedef union
+{
+ struct
+ {
+ struct llqueue llq;
+ uint32_t prio;
+ };//Anonymous struct, access members directly
+ char dummy[CACHE_LINE];//Required so that sched_queue is size of alignment
+} sched_queue ALIGNED(CACHE_LINE);
+
+struct odp_event_s;
+typedef struct odp_event_s *odp_event_t;
+#define ODP_EVENT_INVALID ((odp_event_t)NULL)
+
+struct sched_obj;//Scheduler objects are the elements of the scheduler queues
+typedef struct sched_obj *odp_queue_t;//ODP queues are scheduler objects
+#define ODP_QUEUE_INVALID ((odp_queue_t)NULL)
+
+struct sched_group;
+typedef uint64_t sched_group_mask_t;
+#define MAX_SCHED_GROUP (sizeof(sched_group_mask_t) * CHAR_BIT) //E.g. 64
+typedef uint32_t odp_schedule_group_t;//1..MAX_SCHED_GROUP
+#define ODP_SCHED_GROUP_INVALID 0
+
+static sched_queue *schedq_from_sched_group(odp_schedule_group_t grp,
+ uint32_t prio);
+
+#define NUM_PRIO 4 //High, medium, low and below priorities
+#define PRIO_MED (NUM_PRIO / 2)
+
+static int odp_queue_enq(odp_queue_t q, const odp_event_t ev[], int num);
+
+/*******************************************************************************
+ * Per thread state
+ ******************************************************************************/
+
+struct reorder_context;
+struct reorder_window;
+struct odp_event_s;
+
+static inline bool rwin_reserve(struct reorder_window *rwin, uint32_t *sn);
+static void rwin_insert(struct reorder_window *rwin,
+ struct reorder_context *rctx,
+ uint32_t sn,
+ void (*callback)(const struct reorder_context *));
+static struct odp_event_s *event_next_get(struct odp_event_s *evt);
+static odp_queue_t event_queue_get(struct odp_event_s *evt);
+static uint32_t event_number_get(struct odp_event_s *evt);
+
+struct reorder_context
+{
+ struct odp_event_s *head, *tail;//Linked list of deferred events
+ struct reorder_window *rwin;//Reorder window for source queue (or whatever)
+ uint32_t *rvec_free;//Pointer to TS->rvec_free
+ uint32_t sn;//Our slot in the reorder window
+ uint16_t idx;//Our index in thread_state rvec array
+ uint16_t olock_flags;
+} ALIGNED(CACHE_LINE);
+
+static inline void rctx_init(struct reorder_context *rctx, uint32_t *rvec_free, uint16_t idx, struct reorder_window *rwin)
+{
+ rctx->head = rctx->tail = NULL;
+ rctx->rwin = rwin;
+ rctx->rvec_free = rvec_free;
+ rctx->sn = 0;
+ rctx->idx = idx;
+ rctx->olock_flags = 0;
+ //Clear free bit
+ assert((*rctx->rvec_free & (1U << rctx->idx)) != 0);
+ __atomic_fetch_and(rctx->rvec_free, ~(1U << rctx->idx), __ATOMIC_RELAXED);
+}
+
+static inline void rctx_free(const struct reorder_context *rctx)
+{
+ assert(rctx->rwin != NULL);
+ //Set free bit
+ assert((*rctx->rvec_free & (1U << rctx->idx)) == 0);
+ //Relaxed order is OK since we haven't written to the reorder_context
+ __atomic_fetch_or(rctx->rvec_free, 1U << rctx->idx, __ATOMIC_RELAXED);
+}
+
+static void olock_release(const struct reorder_context *rctx);
+
+//rctx_retire may be called by any thread
+static void rctx_retire(const struct reorder_context *rctx)
+{
+ struct odp_event_s *evt = rctx->head;
+ while (likely(evt != NULL))
+ {
+ struct odp_event_s *next = event_next_get(evt);
+ //Prefetch next event
+ __builtin_prefetch(next, 0, 0);
+ int rc = odp_queue_enq(event_queue_get(evt), &evt, 1);
+ if (unlikely(rc != 1))
+ {
+ fprintf(stderr, "rctx_retire: failed to enqueue event %p/%u on queue %p\n", evt, event_number_get(evt), event_queue_get(evt));
+ fflush(NULL); abort();
+ }
+ evt = next;
+ }
+ olock_release(rctx);
+ rctx_free(rctx);
+}
+
+static inline void rctx_release(struct reorder_context *rctx)
+{
+ assert((*rctx->rvec_free & (1U << rctx->idx)) == 0);
+ //Insert reorder context into reorder window, potentially calling the
+ //rctx_retire function for all pending reorder_contexts
+ rwin_insert(rctx->rwin, rctx, rctx->sn, rctx_retire);
+}
+
+#define TS_RVEC_SIZE 16
+
+struct thread_state
+{
+ struct sched_obj *atomq;//Atomic queue currently being processed or NULL
+ struct reorder_context *rctx;//Current reorder context or NULL
+ bool pause;
+ bool out_of_order;
+ uint32_t tidx;//Thread index
+ uint32_t ticket;//Ticket for atomic queue or TICKET_INVALID
+ uint32_t rvec_free;//Bitset of free entries in rvec
+ uint16_t num_schedq;
+ uint16_t sg_sem;//Set when sg_wanted is modified by other thread
+ sched_group_mask_t sg_actual[NUM_PRIO];//Current sched_group membership
+ sched_group_mask_t sg_wanted[NUM_PRIO];//Future sched_group membership
+#define SCHEDQ_PER_THREAD (MAX_SCHED_GROUP * NUM_PRIO)
+ sched_queue *schedq_list[SCHEDQ_PER_THREAD];
+ struct reorder_context rvec[TS_RVEC_SIZE];
+} ALIGNED(CACHE_LINE);
+
+#define MAXTHREADS 32
+
+static struct thread_state thread_state[MAXTHREADS];
+static uint32_t NUMTHREADS = 2;
+static __thread struct thread_state *TS;
+
+static void thread_state_init(int tidx)
+{
+ struct thread_state *ts = &thread_state[tidx];
+ ts->atomq = ODP_QUEUE_INVALID;
+ ts->rctx = NULL;
+ ts->pause = false;
+ ts->out_of_order = false;
+ ts->tidx = tidx;
+ ts->rvec_free = 0;
+ assert(TS_RVEC_SIZE <= sizeof(ts->rvec_free) * CHAR_BIT);
+ ts->rvec_free = (1ULL << TS_RVEC_SIZE) - 1;
+ ts->num_schedq = 0;
+ ts->sg_sem = 1;//Start with sched group semaphore changed
+ memset(ts->sg_actual, 0, sizeof ts->sg_actual);
+ //clear ts->sg_wanted;//This might already have been set
+ TS = ts;
+}
+
+static void insert_schedq_in_list(struct thread_state *ts,
+ sched_queue *schedq)
+{
+ //Find slot for schedq
+ for (uint32_t i = 0; i < ts->num_schedq; i++)
+ {
+ //Higher value is higher priority and closer to start of list
+ if (schedq->prio >= ts->schedq_list[i]->prio)
+ {
+ //This is the slot!
+ sched_queue *tmp = ts->schedq_list[i];
+ ts->schedq_list[i] = schedq;
+ schedq = tmp;
+ //Continue the insertion procedure with the new schedq
+ }
+ }
+ //Insert schedq at end of list
+ if (ts->num_schedq == SCHEDQ_PER_THREAD)
+ {
+ fprintf(stderr, "insert_schedq_in_list: too many schedq's\n");
+ abort();
+ }
+ ts->schedq_list[ts->num_schedq++] = schedq;
+}
+
+static void remove_schedq_from_list(struct thread_state *ts,
+ sched_queue *schedq)
+{
+ //Find schedq
+ for (uint32_t i = 0; i < ts->num_schedq; i++)
+ {
+ if (ts->schedq_list[i] == schedq)
+ {
+ //Move remaining schedq's
+ for (uint32_t j = i + 1; j < ts->num_schedq; j++)
+ {
+ ts->schedq_list[j - 1] = ts->schedq_list[j];
+ }
+ ts->num_schedq--;
+ return;
+ }
+ }
+ //schedq not found, internal error
+ fprintf(stderr, "remove_schedq_from_list: schedq not found\n");
+ abort();
+}
+
+/******************************************************************************
+ * Scheduler queues
+ *****************************************************************************/
+
+typedef enum
+{
+ pktio, parallel_q, ordered_q, atomic_q
+} sched_obj_type;
+
+static inline void schedq_init(sched_queue *schedq, uint32_t prio)
+{
+ llqueue_init(&schedq->llq);
+ schedq->prio = prio;
+}
+
+static inline struct sched_obj *schedq_peek(sched_queue *schedq)
+{
+ return (struct sched_obj *)llq_head(&schedq->llq);
+}
+
+static bool schedq_cond_pop(sched_queue *schedq, struct sched_obj *obj) __attribute__((always_inline));
+static inline bool schedq_cond_pop(sched_queue *schedq, struct sched_obj *obj)
+{
+ return llq_dequeue_cond(&schedq->llq, (struct llnode *)obj, NULL) ==
+ (struct llnode *)obj;
+}
+
+static void schedq_push(sched_queue *schedq,
+ struct sched_obj *obj)
+{
+ llq_enqueue(&schedq->llq, (struct llnode *)obj, NULL);
+}
+
+/******************************************************************************
+ * ODP event
+ *****************************************************************************/
+
+struct odp_event_s
+{
+ struct odp_event_s *next;//Next pointer for linked list
+ odp_queue_t queue;//Queue this event is destined for
+ //Below are fields used by the application
+ unsigned fromqidx;
+ unsigned number;
+};
+
+static odp_event_t odp_event_alloc(void)
+{
+ struct odp_event_s *evt = aligned_alloc(CACHE_LINE,
+ sizeof(struct odp_event_s));
+ if (unlikely(evt == NULL))
+ return ODP_EVENT_INVALID;
+ return evt;
+}
+
+static inline struct odp_event_s *event_next_get(struct odp_event_s *evt)
+{
+ return evt->next;
+}
+
+static inline void event_next_set(struct odp_event_s *evt, struct odp_event_s *nxt)
+{
+ evt->next = nxt;
+}
+
+static inline odp_queue_t event_queue_get(struct odp_event_s *evt)
+{
+ return evt->queue;
+}
+
+static inline void event_queue_set(struct odp_event_s *evt, odp_queue_t q)
+{
+ evt->queue = q;
+}
+
+static inline uint32_t event_number_get(struct odp_event_s *evt)
+{
+ return evt->number;
+}
+
+/******************************************************************************
+ * Reorder window
+ *****************************************************************************/
+
+struct hc
+{
+ uint32_t head;//First missing context
+ uint32_t chgi;//Change indicator
+} ALIGNED(sizeof(uint64_t));
+
+#define RWIN_SIZE 32 //Should be at least one per CPU
+
+#define NUM_OLOCKS 2
+
+struct reorder_window
+{
+ struct hc hc;//head and chgi
+ uint32_t winmask;
+ uint32_t tail;
+ uint32_t turn;
+ uint16_t lock_count ALIGNED(CACHE_LINE);//Force new cache line
+ uint32_t olock[NUM_OLOCKS];
+ struct reorder_context *ring[RWIN_SIZE] ALIGNED(CACHE_LINE);//Force new cache line
+};
+
+static inline void olock_unlock(struct thread_state *ts, const struct reorder_context *rctx, struct reorder_window *rwin, unsigned lock_index)
+{
+ if ((rctx->olock_flags & (1U << lock_index)) == 0)
+ {
+ //Lock not used
+#ifdef LOG
+if (VERBOSE) printf("%u: release %p->olock[%u]=%u\n", TS->tidx, rwin, lock_index, rctx->sn + 1);
+#endif
+ //Use relaxed ordering, we are not releasing any updates
+ far_atomic_store(&rwin->olock[lock_index],
+ rctx->sn + 1,
+ __ATOMIC_RELAXED);
+ }
+}
+
+static void olock_release(const struct reorder_context *rctx)
+{
+ struct reorder_window *rwin = rctx->rwin;
+#ifdef LOG
+if (VERBOSE) printf("%u: release sn=%u %p->olock[0]=%u olock_flags=%x\n", TS->tidx, rctx->sn, rwin, rwin->olock[0], rctx->olock_flags);
+#endif
+ if (unlikely(rwin->lock_count != 0))
+ {
+ olock_unlock(TS, rctx, rwin, 0);
+ if (rwin->lock_count != 1)
+ {
+ olock_unlock(TS, rctx, rwin, 1);
+ }
+ }
+ assert(NUM_OLOCKS == 2);
+}
+
+static struct reorder_window *rwin_alloc(unsigned lock_count)
+{
+ assert(is_power_of_two(RWIN_SIZE));
+ struct reorder_window *rwin = aligned_alloc(CACHE_LINE, sizeof(struct reorder_window));
+ if (rwin != NULL)
+ {
+ assert(offsetof(struct reorder_window, hc) == 0);
+ assert(offsetof(struct reorder_window, lock_count) == CACHE_LINE);
+ assert(offsetof(struct reorder_window, ring) == 2 * CACHE_LINE);
+ rwin->hc.head = 0;
+ rwin->hc.chgi = 0;
+ rwin->winmask = RWIN_SIZE - 1;
+ rwin->tail = 0;
+ rwin->turn = 0;
+ rwin->lock_count = (uint16_t)lock_count;
+ memset(rwin->olock, 0, sizeof rwin->olock);
+ for (uint32_t i = 0; i < RWIN_SIZE; i++)
+ rwin->ring[i] = NULL;
+ }
+ return rwin;
+}
+
+static inline bool rwin_reserve(struct reorder_window *rwin, uint32_t *sn)
+{
+ uint32_t head, oldt, newt;
+ //Read head and tail separately
+#ifndef USE_LLSC
+ oldt = __atomic_load_n(&rwin->tail, __ATOMIC_RELAXED);
+#endif
+ do
+ {
+ head = __atomic_load_n(&rwin->hc.head, __ATOMIC_RELAXED);
+#ifdef USE_LLSC
+ oldt = ll32(&rwin->tail, __ATOMIC_RELAXED);
+#endif
+ if (unlikely(oldt - head >= rwin->winmask))
+ {
+ return false;
+ }
+ newt = oldt + 1;
+ }
+#ifdef USE_LLSC
+ while (unlikely(sc32(&rwin->tail, newt, __ATOMIC_RELAXED)));
+#else
+ while (!__atomic_compare_exchange(&rwin->tail,
+ &oldt,
+ &newt,
+ CAS_WEAK,
+ __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED));
+#endif
+ *sn = oldt;
+ return true;
+}
+
+static void rwin_insert(struct reorder_window *rwin,
+ struct reorder_context *rctx,
+ uint32_t sn,
+ void (*callback)(const struct reorder_context *))
+{
+ struct hc old;
+ __atomic_load(&rwin->hc, &old, __ATOMIC_ACQUIRE);
+ uint32_t winmask = rwin->winmask;
+ if (old.head != sn)
+ {
+ //We are out-of-order
+ //Store context in reorder window, releasing its content
+ assert(rwin->ring[sn & winmask] == NULL);
+ __atomic_store_n(&rwin->ring[sn & winmask], rctx, __ATOMIC_RELEASE);
+ rctx = NULL;
+
+ do
+ {
+ struct hc new;
+ new.head = old.head;
+ new.chgi = old.chgi + 1;//Changed value
+ //Update head&chgi, fail if any has changed
+ if (__atomic_compare_exchange(&rwin->hc,
+ &old,//Updated on failure
+ &new,
+ CAS_WEAK,
+ __ATOMIC_RELEASE,//Release our ring update
+ __ATOMIC_ACQUIRE))
+ {
+ //CAS succeeded => head same (we are not in-order), chgi updated
+ return;
+ }
+ //CAS failed => head and/or chgi changed
+ //We might not be out-of-order anymore
+ }
+ while (old.head != sn);
+ //old.head == sn => we are now in-order!
+ }
+
+ assert(old.head == sn);
+ //We are in-order so our responsibility to retire contexts
+ struct hc new;
+ new.head = old.head;
+ new.chgi = old.chgi + 1;//Changed value
+
+ //Retire our in-order context (if we still have it)
+ if (rctx != NULL)
+ {
+ callback(rctx);
+ new.head++;
+ }
+
+ //Retire in-order contexts in the ring
+ //The first context might actually be ours (if we were originally
+ //out-of-order)
+ do
+ {
+ for (;;)
+ {
+ rctx = __atomic_load_n(&rwin->ring[new.head & winmask],
+ __ATOMIC_ACQUIRE);
+ if (rctx == NULL)
+ break;
+ //We are the only thread that are in-order (until head updated)
+ //so don't have to use atomic load-and-clear (exchange)
+ rwin->ring[new.head & winmask] = NULL;
+ callback(rctx);
+ new.head++;
+ }
+ }
+ //Update head&chgi, fail if chgi has changed (head cannot change)
+ while (!__atomic_compare_exchange(&rwin->hc,
+ &old,//Updated on failure
+ &new,
+ /*weak=*/false,
+ __ATOMIC_RELEASE,//Release our ring updates
+ __ATOMIC_ACQUIRE));
+}
+
+/******************************************************************************
+ * sched_obj aka ODP queue
+ *****************************************************************************/
+
+//Number of events that can be stored in a queue
+#define RING_SIZE 2048
+
+typedef uint32_t ringidx_t;
+struct ringstate
+{
+ ringidx_t read;
+ ringidx_t write;
+} ALIGNED(8);
+#define RINGSIZE_MAX (1U << 31)
+
+struct sharedstate
+{
+ uint32_t numevts;
+ uint16_t cur_ticket;
+ uint16_t nxt_ticket;
+} ALIGNED(sizeof(uint32_t) * 2);
+#define TICKET_INVALID (uint32_t)(~0U)
+
+struct ring
+{
+ struct ringstate prod;
+ struct ringstate cons;
+ struct sharedstate shared;
+ uint32_t mask;
+ odp_event_t ring[RING_SIZE] ALIGNED(CACHE_LINE);
+};
+
+struct sched_obj//May actually be an ODP queue
+{
+ struct llnode node;
+ sched_queue *schedq;//Which schedq we belong to
+ sched_obj_type type;
+ void *user_ctx;
+ struct reorder_window *rwin;
+ struct ring queue;
+} ALIGNED(CACHE_LINE);
+
+static inline struct reorder_window *queue_rwin_get(const odp_queue_t q)
+{
+ return q->rwin;
+}
+
+static inline bool queue_is_empty(const odp_queue_t q)
+{
+ return q->queue.cons.read == q->queue.cons.write;
+}
+
+static inline ringidx_t ringstate_num_used(struct ringstate rs)
+{
+ return (ringidx_t)(rs.write - rs.read);
+}
+
+static inline ringidx_t ringstate_num_free(struct ringstate rs)
+{
+ return RING_SIZE - (ringidx_t)(rs.write - rs.read);
+}
+
+static odp_queue_t _odp_queue_create(uint32_t prio,
+ sched_obj_type sync,
+ odp_schedule_group_t group,
+ unsigned lock_count,
+ void *user_ctx)
+{
+ if (lock_count > (sync == ordered_q ? NUM_OLOCKS : 0))
+ return NULL;
+ odp_queue_t q = aligned_alloc(CACHE_LINE, sizeof(struct sched_obj));
+ if (q == NULL)
+ perror("aligned_alloc"), exit(EXIT_FAILURE);
+ q->schedq = schedq_from_sched_group(group, prio);
+ q->type = sync;
+ q->user_ctx = user_ctx;
+ assert(is_power_of_two(RING_SIZE));
+ q->queue.prod.read = 0;
+ q->queue.prod.write = 0;
+ q->queue.cons.read = 0;
+ q->queue.cons.write = 0;
+ q->queue.shared.numevts = 0;
+ q->queue.shared.cur_ticket = 0;
+ q->queue.shared.nxt_ticket = 0;
+ q->queue.mask = RING_SIZE - 1;
+ for (uint32_t i = 0; i < RING_SIZE; i++)
+ {
+ q->queue.ring[i] = ODP_EVENT_INVALID;
+ }
+ q->rwin = NULL;
+ if (sync == ordered_q)
+ {
+ q->rwin = rwin_alloc(lock_count);
+ if (q->rwin == NULL)
+ perror("rwin_alloc"), exit(EXIT_FAILURE);
+ }
+ assert(queue_is_empty(q));
+ return q;
+}
+
+static const char *qtype2str(odp_queue_t q)
+{
+ switch (q->type)
+ {
+ case pktio :
+ return "pktio";
+ case parallel_q :
+ return "parallel";
+ case ordered_q :
+ return "ordered";
+ case atomic_q :
+ return "atomic";
+ }
+ return "?";
+}
+
+static int odp_queue_enq(odp_queue_t q, const odp_event_t ev[], int num)
+{
+ struct thread_state *ts = TS;
+ if (unlikely(ts->out_of_order))//unlikely() improves performance for atomic and parallel queues but degrades it for ordered queues
+ {
+ int i = 0;
+ struct reorder_context *rctx = ts->rctx;
+ assert(ts->rctx != NULL);
+ while (i < num)
+ {
+#ifdef LOG
+if (VERBOSE) printf("%u: Deferring enqueue event %p/%u on queue %p\n", TS->tidx, ev[i], ev[i]->number, q);
+#endif
+ event_queue_set(ev[i], q);
+ if (rctx->head == NULL)
+ {
+ rctx->head = ev[i];
+ rctx->tail = ev[i];
+ }
+ else
+ {
+ event_next_set(rctx->tail, ev[i]);
+ rctx->tail = ev[i];
+ }
+ i++;
+ }
+ event_next_set(ev[i - 1], NULL);
+ rctx->tail = ev[i - 1];
+ return i;
+ }
+
+ struct ringstate old;
+ ringidx_t new_write;
+ uint32_t actual;
+
+ //Load producer ring state (read & write index)
+#ifdef NDEBUG
+ //No debug => no assert => relaxed ordering OK
+#define ATOMIC_READ_ON_ASSERT __ATOMIC_RELAXED
+#else
+ //Debug => assert reads from the ring => needs acquire ordering
+#define ATOMIC_READ_ON_ASSERT __ATOMIC_ACQUIRE
+#endif
+
+#ifndef USE_LLSC
+ old.write = __atomic_load_n(&q->queue.prod.write, __ATOMIC_RELAXED);
+#endif
+ do
+ {
+#ifdef USE_LLSC
+ old.write = ll32(&q->queue.prod.write, ATOMIC_READ_ON_ASSERT);
+#endif
+ old.read = __atomic_load_n(&q->queue.prod.read, __ATOMIC_RELAXED);
+
+ actual = MIN(num, ringstate_num_free(old));
+ if (unlikely(actual == 0))
+ {
+ return 0;
+ }
+
+ new_write = old.write + actual;
+ }
+#ifdef USE_LLSC
+ while (unlikely(sc32(&q->queue.prod.write, new_write, __ATOMIC_RELAXED)));
+#else
+ while (!__atomic_compare_exchange_n(&q->queue.prod.write,
+ &old.write,//Updated on failure
+ new_write,
+ CAS_WEAK,
+ ATOMIC_READ_ON_ASSERT,
+ __ATOMIC_RELAXED));
+#endif
+
+ //Store our event(s) in the ring
+ uint32_t index = old.write & q->queue.mask;
+ for (uint32_t i = 0; i < actual; i++)
+ {
+ //The following assert reads from the ring, needs acquire ordering above
+ assert(ev[i] != ODP_EVENT_INVALID);
+ assert(q->queue.ring[index] == ODP_EVENT_INVALID);
+#ifdef LOG
+if (VERBOSE) printf("%u: Enqueue event %p/%u on queue %p (%u used)\n", TS->tidx, ev[i], ev[i]->number, q, (uint32_t)(new_write - old.read));
+#endif
+ q->queue.ring[index] = ev[i];
+ index = (index + 1) & q->queue.mask;
+ }
+
+ //Wait for our turn to signal consumers
+ while (__atomic_load_n(&q->queue.cons.write, __ATOMIC_RELAXED) != old.write)
+ {
+ doze();
+ }
+
+ //Update the event counter, optionally take a ticket
+ union
+ {
+ struct sharedstate ss;
+ uint64_t ui;
+ } oss, nss;
+ uint32_t ticket;
+#ifndef USE_LLSC
+ __atomic_load(&q->queue.shared, &oss, __ATOMIC_RELAXED);
+#endif
+ do
+ {
+ ticket = TICKET_INVALID;
+#ifdef USE_LLSC
+ oss.ui = ll64((uint64_t *)&q->queue.shared, __ATOMIC_RELAXED);
+#endif
+ nss = oss;
+ nss.ss.numevts += actual;
+ if (oss.ss.numevts == 0)//Empty -> non-empty transition
+ {
+ if (q->type != atomic_q || oss.ss.cur_ticket == oss.ss.nxt_ticket)
+ {
+ //Atomic queue: only take ticket if one is immediately available
+ //Otherwise ticket already taken => queue processed by some thread
+ {
+ ticket = nss.ss.nxt_ticket++;
+ }
+ //Parallel or ordered queue
+ //Always take ticket
+ }
+ }
+ //Else queue already was non-empty
+ }
+ //Attempt to update numevts counter and optionally take ticket
+#ifdef USE_LLSC
+ while (sc64((uint64_t *)&q->queue.shared, nss.ui, __ATOMIC_RELAXED));
+#else
+ while (!__atomic_compare_exchange(&q->queue.shared,
+ &oss,
+ &nss,
+ CAS_WEAK,
+ __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED));
+#endif
+
+ //Signal consumers that events are available (release events)
+ //Enable other producers to continue
+ far_atomic_store(&q->queue.cons.write, new_write, __ATOMIC_RELEASE);
+
+ if (ticket != TICKET_INVALID)
+ {
+ assert(oss.ss.numevts == 0);
+ //Wait for our turn to update schedq
+ while (__atomic_load_n(&q->queue.shared.cur_ticket, __ATOMIC_ACQUIRE) !=
+ ticket)
+ {
+ doze();
+ }
+
+ //Enqueue at end of scheduler queue
+ schedq_push(q->schedq, q);
+#ifdef LOG
+if (VERBOSE) printf("%u: Push queue %p on schedq %p\n", TS->tidx, q, q->schedq);
+#endif
+ far_atomic_store(&q->queue.shared.cur_ticket,
+ ticket + 1,
+ __ATOMIC_RELEASE_AFTER_CAS);
+ }
+ //Else queue was not empty or atomic queue already busy
+
+ return actual;
+}
+
+//We want _odp_queue_deq() to be inlined so that unexecuted paths caused by
+//threadsafe and atomic parameters are removed
+static int _odp_queue_deq(odp_queue_t q, odp_event_t ev[], int num, bool threadsafe, bool atomic) __attribute__((always_inline));
+static int _odp_queue_deq(odp_queue_t q,
+ odp_event_t ev[],
+ int num,
+ bool threadsafe,
+ bool atomic)
+{
+ uint32_t actual;
+ struct ringstate old;
+ ringidx_t new_read;
+
+ //Load consumer ring state (read & write index)
+ if (!threadsafe)
+ {
+ old.read = __atomic_load_n(&q->queue.cons.read, __ATOMIC_ACQUIRE);
+ old.write = __atomic_load_n(&q->queue.cons.write, __ATOMIC_RELAXED);
+ actual = MIN(num, ringstate_num_used(old));
+ if (unlikely(actual == 0))
+ {
+ return 0;
+ }
+ new_read = old.read + actual;
+ q->queue.cons.read = new_read;
+ }
+ else
+ {
+#ifndef USE_LLSC
+ old.read = __atomic_load_n(&q->queue.cons.read, __ATOMIC_RELAXED);
+#endif
+ do
+ {
+#ifdef USE_LLSC
+ old.read = ll32(&q->queue.cons.read, __ATOMIC_ACQUIRE);
+#endif
+ old.write = __atomic_load_n(&q->queue.cons.write, __ATOMIC_RELAXED);
+
+ actual = MIN(num, ringstate_num_used(old));
+ if (unlikely(actual == 0))
+ {
+ return 0;
+ }
+
+ //Prefetch queue context for use by application
+ //__builtin_prefetch(q->user_ctx, 0, 0);
+
+ //Attempt to free ring slot(s)
+ new_read = old.read + actual;
+ }
+#ifdef USE_LLSC
+ while (unlikely(sc32(&q->queue.cons.read, new_read, __ATOMIC_RELAXED)));
+#else
+ while (!__atomic_compare_exchange_n(&q->queue.cons.read,
+ &old.read,//Updated on failure
+ new_read,
+ CAS_WEAK,
+ __ATOMIC_ACQUIRE,
+ __ATOMIC_ACQUIRE));
+#endif
+ }
+
+ uint32_t index = old.read & q->queue.mask;
+ uint32_t i;
+ for (i = 0; i < actual; i++)
+ {
+ //TODO Prefetch event data
+ ev[i] = q->queue.ring[index];
+ assert(ev[i] != ODP_EVENT_INVALID);
+#ifndef NDEBUG
+ q->queue.ring[index] = ODP_EVENT_INVALID;
+#endif
+ index = (index + 1) & q->queue.mask;
+#ifdef LOG
+if (VERBOSE) printf("%u: Dequeue event %p/%u from queue %p (%u used)\n", TS->tidx, ev[i], ev[i]->number, q, (uint32_t)(old.write - new_read));
+#endif
+ }
+
+ if (!threadsafe)
+ {
+ //Wait for our turn to signal producers
+ while (__atomic_load_n(&q->queue.prod.read, __ATOMIC_RELAXED) !=
+ old.read)
+ {
+ doze();
+ }
+ }
+
+ if (atomic)
+ {
+ (void)__atomic_fetch_sub(&q->queue.shared.numevts,
+ actual,
+ __ATOMIC_RELAXED);
+
+ //Signal producers that empty slots are available (release ring slots)
+ //Enable other consumers to continue
+ far_atomic_store(&q->queue.prod.read, new_read, __ATOMIC_RELEASE);
+ }
+ else
+ {
+ union
+ {
+ struct sharedstate ss;
+ uint64_t ui;
+ } oss, nss;
+ uint32_t ticket = TICKET_INVALID;
+#ifndef USE_LLSC
+ __atomic_load(&q->queue.shared, &oss, __ATOMIC_RELAXED);
+#endif
+ do
+ {
+#ifdef USE_LLSC
+ oss.ui = ll64((uint64_t *)&q->queue.shared, __ATOMIC_RELAXED);
+#endif
+ nss = oss;
+ nss.ss.numevts -= actual;
+ if (nss.ss.numevts == 0)
+ {
+ //If we emptied parallel/ordered queue, we need a ticket for a
+ //later pop
+ ticket = nss.ss.nxt_ticket++;
+ }
+ }
+ //Attempt update numevts and optionally take ticket
+#ifdef USE_LLSC
+ while (sc64((uint64_t *)&q->queue.shared, nss.ui, __ATOMIC_RELAXED));
+#else
+ while (!__atomic_compare_exchange(&q->queue.shared,
+ &oss,//Updated on failure
+ &nss,
+ CAS_WEAK,
+ __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED));
+#endif
+
+ //Signal producers that empty slots are available (release ring slots)
+ //Enable other consumers to continue
+ far_atomic_store(&q->queue.prod.read, new_read, __ATOMIC_RELEASE);
+
+ if (nss.ss.numevts == 0)
+ {
+ assert(q->type != atomic_q);
+ //Wait for our turn update schedq
+ while (__atomic_load_n(&q->queue.shared.cur_ticket,
+ __ATOMIC_ACQUIRE) != ticket)
+ {
+ doze();
+ }
+
+ bool b = schedq_cond_pop(q->schedq, q);
+ (void)b;
+#ifdef LOG
+ if (VERBOSE) printf("%u: Pop queue %p from schedq %p %s\n", TS->tidx, q, q->schedq, b ? "success" : "failure");
+#endif
+ far_atomic_store(&q->queue.shared.cur_ticket,
+ ticket + 1,
+ __ATOMIC_RELEASE_AFTER_CAS);
+ }
+ }
+
+ return actual;
+}
+
+/******************************************************************************
+ * Behold, the scheduler!
+ *****************************************************************************/
+
+static inline void _odp_schedule_release_ordered(struct thread_state *ts)
+{
+ if (ts->rctx != NULL)
+ {
+#ifdef LOG
+if (VERBOSE) printf("%u: Release rctx %p\n", ts->tidx, ts->rctx);
+#endif
+ ts->out_of_order = false;
+ rctx_release(ts->rctx);
+ ts->rctx = NULL;
+ }
+}
+
+void odp_schedule_release_ordered(void)
+{
+ struct thread_state *ts = TS;
+ if (unlikely(ts->rctx == NULL))
+ {
+ fprintf(stderr, "odp_schedule_release_ordered: unexpected call\n");
+ fflush(NULL); abort();
+ }
+ _odp_schedule_release_ordered(ts);
+}
+
+static inline void _odp_schedule_release_atomic(struct thread_state *ts)
+{
+ struct sched_obj *q = ts->atomq;
+ bool pushed = false;
+ struct sharedstate oss, nss;
+ assert(ts->atomq != ODP_QUEUE_INVALID);
+ assert(ts->ticket != TICKET_INVALID);
+ //Only we have this queue, only we can dequeue but others can enqueue so
+ //numevts can increase but not decrease
+ __atomic_load(&q->queue.shared, &oss, __ATOMIC_ACQUIRE);
+ do
+ {
+ assert(oss.cur_ticket == ts->ticket);
+ if (oss.numevts != 0 && !pushed)
+ {
+ schedq_push(q->schedq, q);
+#ifdef LOG
+if (VERBOSE) printf("%u: Push queue %p on schedq %p\n", TS->tidx, q, q->schedq);
+#endif
+ pushed = true;//Only push once
+ }
+ nss = oss;
+ //Release ticket
+ nss.cur_ticket = ts->ticket + 1;
+ }
+ //Attempt to release ticket expecting our view of numevts to be correct
+ while (!__atomic_compare_exchange(&q->queue.shared,
+ &oss,
+ &nss,
+ CAS_WEAK,
+ __ATOMIC_RELEASE,
+ __ATOMIC_ACQUIRE));
+ //CAS succeed => if (numevts != 0) then queue pushed to schedq
+ ts->atomq = ODP_QUEUE_INVALID;
+ ts->ticket = TICKET_INVALID;
+}
+
+void odp_schedule_release_atomic(void)
+{
+ struct thread_state *ts = TS;
+ if (unlikely(ts->atomq == ODP_QUEUE_INVALID ||
+ ts->ticket == TICKET_INVALID))
+ {
+ fprintf(stderr, "odp_schedule_release_atomic: unexpected call\n");
+ fflush(NULL); abort();
+ }
+ _odp_schedule_release_atomic(ts);
+}
+
+static void update_sg_membership(struct thread_state *ts);
+
+static int odp_schedule_multi(odp_queue_t *from, uint64_t wait,
+ odp_event_t ev[], int num) __attribute__((noinline));
+static int odp_schedule_multi(odp_queue_t *from, uint64_t wait,
+ odp_event_t ev[], int num)
+{
+ (void)wait;//TODO implement timeout
+ //Get pointer to our per-thread state
+ struct thread_state *ts = TS;
+ if (unlikely(ts->pause))
+ {
+ return 0;
+ }
+ odp_queue_t atomq = ts->atomq;
+ //Check if we are currently processing an atomic queue
+ if (atomq != ODP_QUEUE_INVALID)
+ {
+ //Yes, continue to process this queue (optimise for throughput)
+ int ret;
+ assert(ts->ticket != TICKET_INVALID);
+dequeue_atomic: //No side effects before this label!
+ //Atomic queues can be dequeued without lock since this thread has the
+ //only reference to the atomic queue being processed
+ //We are the only thread that can dequeue but other threads can enqueue
+ if (likely((ret = _odp_queue_deq(atomq,
+ ev,
+ num,
+ /*threadsafe=*/false,
+ /*atomic=*/true)) != 0))
+ {
+ *from = atomq;
+ //This thread must continue to "own" this atomic queue until all
+ //events processed and the thread re-invokes the scheduler
+ return ret;
+ }
+ //Atomic queue was empty, release it
+ _odp_schedule_release_atomic(ts);
+ }
+ //No atomic queue processing
+ //else
+ {
+ //Release any previous reorder context
+ _odp_schedule_release_ordered(ts);
+ }
+
+ if (unlikely(__atomic_load_n(&ts->sg_sem, __ATOMIC_RELAXED) != 0))
+ {
+ (void)__atomic_load_n(&ts->sg_sem, __ATOMIC_ACQUIRE);
+ __atomic_store_n(&ts->sg_sem, 0, __ATOMIC_RELAXED);//FIXME?
+ update_sg_membership(ts);
+ }
+
+ //Iterate through our list of scheduler queues which are sorted with
+ //higher priority first
+ for (uint32_t i = 0; i < ts->num_schedq; i++)
+ {
+ //__builtin_prefetch(ts->schedq_list[i + 1], 0, 0);
+ sched_queue *schedq = ts->schedq_list[i];
+ struct sched_obj *obj;
+restart_same:
+ //Peek at the head of the scheduler queue
+ obj = schedq_peek(schedq);
+ if (likely(obj == NULL))
+ {
+ //schedq empty
+ continue;//Look at next schedq
+ }
+ if (obj->type == atomic_q)
+ {
+ //Dequeue object only if it is still at head of schedq
+ bool b = schedq_cond_pop(schedq, obj);
+#ifdef LOG
+if (VERBOSE) printf("%u: Pop atomic queue %p from schedq %p %s\n", ts->tidx, obj, obj->schedq, b ? "success" : "failure");
+#endif
+ if (unlikely(!b))
+ {
+ //atomq not at head of schedq anymore, some other thread
+ //stole it
+ goto restart_same;//Restart at the same schedq
+ }
+ ts->atomq = atomq = obj;
+ //Dequeued atomic queue from the schedq, only we can process it
+ ts->ticket = __atomic_fetch_add(&atomq->queue.shared.nxt_ticket, 1, __ATOMIC_RELAXED);
+ while (__atomic_load_n(&atomq->queue.shared.cur_ticket, __ATOMIC_RELAXED) != ts->ticket)
+ {
+ doze();
+ }
+ goto dequeue_atomic;
+ }
+ else if (obj->type == ordered_q)
+ {
+ odp_queue_t ordq = obj;
+ assert(queue_rwin_get(ordq) != NULL);
+ //The scheduler object (probably an ordered queue) has a
+ //reorder window so requires order restoration
+ //We must use a reorder context to collect all outgoing events
+ //Find and initialise an unused reorder context
+ uint32_t i = __atomic_load_n(&ts->rvec_free, __ATOMIC_RELAXED);
+ if (unlikely(i == 0))
+ {
+ //No free reorder contexts for this thread
+#ifdef LOG
+if (VERBOSE) printf("%u: Out of reorder contexts, queue ignored\n", ts->tidx);
+#endif
+ continue;//Look at next schedq, hope we find non-ordered queue
+ }
+ //Get first bit set (starting from 0)
+ i = __builtin_ffs(i) - 1;
+ struct reorder_context *rctx = ts->rctx = &ts->rvec[i];
+ rctx_init(rctx, &ts->rvec_free, i, queue_rwin_get(ordq));
+#ifdef LOG
+if (VERBOSE) printf("%u: Using rctx %p\n", ts->tidx, rctx);
+#endif
+ //rwin_reserve and odp_queue_deq must be atomic or we will
+ //have a potential race condition
+ //Allocate a slot in the reorder window
+ if (unlikely(!rwin_reserve(rctx->rwin, &rctx->sn)))
+ {
+ //Reorder window full
+#ifdef LOG
+if (VERBOSE) printf("%u: Reorder window full, queue ignored\n", ts->tidx);
+#endif
+//bp();
+ rctx_free(rctx);
+ ts->rctx = NULL;
+ ts->out_of_order = false;
+ continue;//Look at next schedq, find other queue
+ }
+ //Are we in-order or out-of-order?
+ ts->out_of_order = rctx->sn != rctx->rwin->hc.head;
+#ifdef LOG
+if (VERBOSE) printf("%u: Reserved pos %u in rwin %p\n", ts->tidx, rctx->sn, rctx->rwin);
+#endif
+ //Wait for our turn to dequeue
+ while (__atomic_load_n(&rctx->rwin->turn, __ATOMIC_RELAXED) != rctx->sn)
+ {
+ doze();
+ }
+ int ret = _odp_queue_deq(ordq,
+ ev,
+ num,
+ /*threadsafe=*/false,
+ /*atomic=*/false);
+ //Someone else's turn
+ far_atomic_store(&rctx->rwin->turn,
+ rctx->sn + 1,
+ __ATOMIC_RELEASE_AFTER_CAS);
+ if (likely(ret != 0))
+ {
+ *from = ordq;
+ return ret;
+ }
+#ifdef LOG
+if (VERBOSE) printf("%u: Queue %p seems empty, ignoring\n", ts->tidx, ordq);
+//Queue will either become non-empty or will be removed by thread which made it empty
+if (VERBOSE) printf("%u: Release unused rctx %p\n", ts->tidx, ts->rctx);
+#endif
+ ts->out_of_order = false;
+ rctx_release(ts->rctx);
+#ifdef LOG
+if (VERBOSE) printf("%u: Release unused rctx %p rwin %p\n", ts->tidx, ts->rctx, ts->rctx->rwin);
+#endif
+ ts->rctx = NULL;
+ continue;//Look at next schedq
+ }
+ else if (obj->type == parallel_q)
+ {
+ odp_queue_t pq = obj;
+ int ret = _odp_queue_deq(pq,
+ ev,
+ num,
+ /*threadsafe=*/true,
+ /*atomic=*/false);
+ if (likely(ret != 0))
+ {
+ *from = pq;
+ return ret;
+ }
+#ifdef LOG
+if (VERBOSE) printf("%u: Queue %p seems empty, ignoring\n", ts->tidx, pq);
+//Queue will either become non-empty or will be removed by thread which made it empty
+#endif
+ continue;//Look at next schedq
+ }
+ else if (obj->type == pktio)
+ {
+ }
+ }
+ return 0;
+}
+
+static odp_event_t odp_schedule(odp_queue_t *from, uint64_t wait)
+{
+ odp_event_t evt;
+ if (likely(odp_schedule_multi(from, wait, &evt, 1) == 1))
+ {
+ return evt;
+ }
+ return ODP_EVENT_INVALID;
+}
+
+int odp_schedule_num_prio(void)
+{
+ return NUM_PRIO;
+}
+
+void odp_schedule_pause(void)
+{
+ struct thread_state *ts = TS;
+ ts->pause = true;
+}
+
+void odp_schedule_resume(void)
+{
+ struct thread_state *ts = TS;
+ ts->pause = false;
+}
+
+void odp_schedule_prefetch(int num)
+{
+ (void)num;
+ //No-op for the SW scheduler which is only driven by the application
+ //threads themselves
+}
+
+/******************************************************************************
+ * Scheduler groups
+ *****************************************************************************/
+
+struct sched_group
+{
+ odp_thrmask_t thr_actual[NUM_PRIO];//Threads currently associated with the sched group
+ odp_thrmask_t thr_wanted;
+ uint32_t xcount[NUM_PRIO];//Used to spread queues over schedq's
+ uint32_t xfactor;//Number of schedq's per prio
+ sched_queue schedq[1];//NUMPRIO * xfactor
+};
+
+static sched_group_mask_t sg_used;
+static struct sched_group *sg_vec[MAX_SCHED_GROUP];
+
+static sched_queue *schedq_from_sched_group(odp_schedule_group_t grp,
+ uint32_t prio)
+{
+ assert(grp > 0 && grp <= MAX_SCHED_GROUP);
+ assert((sg_used & (1ULL << (grp - 1))) != 0);
+ assert(prio >= 0 && prio < NUM_PRIO);
+ uint32_t sgi = grp - 1;
+ struct sched_group *sg = sg_vec[sgi];
+ //Use xcount to spread queues over the xfactor schedq's per priority
+ uint32_t x = __atomic_fetch_add(&sg->xcount[prio], 1, __ATOMIC_RELAXED);
+ if (x == 0)
+ {
+ //First ODP queue for this priority
+ //Notify all threads in sg->thr_wanted that they should join
+ sched_group_mask_t thrds = sg->thr_wanted;
+ while (thrds != 0)
+ {
+ uint32_t thr = __builtin_ffsl(thrds) - 1;
+ thrds &= ~(1ULL << thr);
+ //Notify the thread about membership in this group/priority
+ (void)__atomic_fetch_or(&thread_state[thr].sg_wanted[prio],
+ 1ULL << sgi,
+ __ATOMIC_RELEASE);
+ __atomic_store_n(&thread_state[thr].sg_sem, 1, __ATOMIC_RELEASE);
+ }
+ }
+ return &sg->schedq[prio * sg->xfactor + x % sg->xfactor];
+}
+
+static void update_sg_membership(struct thread_state *ts)
+{
+ for (uint32_t p = 0; p < NUM_PRIO; p++)
+ {
+ sched_group_mask_t sg_wanted = __atomic_load_n(&ts->sg_wanted[p],
+ __ATOMIC_ACQUIRE);
+ if (ts->sg_actual[p] != sg_wanted)
+ {
+ //Our sched_group membership has changed
+ sched_group_mask_t added = sg_wanted & ~ts->sg_actual[p];
+ while (added != 0)
+ {
+ uint32_t sgi = __builtin_ffsl(added) - 1;
+ struct sched_group *sg = sg_vec[sgi];
+ for (uint32_t x = 0; x < sg->xfactor; x++)
+ {
+ //Include our thread index to shift (rotate) the order of
+ //schedq's
+ insert_schedq_in_list(ts,
+ &sg->schedq[p * sg->xfactor +
+ (x + ts->tidx) % sg->xfactor]);
+ }
+ (void)__atomic_fetch_or(&sg->thr_actual[p],
+ 1ULL << ts->tidx,
+ __ATOMIC_RELAXED);
+ added &= ~(1ULL << sgi);
+ }
+ sched_group_mask_t removed = ~sg_wanted & ts->sg_actual[p];
+ while (removed != 0)
+ {
+ uint32_t sgi = __builtin_ffsl(removed) - 1;
+ struct sched_group *sg = sg_vec[sgi];
+ for (uint32_t x = 0; x < sg->xfactor; x++)
+ {
+ remove_schedq_from_list(ts,
+ &sg->schedq[p * sg->xfactor + x]);
+ }
+ (void)__atomic_fetch_and(&sg->thr_actual[p],
+ ~(1ULL << ts->tidx),
+ __ATOMIC_RELAXED);
+ removed &= ~(1ULL << sgi);
+ }
+ ts->sg_actual[p] = sg_wanted;
+ }
+ }
+}
+
+int odp_schedule_group_join(odp_schedule_group_t group,
+ const odp_thrmask_t *mask);
+
+odp_schedule_group_t odp_schedule_group_create(const char *name,
+ const odp_thrmask_t *mask)
+{
+ uint32_t sgi;
+ sched_group_mask_t used = __atomic_load_n(&sg_used, __ATOMIC_ACQUIRE);
+ do
+ {
+ if (~used == 0)
+ return -1;//All sched_groups in use
+ sgi = __builtin_ffsl(~used) - 1;
+ if (sgi >= MAX_SCHED_GROUP)
+ return -1;//All sched_groups in use
+ } while (!__atomic_compare_exchange_n(&sg_used,
+ &used,
+ used | (1ULL << sgi),
+ CAS_WEAK,
+ __ATOMIC_ACQUIRE,
+ __ATOMIC_ACQUIRE));
+ //Compute xfactor (spread factor) from the number of threads present in the
+ //thread mask
+ //Preferable this would be an explicit parameter
+ uint32_t xfactor = __builtin_popcountll(*mask);
+ if (xfactor < 1)
+ {
+ xfactor = 1;
+ }
+ struct sched_group *sg = aligned_alloc(CACHE_LINE,
+ sizeof(struct sched_group) +
+ (NUM_PRIO * xfactor - 1) *
+ sizeof(sched_queue));
+ if (sg == NULL)
+ {
+ return -1;
+ }
+ sg_vec[sgi] = sg;
+ memset(sg->thr_actual, 0, sizeof sg->thr_actual);
+ sg->thr_wanted = 0;
+ sg->xfactor = xfactor;
+ for (uint32_t p = 0; p < NUM_PRIO; p++)
+ {
+ sg->xcount[p] = 0;
+ for (uint32_t x = 0; x < xfactor; x++)
+ {
+ schedq_init(&sg->schedq[p * xfactor + x], p);
+ }
+ }
+ if (__builtin_popcountll(*mask) != 0)
+ {
+ odp_schedule_group_join(sgi + 1, mask);
+ }
+ return sgi + 1;
+}
+
+int odp_schedule_group_join(odp_schedule_group_t group,
+ const odp_thrmask_t *mask)
+{
+ if (group < 1 && group > MAX_SCHED_GROUP)
+ return -1;
+ uint32_t sgi = group - 1;
+ if ((sg_used & (1ULL << sgi)) == 0)
+ return -1;
+ struct sched_group *sg = sg_vec[sgi];
+ odp_thrmask_t toadd = *mask;
+ //Add threads to scheduler group wanted thread mask
+ (void)__atomic_fetch_or(&sg->thr_wanted, toadd, __ATOMIC_RELAXED);
+ //Notify relevant threads about the change
+ while (toadd != 0)
+ {
+ uint32_t thr = __builtin_ffsl(toadd) - 1;
+ toadd &= ~(1ULL << thr);
+ for (uint32_t p = 0; p < NUM_PRIO; p++)
+ {
+ if (sg->xcount[p] != 0)
+ {
+ //This priority level has ODP queues
+ //Notify the thread about membership in this group/priority
+ (void)__atomic_fetch_or(&thread_state[thr].sg_wanted[p],
+ 1ULL << sgi,
+ __ATOMIC_RELEASE);
+ __atomic_store_n(&thread_state[thr].sg_sem,
+ 1,
+ __ATOMIC_RELEASE);
+ }
+ }
+ }
+ return 0;
+}
+
+int odp_schedule_group_leave(odp_schedule_group_t group,
+ const odp_thrmask_t *mask)
+{
+ if (group < 1 && group > MAX_SCHED_GROUP)
+ return -1;
+ uint32_t sgi = group - 1;
+ if ((sg_used & (1ULL << sgi)) == 0)
+ return -1;
+ struct sched_group *sg = sg_vec[sgi];
+ odp_thrmask_t torem = *mask;
+ //Remove threads from scheduler group wanted thread mask
+ (void)__atomic_fetch_and(&sg->thr_wanted, ~torem, __ATOMIC_RELAXED);
+ //Notify relevant threads about the change
+ while (torem != 0)
+ {
+ uint32_t thr = __builtin_ffsl(torem) - 1;
+ torem &= ~(1ULL << thr);
+ for (uint32_t p = 0; p < NUM_PRIO; p++)
+ {
+ if (sg->xcount[p] != 0)
+ {
+ //Clear bit which specifies membership in this sched_group/prio
+ (void)__atomic_fetch_and(&thread_state[thr].sg_wanted[p],
+ ~(1ULL << sgi),
+ __ATOMIC_RELEASE);
+ __atomic_store_n(&thread_state[thr].sg_sem,
+ 1,
+ __ATOMIC_RELEASE);
+ }
+ }
+ }
+ return 0;
+}
+
+int odp_schedule_group_thrmask(odp_schedule_group_t group,
+ odp_thrmask_t *thrmask)
+{
+ if (group < 1 && group > MAX_SCHED_GROUP)
+ return -1;
+ uint32_t sgi = group - 1;
+ if ((sg_used & (1ULL << sgi)) == 0)
+ return -1;
+ struct sched_group *sg = sg_vec[sgi];
+ *thrmask = sg->thr_wanted;
+ return 0;
+}
+
+/******************************************************************************
+ * Ordered locks
+ *****************************************************************************/
+
+void odp_schedule_order_lock(unsigned lock_index)
+{
+ struct thread_state *ts = TS;
+#ifndef NDEBUG
+ if (unlikely(ts->rctx == NULL))
+ {
+ fprintf(stderr, "odp_schedule_order_lock: unexpected call\n");
+ abort();
+ }
+#endif
+ struct reorder_context *rctx = ts->rctx;
+ struct reorder_window *rwin = rctx->rwin;
+#ifndef NDEBUG
+ if (unlikely(lock_index >= rwin->lock_count))
+ {
+ fprintf(stderr, "odp_schedule_order_lock: invalid lock index %u\n",
+ lock_index);
+ abort();
+ }
+#endif
+#ifdef LOG
+if (VERBOSE) printf("%u: lock acquire sn=%u %p->olock[0]=%u\n", TS->tidx, rctx->sn, rwin, rwin->olock[0]);
+#endif
+ while (__atomic_load_n(&rwin->olock[lock_index], __ATOMIC_ACQUIRE) !=
+ rctx->sn)
+ {
+ doze();
+ }
+#ifdef LOG
+if (VERBOSE) printf("%u: lock taken sn=%u %p->olock[0]=%u\n", TS->tidx, rctx->sn, rwin, rwin->olock[0]);
+#endif
+}
+
+void odp_schedule_order_unlock(unsigned lock_index)
+{
+ struct thread_state *ts = TS;
+#ifndef NDEBUG
+ if (unlikely(ts->rctx == NULL))
+ {
+ fprintf(stderr, "odp_schedule_order_unlock: unexpected call\n");
+ abort();
+ }
+#endif
+ struct reorder_context *rctx = ts->rctx;
+ struct reorder_window *rwin = rctx->rwin;
+#ifndef NDEBUG
+ if (unlikely(lock_index >= rwin->lock_count))
+ {
+ fprintf(stderr, "odp_schedule_order_unlock: invalid lock index %u\n",
+ lock_index);
+ abort();
+ }
+ if (unlikely(rwin->olock[lock_index] != rctx->sn))
+ {
+ fprintf(stderr, "odp_schedule_order_unlock: mismatched call\n");
+ }
+#endif
+#ifdef LOG
+if (VERBOSE) printf("%u: lock released %p->olock[0]=%u\n", TS->tidx, rwin, rctx->sn + 1);
+#endif
+ __atomic_store_n(&rwin->olock[lock_index], rctx->sn + 1, __ATOMIC_RELEASE);
+ rctx->olock_flags |= 1U << lock_index;
+}
+
+/******************************************************************************
+ *
+ *****************************************************************************/
+
+static pthread_t tid[MAXTHREADS];
+static unsigned long CPUFREQ;
+static bool AFFINITY = false;
+static bool PARALLEL = false;
+static bool ORDERED = false;
+static pthread_barrier_t BAR;
+#define MAXQUEUES 256
+static uint32_t NUMQUEUES = 20;
+static odp_queue_t ODPQ[MAXQUEUES];
+#define MAXEVENTS 100000
+static uint32_t NUMEVENTS = 2048;
+static odp_event_t EVENTS[MAXEVENTS];
+static uint32_t NUMCOMPLETED ALIGNED(CACHE_LINE);
+
+static void *entrypoint(void *arg)
+{
+ unsigned tidx = (unsigned)(long)arg;
+ thread_state_init(tidx);
+
+ if (pthread_barrier_wait(&BAR) < PTHREAD_BARRIER_SERIAL_THREAD)
+ {
+ perror("pthread_barrier_wait"), abort();
+ }
+
+ if (tidx == 0)
+ {
+ //Enqueue events from events array into queue 0
+ for (unsigned i = 0; i < NUMEVENTS; i++)
+ {
+ odp_event_t evt = EVENTS[i];
+ evt->fromqidx = 0;
+ unsigned j;
+ for (j = 0; j < 100000; j++)
+ {
+ int rc = odp_queue_enq(ODPQ[0], &evt, 1);
+ if (rc == 1)
+ break;
+ doze();
+ fprintf(stderr, "i=%u, read=%u, write=%u\n", i, ODPQ[0]->queue.prod.read, ODPQ[0]->queue.prod.write);
+ fflush(NULL); abort();
+ }
+ if (j == 100000)
+ fprintf(stderr, "Failed initial enqueue\n"), fflush(NULL), abort();
+ }
+ }
+
+ //Move events from queue N to queue N+1
+ uint32_t fails = 0;
+ while (__atomic_load_n(&NUMCOMPLETED, __ATOMIC_RELAXED) != NUMEVENTS)
+ {
+ odp_queue_t q;
+ odp_event_t evt = odp_schedule(&q, 0);
+ if (evt != ODP_EVENT_INVALID)
+ {
+ evt->fromqidx++;
+ if (evt->fromqidx < NUMQUEUES)
+ {
+ int rc = odp_queue_enq(ODPQ[evt->fromqidx], &evt, 1);
+ if (rc != 1)
+ {
+ fprintf(stderr, "Queue full\n");
+ fflush(NULL); abort();
+ }
+ }
+ else//Event has passed through all queues
+ {
+ if (ORDERED)
+ {
+ odp_schedule_order_lock(0);
+ }
+
+ uint32_t expected = __atomic_fetch_add(&NUMCOMPLETED,
+ 1, __ATOMIC_RELAXED);
+#ifdef LOG
+ if (VERBOSE) printf("%u: Event %u completed\n", TS->tidx, evt->number);
+#endif
+ if (!PARALLEL && evt->number != expected)
+ {
+ //Ordered or atomic queues
+ fprintf(stderr, "%u: Event %u wrong order, expected %u\n",
+ TS->tidx, evt->number, expected);
+ }
+ //Else parallel queues, order not preserved
+ if (ORDERED)
+ {
+ odp_schedule_order_unlock(0);
+ }
+ }
+ fails = 0;
+ }
+ else
+ {
+ doze(); doze(); doze(); doze();
+ doze(); doze(); doze(); doze();
+ doze(); doze(); doze(); doze();
+ doze(); doze(); doze(); doze();
+ doze(); doze(); doze(); doze();
+ doze(); doze(); doze(); doze();
+ if (++fails == 10000000)
+ {
+ fprintf(stderr, "%u: Deadlock suspected\n", TS->tidx);
+ fflush(NULL);
+ bp();//break;
+ }
+ }
+ }
+
+#ifdef LOG
+ if (VERBOSE)
+ {
+ printf("NUMCOMPLETED %u\n", NUMCOMPLETED);
+ static int THREADEXIT = 0;
+ if (__atomic_fetch_add(&THREADEXIT, 1, __ATOMIC_ACQUIRE) == 0)
+ {
+ for (int i = 0; i < NUMQUEUES; i++)
+ {
+ printf("queue %p: numevts %u, cur_ticket %u, nxt_ticket %u\n", ODPQ[i], ODPQ[i]->queue.shared.numevts, ODPQ[i]->queue.shared.cur_ticket, ODPQ[i]->queue.shared.nxt_ticket);
+ struct ringstate rs;
+ rs = ODPQ[i]->queue.cons;
+ if (ringstate_num_used(rs) != 0)
+ {
+ printf("queue %p.cons has %u elements\n", ODPQ[i], ringstate_num_used(rs));
+ }
+ rs = ODPQ[i]->queue.prod;
+ if (ringstate_num_used(rs) != 0)
+ {
+ printf("queue %p.prod has %u elements\n", ODPQ[i], ringstate_num_used(rs));
+ }
+ }
+ }
+ }
+#endif
+
+ return NULL;
+}
+
+static void
+initialize_attr(pthread_attr_t *attr, int sched, int prio, unsigned cpu, const char *name)
+{
+ int err;
+ if (pthread_attr_init(attr) != 0)
+ {
+ perror("pthread_attr_init"), abort();
+ }
+ if (AFFINITY)
+ {
+ cpu_set_t cpuset;
+ CPU_ZERO(&cpuset);
+ CPU_SET(cpu + 1, &cpuset);
+ if (pthread_attr_setaffinity_np(attr, sizeof cpuset, &cpuset))
+ {
+ perror("pthread_attr_setaffinity_np"), abort();
+ }
+ }
+ if (pthread_attr_setschedpolicy(attr, sched))
+ {
+ perror("pthread_attr_setschedpolicy"), abort();
+ }
+ //Get scheduling policy from attr
+ if (pthread_attr_setinheritsched(attr, PTHREAD_EXPLICIT_SCHED))
+ {
+ perror("pthread_attr_setinheritsched"), abort();
+ }
+ struct sched_param schedp;
+ if (sched == SCHED_FIFO || sched == SCHED_RR)
+ {
+ memset(&schedp, 0, sizeof schedp);
+ schedp.sched_priority = prio;
+ if ((err = pthread_attr_setschedparam(attr, &schedp)) != 0)
+ {
+ errno = err;
+ perror("pthread_attr_setschedparam"), abort();
+ }
+ }
+}
+
+static void create_threads(void)
+{
+ unsigned thr;
+ void *(*ep)(void *) = entrypoint;
+ for (thr = 0; thr < NUMTHREADS; thr++)
+ {
+ int err;
+ pthread_attr_t pt_attr;
+ initialize_attr(&pt_attr, SCHED, PRIO, /*cpu=*/thr, "task");
+ if ((err = pthread_create(&tid[thr], &pt_attr, ep, /*arg=*/(void*)(long)thr)) != 0)
+ {
+ if (err == EPERM)
+ {
+ //Work-around for some platforms that do not support/allow
+ //SCHED_FIFO/SCHED_RR
+ initialize_attr(&pt_attr, SCHED_OTHER, PRIO, /*cpu=*/thr, "task");
+ err = pthread_create(&tid[thr], &pt_attr, ep, /*arg=*/(void*)(long)thr);
+ }
+ if (err != 0)
+ {
+ errno = err;
+ perror("pthread_create");
+ exit(20);
+ }
+ }
+ }
+}
+
+#if 0
+static unsigned permille(uint32_t rel, uint32_t tot)
+{
+ return (unsigned)(1000ULL * rel / tot);
+}
+#endif
+
+int main(int argc, char *argv[])
+{
+ unsigned thr;
+ int c;
+
+ while ((c = getopt(argc, argv, "ae:f:opq:t:v")) != -1)
+ {
+ switch (c)
+ {
+ case 'a' :
+ AFFINITY = true;
+ break;
+ case 'e' :
+ {
+ int numevents = atoi(optarg);
+ if (numevents < 1 || numevents > MAXEVENTS)
+ {
+ fprintf(stderr, "Invalid number of events %d\n", numevents);
+ exit(EXIT_FAILURE);
+ }
+ NUMEVENTS = (unsigned)numevents;
+ break;
+ }
+ case 'f' :
+ {
+ CPUFREQ = atol(optarg);
+ break;
+ }
+ case 'o' :
+ ORDERED = true;
+ break;
+ case 'p' :
+ PARALLEL = true;
+ break;
+ case 'q' :
+ {
+ int numqueues = atoi(optarg);
+ if (numqueues < 1 || numqueues > MAXQUEUES)
+ {
+ fprintf(stderr, "Invalid number of queues %d\n", numqueues);
+ exit(EXIT_FAILURE);
+ }
+ NUMQUEUES = (unsigned)numqueues;
+ break;
+ }
+ case 't' :
+ {
+ int numthreads = atoi(optarg);
+ if (numthreads < 1 || numthreads > MAXTHREADS)
+ {
+ fprintf(stderr, "Invalid number of threads %d\n", numthreads);
+ exit(EXIT_FAILURE);
+ }
+ NUMTHREADS = (unsigned)numthreads;
+ break;
+ }
+ default :
+usage :
+ fprintf(stderr, "Usage: scheduler <options>\n"
+ "-a Make threads CPU affine\n"
+ "-e <numevents> Number of events\n"
+ "-f <cpufreq> CPU frequency in kHz\n"
+ "-o Use ordered queues\n"
+ "-p Use parallel queues\n"
+ "-q <numqueues> Number of queues\n"
+ "-t <numthr> Number of threads\n"
+ "-v Verbose\n"
+ );
+ exit(EXIT_FAILURE);
+ case 'v' :
+ VERBOSE = true;
+ break;
+ }
+ }
+ if (optind > argc || (PARALLEL && ORDERED))
+ {
+ goto usage;
+ }
+
+ printf("%u events, %u %s queue%s, %u thread%s\n",
+ NUMEVENTS,
+ NUMQUEUES,
+ PARALLEL ? "parallel" : ORDERED ? "ordered" : "atomic",
+ NUMQUEUES != 1 ? "s" : "",
+ NUMTHREADS,
+ NUMTHREADS != 1 ? "s" : "");
+
+ if (pthread_barrier_init(&BAR, NULL, NUMTHREADS + 1) != 0)
+ {
+ perror("pthread_barrier_init"), abort();
+ }
+
+ //Create scheduler group with thread mask = all threads (0..NUMTHREADS-1)
+ //so the scheduler knows how many schedq's are needed for best spread
+ odp_thrmask_t all = (1ULL << NUMTHREADS) - 1;
+ odp_schedule_group_t grp_all = odp_schedule_group_create("ALL", &all);
+
+ //Create all our ODP queues
+ for (unsigned i = 0; i < NUMQUEUES; i++)
+ {
+ //The last queue is atomic so that we can safely test ordering of events
+ odp_queue_t q = _odp_queue_create(/*prio=*/PRIO_MED,
+ /*sync=*/PARALLEL ? parallel_q :
+ ORDERED ? ordered_q :
+ atomic_q,
+ /*group=*/grp_all,
+ /*lock_count=*/ORDERED && (i == NUMQUEUES - 1),
+ /*user_ctx=*/NULL);
+ if (q == ODP_QUEUE_INVALID)
+ perror("_odp_queue_create"), abort();
+if (VERBOSE) printf("ODPQ[%u]=%p, type=%s, schedq %p\n", i, q, qtype2str(q), q->schedq);
+ ODPQ[i] = q;
+ }
+
+ for (unsigned i = 0; i < NUMEVENTS; i++)
+ {
+ odp_event_t evt = odp_event_alloc();
+ if (evt == ODP_EVENT_INVALID)
+ abort();
+ evt->number = i;
+ EVENTS[i] = evt;
+ }
+ NUMCOMPLETED = 0;
+
+ //Create threads
+ create_threads();
+
+ struct timespec ts;
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ uint64_t start = ts.tv_sec * 1000000000ULL + ts.tv_nsec;
+
+ //Release threads by joining the barrier
+ pthread_barrier_wait(&BAR);
+
+ //Wait for threads to terminate
+ for (thr = 0; thr < NUMTHREADS; thr++)
+ {
+ pthread_join(tid[thr], NULL);
+ }
+
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ if (AFFINITY && CPUFREQ == 0)
+ {
+ unsigned long cpufreq[MAXTHREADS];
+ for (thr = 0; thr < NUMTHREADS; thr++)
+ {
+ char s[200];
+ cpufreq[thr] = 0;
+ sprintf(s, "/sys/devices/system/cpu/cpu%u/cpufreq/cpuinfo_cur_freq", thr + 1);
+ int fd = open(s, O_RDONLY);
+ if (fd != -1)
+ {
+ char buf[40];
+ int l = read(fd, buf, sizeof buf);
+ if (l > 0)
+ {
+ cpufreq[thr] = atol(buf);
+ }
+ close(fd);
+ }
+ }
+ CPUFREQ = 0;
+ for (thr = 0; thr < NUMTHREADS; thr++)
+ {
+ printf("Thread %u current CPU frequency %lukHz\n", thr, cpufreq[thr]);
+ CPUFREQ += cpufreq[thr] / NUMTHREADS;
+ }
+ printf("Average CPU frequency %lukHz\n", CPUFREQ);
+ }
+ uint64_t numops = NUMEVENTS * NUMQUEUES;
+ uint64_t elapsed = ts.tv_sec * 1000000000ULL + ts.tv_nsec - start;
+ printf("%llu.%03llu seconds, ", elapsed / 1000000000LLU, (elapsed % 1000000000LLU) / 1000000LLU);
+ if (elapsed / 1000000 != 0)
+ {
+ printf("%"PRIu32" ops/second", (uint32_t)((numops / (elapsed / 1000000)) * 1000));
+ }
+ printf("\n");
+ printf("%"PRIu32" nanoseconds/update\n", (uint32_t)(elapsed / numops));
+ if (CPUFREQ != 0)
+ {
+ uint64_t cycles = NUMTHREADS * elapsed * CPUFREQ / 1000000ULL;
+ printf("%"PRIu32" cycles/update\n", (uint32_t)(cycles / numops));
+ }
+
+if (VERBOSE)
+{
+ for (uint32_t thr = 0; thr < NUMTHREADS; thr++)
+ {
+ for (uint32_t j = 0; j < thread_state[thr].num_schedq; j++)
+ {
+ sched_queue *schedq = thread_state[thr].schedq_list[j];
+ printf("%u: schedq[%u]=%p (prio=%u)\n", thr, j, schedq, schedq->prio);
+ }
+ }
+ uint32_t numpushpop = 0;
+ for (uint32_t i = 0; i < NUMQUEUES; i++)
+ {
+ numpushpop += ODPQ[i]->queue.shared.nxt_ticket;
+ }
+ printf("%u push/pop operations\n", numpushpop);
+}
+
+ return 0;
+}