From patchwork Wed May 11 16:41:49 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Christophe Milard X-Patchwork-Id: 67562 Delivered-To: patch@linaro.org Received: by 10.140.92.199 with SMTP id b65csp294991qge; Wed, 11 May 2016 08:48:50 -0700 (PDT) X-Received: by 10.140.104.48 with SMTP id z45mr4273163qge.49.1462981730035; Wed, 11 May 2016 08:48:50 -0700 (PDT) Return-Path: Received: from lists.linaro.org (lists.linaro.org. [54.225.227.206]) by mx.google.com with ESMTP id c191si5618283qke.4.2016.05.11.08.48.49; Wed, 11 May 2016 08:48:50 -0700 (PDT) Received-SPF: pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.225.227.206 as permitted sender) client-ip=54.225.227.206; Authentication-Results: mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.225.227.206 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org; dmarc=pass (p=NONE dis=NONE) header.from=linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id A979161718; Wed, 11 May 2016 15:48:49 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on ip-10-142-244-252 X-Spam-Level: X-Spam-Status: No, score=-2.6 required=5.0 tests=BAYES_00, RCVD_IN_DNSWL_LOW, RCVD_IN_MSPIKE_H3, RCVD_IN_MSPIKE_WL, URIBL_BLOCKED autolearn=disabled version=3.4.0 Received: from [127.0.0.1] (localhost [127.0.0.1]) by lists.linaro.org (Postfix) with ESMTP id 2808B61649; Wed, 11 May 2016 15:45:11 +0000 (UTC) X-Original-To: lng-odp@lists.linaro.org Delivered-To: lng-odp@lists.linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id 5C8F7616AB; Wed, 11 May 2016 15:45:05 +0000 (UTC) Received: from mail-lf0-f47.google.com (mail-lf0-f47.google.com [209.85.215.47]) by lists.linaro.org (Postfix) with ESMTPS id 9277D61649 for ; Wed, 11 May 2016 15:44:12 +0000 (UTC) Received: by mail-lf0-f47.google.com with SMTP id j8so53740019lfd.2 for ; Wed, 11 May 2016 08:44:12 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=5sQSp0aE/8LUrfOGcfCu3woRECpYZSOqUroVtKJXxdg=; b=lc3F4tZcHA5EWoaC3u/Soq/U1/Z3L1rGxWyGaJSHByWNbG9j3UpkdvfZP4mwegl0Tn WTxdDaQ41h6UOCc4CgGMhxDv+l7fMe24PUwSB918fL4O6IgZ5n1rWMJUSFpzlMaMdwJd xnGKgfq5kwX922q01nji5rM9K5S4DyBvTdXS68mEOwbcRmFhUdx2X2ev55lJ+rWe9fcA xfUmrEqGKB3xuplyNZWjdPks+8cR4FyXYFNGiSgNNsOmwk0lfZw42bJ7JGWefGLWfig3 +6y3CNu4diquTj3VRmOHus0xs3BHqRaXGHkGV5WBy/tVwseoXzZIkFPprQFQI4fD3ZfW sW8Q== X-Gm-Message-State: AOPr4FW8mydKQv8Px5rZCQ9HnkHOoc1ueyuCWeIcU1Z191QOQMeD+FbKrKIQDdsYvTWy4UjxW9I= X-Received: by 10.112.95.20 with SMTP id dg20mr1899566lbb.58.1462981451470; Wed, 11 May 2016 08:44:11 -0700 (PDT) Received: from erachmi-ericsson.ki.sw.ericsson.se (c-83-233-90-46.cust.bredband2.com. [83.233.90.46]) by smtp.gmail.com with ESMTPSA id n37sm1421164lfg.46.2016.05.11.08.44.10 (version=TLS1_2 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Wed, 11 May 2016 08:44:10 -0700 (PDT) From: Christophe Milard To: brian.brooks@linaro.org, mike.holmes@linaro.org, lng-odp@lists.linaro.org Date: Wed, 11 May 2016 18:41:49 +0200 Message-Id: <1462984942-53326-6-git-send-email-christophe.milard@linaro.org> X-Mailer: git-send-email 2.5.0 In-Reply-To: <1462984942-53326-1-git-send-email-christophe.milard@linaro.org> References: <1462984942-53326-1-git-send-email-christophe.milard@linaro.org> X-Topics: patch Subject: [lng-odp] [PATCHv6 05/38] helpers: linux: creating functions to handle odpthreads X-BeenThere: lng-odp@lists.linaro.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: "The OpenDataPlane \(ODP\) List" List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , MIME-Version: 1.0 Errors-To: lng-odp-bounces@lists.linaro.org Sender: "lng-odp" Two functions, odph_odpthreads_create and odph_odpthreads_join are created to create and termindate odp threads. These function will create the odp threads either as linux processes or as linux threads, depending on the command line arguments (flag odph_proc or odph_thread). If both flags are given, every second odp thread will be process, (others are threads). default is thead only. Former functions (odph_linux_pthread_create, odph_linux_pthread_join, odph_linux_process_fork, odph_linux_process_fork_n and odph_linux_process_wait_n) are left for a compatibility, but are aimed to be removed. Signed-off-by: Christophe Milard --- helper/include/odp/helper/linux.h | 28 ++++ helper/linux.c | 267 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 295 insertions(+) diff --git a/helper/include/odp/helper/linux.h b/helper/include/odp/helper/linux.h index f99b88a..f67aa30 100644 --- a/helper/include/odp/helper/linux.h +++ b/helper/include/odp/helper/linux.h @@ -166,6 +166,34 @@ int odph_linux_process_fork_n(odph_linux_process_t *proc_tbl, */ int odph_linux_process_wait_n(odph_linux_process_t *proc_tbl, int num); +/** + * Creates and launches odpthreads (as linux threads or processes) + * + * Creates, pins and launches threads to separate CPU's based on the cpumask. + * + * @param thread_tbl Thread table + * @param mask CPU mask + * @param thr_params ODP thread parameters + * + * @return Number of threads created + */ +int odph_odpthreads_create(odph_odpthread_t *thread_tbl, + const odp_cpumask_t *mask, + const odph_odpthread_params_t *thr_params); + +/** + * Waits odpthreads (as linux threads or processes) to exit. + * + * Returns when all odpthreads have terminated. + * + * @param thread_tbl Thread table + * @return The number of joined threads or -1 on error. + * (error occurs if any of the start_routine return non-zero or if + * the thread join/process wait itself failed -e.g. as the result of a kill) + * + */ +int odph_odpthreads_join(odph_odpthread_t *thread_tbl); + /** * Parse linux helper options diff --git a/helper/linux.c b/helper/linux.c index 5dbc2da..b8d4f49 100644 --- a/helper/linux.c +++ b/helper/linux.c @@ -22,6 +22,11 @@ #include #include "odph_debug.h" +static struct { + int proc; /* true when process mode is required */ + int thrd; /* true when thread mode is required */ +} helper_options; + static void *odp_run_start_routine(void *arg) { odph_linux_thr_params_t *thr_params = arg; @@ -239,6 +244,262 @@ int odph_linux_process_wait_n(odph_linux_process_t *proc_tbl, int num) } /* + * wrapper for odpthreads, either implemented as linux threads or processes. + * (in process mode, if start_routine returns NULL, the process return FAILURE). + */ +static void *odpthread_run_start_routine(void *arg) +{ + int status; + int ret; + odph_odpthread_params_t *thr_params; + + odph_odpthread_start_args_t *start_args = arg; + + thr_params = &start_args->thr_params; + + /* ODP thread local init */ + if (odp_init_local(thr_params->instance, thr_params->thr_type)) { + ODPH_ERR("Local init failed\n"); + if (start_args->linuxtype == ODPTHREAD_PROCESS) + _exit(EXIT_FAILURE); + return NULL; + } + + ODPH_DBG("helper: ODP %s thread started as linux %s. (pid=%d)\n", + thr_params->thr_type == ODP_THREAD_WORKER ? + "worker" : "control", + (start_args->linuxtype == ODPTHREAD_PTHREAD) ? + "pthread" : "process", + (int)getpid()); + + status = thr_params->start(thr_params->arg); + ret = odp_term_local(); + + if (ret < 0) + ODPH_ERR("Local term failed\n"); + else if (ret == 0 && odp_term_global(thr_params->instance)) + ODPH_ERR("Global term failed\n"); + + /* for process implementation of odp threads, just return status... */ + if (start_args->linuxtype == ODPTHREAD_PROCESS) + _exit(status); + + /* threads implementation return void* pointers: cast status to that. */ + return (void *)(long)status; +} + +/* + * Create a single ODPthread as a linux process + */ +static int odph_linux_process_create(odph_odpthread_t *thread_tbl, + int cpu, + const odph_odpthread_params_t *thr_params) +{ + int ret; + cpu_set_t cpu_set; + pid_t pid; + + CPU_ZERO(&cpu_set); + CPU_SET(cpu, &cpu_set); + + thread_tbl->start_args.thr_params = *thr_params; /* copy */ + thread_tbl->start_args.linuxtype = ODPTHREAD_PROCESS; + thread_tbl->cpu = cpu; + + pid = fork(); + if (pid < 0) { + ODPH_ERR("fork() failed\n"); + thread_tbl->start_args.linuxtype = ODPTHREAD_NOT_STARTED; + return -1; + } + + /* Parent continues to fork */ + if (pid > 0) { + thread_tbl->proc.pid = pid; + return 0; + } + + /* Child process */ + + /* Request SIGTERM if parent dies */ + prctl(PR_SET_PDEATHSIG, SIGTERM); + /* Parent died already? */ + if (getppid() == 1) + kill(getpid(), SIGTERM); + + if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set)) { + ODPH_ERR("sched_setaffinity() failed\n"); + return -2; + } + + odpthread_run_start_routine(&thread_tbl->start_args); + + return 0; /* never reached */ +} + +/* + * Create a single ODPthread as a linux thread + */ +static int odph_linux_thread_create(odph_odpthread_t *thread_tbl, + int cpu, + const odph_odpthread_params_t *thr_params) +{ + int ret; + cpu_set_t cpu_set; + + CPU_ZERO(&cpu_set); + CPU_SET(cpu, &cpu_set); + + pthread_attr_init(&thread_tbl->thread.attr); + + thread_tbl->cpu = cpu; + + pthread_attr_setaffinity_np(&thread_tbl->thread.attr, + sizeof(cpu_set_t), &cpu_set); + + thread_tbl->start_args.thr_params = *thr_params; /* copy */ + thread_tbl->start_args.linuxtype = ODPTHREAD_PTHREAD; + + ret = pthread_create(&thread_tbl->thread.thread_id, + &thread_tbl->thread.attr, + odpthread_run_start_routine, + &thread_tbl->start_args); + if (ret != 0) { + ODPH_ERR("Failed to start thread on cpu #%d\n", cpu); + thread_tbl->start_args.linuxtype = ODPTHREAD_NOT_STARTED; + return ret; + } + + return 0; +} + +/* + * create an odpthread set (as linux processes or linux threads or both) + */ +int odph_odpthreads_create(odph_odpthread_t *thread_tbl, + const odp_cpumask_t *mask, + const odph_odpthread_params_t *thr_params) +{ + int i; + int num; + int cpu_count; + int cpu; + + num = odp_cpumask_count(mask); + + memset(thread_tbl, 0, num * sizeof(odph_odpthread_t)); + + cpu_count = odp_cpu_count(); + + if (num < 1 || num > cpu_count) { + ODPH_ERR("Invalid number of odpthreads:%d" + " (%d cores available)\n", + num, cpu_count); + return -1; + } + + cpu = odp_cpumask_first(mask); + for (i = 0; i < num; i++) { + /* + * Thread mode by default, or if both thread and proc mode + * are required each second odpthread is a linux thread. + */ + if ((!helper_options.proc) || + (helper_options.proc && helper_options.thrd && (i & 1))) { + if (odph_linux_thread_create(&thread_tbl[i], + cpu, + thr_params)) + break; + } else { + if (odph_linux_process_create(&thread_tbl[i], + cpu, + thr_params)) + break; + } + + cpu = odp_cpumask_next(mask, cpu); + } + thread_tbl[num - 1].last = 1; + + return i; +} + +/* + * wait for the odpthreads termination (linux processes and threads) + */ +int odph_odpthreads_join(odph_odpthread_t *thread_tbl) +{ + pid_t pid; + int i = 0; + int terminated = 0; + int status = 0; /* child process return code (!=0 is error) */ + void *thread_ret; /* "child" thread return code (NULL is error) */ + int ret; + int retval = 0; + + /* joins linux threads or wait for processes */ + do { + /* pthreads: */ + switch (thread_tbl[i].start_args.linuxtype) { + case ODPTHREAD_PTHREAD: + /* Wait thread to exit */ + ret = pthread_join(thread_tbl[i].thread.thread_id, + &thread_ret); + if (ret != 0) { + ODPH_ERR("Failed to join thread from cpu #%d\n", + thread_tbl[i].cpu); + retval = -1; + } else { + terminated++; + if (thread_ret != NULL) + retval = -1; + } + pthread_attr_destroy(&thread_tbl[i].thread.attr); + break; + + case ODPTHREAD_PROCESS: + + /* processes: */ + pid = waitpid(thread_tbl[i].proc.pid, &status, 0); + + if (pid < 0) { + ODPH_ERR("waitpid() failed\n"); + retval = -1; + break; + } + + terminated++; + + /* Examine the child process' termination status */ + if (WIFEXITED(status) && + WEXITSTATUS(status) != EXIT_SUCCESS) { + ODPH_ERR("Child exit status:%d (pid:%d)\n", + WEXITSTATUS(status), (int)pid); + retval = -1; + } + if (WIFSIGNALED(status)) { + int signo = WTERMSIG(status); + + ODPH_ERR("Child term signo:%d - %s (pid:%d)\n", + signo, strsignal(signo), (int)pid); + retval = -1; + } + break; + + case ODPTHREAD_NOT_STARTED: + ODPH_DBG("No join done on not started ODPthread.\n"); + break; + default: + ODPH_DBG("Invalid case statement value!\n"); + break; + } + + } while (!thread_tbl[i++].last); + + return (retval < 0) ? retval : terminated; +} + +/* * Parse command line options to extract options affecting helpers. */ int odph_parse_options(int argc, char *argv[]) @@ -247,9 +508,15 @@ int odph_parse_options(int argc, char *argv[]) static struct option long_options[] = { /* These options set a flag. */ + {"odph_proc", no_argument, &helper_options.proc, 1}, + {"odph_thread", no_argument, &helper_options.thrd, 1}, {0, 0, 0, 0} }; + /* defaults: */ + helper_options.proc = false; + helper_options.thrd = false; + while (1) { /* getopt_long stores the option index here. */ int option_index = 0;