@@ -34,6 +34,9 @@
#define APPL_MODE_PING 1 /**< ping mode */
#define APPL_MODE_RCV 2 /**< receive mode */
+#define PING_THR_TX 0
+#define PING_THR_RX 1
+
/** print appl mode */
#define PRINT_APPL_MODE(x) printf("%s(%i)\n", #x, (x))
@@ -82,18 +85,21 @@ typedef struct {
/**
* counters
*/
-static struct {
- odp_atomic_u64_t seq; /**< ip seq to be send */
- odp_atomic_u64_t ip; /**< ip packets */
- odp_atomic_u64_t udp; /**< udp packets */
- odp_atomic_u64_t icmp; /**< icmp packets */
- odp_atomic_u64_t cnt; /**< sent packets*/
- odp_atomic_u64_t tx_drops; /**< packets dropped in transmit */
-} counters;
+typedef struct {
+ uint64_t ctr_pkt_snd; /**< sent packets*/
+ uint64_t ctr_pkt_snd_drop; /**< packets dropped in transmit */
+
+ uint64_t ctr_pkt_rcv; /**< recv packets */
+ uint64_t ctr_seq; /**< ip seq to be send */
+ uint64_t ctr_udp_rcv; /**< udp packets */
+ uint64_t ctr_icmp_reply_rcv; /**< icmp reply packets */
+} counters_t;
/** * Thread specific arguments
*/
typedef struct {
+ counters_t counters; /**< Packet conters */
+ odp_bool_t stop; /**< Stop packet processing */
union {
struct {
odp_pktout_queue_t pktout; /**< Packet output queue */
@@ -120,6 +126,9 @@ typedef struct {
appl_args_t appl;
/** Thread specific arguments */
thread_args_t thread[MAX_WORKERS];
+ /** Global arguments */
+ int thread_cnt;
+ int tx_burst_size;
} args_t;
/** Global pointer to args */
@@ -131,7 +140,8 @@ static odp_barrier_t barrier;
/** Packet processing function types */
typedef odp_packet_t (*setup_pkt_ref_fn_t)(odp_pool_t,
odp_pktout_config_opt_t *);
-typedef int (*setup_pkt_fn_t)(odp_packet_t, odp_pktout_config_opt_t *);
+typedef int (*setup_pkt_fn_t)(odp_packet_t, odp_pktout_config_opt_t *,
+ counters_t *);
/* helper funcs */
static void parse_args(int argc, char *argv[], appl_args_t *appl_args);
@@ -245,6 +255,7 @@ static int setup_pkt_ref_array(odp_pool_t pool,
* @return 0 success, -1 failed
*/
static int setup_pkt_array(odp_pktout_config_opt_t *pktout_cfg,
+ counters_t *counters,
odp_packet_t *pkt_ref_array,
odp_packet_t *pkt_array,
int pkt_array_size,
@@ -253,7 +264,7 @@ static int setup_pkt_array(odp_pktout_config_opt_t *pktout_cfg,
int i;
for (i = 0; i < pkt_array_size; i++) {
- if ((*setup_pkt)(pkt_ref_array[i], pktout_cfg))
+ if ((*setup_pkt)(pkt_ref_array[i], pktout_cfg, counters))
break;
pkt_array[i] = odp_packet_ref_static(pkt_ref_array[i]);
@@ -342,7 +353,8 @@ static odp_packet_t setup_udp_pkt_ref(odp_pool_t pool,
* @return Success/Failed
* @retval 0 on success, -1 on fail
*/
-static int setup_udp_pkt(odp_packet_t pkt, odp_pktout_config_opt_t *pktout_cfg)
+static int setup_udp_pkt(odp_packet_t pkt, odp_pktout_config_opt_t *pktout_cfg,
+ counters_t *counters)
{
char *buf;
odph_ipv4hdr_t *ip;
@@ -352,7 +364,8 @@ static int setup_udp_pkt(odp_packet_t pkt, odp_pktout_config_opt_t *pktout_cfg)
/*Update IP ID and checksum*/
ip = (odph_ipv4hdr_t *)(buf + ODPH_ETHHDR_LEN);
- seq = odp_atomic_fetch_add_u64(&counters.seq, 1) % 0xFFFF;
+ seq = counters->ctr_seq % 0xFFFF;
+ counters->ctr_seq++;
ip->id = odp_cpu_to_be_16(seq);
if (!pktout_cfg->bit.ipv4_chksum) {
ip->chksum = 0;
@@ -437,7 +450,8 @@ static odp_packet_t setup_icmp_pkt_ref(odp_pool_t pool,
* @retval 0 on success, -1 on fail
*/
static int setup_icmp_pkt(odp_packet_t pkt,
- odp_pktout_config_opt_t *pktout_cfg)
+ odp_pktout_config_opt_t *pktout_cfg,
+ counters_t *counters)
{
char *buf;
odph_ipv4hdr_t *ip;
@@ -450,7 +464,8 @@ static int setup_icmp_pkt(odp_packet_t pkt,
/* ip */
ip = (odph_ipv4hdr_t *)(buf + ODPH_ETHHDR_LEN);
- seq = odp_atomic_fetch_add_u64(&counters.seq, 1) % 0xffff;
+ seq = counters->ctr_seq % 0xffff;
+ counters->ctr_seq++;
ip->id = odp_cpu_to_be_16(seq);
if (!pktout_cfg->bit.ipv4_chksum) {
ip->chksum = 0;
@@ -607,30 +622,39 @@ static int gen_send_thread(void *arg)
odp_pktout_config_opt_t *pktout_cfg;
odp_packet_t pkt_ref_array[MAX_UDP_TX_BURST];
odp_packet_t pkt_array[MAX_UDP_TX_BURST];
- int pkt_array_size;
+ int pkt_array_size, seq_step;
int burst_start, burst_size;
setup_pkt_ref_fn_t setup_pkt_ref = NULL;
setup_pkt_fn_t setup_pkt = NULL;
+ counters_t *counters;
+ uint64_t pkt_count_max = 0;
thr = odp_thread_id();
thr_args = arg;
pktout = thr_args->tx.pktout;
pktout_cfg = thr_args->tx.pktout_cfg;
+ counters = &thr_args->counters;
/* Create reference packets*/
if (args->appl.mode == APPL_MODE_UDP) {
- pkt_array_size = args->appl.udp_tx_burst;
setup_pkt_ref = setup_udp_pkt_ref;
setup_pkt = setup_udp_pkt;
+ seq_step = args->tx_burst_size * (args->thread_cnt - 1);
+ if (args->appl.number != -1)
+ pkt_count_max = args->appl.number / args->thread_cnt +
+ (args->appl.number % args->thread_cnt ? 1 : 0);
} else if (args->appl.mode == APPL_MODE_PING) {
- pkt_array_size = 1;
setup_pkt_ref = setup_icmp_pkt_ref;
setup_pkt = setup_icmp_pkt;
+ seq_step = 0;
+ if (args->appl.number != -1)
+ pkt_count_max = args->appl.number;
} else {
EXAMPLE_ERR(" [%02i] Error: invalid processing mode %d\n",
thr, args->appl.mode);
return -1;
}
+ pkt_array_size = args->tx_burst_size;
if (setup_pkt_ref_array(thr_args->pool, pktout_cfg,
pkt_ref_array, pkt_array_size,
@@ -645,13 +669,17 @@ static int gen_send_thread(void *arg)
odp_barrier_wait(&barrier);
for (;;) {
- if (args->appl.number != -1 &&
- odp_atomic_fetch_add_u64(&counters.cnt, pkt_array_size) >=
- (unsigned int)args->appl.number)
+ if (thr_args->stop)
break;
+ if (pkt_count_max && counters->ctr_pkt_snd > pkt_count_max) {
+ sleep(1); /* wait for stop command */
+ continue;
+ }
+
/* Setup TX burst*/
- if (setup_pkt_array(pktout_cfg, pkt_ref_array, pkt_array,
+ if (setup_pkt_array(pktout_cfg, counters,
+ pkt_ref_array, pkt_array,
pkt_array_size, setup_pkt)) {
EXAMPLE_ERR("[%02i] Error: failed to setup packets\n",
thr);
@@ -663,10 +691,12 @@ static int gen_send_thread(void *arg)
ret = odp_pktout_send(pktout, &pkt_array[burst_start],
burst_size);
if (ret == burst_size) {
+ burst_size = 0;
break;
} else if (ret >= 0 && ret < burst_size) {
- odp_atomic_add_u64(&counters.tx_drops,
- burst_size - ret);
+ thr_args->counters.ctr_pkt_snd_drop +=
+ burst_size - ret;
+
burst_start += ret;
burst_size -= ret;
continue;
@@ -677,32 +707,20 @@ static int gen_send_thread(void *arg)
break;
}
+ counters->ctr_pkt_snd += pkt_array_size - burst_size;
+
if (args->appl.interval != 0) {
printf(" [%02i] send pkt no:%ju seq %ju\n",
thr,
- odp_atomic_load_u64(&counters.seq),
- odp_atomic_load_u64(&counters.seq)%0xffff);
+ counters->ctr_seq,
+ counters->ctr_seq % 0xffff);
millisleep(args->appl.interval,
thr_args->tp,
thr_args->tim,
thr_args->tq,
thr_args->tmo_ev);
}
- }
-
- /* receive number of reply pks until timeout */
- if (args->appl.mode == APPL_MODE_PING && args->appl.number > 0) {
- while (args->appl.timeout >= 0) {
- if (odp_atomic_load_u64(&counters.icmp) >=
- (unsigned int)args->appl.number)
- break;
- millisleep(DEFAULT_PKT_INTERVAL,
- thr_args->tp,
- thr_args->tim,
- thr_args->tq,
- thr_args->tmo_ev);
- args->appl.timeout--;
- }
+ counters->ctr_seq += seq_step;
}
odp_packet_free_multi(pkt_ref_array, pkt_array_size);
@@ -717,7 +735,8 @@ static int gen_send_thread(void *arg)
* @param msg output buffer
*/
-static void process_icmp_pkt(odph_icmphdr_t *icmp, char *msg)
+static void process_icmp_pkt(thread_args_t *thr_args,
+ odph_icmphdr_t *icmp, char *msg)
{
uint64_t trecv;
uint64_t tsend;
@@ -726,7 +745,7 @@ static void process_icmp_pkt(odph_icmphdr_t *icmp, char *msg)
msg[0] = 0;
if (icmp->type == ICMP_ECHOREPLY) {
- odp_atomic_inc_u64(&counters.icmp);
+ thr_args->counters.ctr_icmp_reply_rcv++;
memcpy(&tsend, (uint8_t *)icmp + ODPH_ICMPHDR_LEN,
sizeof(uint64_t));
@@ -751,7 +770,8 @@ static void process_icmp_pkt(odph_icmphdr_t *icmp, char *msg)
* @param pkt_tbl packets to be print
* @param len packet number
*/
-static void print_pkts(int thr, odp_packet_t pkt_tbl[], unsigned len)
+static void print_pkts(int thr, thread_args_t *thr_args,
+ odp_packet_t pkt_tbl[], unsigned len)
{
odp_packet_t pkt;
char *buf;
@@ -768,21 +788,20 @@ static void print_pkts(int thr, odp_packet_t pkt_tbl[], unsigned len)
if (!odp_packet_has_ipv4(pkt))
continue;
- odp_atomic_inc_u64(&counters.ip);
+ thr_args->counters.ctr_pkt_rcv++;
buf = odp_packet_data(pkt);
ip = (odph_ipv4hdr_t *)(buf + odp_packet_l3_offset(pkt));
offset = odp_packet_l4_offset(pkt);
/* udp */
- if (ip->proto == ODPH_IPPROTO_UDP) {
- odp_atomic_inc_u64(&counters.udp);
- }
+ if (ip->proto == ODPH_IPPROTO_UDP)
+ thr_args->counters.ctr_udp_rcv++;
/* icmp */
if (ip->proto == ODPH_IPPROTO_ICMPv4) {
icmp = (odph_icmphdr_t *)(buf + offset);
- process_icmp_pkt(icmp, msg);
+ process_icmp_pkt(thr_args, icmp, msg);
printf(" [%02i] %s\n", thr, msg);
}
}
@@ -810,14 +829,11 @@ static int gen_recv_thread(void *arg)
odp_barrier_wait(&barrier);
for (;;) {
- if (args->appl.number != -1 &&
- odp_atomic_load_u64(&counters.icmp) >=
- (unsigned int)args->appl.number) {
+ if (thr_args->stop)
break;
- }
/* Use schedule to get buf from any input queue */
- ev_cnt = odp_schedule_multi(NULL, ODP_SCHED_WAIT,
+ ev_cnt = odp_schedule_multi(NULL, ODP_SCHED_NO_WAIT,
events, MAX_RX_BURST);
if (ev_cnt == 0)
continue;
@@ -848,7 +864,7 @@ static int gen_recv_thread(void *arg)
}
if (pkt_cnt) {
- print_pkts(thr, pkts, pkt_cnt);
+ print_pkts(thr, thr_args, pkts, pkt_cnt);
odp_packet_free_multi(pkts, pkt_cnt);
}
@@ -857,6 +873,35 @@ static int gen_recv_thread(void *arg)
return 0;
}
+#define COUNTER_SUM(_c, _nw) \
+({ \
+ int _itr; \
+ uint64_t _result = 0; \
+ \
+ for (_itr = 0; _itr < _nw; _itr++) \
+ _result += args->thread[_itr].counters.ctr_ ## _c; \
+ \
+ _result; \
+})
+
+static void garceful_stop_ping(void)
+{
+ uint64_t snd, rcv;
+
+ if (args->appl.mode != APPL_MODE_PING)
+ return;
+
+ while (args->appl.timeout >= 0) {
+ snd = COUNTER_SUM(pkt_snd, 2);
+ rcv = COUNTER_SUM(icmp_reply_rcv, 2);
+ if (rcv >= snd)
+ break;
+
+ sleep(1);
+ args->appl.timeout--;
+ }
+}
+
/**
* printing verbose statistics
*
@@ -868,8 +913,8 @@ static void print_global_stats(int num_workers)
uint64_t pps_snd = 0, maximum_pps_snd = 0;
uint64_t pkts_rcv = 0, pkts_rcv_prev = 0;
uint64_t pps_rcv = 0, maximum_pps_rcv = 0;
- uint64_t stall;
- int verbose_interval = 20;
+ uint64_t stall, pkts_snd_drop;
+ int verbose_interval = 20, i;
odp_thrmask_t thrd_mask;
odp_barrier_wait(&barrier);
@@ -878,12 +923,15 @@ static void print_global_stats(int num_workers)
next = odp_time_sum(odp_time_local(), wait);
while (odp_thrmask_worker(&thrd_mask) == num_workers) {
- if (args->appl.number != -1 &&
- odp_atomic_load_u64(&counters.cnt) >=
- (unsigned int)args->appl.number) {
- break;
- }
+ if (args->appl.mode != APPL_MODE_RCV &&
+ args->appl.number != -1) {
+ uint64_t cnt = COUNTER_SUM(pkt_snd, num_workers);
+ if (cnt >= (unsigned int)args->appl.number) {
+ garceful_stop_ping();
+ break;
+ }
+ }
cur = odp_time_local();
if (odp_time_cmp(next, cur) > 0) {
left = odp_time_diff(next, cur);
@@ -894,18 +942,22 @@ static void print_global_stats(int num_workers)
usleep(stall / ODP_TIME_USEC_IN_NS);
continue;
}
-
next = odp_time_sum(cur, wait);
+
switch (args->appl.mode) {
case APPL_MODE_RCV:
- pkts_rcv = odp_atomic_load_u64(&counters.ip);
+ pkts_rcv = COUNTER_SUM(pkt_rcv, num_workers);
+ pkts_snd = 0;
+ pkts_snd_drop = 0;
break;
case APPL_MODE_PING:
- pkts_snd = odp_atomic_load_u64(&counters.seq);
- pkts_rcv = odp_atomic_load_u64(&counters.icmp);
+ pkts_snd = COUNTER_SUM(pkt_snd, num_workers);
+ pkts_snd_drop = COUNTER_SUM(pkt_snd_drop, num_workers);
+ pkts_rcv = COUNTER_SUM(icmp_reply_rcv, num_workers);
break;
case APPL_MODE_UDP:
- pkts_snd = odp_atomic_load_u64(&counters.seq);
+ pkts_snd = COUNTER_SUM(pkt_snd, num_workers);
+ pkts_snd_drop = COUNTER_SUM(pkt_snd_drop, num_workers);
break;
default:
continue;
@@ -927,11 +979,14 @@ static void print_global_stats(int num_workers)
"rcv: %" PRIu64 ", "
"recv rate: %" PRIu64 " pps, "
"max recv rate: %" PRIu64 " pps\n",
- pkts_snd, odp_atomic_load_u64(&counters.tx_drops),
+ pkts_snd, pkts_snd_drop,
pps_snd, maximum_pps_snd,
pkts_rcv, pps_rcv, maximum_pps_rcv);
fflush(NULL);
}
+
+ for (i = 0; i < num_workers; i++)
+ args->thread[i].stop = 1;
}
/**
@@ -968,14 +1023,6 @@ int main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
- /* init counters */
- odp_atomic_init_u64(&counters.seq, 0);
- odp_atomic_init_u64(&counters.ip, 0);
- odp_atomic_init_u64(&counters.udp, 0);
- odp_atomic_init_u64(&counters.icmp, 0);
- odp_atomic_init_u64(&counters.cnt, 0);
- odp_atomic_init_u64(&counters.tx_drops, 0);
-
/* Reserve memory for args from shared mem */
shm = odp_shm_reserve("shm_args", sizeof(args_t),
ODP_CACHE_LINE_SIZE, 0);
@@ -1024,6 +1071,15 @@ int main(int argc, char *argv[])
num_workers = 2;
}
}
+ args->thread_cnt = num_workers;
+
+ /* Burst size */
+ if (args->appl.mode == APPL_MODE_PING)
+ args->tx_burst_size = 1;
+ else if (args->appl.mode == APPL_MODE_UDP)
+ args->tx_burst_size = args->appl.udp_tx_burst;
+ else
+ args->tx_burst_size = 0;
/* Create packet pool */
odp_pool_param_init(¶ms);
@@ -1104,6 +1160,7 @@ int main(int argc, char *argv[])
if (args->appl.mode == APPL_MODE_PING) {
odp_cpumask_t cpu_mask;
int cpu_first, cpu_next;
+ thread_args_t *thr_args;
odp_cpumask_zero(&cpu_mask);
cpu_first = odp_cpumask_first(&cpumask);
@@ -1114,60 +1171,64 @@ int main(int argc, char *argv[])
EXAMPLE_ERR("queue_create failed\n");
abort();
}
- args->thread[1].rx.ifs = ifs;
- args->thread[1].rx.ifs_count = args->appl.if_count;
- args->thread[1].pool = pool;
- args->thread[1].tp = tp;
- args->thread[1].tq = tq;
- args->thread[1].tim = odp_timer_alloc(tp, tq, NULL);
- if (args->thread[1].tim == ODP_TIMER_INVALID) {
+ thr_args = &args->thread[PING_THR_RX];
+ thr_args->rx.ifs = ifs;
+ thr_args->rx.ifs_count = args->appl.if_count;
+ thr_args->pool = pool;
+ thr_args->tp = tp;
+ thr_args->tq = tq;
+ thr_args->tim = odp_timer_alloc(tp, tq, NULL);
+ if (thr_args->tim == ODP_TIMER_INVALID) {
EXAMPLE_ERR("timer_alloc failed\n");
abort();
}
- args->thread[1].tmo_ev = odp_timeout_alloc(tmop);
- if (args->thread[1].tmo_ev == ODP_TIMEOUT_INVALID) {
+ thr_args->tmo_ev = odp_timeout_alloc(tmop);
+ if (thr_args->tmo_ev == ODP_TIMEOUT_INVALID) {
EXAMPLE_ERR("timeout_alloc failed\n");
abort();
}
- args->thread[1].mode = args->appl.mode;
+ thr_args->mode = args->appl.mode;
memset(&thr_params, 0, sizeof(thr_params));
thr_params.start = gen_recv_thread;
- thr_params.arg = &args->thread[1];
+ thr_params.arg = thr_args;
thr_params.thr_type = ODP_THREAD_WORKER;
thr_params.instance = instance;
- odph_odpthreads_create(&thread_tbl[1], &cpu_mask, &thr_params);
+ odph_odpthreads_create(&thread_tbl[PING_THR_RX],
+ &cpu_mask, &thr_params);
tq = odp_queue_create("", NULL);
if (tq == ODP_QUEUE_INVALID) {
EXAMPLE_ERR("queue_create failed\n");
abort();
}
- args->thread[0].tx.pktout = ifs[0].pktout[0];
- args->thread[0].tx.pktout_cfg = &ifs[0].config.pktout;
- args->thread[0].pool = pool;
- args->thread[0].tp = tp;
- args->thread[0].tq = tq;
- args->thread[0].tim = odp_timer_alloc(tp, tq, NULL);
- if (args->thread[0].tim == ODP_TIMER_INVALID) {
+ thr_args = &args->thread[PING_THR_TX];
+ thr_args->tx.pktout = ifs[0].pktout[0];
+ thr_args->tx.pktout_cfg = &ifs[0].config.pktout;
+ thr_args->pool = pool;
+ thr_args->tp = tp;
+ thr_args->tq = tq;
+ thr_args->tim = odp_timer_alloc(tp, tq, NULL);
+ if (thr_args->tim == ODP_TIMER_INVALID) {
EXAMPLE_ERR("timer_alloc failed\n");
abort();
}
- args->thread[0].tmo_ev = odp_timeout_alloc(tmop);
- if (args->thread[0].tmo_ev == ODP_TIMEOUT_INVALID) {
+ thr_args->tmo_ev = odp_timeout_alloc(tmop);
+ if (thr_args->tmo_ev == ODP_TIMEOUT_INVALID) {
EXAMPLE_ERR("timeout_alloc failed\n");
abort();
}
- args->thread[0].mode = args->appl.mode;
+ thr_args->mode = args->appl.mode;
cpu_next = odp_cpumask_next(&cpumask, cpu_first);
odp_cpumask_zero(&cpu_mask);
odp_cpumask_set(&cpu_mask, cpu_next);
thr_params.start = gen_send_thread;
- thr_params.arg = &args->thread[0];
+ thr_params.arg = thr_args;
- odph_odpthreads_create(&thread_tbl[0], &cpu_mask, &thr_params);
+ odph_odpthreads_create(&thread_tbl[PING_THR_TX],
+ &cpu_mask, &thr_params);
} else {
int cpu = odp_cpumask_first(&cpumask);
@@ -1176,6 +1237,7 @@ int main(int argc, char *argv[])
odp_cpumask_t thd_mask;
int (*thr_run_func)(void *);
int if_idx, pktout_idx;
+ uint64_t start_seq;
if (args->appl.mode == APPL_MODE_RCV) {
args->thread[i].rx.ifs = ifs;
@@ -1185,11 +1247,13 @@ int main(int argc, char *argv[])
if_idx = i % args->appl.if_count;
pktout_idx = (i / args->appl.if_count) %
ifs[if_idx].pktout_count;
+ start_seq = i * args->tx_burst_size;
args->thread[i].tx.pktout =
ifs[if_idx].pktout[pktout_idx];
args->thread[i].tx.pktout_cfg =
&ifs[if_idx].config.pktout;
+ args->thread[i].counters.ctr_seq = start_seq;
}
tq = odp_queue_create("", NULL);
if (tq == ODP_QUEUE_INVALID) {
@@ -1531,9 +1595,9 @@ static void usage(char *progname)
"OpenDataPlane example application.\n"
"\n"
" Work mode:\n"
- " 1.send udp packets\n"
+ " 1.send ipv4 udp packets\n"
" odp_generator -I eth0 --srcmac fe:0f:97:c9:e0:44 --dstmac 32:cb:9b:27:2f:1a --srcip 192.168.0.1 --dstip 192.168.0.2 -m u\n"
- " 2.receive udp packets\n"
+ " 2.receive ipv4 packets\n"
" odp_generator -I eth0 -m r\n"
" 3.work likes ping\n"
" odp_generator -I eth0 --srcmac fe:0f:97:c9:e0:44 --dstmac 32:cb:9b:27:2f:1a --srcip 192.168.0.1 --dstip 192.168.0.2 --cpumask 0xc -m p\n"