From patchwork Fri May 20 16:30:20 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Christophe Milard X-Patchwork-Id: 68255 Delivered-To: patch@linaro.org Received: by 10.140.92.199 with SMTP id b65csp3847989qge; Fri, 20 May 2016 08:36:52 -0700 (PDT) X-Received: by 10.140.169.193 with SMTP id p184mr4169851qhp.13.1463758611985; Fri, 20 May 2016 08:36:51 -0700 (PDT) Return-Path: Received: from lists.linaro.org (lists.linaro.org. [54.225.227.206]) by mx.google.com with ESMTP id r95si15830396qkr.150.2016.05.20.08.36.51; Fri, 20 May 2016 08:36:51 -0700 (PDT) Received-SPF: pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.225.227.206 as permitted sender) client-ip=54.225.227.206; Authentication-Results: mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.225.227.206 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org; dmarc=pass (p=NONE dis=NONE) header.from=linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id 9366761943; Fri, 20 May 2016 15:36:51 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on ip-10-142-244-252 X-Spam-Level: X-Spam-Status: No, score=-2.6 required=5.0 tests=BAYES_00, RCVD_IN_DNSWL_LOW, RCVD_IN_MSPIKE_H3, RCVD_IN_MSPIKE_WL, URIBL_BLOCKED autolearn=disabled version=3.4.0 Received: from [127.0.0.1] (localhost [127.0.0.1]) by lists.linaro.org (Postfix) with ESMTP id C064261817; Fri, 20 May 2016 15:32:33 +0000 (UTC) X-Original-To: lng-odp@lists.linaro.org Delivered-To: lng-odp@lists.linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id DDAC7616EE; Fri, 20 May 2016 15:32:07 +0000 (UTC) Received: from mail-lb0-f169.google.com (mail-lb0-f169.google.com [209.85.217.169]) by lists.linaro.org (Postfix) with ESMTPS id 16D38615F0 for ; Fri, 20 May 2016 15:32:05 +0000 (UTC) Received: by mail-lb0-f169.google.com with SMTP id h1so36541918lbj.3 for ; Fri, 20 May 2016 08:32:05 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=GyjeS21gGPxXanxb4zHC00xNe43ciMM2fz9QVSre3iI=; b=IclaFhhmc9C6oXIe7QjexRj5ZtLr1ucc9Vj17cvbPgnAnREBI9YToRbDeOV2oWG/IT ZFllZJ357bx8Z2HhSDLkfDFLiOl3UKoXvCarUcy1ef4ywm6OjSrBmMw/WVMttQdxyIGR qKDlPuYXqpNNZ7SgCUakpEyFOf7qg+VuolX7+MJiSmYI9HDCnvKsMyrKgQE7iRn/iCte LSkcA32uitX7MmKdmAX55MZZd+UIAFPwRvHZijLa0zN58eTfag6XNxnBCxUTtFC/koUG QGm6mA3xElGkQmjKrrs2vFxM5iLHsWpj4Yj2w2bh7Q3dGrxh28zRAYyQDvonCjJutvC5 VD6g== X-Gm-Message-State: AOPr4FV4CgXKJisq+V1q/dPw7zumT+cwY7Y6KPeUswS/zb0QpKOTTkobaTO/dhvWAqCHRE9ufnk= X-Received: by 10.113.5.4 with SMTP id ci4mr1315145lbd.87.1463758323811; Fri, 20 May 2016 08:32:03 -0700 (PDT) Received: from erachmi-ericsson.ki.sw.ericsson.se (c-83-233-90-46.cust.bredband2.com. [83.233.90.46]) by smtp.gmail.com with ESMTPSA id tg1sm3348847lbb.7.2016.05.20.08.32.02 (version=TLS1_2 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Fri, 20 May 2016 08:32:02 -0700 (PDT) From: Christophe Milard To: maxim.uvarov@linaro.com, lng-odp@lists.linaro.org Date: Fri, 20 May 2016 18:30:20 +0200 Message-Id: <1463761851-37121-5-git-send-email-christophe.milard@linaro.org> X-Mailer: git-send-email 2.5.0 In-Reply-To: <1463761851-37121-1-git-send-email-christophe.milard@linaro.org> References: <1463761851-37121-1-git-send-email-christophe.milard@linaro.org> X-Topics: patch Subject: [lng-odp] [PATCHv9 04/35] helpers: linux: creating functions to handle odpthreads X-BeenThere: lng-odp@lists.linaro.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: "The OpenDataPlane \(ODP\) List" List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , MIME-Version: 1.0 Errors-To: lng-odp-bounces@lists.linaro.org Sender: "lng-odp" odph_linux_pthread_t and odph_linux_process_t are joined, creating odph_odpthread_t. Tests should be using the latter only so that the actual implementation of the ODP thread don't leak in the test itself. (odph_odpthread_t is opaque to the test, and is common to both thread and process omplementation of ODP threads) Two functions, odph_odpthreads_create and odph_odpthreads_join are then created to create and termindate odp threads. These functions create the odp threads either as linux processes or as linux threads, depending on the command line arguments (flag odph_proc for processes). Signed-off-by: Christophe Milard --- helper/include/odp/helper/linux.h | 67 +++++++++- helper/linux.c | 259 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 325 insertions(+), 1 deletion(-) diff --git a/helper/include/odp/helper/linux.h b/helper/include/odp/helper/linux.h index d47abef..01c348d 100644 --- a/helper/include/odp/helper/linux.h +++ b/helper/include/odp/helper/linux.h @@ -49,7 +49,6 @@ typedef struct { odph_linux_thr_params_t thr_params; } odph_linux_pthread_t; - /** Linux process state information */ typedef struct { pid_t pid; /**< Process ID */ @@ -57,6 +56,44 @@ typedef struct { int status; /**< Process state change status */ } odph_linux_process_t; +/** odpthread linux type: whether an ODP thread is a linux thread or process */ +typedef enum odph_odpthread_linuxtype_e { + ODPTHREAD_NOT_STARTED = 0, + ODPTHREAD_PROCESS, + ODPTHREAD_PTHREAD +} odph_odpthread_linuxtype_t; + +/** odpthread parameters for odp threads (pthreads and processes) */ +typedef struct { + int (*start)(void *); /**< Thread entry point function */ + void *arg; /**< Argument for the function */ + odp_thread_type_t thr_type; /**< ODP thread type */ + odp_instance_t instance; /**< ODP instance handle */ +} odph_odpthread_params_t; + +/** The odpthread starting arguments, used both in process or thread mode */ +typedef struct { + odph_odpthread_linuxtype_t linuxtype; + odph_odpthread_params_t thr_params; /*copy of thread start parameter*/ +} odph_odpthread_start_args_t; + +/** Linux odpthread state information, used both in process or thread mode */ +typedef struct { + odph_odpthread_start_args_t start_args; + int cpu; /**< CPU ID */ + int last; /**< true if last table entry */ + union { + struct { /* for thread implementation */ + pthread_t thread_id; /**< Pthread ID */ + pthread_attr_t attr; /**< Pthread attributes */ + } thread; + struct { /* for process implementation */ + pid_t pid; /**< Process ID */ + int status; /**< Process state chge status*/ + } proc; + }; +} odph_odpthread_t; + /** * Creates and launches pthreads * @@ -135,6 +172,34 @@ int odph_linux_process_fork_n(odph_linux_process_t *proc_tbl, int odph_linux_process_wait_n(odph_linux_process_t *proc_tbl, int num); /** + * Creates and launches odpthreads (as linux threads or processes) + * + * Creates, pins and launches threads to separate CPU's based on the cpumask. + * + * @param thread_tbl Thread table + * @param mask CPU mask + * @param thr_params ODP thread parameters + * + * @return Number of threads created + */ +int odph_odpthreads_create(odph_odpthread_t *thread_tbl, + const odp_cpumask_t *mask, + const odph_odpthread_params_t *thr_params); + +/** + * Waits odpthreads (as linux threads or processes) to exit. + * + * Returns when all odpthreads have terminated. + * + * @param thread_tbl Thread table + * @return The number of joined threads or -1 on error. + * (error occurs if any of the start_routine return non-zero or if + * the thread join/process wait itself failed -e.g. as the result of a kill) + * + */ +int odph_odpthreads_join(odph_odpthread_t *thread_tbl); + +/** * Merge getopt options * * Given two sets of getopt options (each containing possibly both short diff --git a/helper/linux.c b/helper/linux.c index ef435bd..6366694 100644 --- a/helper/linux.c +++ b/helper/linux.c @@ -21,6 +21,10 @@ #include #include "odph_debug.h" +static struct { + int proc; /* true when process mode is required, false otherwise */ +} helper_options; + static void *odp_run_start_routine(void *arg) { odph_linux_thr_params_t *thr_params = arg; @@ -238,6 +242,257 @@ int odph_linux_process_wait_n(odph_linux_process_t *proc_tbl, int num) } /* + * wrapper for odpthreads, either implemented as linux threads or processes. + * (in process mode, if start_routine returns NULL, the process return FAILURE). + */ +static void *odpthread_run_start_routine(void *arg) +{ + int status; + int ret; + odph_odpthread_params_t *thr_params; + + odph_odpthread_start_args_t *start_args = arg; + + thr_params = &start_args->thr_params; + + /* ODP thread local init */ + if (odp_init_local(thr_params->instance, thr_params->thr_type)) { + ODPH_ERR("Local init failed\n"); + if (start_args->linuxtype == ODPTHREAD_PROCESS) + _exit(EXIT_FAILURE); + return NULL; + } + + ODPH_DBG("helper: ODP %s thread started as linux %s. (pid=%d)\n", + thr_params->thr_type == ODP_THREAD_WORKER ? + "worker" : "control", + (start_args->linuxtype == ODPTHREAD_PTHREAD) ? + "pthread" : "process", + (int)getpid()); + + status = thr_params->start(thr_params->arg); + ret = odp_term_local(); + + if (ret < 0) + ODPH_ERR("Local term failed\n"); + else if (ret == 0 && odp_term_global(thr_params->instance)) + ODPH_ERR("Global term failed\n"); + + /* for process implementation of odp threads, just return status... */ + if (start_args->linuxtype == ODPTHREAD_PROCESS) + _exit(status); + + /* threads implementation return void* pointers: cast status to that. */ + return (void *)(long)status; +} + +/* + * Create a single ODPthread as a linux process + */ +static int odph_linux_process_create(odph_odpthread_t *thread_tbl, + int cpu, + const odph_odpthread_params_t *thr_params) +{ + int ret; + cpu_set_t cpu_set; + pid_t pid; + + CPU_ZERO(&cpu_set); + CPU_SET(cpu, &cpu_set); + + thread_tbl->start_args.thr_params = *thr_params; /* copy */ + thread_tbl->start_args.linuxtype = ODPTHREAD_PROCESS; + thread_tbl->cpu = cpu; + + pid = fork(); + if (pid < 0) { + ODPH_ERR("fork() failed\n"); + thread_tbl->start_args.linuxtype = ODPTHREAD_NOT_STARTED; + return -1; + } + + /* Parent continues to fork */ + if (pid > 0) { + thread_tbl->proc.pid = pid; + return 0; + } + + /* Child process */ + + /* Request SIGTERM if parent dies */ + prctl(PR_SET_PDEATHSIG, SIGTERM); + /* Parent died already? */ + if (getppid() == 1) + kill(getpid(), SIGTERM); + + if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set)) { + ODPH_ERR("sched_setaffinity() failed\n"); + return -2; + } + + odpthread_run_start_routine(&thread_tbl->start_args); + + return 0; /* never reached */ +} + +/* + * Create a single ODPthread as a linux thread + */ +static int odph_linux_thread_create(odph_odpthread_t *thread_tbl, + int cpu, + const odph_odpthread_params_t *thr_params) +{ + int ret; + cpu_set_t cpu_set; + + CPU_ZERO(&cpu_set); + CPU_SET(cpu, &cpu_set); + + pthread_attr_init(&thread_tbl->thread.attr); + + thread_tbl->cpu = cpu; + + pthread_attr_setaffinity_np(&thread_tbl->thread.attr, + sizeof(cpu_set_t), &cpu_set); + + thread_tbl->start_args.thr_params = *thr_params; /* copy */ + thread_tbl->start_args.linuxtype = ODPTHREAD_PTHREAD; + + ret = pthread_create(&thread_tbl->thread.thread_id, + &thread_tbl->thread.attr, + odpthread_run_start_routine, + &thread_tbl->start_args); + if (ret != 0) { + ODPH_ERR("Failed to start thread on cpu #%d\n", cpu); + thread_tbl->start_args.linuxtype = ODPTHREAD_NOT_STARTED; + return ret; + } + + return 0; +} + +/* + * create an odpthread set (as linux processes or linux threads or both) + */ +int odph_odpthreads_create(odph_odpthread_t *thread_tbl, + const odp_cpumask_t *mask, + const odph_odpthread_params_t *thr_params) +{ + int i; + int num; + int cpu_count; + int cpu; + + num = odp_cpumask_count(mask); + + memset(thread_tbl, 0, num * sizeof(odph_odpthread_t)); + + cpu_count = odp_cpu_count(); + + if (num < 1 || num > cpu_count) { + ODPH_ERR("Invalid number of odpthreads:%d" + " (%d cores available)\n", + num, cpu_count); + return -1; + } + + cpu = odp_cpumask_first(mask); + for (i = 0; i < num; i++) { + if (!helper_options.proc) { + if (odph_linux_thread_create(&thread_tbl[i], + cpu, + thr_params)) + break; + } else { + if (odph_linux_process_create(&thread_tbl[i], + cpu, + thr_params)) + break; + } + + cpu = odp_cpumask_next(mask, cpu); + } + thread_tbl[num - 1].last = 1; + + return i; +} + +/* + * wait for the odpthreads termination (linux processes and threads) + */ +int odph_odpthreads_join(odph_odpthread_t *thread_tbl) +{ + pid_t pid; + int i = 0; + int terminated = 0; + int status = 0; /* child process return code (!=0 is error) */ + void *thread_ret; /* "child" thread return code (NULL is error) */ + int ret; + int retval = 0; + + /* joins linux threads or wait for processes */ + do { + /* pthreads: */ + switch (thread_tbl[i].start_args.linuxtype) { + case ODPTHREAD_PTHREAD: + /* Wait thread to exit */ + ret = pthread_join(thread_tbl[i].thread.thread_id, + &thread_ret); + if (ret != 0) { + ODPH_ERR("Failed to join thread from cpu #%d\n", + thread_tbl[i].cpu); + retval = -1; + } else { + terminated++; + if (thread_ret != NULL) + retval = -1; + } + pthread_attr_destroy(&thread_tbl[i].thread.attr); + break; + + case ODPTHREAD_PROCESS: + + /* processes: */ + pid = waitpid(thread_tbl[i].proc.pid, &status, 0); + + if (pid < 0) { + ODPH_ERR("waitpid() failed\n"); + retval = -1; + break; + } + + terminated++; + + /* Examine the child process' termination status */ + if (WIFEXITED(status) && + WEXITSTATUS(status) != EXIT_SUCCESS) { + ODPH_ERR("Child exit status:%d (pid:%d)\n", + WEXITSTATUS(status), (int)pid); + retval = -1; + } + if (WIFSIGNALED(status)) { + int signo = WTERMSIG(status); + + ODPH_ERR("Child term signo:%d - %s (pid:%d)\n", + signo, strsignal(signo), (int)pid); + retval = -1; + } + break; + + case ODPTHREAD_NOT_STARTED: + ODPH_DBG("No join done on not started ODPthread.\n"); + break; + default: + ODPH_DBG("Invalid case statement value!\n"); + break; + } + + } while (!thread_tbl[i++].last); + + return (retval < 0) ? retval : terminated; +} + +/* * return the number of elements in an array of getopt options, excluding the * terminating {0,0,0,0} */ @@ -325,11 +580,15 @@ int odph_parse_options(int argc, char *argv[], static struct option helper_long_options[] = { /* These options set a flag. */ + {"odph_proc", no_argument, &helper_options.proc, 1}, {0, 0, 0, 0} }; static char *helper_short_options = ""; + /* defaults: */ + helper_options.proc = false; + /* merge caller's command line options descriptions with helper's: */ if (odph_merge_getopt_options(caller_shortopts, helper_short_options, caller_longopts, helper_long_options,