Message ID | 1411558114-9885-1-git-send-email-maxim.uvarov@linaro.org |
---|---|
State | New |
Headers | show |
Hi, This code forks two processes from a common ancestor, which does common initialization for both. So, it runs single instance of ODP (no different from other examples which create pthreads after a common ODP init). To demonstrate IPC, you should communicate between two instances of ODP, or between an ODP app and a non-ODP app. There's no difference in communication inside an ODP instance regardless it's composed from bare metal cores/pthread/forked processes (or even separately started processes). Application would just use normal pools/queues, no special IPC types needed. Implementation would be different for initialization/memory mapping/etc, but application would not see the difference (when using ODP API). ODP IPC is just another type of messaging interface. An application could open an IPC interface (instead of e.g. a packet IO "loopback" interface) for optimized inter-processes messaging (within a SoC). > + > +static void *ring_thread(void *arg ODP_UNUSED) To demonstrate IPC this process should run separate instance of ODP (call odp_global_init, etc ...) > +{ > + int ret; > + odp_buffer_t buf; > + odp_buffer_pool_t pkt_pool; > + odp_pktio_params_t pktio_ipc_params; > + odp_pktio_t pktio_ipc; > + odp_queue_t ipcq_def; > + > + printf("ODP RING THREAD PID %d\n", getpid()); > + > + pkt_pool = odp_buffer_pool_lookup("packet_pool"); Cannot share pools between ODP instances. > + if (pkt_pool == ODP_BUFFER_POOL_INVALID) { > + ODP_ERR("Error: pkt_pool not found\n"); > + return NULL; > + } > + > + /* create shared queue between processes*/ > + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; > + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); > + if (pktio_ipc == ODP_PKTIO_INVALID) { > + ODP_ERR("Error: pktio create failed\n"); > + return NULL; > + } > + > + > + while (1) { > + ipcq_def = odp_queue_lookup("shared-queue"); Cannot share queues between ODP instances. -Petri > + if (ipcq_def != ODP_QUEUE_INVALID) > + break; > + sleep(1); > + } > + printf("%s() shared-queue found\n", __func__); > + > + ret = odp_pktio_inq_setdef(pktio_ipc, ipcq_def); > + if (ret != 0) { > + ODP_ERR("Error: slave thread default ipc-Q setup\n"); > + return NULL; > + } > + > + /* In loop take packets from ipc queue and free this buffer */ > + while (1) { > + buf = odp_queue_deq(ipcq_def); > + if (odp_unlikely(!odp_buffer_is_valid(buf))) > + continue; > + > + printf("\t\t%s() got buffer from IPC queue size %ld/%ld\n", > + __func__, > + (unsigned long)odp_packet_get_len(buf), > + (unsigned long)odp_buffer_size(buf)); > + odp_buffer_free(buf); > + } > + > + /* unreachable */ > + return NULL; > +} > + > +
On 09/25/2014 11:44 AM, Savolainen, Petri (NSN - FI/Espoo) wrote: > Hi, > > This code forks two processes from a common ancestor, which does common initialization for both. So, it runs single instance of ODP (no different from other examples which create pthreads after a common ODP init). Petri, the difference here is huge. I do queue creation and lookup after fork. And both creation and lookup is done vis shm_open. So after fork it looks like 2 separate processes. One reason why I used fork here is not implement pool_lookup over shared memory. Just because it's match complex and not sure if other platform, not linux generic need that. My solution is really very close to dpdk example for IPC. > To demonstrate IPC, you should communicate between two instances of ODP, or between an ODP app and a non-ODP app. There's no difference in communication inside an ODP instance regardless it's composed from bare metal cores/pthread/forked processes (or even separately started processes). Application would just use normal pools/queues, no special IPC types needed. Implementation would be different for initialization/memory mapping/etc, but application would not see the difference (when using ODP API). > > ODP IPC is just another type of messaging interface. An application could open an IPC interface (instead of e.g. a packet IO "loopback" interface) for optimized inter-processes messaging (within a SoC). > That is what is done. Only pool_lookup between 2 processes is not implemented. I think if pool can be done, then it will work as you describe. But here interesting question. How pool_lookup will fit other platforms? I assume single pool is used only for software IPC (linux-generic). In hardware case packet may change it's original pool. So it might be not big reason to implement pool_lookup and for linux-generic just stay with fork. What do you about that? > >> + >> +static void *ring_thread(void *arg ODP_UNUSED) > To demonstrate IPC this process should run separate instance of ODP (call odp_global_init, etc ...) It is what fork does. One process runs X ring_threads, to do lookup for IPC queue and once lookup done - it reads packets from IPC queue. The other process runs pktio_queue_thread or pktio_ifburst_thread. Creates IO queue and IPC queue. And simple transfers packets from IO queue to IPC queue. >> +{ >> + int ret; >> + odp_buffer_t buf; >> + odp_buffer_pool_t pkt_pool; >> + odp_pktio_params_t pktio_ipc_params; >> + odp_pktio_t pktio_ipc; >> + odp_queue_t ipcq_def; >> + >> + printf("ODP RING THREAD PID %d\n", getpid()); >> + >> + pkt_pool = odp_buffer_pool_lookup("packet_pool"); > Cannot share pools between ODP instances. Yes, for linux-generic pool implementation is awful. To share pool with locks, pool entries and etc is really complicated. As I wrote about I did not implement that and there is might be no reason to do it at all. > >> + if (pkt_pool == ODP_BUFFER_POOL_INVALID) { >> + ODP_ERR("Error: pkt_pool not found\n"); >> + return NULL; >> + } >> + >> + /* create shared queue between processes*/ >> + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; >> + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); >> + if (pktio_ipc == ODP_PKTIO_INVALID) { >> + ODP_ERR("Error: pktio create failed\n"); >> + return NULL; >> + } >> + >> + >> + while (1) { >> + ipcq_def = odp_queue_lookup("shared-queue"); > Cannot share queues between ODP instances. That is not true. In my patch I implement this feature. And the main idea of that patch is to make queues visible for other processes. You can run example and check that it works. Best regards, Maxim. > -Petri > >> + if (ipcq_def != ODP_QUEUE_INVALID) >> + break; >> + sleep(1); >> + } >> + printf("%s() shared-queue found\n", __func__); >> + >> + ret = odp_pktio_inq_setdef(pktio_ipc, ipcq_def); >> + if (ret != 0) { >> + ODP_ERR("Error: slave thread default ipc-Q setup\n"); >> + return NULL; >> + } >> + >> + /* In loop take packets from ipc queue and free this buffer */ >> + while (1) { >> + buf = odp_queue_deq(ipcq_def); >> + if (odp_unlikely(!odp_buffer_is_valid(buf))) >> + continue; >> + >> + printf("\t\t%s() got buffer from IPC queue size %ld/%ld\n", >> + __func__, >> + (unsigned long)odp_packet_get_len(buf), >> + (unsigned long)odp_buffer_size(buf)); >> + odp_buffer_free(buf); >> + } >> + >> + /* unreachable */ >> + return NULL; >> +} >> + >> + >
Hi, My main point is that IPC is not needed when you fork _after_ odp_init_global(). The application is running single instance of ODP, and does not need any special IPC to communicate internally. IPC would be needed if you'd fork _before_ odp_init_global() and thus have two different ODP instances running (copies of ODP state data). -Petri > -----Original Message----- > From: ext Maxim Uvarov [mailto:maxim.uvarov@linaro.org] > Sent: Thursday, September 25, 2014 12:35 PM > To: Savolainen, Petri (NSN - FI/Espoo); lng-odp@lists.linaro.org > Subject: Re: [lng-odp] [PATCHv6] linux-generic: odp ipc implementation > > On 09/25/2014 11:44 AM, Savolainen, Petri (NSN - FI/Espoo) wrote: > > Hi, > > > > This code forks two processes from a common ancestor, which does common > initialization for both. So, it runs single instance of ODP (no different > from other examples which create pthreads after a common ODP init). > > Petri, the difference here is huge. I do queue creation and lookup after > fork. And both creation and lookup is done vis shm_open. So after fork > it looks like 2 separate processes. One reason why I used fork here is > not implement pool_lookup over shared memory. Just because it's match > complex and not sure if other platform, not linux generic need that. My > solution is really very close to dpdk example for IPC. > > > To demonstrate IPC, you should communicate between two instances of > ODP, or between an ODP app and a non-ODP app. There's no difference in > communication inside an ODP instance regardless it's composed from bare > metal cores/pthread/forked processes (or even separately started > processes). Application would just use normal pools/queues, no special IPC > types needed. Implementation would be different for initialization/memory > mapping/etc, but application would not see the difference (when using ODP > API). > > > > ODP IPC is just another type of messaging interface. An application > could open an IPC interface (instead of e.g. a packet IO "loopback" > interface) for optimized inter-processes messaging (within a SoC). > > > That is what is done. Only pool_lookup between 2 processes is not > implemented. I think if pool can be done, then it will work as you > describe. But here interesting question. How pool_lookup will fit other > platforms? I assume single pool is used only for software IPC > (linux-generic). In hardware case packet may change it's original pool. > So it might be not big reason to implement pool_lookup and for > linux-generic just stay with fork. What do you about that? > > > > >> + > >> +static void *ring_thread(void *arg ODP_UNUSED) > > To demonstrate IPC this process should run separate instance of ODP > (call odp_global_init, etc ...) > > It is what fork does. One process runs X ring_threads, to do lookup for > IPC queue and once lookup done - it reads packets from IPC queue. > The other process runs pktio_queue_thread or pktio_ifburst_thread. > Creates IO queue and IPC queue. And simple transfers packets from IO > queue to IPC queue. > > >> +{ > >> + int ret; > >> + odp_buffer_t buf; > >> + odp_buffer_pool_t pkt_pool; > >> + odp_pktio_params_t pktio_ipc_params; > >> + odp_pktio_t pktio_ipc; > >> + odp_queue_t ipcq_def; > >> + > >> + printf("ODP RING THREAD PID %d\n", getpid()); > >> + > >> + pkt_pool = odp_buffer_pool_lookup("packet_pool"); > > Cannot share pools between ODP instances. > > Yes, for linux-generic pool implementation is awful. To share pool with > locks, pool entries and etc is really complicated. > As I wrote about I did not implement that and there is might be no > reason to do it at all. > > > >> + if (pkt_pool == ODP_BUFFER_POOL_INVALID) { > >> + ODP_ERR("Error: pkt_pool not found\n"); > >> + return NULL; > >> + } > >> + > >> + /* create shared queue between processes*/ > >> + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; > >> + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); > >> + if (pktio_ipc == ODP_PKTIO_INVALID) { > >> + ODP_ERR("Error: pktio create failed\n"); > >> + return NULL; > >> + } > >> + > >> + > >> + while (1) { > >> + ipcq_def = odp_queue_lookup("shared-queue"); > > Cannot share queues between ODP instances. > That is not true. In my patch I implement this feature. And the main > idea of that patch is to make queues > visible for other processes. You can run example and check that it works. > > > Best regards, > Maxim. > > -Petri > > > >> + if (ipcq_def != ODP_QUEUE_INVALID) > >> + break; > >> + sleep(1); > >> + } > >> + printf("%s() shared-queue found\n", __func__); > >> + > >> + ret = odp_pktio_inq_setdef(pktio_ipc, ipcq_def); > >> + if (ret != 0) { > >> + ODP_ERR("Error: slave thread default ipc-Q setup\n"); > >> + return NULL; > >> + } > >> + > >> + /* In loop take packets from ipc queue and free this buffer */ > >> + while (1) { > >> + buf = odp_queue_deq(ipcq_def); > >> + if (odp_unlikely(!odp_buffer_is_valid(buf))) > >> + continue; > >> + > >> + printf("\t\t%s() got buffer from IPC queue size %ld/%ld\n", > >> + __func__, > >> + (unsigned long)odp_packet_get_len(buf), > >> + (unsigned long)odp_buffer_size(buf)); > >> + odp_buffer_free(buf); > >> + } > >> + > >> + /* unreachable */ > >> + return NULL; > >> +} > >> + > >> + > >
On 09/25/2014 02:03 PM, Savolainen, Petri (NSN - FI/Espoo) wrote: > Hi, > > My main point is that IPC is not needed when you fork _after_ odp_init_global(). The application is running single instance of ODP, and does not need any special IPC to communicate internally. IPC would be needed if you'd fork _before_ odp_init_global() and thus have two different ODP instances running (copies of ODP state data). > > > -Petri > Ok understand your point. In that case I need to think how to make pool visible to 2 different processes. I think one it will be done fork before odp_init_global will work also. Maxim. > >> -----Original Message----- >> From: ext Maxim Uvarov [mailto:maxim.uvarov@linaro.org] >> Sent: Thursday, September 25, 2014 12:35 PM >> To: Savolainen, Petri (NSN - FI/Espoo); lng-odp@lists.linaro.org >> Subject: Re: [lng-odp] [PATCHv6] linux-generic: odp ipc implementation >> >> On 09/25/2014 11:44 AM, Savolainen, Petri (NSN - FI/Espoo) wrote: >>> Hi, >>> >>> This code forks two processes from a common ancestor, which does common >> initialization for both. So, it runs single instance of ODP (no different >> from other examples which create pthreads after a common ODP init). >> >> Petri, the difference here is huge. I do queue creation and lookup after >> fork. And both creation and lookup is done vis shm_open. So after fork >> it looks like 2 separate processes. One reason why I used fork here is >> not implement pool_lookup over shared memory. Just because it's match >> complex and not sure if other platform, not linux generic need that. My >> solution is really very close to dpdk example for IPC. >> >>> To demonstrate IPC, you should communicate between two instances of >> ODP, or between an ODP app and a non-ODP app. There's no difference in >> communication inside an ODP instance regardless it's composed from bare >> metal cores/pthread/forked processes (or even separately started >> processes). Application would just use normal pools/queues, no special IPC >> types needed. Implementation would be different for initialization/memory >> mapping/etc, but application would not see the difference (when using ODP >> API). >>> ODP IPC is just another type of messaging interface. An application >> could open an IPC interface (instead of e.g. a packet IO "loopback" >> interface) for optimized inter-processes messaging (within a SoC). >> That is what is done. Only pool_lookup between 2 processes is not >> implemented. I think if pool can be done, then it will work as you >> describe. But here interesting question. How pool_lookup will fit other >> platforms? I assume single pool is used only for software IPC >> (linux-generic). In hardware case packet may change it's original pool. >> So it might be not big reason to implement pool_lookup and for >> linux-generic just stay with fork. What do you about that? >> >>>> + >>>> +static void *ring_thread(void *arg ODP_UNUSED) >>> To demonstrate IPC this process should run separate instance of ODP >> (call odp_global_init, etc ...) >> >> It is what fork does. One process runs X ring_threads, to do lookup for >> IPC queue and once lookup done - it reads packets from IPC queue. >> The other process runs pktio_queue_thread or pktio_ifburst_thread. >> Creates IO queue and IPC queue. And simple transfers packets from IO >> queue to IPC queue. >> >>>> +{ >>>> + int ret; >>>> + odp_buffer_t buf; >>>> + odp_buffer_pool_t pkt_pool; >>>> + odp_pktio_params_t pktio_ipc_params; >>>> + odp_pktio_t pktio_ipc; >>>> + odp_queue_t ipcq_def; >>>> + >>>> + printf("ODP RING THREAD PID %d\n", getpid()); >>>> + >>>> + pkt_pool = odp_buffer_pool_lookup("packet_pool"); >>> Cannot share pools between ODP instances. >> Yes, for linux-generic pool implementation is awful. To share pool with >> locks, pool entries and etc is really complicated. >> As I wrote about I did not implement that and there is might be no >> reason to do it at all. >>>> + if (pkt_pool == ODP_BUFFER_POOL_INVALID) { >>>> + ODP_ERR("Error: pkt_pool not found\n"); >>>> + return NULL; >>>> + } >>>> + >>>> + /* create shared queue between processes*/ >>>> + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; >>>> + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); >>>> + if (pktio_ipc == ODP_PKTIO_INVALID) { >>>> + ODP_ERR("Error: pktio create failed\n"); >>>> + return NULL; >>>> + } >>>> + >>>> + >>>> + while (1) { >>>> + ipcq_def = odp_queue_lookup("shared-queue"); >>> Cannot share queues between ODP instances. >> That is not true. In my patch I implement this feature. And the main >> idea of that patch is to make queues >> visible for other processes. You can run example and check that it works. >> >> >> Best regards, >> Maxim. >>> -Petri >>> >>>> + if (ipcq_def != ODP_QUEUE_INVALID) >>>> + break; >>>> + sleep(1); >>>> + } >>>> + printf("%s() shared-queue found\n", __func__); >>>> + >>>> + ret = odp_pktio_inq_setdef(pktio_ipc, ipcq_def); >>>> + if (ret != 0) { >>>> + ODP_ERR("Error: slave thread default ipc-Q setup\n"); >>>> + return NULL; >>>> + } >>>> + >>>> + /* In loop take packets from ipc queue and free this buffer */ >>>> + while (1) { >>>> + buf = odp_queue_deq(ipcq_def); >>>> + if (odp_unlikely(!odp_buffer_is_valid(buf))) >>>> + continue; >>>> + >>>> + printf("\t\t%s() got buffer from IPC queue size %ld/%ld\n", >>>> + __func__, >>>> + (unsigned long)odp_packet_get_len(buf), >>>> + (unsigned long)odp_buffer_size(buf)); >>>> + odp_buffer_free(buf); >>>> + } >>>> + >>>> + /* unreachable */ >>>> + return NULL; >>>> +} >>>> + >>>> +
> -----Original Message----- > From: ext Maxim Uvarov [mailto:maxim.uvarov@linaro.org] > Sent: Thursday, September 25, 2014 2:15 PM > To: Savolainen, Petri (NSN - FI/Espoo); lng-odp@lists.linaro.org > Subject: Re: [lng-odp] [PATCHv6] linux-generic: odp ipc implementation > > On 09/25/2014 02:03 PM, Savolainen, Petri (NSN - FI/Espoo) wrote: > > Hi, > > > > My main point is that IPC is not needed when you fork _after_ > odp_init_global(). The application is running single instance of ODP, and > does not need any special IPC to communicate internally. IPC would be > needed if you'd fork _before_ odp_init_global() and thus have two > different ODP instances running (copies of ODP state data). > > > > > > -Petri > > > Ok understand your point. In that case I need to think how to make pool > visible to 2 different processes. I think one it will be done fork > before odp_init_global will work also. > > Maxim. Pools or queues must not be shared - the "IPC interface" should be used instead to move buffers from one side to another. Two options for the interface: 1) application enqueues empty buffers to the interface, and dequeue/schedule filled buffers from the interface 2) let interface to allocate buffers (and copy data), and just dequeue/schedule filled buffers from the interface. This is how packet IO interface works. -Petri
Hi, A few nits to start with: On Wed, Sep 24, 2014 at 2:28 PM, Maxim Uvarov <maxim.uvarov@linaro.org> wrote: > Implement odp implementation for linux-generic using standard > odp queue API. > > Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org> > --- > v6: - update to the latest code (on top of Petris shm patches); > > v5: - malloc for queues_tbl. This fixes segfault in v4 if shm was > not created; > - removed not needed chunk from odp_shm_reserve() introduced in v4; > - implement linux-generic plat_odp_shm_reserve() with extendend flags; > > v4: - fixed Anderses comments. (did not use unlikely() for > init functions. Only for packet processing. > - checkpatch cleanup; > - update to the latest ODP head; > - remove allocation memory for r_p; > > > .gitignore | 1 + > configure.ac | 1 + > example/Makefile.am | 2 +- > example/ipc/Makefile.am | 6 + > example/ipc/README | 56 ++ > example/ipc/odp_ipc.c | 679 +++++++++++++++++++++ > helper/include/odph_ring.h | 2 + > platform/linux-generic/.dirstamp | 0 > .../linux-generic/include/api/odp_pktio_types.h | 1 + > platform/linux-generic/include/api/odp_queue.h | 2 + > .../linux-generic/include/api/odp_shared_memory.h | 1 + > .../linux-generic/include/odp_packet_io_internal.h | 1 + > .../linux-generic/include/odp_queue_internal.h | 14 +- > .../include/odp_shared_memory_internal.h | 35 ++ > platform/linux-generic/odp_packet_io.c | 27 +- > platform/linux-generic/odp_queue.c | 215 ++++++- > platform/linux-generic/odp_ring.c | 10 +- > platform/linux-generic/odp_shared_memory.c | 32 +- > 18 files changed, 1066 insertions(+), 19 deletions(-) > create mode 100644 example/ipc/Makefile.am > create mode 100644 example/ipc/README > create mode 100644 example/ipc/odp_ipc.c > create mode 100644 platform/linux-generic/.dirstamp > create mode 100644 platform/linux-generic/include/odp_shared_memory_internal.h > > diff --git a/.gitignore b/.gitignore > index 2b9e4f5..428f06b 100644 > --- a/.gitignore > +++ b/.gitignore > @@ -33,6 +33,7 @@ lib/ > obj/ > build/ > odp_example > +odp_ipc > odp_packet > odp_packet_netmap > odp_atomic > diff --git a/configure.ac b/configure.ac > index 102486d..3db5f59 100644 > --- a/configure.ac > +++ b/configure.ac > @@ -153,6 +153,7 @@ AC_CONFIG_FILES([Makefile > platform/linux-keystone2/Makefile > platform/linux-dpdk/Makefile > example/Makefile > + example/ipc/Makefile > example/generator/Makefile > example/ipsec/Makefile > example/l2fwd/Makefile > diff --git a/example/Makefile.am b/example/Makefile.am > index 72663b9..5bfd6d7 100644 > --- a/example/Makefile.am > +++ b/example/Makefile.am > @@ -1 +1 @@ > -SUBDIRS = generator ipsec l2fwd odp_example packet packet_netmap timer > +SUBDIRS = generator ipsec l2fwd odp_example packet packet_netmap timer ipc > diff --git a/example/ipc/Makefile.am b/example/ipc/Makefile.am > new file mode 100644 > index 0000000..2fd48f7 > --- /dev/null > +++ b/example/ipc/Makefile.am > @@ -0,0 +1,6 @@ > +include $(top_srcdir)/example/Makefile.inc > + > +bin_PROGRAMS = odp_ipc > +odp_ipc_LDFLAGS = $(AM_LDFLAGS) -static > + > +dist_odp_ipc_SOURCES = odp_ipc.c > diff --git a/example/ipc/README b/example/ipc/README > new file mode 100644 > index 0000000..57df942 > --- /dev/null > +++ b/example/ipc/README > @@ -0,0 +1,56 @@ > +/* Copyright (c) 2014, Linaro Limited > + * All rights reserved. > + * > + * SPDX-License-Identifier: BSD-3-Clause > + */ > + > + ODP IPC example > + > +This example shows how to use queues to exchange packets between different > +processes. > + > +Examples scheme: > + > + Ping (Machine 1) ----> odp_ipc app (Machine 2) > + > +Example burst mode: > +./odp_ipc -i eth0 -m 1 -c 1 > +On remote host run ping target that runs odp_ipc. > + > +[11492/1] enqueue 1 packets, first buf 7921 size 98/1856, cnt 1 > +11490 no valid buffer > + ring_thread() got buffer from IPC queue size 98/1856 > +[11492/1] enqueue 1 packets, first buf 7905 size 98/1856, cnt 2 > +11490 no valid buffer > + ring_thread() got buffer from IPC queue size 98/1856 > +[11492/1] enqueue 1 packets, first buf 7889 size 98/1856, cnt 3 > +11490 no valid buffer > + ring_thread() got buffer from IPC queue size 98/1856 > +[11492/1] enqueue 1 packets, first buf 7873 size 98/1856, cnt 4 > + > + > +Main PID/thread [11492/1] enqueues packets to IPC queue with odp_queue_enq_multi(), > +child process thread ring_thread() dequeues packets from ipc queue. > + > + > +Example queue mode: > + > +./odp_ipc -i eth0 -m 1 -c 1 > +waiting for packet... > +Enqueue the packet to ipc queue size 98/1856 > +waiting for packet... > +15917 no valid buffer > + ring_thread() got buffer from IPC queue size 98/1856 > +Enqueue the packet to ipc queue size 98/1856 > +waiting for packet... > +15917 no valid buffer > + ring_thread() got buffer from IPC queue size 98/1856 > +Enqueue the packet to ipc queue size 98/1856 > +waiting for packet... > +15917 no valid buffer > + ring_thread() got buffer from IPC queue size 98/1856 > +Enqueue the packet to ipc queue size 98/1856 > +waiting for packet... > + > +Thread 15917 moves packets from ingress queue to IPC queue. Other process > +in ring_thread() thread dequeues packets from IPC queue. > diff --git a/example/ipc/odp_ipc.c b/example/ipc/odp_ipc.c > new file mode 100644 > index 0000000..0b5c636 > --- /dev/null > +++ b/example/ipc/odp_ipc.c > @@ -0,0 +1,679 @@ > +/* Copyright (c) 2014, Linaro Limited > + * All rights reserved. > + * > + * SPDX-License-Identifier: BSD-3-Clause > + */ > + > +/** > + * @file > + * > + * @example odp_ipc.c ODP IPC queues example application > + */ > + > +#include <stdlib.h> > +#include <string.h> > +#include <getopt.h> > +#include <unistd.h> > + > +#include <odp.h> > +#include <odph_linux.h> > +#include <odph_packet.h> > +#include <odph_eth.h> > +#include <odph_ip.h> > + > +#define MAX_WORKERS 32 > +#define SHM_PKT_POOL_SIZE (512*2048) > +#define SHM_PKT_POOL_BUF_SIZE 1856 > +#define MAX_PKT_BURST 16 > + > +#define APPL_MODE_PKT_BURST 0 > +#define APPL_MODE_PKT_QUEUE 1 > + > +#define RING_SIZE 4096 > +#define ODP_RING_NAMESIZE 32 > + > +#define PRINT_APPL_MODE(x) printf("%s(%i)\n", #x, (x)) > + > +/** Get rid of path in filename - only for unix-type paths using '/' */ > +#define NO_PATH(file_name) (strrchr((file_name), '/') ? \ > + strrchr((file_name), '/') + 1 : (file_name)) > +/** > + * Parsed command line application arguments > + */ > +typedef struct { > + int core_count; > + int if_count; /**< Number of interfaces to be used */ > + char **if_names; /**< Array of pointers to interface names */ > + int mode; /**< Packet IO mode */ > + int type; /**< Packet IO type */ > + int fanout; /**< Packet IO fanout */ > + odp_buffer_pool_t pool; /**< Buffer pool for packet IO */ > +} appl_args_t; > + > +/** > + * Thread specific arguments > + */ > +typedef struct { > + char *pktio_dev; /**< Interface name to use */ > + odp_buffer_pool_t pool; /**< Buffer pool for packet IO */ > + int mode; /**< Thread mode */ > + int type; /**< Thread i/o type */ > + int fanout; /**< Thread i/o fanout */ > +} thread_args_t; > + > +/** > + * Grouping of both parsed CL args and thread specific args - alloc together > + */ > +typedef struct { > + /** Application (parsed) arguments */ > + appl_args_t appl; > + /** Thread specific arguments */ > + thread_args_t thread[MAX_WORKERS]; > +} args_t; > + > +/** Global pointer to args */ > +static args_t *args; > + > +/* helper funcs */ > +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len); > +static void parse_args(int argc, char *argv[], appl_args_t *appl_args); > +static void print_info(char *progname, appl_args_t *appl_args); > +static void usage(char *progname); > + > +static void *ring_thread(void *arg ODP_UNUSED) > +{ > + int ret; > + odp_buffer_t buf; > + odp_buffer_pool_t pkt_pool; > + odp_pktio_params_t pktio_ipc_params; > + odp_pktio_t pktio_ipc; > + odp_queue_t ipcq_def; > + > + printf("ODP RING THREAD PID %d\n", getpid()); > + > + pkt_pool = odp_buffer_pool_lookup("packet_pool"); > + if (pkt_pool == ODP_BUFFER_POOL_INVALID) { > + ODP_ERR("Error: pkt_pool not found\n"); > + return NULL; > + } > + > + /* create shared queue between processes*/ > + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; > + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); > + if (pktio_ipc == ODP_PKTIO_INVALID) { > + ODP_ERR("Error: pktio create failed\n"); > + return NULL; > + } > + > + > + while (1) { > + ipcq_def = odp_queue_lookup("shared-queue"); > + if (ipcq_def != ODP_QUEUE_INVALID) > + break; > + sleep(1); > + } > + printf("%s() shared-queue found\n", __func__); > + > + ret = odp_pktio_inq_setdef(pktio_ipc, ipcq_def); After this pktio_ipc is not used at all, I don't think it's needed to begin with. > + if (ret != 0) { > + ODP_ERR("Error: slave thread default ipc-Q setup\n"); > + return NULL; > + } > + > + /* In loop take packets from ipc queue and free this buffer */ > + while (1) { > + buf = odp_queue_deq(ipcq_def); > + if (odp_unlikely(!odp_buffer_is_valid(buf))) > + continue; > + > + printf("\t\t%s() got buffer from IPC queue size %ld/%ld\n", > + __func__, > + (unsigned long)odp_packet_get_len(buf), > + (unsigned long)odp_buffer_size(buf)); > + odp_buffer_free(buf); > + } > + > + /* unreachable */ > + return NULL; > +} > + > + > +/** > + * Packet IO loopback worker thread using ODP queues > + * > + * @param arg thread arguments of type 'thread_args_t *' > + */ > +static void *pktio_queue_thread(void *arg) > +{ > + int thr; > + odp_buffer_pool_t pkt_pool; > + odp_pktio_t pktio; > + odp_pktio_t pktio_ipc; > + thread_args_t *thr_args; > + odp_queue_t inq_def; > + odp_queue_t ipcq_def; > + char inq_name[ODP_QUEUE_NAME_LEN]; > + odp_queue_param_t qparam; > + odp_buffer_t buf; > + int ret; > + odp_pktio_params_t params; > + odp_pktio_params_t pktio_ipc_params; > + socket_params_t *sock_params = ¶ms.sock_params; > + > + thr_args = arg; > + > + thr = odp_thread_id(); > + > + printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr, > + thr_args->pktio_dev); > + > + /* lookup ring from its name */ > + /* Lookup the packet pool */ > + pkt_pool = odp_buffer_pool_lookup("packet_pool"); > + if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) { > + ODP_ERR(" [%02i] Error: pkt_pool not found\n", thr); > + return NULL; > + } > + > + /* Open a packet IO instance for this thread */ > + sock_params->type = thr_args->type; > + sock_params->fanout = thr_args->fanout; > + pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, ¶ms); > + if (pktio == ODP_PKTIO_INVALID) { > + ODP_ERR(" [%02i] Error: pktio create failed\n", thr); > + return NULL; > + } > + > + /* > + * Create and set the default INPUT queue associated with the 'pktio' > + * resource > + */ > + qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT; > + qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC; > + qparam.sched.group = ODP_SCHED_GROUP_DEFAULT; > + snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio); > + inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0'; > + > + inq_def = odp_queue_create(inq_name, ODP_QUEUE_TYPE_PKTIN, &qparam); > + if (inq_def == ODP_QUEUE_INVALID) { > + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); > + return NULL; > + } > + > + ret = odp_pktio_inq_setdef(pktio, inq_def); > + if (ret != 0) { > + ODP_ERR(" [%02i] Error: default input-Q setup\n", thr); > + return NULL; > + } > + > + printf(" [%02i] created pktio:%02i, queue mode (ATOMIC queues)\n" > + " default pktio%02i-INPUT queue:%u\n", > + thr, pktio, pktio, inq_def); > + > + /* create shared queue between processes*/ > + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; > + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); > + if (pktio_ipc == ODP_PKTIO_INVALID) { > + ODP_ERR(" [%02i] Error: pktio create failed\n", thr); > + return NULL; > + } Again, pktio_ipc not used. > + ipcq_def = odp_queue_create("shared-queue", > + ODP_QUEUE_TYPE_IPC, &qparam); > + if (ipcq_def == ODP_QUEUE_INVALID) { > + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); > + return NULL; > + } > + > + /* In loop take packets from inq queue and put them to ipc queue */ > + for (;;) { > + /* Use schedule to get buf from any input queue */ > + printf("waiting for packet...\n"); > + buf = odp_schedule(NULL, ODP_SCHED_WAIT); > + > + printf("Enqueue the packet to ipc queue size %ld/%ld\n", > + (unsigned long)odp_packet_get_len(buf), > + (unsigned long)odp_buffer_size(buf)); > + > + odp_queue_enq(ipcq_def, buf); > + } > + > +/* unreachable */ > +} > + > +/** > + * Packet IO loopback worker thread using bursts from/to IO resources > + * > + * @param arg thread arguments of type 'thread_args_t *' > + */ > +static void *pktio_ifburst_thread(void *arg) > +{ > + int thr; > + odp_buffer_pool_t pkt_pool; > + odp_pktio_t pktio; > + thread_args_t *thr_args; > + int pkts, pkts_ok; > + odp_packet_t pkt_tbl[MAX_PKT_BURST]; > + unsigned long pkt_cnt = 0; > + unsigned long err_cnt = 0; > + odp_pktio_params_t params; > + socket_params_t *sock_params = ¶ms.sock_params; > + int ret; > + > + odp_pktio_t pktio_ipc; > + odp_queue_t ipcq_def; > + char inq_name[ODP_QUEUE_NAME_LEN]; > + odp_queue_param_t qparam; > + odp_pktio_params_t pktio_ipc_params; > + > + thr = odp_thread_id(); > + thr_args = arg; > + > + printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr, > + thr_args->pktio_dev); > + > + /* Lookup the packet pool */ > + pkt_pool = odp_buffer_pool_lookup("packet_pool"); > + if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) { > + ODP_ERR(" [%02i] Error: pkt_pool not found\n", thr); > + return NULL; > + } > + > + /* Open a packet IO instance for this thread */ > + sock_params->type = thr_args->type; > + sock_params->fanout = thr_args->fanout; > + pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, ¶ms); > + if (pktio == ODP_PKTIO_INVALID) { > + ODP_ERR(" [%02i] Error: pktio create failed.\n", thr); > + return NULL; > + } > + > + printf(" [%02i] created pktio:%02i, burst mode\n", > + thr, pktio); > + > + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; > + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); > + if (pktio_ipc == ODP_PKTIO_INVALID) { > + ODP_ERR(" [%02i] Error: pktio create failed\n", thr); > + return NULL; > + } And again. > + > + qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT; > + qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC; > + qparam.sched.group = ODP_SCHED_GROUP_DEFAULT; > + snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio); > + inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0'; > + > + ipcq_def = odp_queue_create("shared-queue", > + ODP_QUEUE_TYPE_IPC, &qparam); > + if (ipcq_def == ODP_QUEUE_INVALID) { > + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); > + return NULL; > + } > + > + /* Loop packets */ > + for (;;) { > + pkts = odp_pktio_recv(pktio, pkt_tbl, MAX_PKT_BURST); > + if (pkts > 0) { > + /* Drop packets with errors */ > + pkts_ok = drop_err_pkts(pkt_tbl, pkts); > + if (pkts_ok > 0) { > + ret = odp_queue_enq_multi(ipcq_def, > + pkt_tbl, pkts_ok); > + pkt_cnt += pkts_ok; > + if (ret != 0) { > + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n"); > + } else { > + printf("[%d/%d] enqueue %d packets, first buf %d size %ld/%ld, cnt %lu\n", > + getpid(), thr, pkts_ok, > + pkt_tbl[0], > + (unsigned long)odp_packet_get_len(pkt_tbl[0]), > + (unsigned long)odp_buffer_size(pkt_tbl[0]), > + pkt_cnt); > + } > + } > + > + if (odp_unlikely(pkts_ok != pkts)) > + ODP_ERR("Dropped frames:%u - err_cnt:%lu\n", > + pkts-pkts_ok, ++err_cnt); > + } > + } > + > +/* unreachable */ > +} > + > +/** > + * ODP packet example main function > + */ > +int main(int argc, char *argv[]) > +{ > + odph_linux_pthread_t thread_tbl[MAX_WORKERS]; > + odp_buffer_pool_t pool; > + int thr_id; > + int num_workers; > + void *pool_base; > + int i; > + int first_core; > + int core_count; > + > + /* Init ODP before calling anything else */ > + if (odp_init_global()) { > + ODP_ERR("Error: ODP global init failed.\n"); > + exit(EXIT_FAILURE); > + } > + > + args = malloc(sizeof(args_t)); > + if (args == NULL) { > + ODP_ERR("Error: shared mem alloc failed.\n"); > + exit(EXIT_FAILURE); > + } > + memset(args, 0, sizeof(*args)); > + > + /* Parse and store the application arguments */ > + parse_args(argc, argv, &args->appl); > + > + /* Print both system and application information */ > + print_info(NO_PATH(argv[0]), &args->appl); > + > + core_count = odp_sys_core_count(); > + num_workers = core_count; > + > + if (args->appl.core_count) > + num_workers = args->appl.core_count; > + > + if (num_workers > MAX_WORKERS) > + num_workers = MAX_WORKERS; > + > + printf("Num worker threads: %i\n", num_workers); > + > + /* > + * By default core #0 runs Linux kernel background tasks. > + * Start mapping thread from core #1 > + */ > + first_core = 1; > + > + if (core_count == 1) > + first_core = 0; > + > + printf("First core: %i\n\n", first_core); > + > + /* Init this thread */ > + thr_id = odp_thread_create(0); > + odp_init_local(thr_id); > + > + /* Create packet pool */ > + odp_shm_t shm = odp_shm_reserve("shm_packet_pool", > + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE, > + ODP_SHM_PROC); > + > + pool_base = odp_shm_addr(shm); > + if (pool_base == NULL) { > + ODP_ERR("Error: packet pool mem alloc failed.\n"); > + exit(EXIT_FAILURE); > + } > + > + pool = odp_buffer_pool_create("packet_pool", pool_base, > + SHM_PKT_POOL_SIZE, > + SHM_PKT_POOL_BUF_SIZE, > + ODP_CACHE_LINE_SIZE, > + ODP_BUFFER_TYPE_PACKET); > + if (pool == ODP_BUFFER_POOL_INVALID) { > + ODP_ERR("Error: packet pool create failed.\n"); > + exit(EXIT_FAILURE); > + } > + odp_buffer_pool_print(pool); > + > + > + /* Create another process */ > + int f = fork(); > + > + /* Create and init worker threads */ > + memset(thread_tbl, 0, sizeof(thread_tbl)); > + for (i = 0; i < num_workers; ++i) { > + void *(*thr_run_func) (void *); > + int core; > + int if_idx; > + > + core = (first_core + i) % core_count; > + > + if_idx = i % args->appl.if_count; > + > + args->thread[i].pktio_dev = args->appl.if_names[if_idx]; > + args->thread[i].pool = pool; > + args->thread[i].mode = args->appl.mode; > + args->thread[i].type = args->appl.type; > + args->thread[i].fanout = args->appl.fanout; > + > + if (f) { > + thr_run_func = ring_thread; > + } else { > + if (args->appl.mode == APPL_MODE_PKT_BURST) > + thr_run_func = pktio_ifburst_thread; > + else /* APPL_MODE_PKT_QUEUE */ > + thr_run_func = pktio_queue_thread; > + } > + /* > + * Create threads one-by-one instead of all-at-once, > + * because each thread might get different arguments. > + * Calls odp_thread_create(cpu) for each thread > + */ > + odph_linux_pthread_create(thread_tbl, 1, core, thr_run_func, > + &args->thread[i]); > + } > + > + /* Master thread waits for other threads to exit */ > + odph_linux_pthread_join(thread_tbl, num_workers); > + > + printf("Exit\n\n"); > + > + return 0; > +} > + > +/** > + * Drop packets which input parsing marked as containing errors. > + * > + * Frees packets with error and modifies pkt_tbl[] to only contain packets with > + * no detected errors. > + * > + * @param pkt_tbl Array of packet > + * @param len Length of pkt_tbl[] > + * > + * @return Number of packets with no detected error > + */ > +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len) > +{ > + odp_packet_t pkt; > + unsigned pkt_cnt = len; > + unsigned i, j; > + > + for (i = 0, j = 0; i < len; ++i) { > + pkt = pkt_tbl[i]; > + > + if (odp_unlikely(odp_packet_error(pkt))) { > + odph_packet_free(pkt); /* Drop */ > + pkt_cnt--; > + } else if (odp_unlikely(i != j++)) { > + pkt_tbl[j-1] = pkt; > + } > + } > + > + return pkt_cnt; > +} > + > +/** > + * Parse and store the command line arguments > + * > + * @param argc argument count > + * @param argv[] argument vector > + * @param appl_args Store application arguments here > + */ > +static void parse_args(int argc, char *argv[], appl_args_t *appl_args) > +{ > + int opt; > + int long_index; > + char *names, *str, *token, *save; > + int i; > + int len; > + static struct option longopts[] = { > + {"count", required_argument, NULL, 'c'}, > + {"interface", required_argument, NULL, 'i'}, /* return 'i' */ > + {"mode", required_argument, NULL, 'm'}, /* return 'm' */ > + {"help", no_argument, NULL, 'h'}, /* return 'h' */ > + {NULL, 0, NULL, 0} > + }; > + > + appl_args->mode = -1; /* Invalid, must be changed by parsing */ > + appl_args->type = 3; /* 3: ODP_PKTIO_TYPE_SOCKET_MMAP */ > + appl_args->fanout = 1; /* turn off fanout by default for mmap */ > + > + while (1) { > + opt = getopt_long(argc, argv, "+c:i:m:t:f:h", > + longopts, &long_index); > + > + if (opt == -1) > + break; /* No more options */ > + > + switch (opt) { > + case 'c': > + appl_args->core_count = atoi(optarg); > + break; > + /* parse packet-io interface names */ > + case 'i': > + len = strlen(optarg); > + if (len == 0) { > + usage(argv[0]); > + exit(EXIT_FAILURE); > + } > + len += 1; /* add room for '\0' */ > + > + names = malloc(len); > + if (names == NULL) { > + usage(argv[0]); > + exit(EXIT_FAILURE); > + } > + > + /* count the number of tokens separated by ',' */ > + strcpy(names, optarg); > + for (str = names, i = 0;; str = NULL, i++) { > + token = strtok_r(str, ",", &save); > + if (token == NULL) > + break; > + } > + appl_args->if_count = i; > + > + if (appl_args->if_count == 0) { > + usage(argv[0]); > + exit(EXIT_FAILURE); > + } > + > + /* allocate storage for the if names */ > + appl_args->if_names = > + calloc(appl_args->if_count, sizeof(char *)); > + > + /* store the if names (reset names string) */ > + strcpy(names, optarg); > + for (str = names, i = 0;; str = NULL, i++) { > + token = strtok_r(str, ",", &save); > + if (token == NULL) > + break; > + appl_args->if_names[i] = token; > + } > + break; > + > + case 'm': > + i = atoi(optarg); > + if (i == 0) > + appl_args->mode = APPL_MODE_PKT_BURST; > + else > + appl_args->mode = APPL_MODE_PKT_QUEUE; > + break; > + > + case 't': > + appl_args->type = atoi(optarg); > + break; > + > + case 'f': > + appl_args->fanout = atoi(optarg); > + break; > + > + case 'h': > + usage(argv[0]); > + exit(EXIT_SUCCESS); > + break; > + > + default: > + break; > + } > + } > + > + if (appl_args->if_count == 0 || appl_args->mode == -1) { > + usage(argv[0]); > + exit(EXIT_FAILURE); > + } > + > + optind = 1; /* reset 'extern optind' from the getopt lib */ > +} > + > +/** > + * Print system and application info > + */ > +static void print_info(char *progname, appl_args_t *appl_args) > +{ > + int i; > + > + printf("\n" > + "ODP system info\n" > + "---------------\n" > + "ODP API version: %s\n" > + "CPU model: %s\n" > + "CPU freq (hz): %"PRIu64"\n" > + "Cache line size: %i\n" > + "Core count: %i\n" > + "\n", > + odp_version_api_str(), odp_sys_cpu_model_str(), odp_sys_cpu_hz(), > + odp_sys_cache_line_size(), odp_sys_core_count()); > + > + printf("Running ODP appl: \"%s\"\n" > + "-----------------\n" > + "IF-count: %i\n" > + "Using IFs: ", > + progname, appl_args->if_count); > + for (i = 0; i < appl_args->if_count; ++i) > + printf(" %s", appl_args->if_names[i]); > + printf("\n" > + "Mode: "); > + if (appl_args->mode == APPL_MODE_PKT_BURST) > + PRINT_APPL_MODE(APPL_MODE_PKT_BURST); > + else > + PRINT_APPL_MODE(APPL_MODE_PKT_QUEUE); > + printf("\n\n"); > + fflush(NULL); > +} > + > +/** > + * Prinf usage information > + */ > +static void usage(char *progname) > +{ > + printf("\n" > + "Usage: %s OPTIONS\n" > + " E.g. %s -i eth1,eth2,eth3 -m 0\n" > + "\n" > + "OpenDataPlane example application.\n" > + "\n" > + "Mandatory OPTIONS:\n" > + " -i, --interface Eth interfaces (comma-separated, no spaces)\n" > + " -m, --mode 0: Burst send&receive packets (no queues)\n" > + " 1: Send&receive packets through ODP queues.\n" > + " -t, --type 1: ODP_PKTIO_TYPE_SOCKET_BASIC\n" > + " 2: ODP_PKTIO_TYPE_SOCKET_MMSG\n" > + " 3: ODP_PKTIO_TYPE_SOCKET_MMAP\n" > + " 4: ODP_PKTIO_TYPE_NETMAP\n" > + " Default: 3: ODP_PKTIO_TYPE_SOCKET_MMAP\n" > + " -f, --fanout 0: off 1: on (Default 1: on)\n" > + "\n" > + "Optional OPTIONS\n" > + " -c, --count <number> Core count.\n" > + " -h, --help Display help and exit.\n" > + "\n", NO_PATH(progname), NO_PATH(progname) > + ); > +} > diff --git a/helper/include/odph_ring.h b/helper/include/odph_ring.h > index 76c1db8..1d81b5f 100644 > --- a/helper/include/odph_ring.h > +++ b/helper/include/odph_ring.h > @@ -158,6 +158,8 @@ typedef struct odph_ring { > > #define ODPH_RING_F_SP_ENQ 0x0001 /* The default enqueue is "single-producer".*/ > #define ODPH_RING_F_SC_DEQ 0x0002 /* The default dequeue is "single-consumer".*/ > +#define ODPH_RING_SHM_PROC 0x0004 /* If set - ring is visible from different > + processes. Default is thread visible. */ > #define ODPH_RING_QUOT_EXCEED (1 << 31) /* Quota exceed for burst ops */ > #define ODPH_RING_SZ_MASK (unsigned)(0x0fffffff) /* Ring size mask */ > > diff --git a/platform/linux-generic/.dirstamp b/platform/linux-generic/.dirstamp > new file mode 100644 > index 0000000..e69de29 > diff --git a/platform/linux-generic/include/api/odp_pktio_types.h b/platform/linux-generic/include/api/odp_pktio_types.h > index 54ce459..79c0f48 100644 > --- a/platform/linux-generic/include/api/odp_pktio_types.h > +++ b/platform/linux-generic/include/api/odp_pktio_types.h > @@ -30,6 +30,7 @@ typedef enum { > ODP_PKTIO_TYPE_SOCKET_MMSG, > ODP_PKTIO_TYPE_SOCKET_MMAP, > ODP_PKTIO_TYPE_NETMAP, > + ODP_PKTIO_TYPE_IPC, > } odp_pktio_type_t; > > #include <odp_pktio_socket.h> > diff --git a/platform/linux-generic/include/api/odp_queue.h b/platform/linux-generic/include/api/odp_queue.h > index 5e083f1..4700a62 100644 > --- a/platform/linux-generic/include/api/odp_queue.h > +++ b/platform/linux-generic/include/api/odp_queue.h > @@ -44,6 +44,8 @@ typedef int odp_queue_type_t; > #define ODP_QUEUE_TYPE_POLL 1 /**< Not scheduled queue */ > #define ODP_QUEUE_TYPE_PKTIN 2 /**< Packet input queue */ > #define ODP_QUEUE_TYPE_PKTOUT 3 /**< Packet output queue */ > +#define ODP_QUEUE_TYPE_IPC 4 /**< Packet ipc queue */ > +#define ODP_QUEUE_TYPE_IPC_LOOKUP 5 /**< Packet ipc queue */ > > /** > * ODP schedule priority > diff --git a/platform/linux-generic/include/api/odp_shared_memory.h b/platform/linux-generic/include/api/odp_shared_memory.h > index 7ad29c3..ce0e89c 100644 > --- a/platform/linux-generic/include/api/odp_shared_memory.h > +++ b/platform/linux-generic/include/api/odp_shared_memory.h > @@ -96,6 +96,7 @@ void *odp_shm_addr(odp_shm_t shm); > */ > int odp_shm_info(odp_shm_t shm, odp_shm_info_t *info); > > +int odp_shm_lookup_ipc(const char *name); > > /** > * Print all shared memory blocks > diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h > index 881cc5f..77fff96 100644 > --- a/platform/linux-generic/include/odp_packet_io_internal.h > +++ b/platform/linux-generic/include/odp_packet_io_internal.h > @@ -35,6 +35,7 @@ struct pktio_entry { > #ifdef ODP_HAVE_NETMAP > pkt_netmap_t pkt_nm; /**< using netmap API for IO */ > #endif > + odp_buffer_pool_t pool; /**< reference to packet pool */ > }; > > typedef union { > diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h > index 8b6c517..077fafb 100644 > --- a/platform/linux-generic/include/odp_queue_internal.h > +++ b/platform/linux-generic/include/odp_queue_internal.h > @@ -23,6 +23,7 @@ extern "C" { > #include <odp_packet_io.h> > #include <odp_align.h> > > +#include <odph_ring.h> > > #define USE_TICKETLOCK > > @@ -39,6 +40,9 @@ extern "C" { > #define QUEUE_STATUS_NOTSCHED 2 > #define QUEUE_STATUS_SCHED 3 > > +#define QUEUE_IPC_ENTRIES 4096 /**< number of odp buffers in > + odp ring queue */ > + > /* forward declaration */ > union queue_entry_u; > > @@ -65,13 +69,13 @@ struct queue_entry_s { > deq_func_t dequeue; > enq_multi_func_t enqueue_multi; > deq_multi_func_t dequeue_multi; > - > odp_queue_t handle; > odp_buffer_t sched_buf; > odp_queue_type_t type; > odp_queue_param_t param; > odp_pktio_t pktin; > odp_pktio_t pktout; > + odph_ring_t *r; /* odph_ring ref for ipc queue */ > char name[ODP_QUEUE_NAME_LEN]; > }; > > @@ -84,10 +88,18 @@ typedef union queue_entry_u { > queue_entry_t *get_qentry(uint32_t queue_id); > > int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); > +int queue_enq_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); > + > odp_buffer_hdr_t *queue_deq(queue_entry_t *queue); > +odp_buffer_hdr_t *queue_deq_ipc(queue_entry_t *queue); > > int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); > +int queue_enq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], > + int num); > + > int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); > +int queue_deq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], > + int num); > > void queue_lock(queue_entry_t *queue); > void queue_unlock(queue_entry_t *queue); > diff --git a/platform/linux-generic/include/odp_shared_memory_internal.h b/platform/linux-generic/include/odp_shared_memory_internal.h > new file mode 100644 > index 0000000..2bcf966 > --- /dev/null > +++ b/platform/linux-generic/include/odp_shared_memory_internal.h > @@ -0,0 +1,35 @@ > +/* Copyright (c) 2014, Linaro Limited > + * All rights reserved. > + * > + * SPDX-License-Identifier: BSD-3-Clause > + */ > + > + > +/** > + * @file > + * > + * ODP shared memory internal > + */ > + > +#ifndef ODP_SHARED_MEMORY_INTERNAL_H_ > +#define ODP_SHARED_MEMORY_INTERNAL_H_ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include <odp_shared_memory.h> > + > +/*Extend odp_shm_e: > + * ODP_SHM_PROC_NOCREAT - Memory accessible by processes has to be created > + * before real usage. > + */ > +#define ODP_SHM_PROC_NOCREAT (ODP_SHM_PROC + 1) > +odp_shm_t plat_odp_shm_reserve(const char *name, uint64_t size, uint64_t align, > + uint32_t flag); > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif > diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c > index 06d8935..65b1a42 100644 > --- a/platform/linux-generic/odp_packet_io.c > +++ b/platform/linux-generic/odp_packet_io.c > @@ -132,6 +132,8 @@ static void init_pktio_entry(pktio_entry_t *entry, odp_pktio_params_t *params) > memset(&entry->s.pkt_nm, 0, sizeof(entry->s.pkt_nm)); > break; > #endif > + case ODP_PKTIO_TYPE_IPC: > + break; > default: > ODP_ERR("Packet I/O type not supported. Please recompile\n"); > break; > @@ -197,6 +199,8 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_buffer_pool_t pool, > ODP_DBG("Allocating netmap pktio\n"); > break; > #endif > + case ODP_PKTIO_TYPE_IPC: > + break; > default: > ODP_ERR("Invalid pktio type: %02x\n", params->type); > return ODP_PKTIO_INVALID; > @@ -242,6 +246,9 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_buffer_pool_t pool, > } > break; > #endif > + case ODP_PKTIO_TYPE_IPC: > + pktio_entry->s.pool = pool; > + break; > default: > free_pktio_entry(id); > id = ODP_PKTIO_INVALID; > @@ -384,11 +391,22 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t queue) > pktio_entry_t *pktio_entry = get_entry(id); > queue_entry_t *qentry = queue_to_qentry(queue); > > - if (pktio_entry == NULL || qentry == NULL) > + if (pktio_entry == NULL || qentry == NULL) { > + ODP_ERR("%s() return -q reason %p -- %p\n", > + __func__, > + pktio_entry, qentry); > return -1; > + } > > - if (qentry->s.type != ODP_QUEUE_TYPE_PKTIN) > + switch (qentry->s.type) { > + case ODP_QUEUE_TYPE_PKTIN: > + case ODP_QUEUE_TYPE_IPC: > + case ODP_QUEUE_TYPE_IPC_LOOKUP: > + break; > + default: > + ODP_ERR("%s() type is %d\n", __func__, qentry->s.type); > return -1; > + } > > lock_entry(pktio_entry); > pktio_entry->s.inq_default = queue; > @@ -399,6 +417,11 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t queue) > qentry->s.status = QUEUE_STATUS_SCHED; > queue_unlock(qentry); > > + if (qentry->s.type == ODP_QUEUE_TYPE_IPC) > + return 0; > + if (qentry->s.type == ODP_QUEUE_TYPE_IPC_LOOKUP) > + return 0; > + > odp_schedule_queue(queue, qentry->s.param.sched.prio); > > return 0; > diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c > index 1318bcd..e821e9f 100644 > --- a/platform/linux-generic/odp_queue.c > +++ b/platform/linux-generic/odp_queue.c > @@ -13,6 +13,7 @@ > #include <odp_buffer_pool_internal.h> > #include <odp_internal.h> > #include <odp_shared_memory.h> > +#include <odp_shared_memory_internal.h> > #include <odp_schedule_internal.h> > #include <odp_config.h> > #include <odp_packet_io_internal.h> > @@ -21,6 +22,11 @@ > #include <odp_hints.h> > #include <odp_sync.h> > > +#include <odph_ring.h> > + > +#include <sys/types.h> > +#include <unistd.h> > + > #ifdef USE_TICKETLOCK > #include <odp_ticketlock.h> > #define LOCK(a) odp_ticketlock_lock(a) > @@ -34,7 +40,7 @@ > #endif > > #include <string.h> > - > +#include <stdlib.h> > > typedef struct queue_table_t { > queue_entry_t queue[ODP_CONFIG_QUEUES]; > @@ -77,6 +83,37 @@ static void queue_init(queue_entry_t *queue, const char *name, > queue->s.enqueue_multi = pktout_enq_multi; > queue->s.dequeue_multi = pktout_deq_multi; > break; > + case ODP_QUEUE_TYPE_IPC: > + queue->s.r = odph_ring_lookup(name); > + if (!queue->s.r) { > + queue->s.r = odph_ring_create(name, > + QUEUE_IPC_ENTRIES, > + ODPH_RING_SHM_PROC); > + if (queue->s.r == NULL) > + ODP_ERR("ring create failed\n"); > + } > + queue->s.enqueue = queue_enq_ipc; > + queue->s.dequeue = queue_deq_ipc; > + queue->s.enqueue_multi = queue_enq_multi_ipc; > + queue->s.dequeue_multi = queue_deq_multi_ipc; > + break; > + case ODP_QUEUE_TYPE_IPC_LOOKUP: > + if (odp_shm_lookup_ipc(name) == 1) { > + size_t ring_size = QUEUE_IPC_ENTRIES * sizeof(void *) > + + sizeof(odph_ring_t); > + odp_shm_t shm = plat_odp_shm_reserve(name, ring_size, > + ODP_CACHE_LINE_SIZE, > + ODP_SHM_PROC_NOCREAT); > + > + queue->s.r = odp_shm_addr(shm); > + if (queue->s.r == NULL) > + ODP_ERR("LOOKUP ring create failed\n"); > + } > + queue->s.enqueue = queue_enq_ipc; > + queue->s.dequeue = queue_deq_ipc; > + queue->s.enqueue_multi = queue_enq_multi_ipc; > + queue->s.dequeue_multi = queue_deq_multi_ipc; > + break; > default: > queue->s.enqueue = queue_enq; > queue->s.dequeue = queue_deq; > @@ -94,16 +131,15 @@ static void queue_init(queue_entry_t *queue, const char *name, > int odp_queue_init_global(void) > { > uint32_t i; > - odp_shm_t shm; > > ODP_DBG("Queue init ... "); > > - shm = odp_shm_reserve("odp_queues", > - sizeof(queue_table_t), > - sizeof(queue_entry_t), 0); > - > - queue_tbl = odp_shm_addr(shm); > - > + /* Use malloc to allocate queues table instead of > + * odp_shm_reserve() because queues are implemented via > + * pointer lists which are differ in different VM space, > + * for example in other forked process. > + */ > + queue_tbl = malloc(sizeof(queue_table_t)); > if (queue_tbl == NULL) > return -1; > > @@ -116,6 +152,11 @@ int odp_queue_init_global(void) > queue->s.handle = queue_from_id(i); > } > > + /* for linux-generic IPC queue implemented totaly in > + * software using odp_ring. > + */ > + odph_ring_tailq_init(); > + > ODP_DBG("done\n"); > ODP_DBG("Queue init global\n"); > ODP_DBG(" struct queue_entry_s size %zu\n", > @@ -246,6 +287,27 @@ odp_queue_t odp_queue_lookup(const char *name) > UNLOCK(&queue->s.lock); > } > > + /* do look up for shared memory object if exist return that queue*/ > + odph_ring_t *r; > + > + r = odph_ring_lookup(name); > + if (r == NULL) { > + if (odp_shm_lookup_ipc(name) == 1) { > + /* Create local IPC queue connected to shm object */ > + odp_queue_t q = odp_queue_create(name, > + ODP_QUEUE_TYPE_IPC_LOOKUP, > + NULL); > + if (q != ODP_QUEUE_INVALID) > + return q; > + } > + } else { > + /* odp ring is in odp_ring_list. That means current process > + * already created link with such name. That might be ipc > + * queue or ring itself. For now print error here. > + */ > + ODP_ERR("odp ring with name: \"%s\" already initialized\n", name); > + } > + > return ODP_QUEUE_INVALID; > } > > @@ -279,6 +341,38 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) > return 0; > } > > +int queue_enq_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) > +{ > + int ret; > + odph_ring_t *r = queue->s.r; > + odp_buffer_bits_t handle; > + uint32_t index = buf_hdr->handle.index; > + uint32_t pool_id = buf_hdr->handle.pool_id; > + odp_buffer_t buf; > + void **rbuf_p; > + > + /* get buffer from buf_hdr */ > + handle.index = index; > + handle.pool_id = pool_id; > + > + buf = handle.u32; > + > + rbuf_p = (void *)&buf; > + /* use odp_ring locks instead of per process queue lock > + * LOCK(&queue->s.lock); > + */ > + /* queue buffer to the ring. Note: we can't use pointer to buf_hdr > + * here due to poiter will be referenced in different porocess > + */ > + ret = odph_ring_mp_enqueue_bulk(r, rbuf_p, 1); > + if (ret != 0) > + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n"); > + /* > + * UNLOCK(&queue->s.lock); > + */ > + return 0; > +} > + > > int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) > { > @@ -314,6 +408,45 @@ int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) > return 0; > } > > +int queue_enq_multi_ipc(queue_entry_t *queue, > + odp_buffer_hdr_t *buf_hdr[], int num) > +{ > + int i; > + int ret = 0; > + odph_ring_t *r = queue->s.r; > + odp_buffer_bits_t handle; > + odp_buffer_t buf; > + void **rbuf_p; > + > + /* use odp_ring locks instead of per process queue lock > + * LOCK(&queue->s.lock); > + */ > + > + /* odp_buffer_t buffers can be in not continius memory, > + * so queue them to IPC ring one by one. > + */ > + for (i = 0; i < num; i++) { > + handle.index = buf_hdr[i]->handle.index; > + handle.pool_id = buf_hdr[i]->handle.pool_id; > + > + buf = handle.u32; > + > + rbuf_p = (void *)&buf; > + > + /* queue buffer to the ring. Note: we can't use pointer > + * to buf_hdr here due to poiter will be referenced in > + * different porocess. > + */ > + ret += odph_ring_mp_enqueue_bulk(r, rbuf_p, 1); > + if (ret != 0) > + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n"); > + } > + /* > + * UNLOCK(&queue->s.lock); > + */ > + > + return ret; > +} > > int odp_queue_enq_multi(odp_queue_t handle, odp_buffer_t buf[], int num) > { > @@ -372,6 +505,72 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) > return buf_hdr; > } > > +odp_buffer_hdr_t *queue_deq_ipc(queue_entry_t *queue) > +{ > + odp_buffer_hdr_t *buf_hdr = NULL; > + odph_ring_t *r = queue->s.r; > + int ret; > + odp_buffer_t buf; > + void **buf_p = (void *)&buf; > + > + /* using odp_ring lock > + * LOCK(&queue->s.lock); > + */ > + ret = odph_ring_mc_dequeue_bulk(r, buf_p, 1); > + if (ret == 0) > + buf_hdr = odp_buf_to_hdr(buf); > + /* > + * UNLOCK(&queue->s.lock); > + */ > + > + return buf_hdr; > +} > + > +int queue_deq_multi_ipc(queue_entry_t *queue, > + odp_buffer_hdr_t *buf_hdr[], int num) > +{ > + int i = 0; > + odph_ring_t *r = queue->s.r; > + int ret; > + odp_buffer_t buf; > + odp_buffer_t ipcbufs[QUEUE_IPC_ENTRIES]; > + void **ipcbufs_p = (void *)&ipcbufs; > + > + /* use odp ring lock > + * LOCK(&queue->s.lock); > + */ > + > + if (queue->s.head == NULL) { > + /* Already empty queue */ > + } else { > + odp_buffer_hdr_t *hdr = queue->s.head; > + > + ret = odph_ring_mc_dequeue_bulk(r, ipcbufs_p, num); > + if (ret == 0) { > + for (; i < num && hdr; i++) { > + memcpy(&buf, (void *)ipcbufs_p[i], > + sizeof(odp_buffer_t)); > + > + buf_hdr[i] = odp_buf_to_hdr(buf); > + hdr = hdr->next; > + buf_hdr[i]->next = NULL; > + } > + } > + > + queue->s.head = hdr; > + > + if (hdr == NULL) { > + /* Queue is now empty */ > + queue->s.tail = NULL; > + } > + } > + > + /* use odp_ring lock > + * UNLOCK(&queue->s.lock); > + */ > + > + return i; > +} > > int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) > { > diff --git a/platform/linux-generic/odp_ring.c b/platform/linux-generic/odp_ring.c > index 632aa66..385ce01 100644 > --- a/platform/linux-generic/odp_ring.c > +++ b/platform/linux-generic/odp_ring.c > @@ -158,8 +158,14 @@ odph_ring_create(const char *name, unsigned count, unsigned flags) > char ring_name[ODPH_RING_NAMESIZE]; > odph_ring_t *r; > size_t ring_size; > + uint32_t shm_flag; > odp_shm_t shm; > > + if (flags & ODPH_RING_SHM_PROC) > + shm_flag = ODP_SHM_PROC; > + else > + shm_flag = 0; > + > /* count must be a power of 2 */ > if (!ODP_VAL_IS_POWER_2(count) || (count > ODPH_RING_SZ_MASK)) { > ODP_ERR("Requested size is invalid, must be power of 2, and do not exceed the size limit %u\n", > @@ -172,10 +178,10 @@ odph_ring_create(const char *name, unsigned count, unsigned flags) > > odp_rwlock_write_lock(&qlock); > /* reserve a memory zone for this ring.*/ > - shm = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE, 0); > + shm = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE, > + shm_flag); > > r = odp_shm_addr(shm); > - > if (r != NULL) { > /* init the ring structure */ > snprintf(r->name, sizeof(r->name), "%s", name); > diff --git a/platform/linux-generic/odp_shared_memory.c b/platform/linux-generic/odp_shared_memory.c > index 1898a34..4631d0b 100644 > --- a/platform/linux-generic/odp_shared_memory.c > +++ b/platform/linux-generic/odp_shared_memory.c > @@ -10,15 +10,21 @@ > #include <odp_align.h> > #include <odp_system_info.h> > #include <odp_debug.h> > +#include <odp_shared_memory_internal.h> > > #include <unistd.h> > + > #include <sys/mman.h> > #include <asm/mman.h> > #include <fcntl.h> > +#include <unistd.h> > +#include <sys/types.h> > > #include <stdio.h> > #include <string.h> > > +#include <odph_ring.h> > +#include <stdlib.h> > > #define ODP_SHM_NUM_BLOCKS 32 > > @@ -74,9 +80,8 @@ int odp_shm_init_global(void) > ODP_DBG("NOTE: mmap does not support huge pages\n"); > #endif > > - addr = mmap(NULL, sizeof(odp_shm_table_t), > - PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); > - > + /* malloc instead of mmap to bind table to process. */ > + addr = malloc(sizeof(odp_shm_table_t)); > if (addr == MAP_FAILED) > return -1; > > @@ -113,7 +118,7 @@ static int find_block(const char *name, uint32_t *index) > } > > > -odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, > +odp_shm_t plat_odp_shm_reserve(const char *name, uint64_t size, uint64_t align, > uint32_t flags) > { > uint32_t i; > @@ -165,7 +170,6 @@ odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, > } > > if (i > ODP_SHM_NUM_BLOCKS - 1) { > - /* Table full */ > odp_spinlock_unlock(&odp_shm_tbl->lock); > ODP_DBG("odp_shm_reserve: no more blocks\n"); > return ODP_SHM_INVALID; > @@ -198,6 +202,8 @@ odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, > if (addr == MAP_FAILED) { > /* Alloc failed */ > odp_spinlock_unlock(&odp_shm_tbl->lock); > + if (!(flags & ODP_SHM_PROC_NOCREAT)) > + shm_unlink(name); > ODP_DBG("odp_shm_reserve: mmap failed\n"); > return ODP_SHM_INVALID; > } > @@ -219,6 +225,11 @@ odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, > return block->hdl; > } > > +odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, > + uint32_t flag) > +{ > + return plat_odp_shm_reserve(name, size, align, flag); > +} > > odp_shm_t odp_shm_lookup(const char *name) > { > @@ -273,6 +284,17 @@ int odp_shm_info(odp_shm_t shm, odp_shm_info_t *info) > return 0; > } > > +int odp_shm_lookup_ipc(const char *name) > +{ > + int shm; > + > + shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR); > + if (shm == -1) > + return 0; > + > + close(shm); > + return 1; > +} > > void odp_shm_print_all(void) > { > -- > 1.8.5.1.163.gd7aced9 I need a bit more time to go through all of this, I think there is more to discuss. /Ciprian > > > _______________________________________________ > lng-odp mailing list > lng-odp@lists.linaro.org > http://lists.linaro.org/mailman/listinfo/lng-odp
On 09/25/2014 03:50 PM, Savolainen, Petri (NSN - FI/Espoo) wrote: > Pools or queues must not be shared - the "IPC interface" should be used instead to move buffers from one side to another. Two options for the interface: > > 1) application enqueues empty buffers to the interface, and dequeue/schedule filled buffers from the interface > > 2) let interface to allocate buffers (and copy data), and just dequeue/schedule filled buffers from the interface. This is how packet IO interface works. > > -Petri Absolutely disagree with that. Queues and pools must be shared. Packets have not be copied between threads or processes. Only reference for packets should be transfered. I.e. it must be zero copy implementation to reach maximum performance and allow to run scalable applications. That is match more easy to allocate buffers, do packet copy. But practical usage for that is near the zero. IPC with coping has to be not part of ODP, it might be done on application level. With google proto buffers or pipe or what ever. From my point of view we should think about zero copy solution and forgot about other else. Thanks, Maxim.
> -----Original Message----- > From: ext Maxim Uvarov [mailto:maxim.uvarov@linaro.org] > Sent: Thursday, September 25, 2014 3:21 PM > To: Savolainen, Petri (NSN - FI/Espoo); lng-odp@lists.linaro.org > Subject: Re: [lng-odp] [PATCHv6] linux-generic: odp ipc implementation > > On 09/25/2014 03:50 PM, Savolainen, Petri (NSN - FI/Espoo) wrote: > > Pools or queues must not be shared - the "IPC interface" should be used > instead to move buffers from one side to another. Two options for the > interface: > > > > 1) application enqueues empty buffers to the interface, and > dequeue/schedule filled buffers from the interface > > > > 2) let interface to allocate buffers (and copy data), and just > dequeue/schedule filled buffers from the interface. This is how packet IO > interface works. > > > > -Petri > > Absolutely disagree with that. Queues and pools must be shared. Packets > have not be copied between threads or processes. Only reference > for packets should be transfered. I.e. it must be zero copy > implementation to reach maximum performance and allow to run scalable > applications. > > That is match more easy to allocate buffers, do packet copy. But > practical usage for that is near the zero. IPC with coping has to be not > part of ODP, > it might be done on application level. With google proto buffers or pipe > or what ever. From my point of view we should think about zero copy > solution > and forgot about other else. > > Thanks, > Maxim. Two instances cannot directly access each others pools/queues, but they can exchange buffers through "IPC interface" which may or may not copy the content depending on implementation (although CPU wouldn't notice much difference if copy is done in HW). The point is that buffers cannot be e.g. directly freed into a remote pool. User have to rely on "IPC interface" to either free it, or transmit it back to the remote side. Two instances need to operate on (logically) separate resources. Otherwise e.g. a bug in one ODP instance would crash/corrupt the other, and there would not be much point in having two instances and IPC in between them... -Petri
On 09/25/2014 05:38 PM, Savolainen, Petri (NSN - FI/Espoo) wrote: > >> -----Original Message----- >> From: ext Maxim Uvarov [mailto:maxim.uvarov@linaro.org] >> Sent: Thursday, September 25, 2014 3:21 PM >> To: Savolainen, Petri (NSN - FI/Espoo); lng-odp@lists.linaro.org >> Subject: Re: [lng-odp] [PATCHv6] linux-generic: odp ipc implementation >> >> On 09/25/2014 03:50 PM, Savolainen, Petri (NSN - FI/Espoo) wrote: >>> Pools or queues must not be shared - the "IPC interface" should be used >> instead to move buffers from one side to another. Two options for the >> interface: >>> 1) application enqueues empty buffers to the interface, and >> dequeue/schedule filled buffers from the interface >>> 2) let interface to allocate buffers (and copy data), and just >> dequeue/schedule filled buffers from the interface. This is how packet IO >> interface works. >>> -Petri >> Absolutely disagree with that. Queues and pools must be shared. Packets >> have not be copied between threads or processes. Only reference >> for packets should be transfered. I.e. it must be zero copy >> implementation to reach maximum performance and allow to run scalable >> applications. >> >> That is match more easy to allocate buffers, do packet copy. But >> practical usage for that is near the zero. IPC with coping has to be not >> part of ODP, >> it might be done on application level. With google proto buffers or pipe >> or what ever. From my point of view we should think about zero copy >> solution >> and forgot about other else. >> >> Thanks, >> Maxim. > Two instances cannot directly access each others pools/queues, but they can exchange buffers through "IPC interface" which may or may not copy the content depending on implementation (although CPU wouldn't notice much difference if copy is done in HW). The point is that buffers cannot be e.g. directly freed into a remote pool. User have to rely on "IPC interface" to either free it, or transmit it back to the remote side. > > Two instances need to operate on (logically) separate resources. Otherwise e.g. a bug in one ODP instance would crash/corrupt the other, and there would not be much point in having two instances and IPC in between them... > > -Petri > Not understand what is bad in having the same shared pool? Also argument "a bug in one ODP instance would crash/corrupt the other" sound silly. Why it should crash? Everything depend on target which we are going to reach. I my case I more thing about scalable ODP application across different cpus. Where threads can not be used. Maxim. > > >
The discussion on this thread is interesting. But to me, it really illuminates the need to agree on and describe the use cases, derive the requirements and propose (not implement but some prototyping allowed) a design which would satisfy the requirements and support the use cases. Not randomly exploring the (infinite) design space until we find a point where we can agree. -- Ola On 25 September 2014 18:01, Maxim Uvarov <maxim.uvarov@linaro.org> wrote: > On 09/25/2014 05:38 PM, Savolainen, Petri (NSN - FI/Espoo) wrote: > >> >> -----Original Message----- >>> From: ext Maxim Uvarov [mailto:maxim.uvarov@linaro.org] >>> Sent: Thursday, September 25, 2014 3:21 PM >>> To: Savolainen, Petri (NSN - FI/Espoo); lng-odp@lists.linaro.org >>> Subject: Re: [lng-odp] [PATCHv6] linux-generic: odp ipc implementation >>> >>> On 09/25/2014 03:50 PM, Savolainen, Petri (NSN - FI/Espoo) wrote: >>> >>>> Pools or queues must not be shared - the "IPC interface" should be used >>>> >>> instead to move buffers from one side to another. Two options for the >>> interface: >>> >>>> 1) application enqueues empty buffers to the interface, and >>>> >>> dequeue/schedule filled buffers from the interface >>> >>>> 2) let interface to allocate buffers (and copy data), and just >>>> >>> dequeue/schedule filled buffers from the interface. This is how packet IO >>> interface works. >>> >>>> -Petri >>>> >>> Absolutely disagree with that. Queues and pools must be shared. Packets >>> have not be copied between threads or processes. Only reference >>> for packets should be transfered. I.e. it must be zero copy >>> implementation to reach maximum performance and allow to run scalable >>> applications. >>> >>> That is match more easy to allocate buffers, do packet copy. But >>> practical usage for that is near the zero. IPC with coping has to be not >>> part of ODP, >>> it might be done on application level. With google proto buffers or pipe >>> or what ever. From my point of view we should think about zero copy >>> solution >>> and forgot about other else. >>> >>> Thanks, >>> Maxim. >>> >> Two instances cannot directly access each others pools/queues, but they >> can exchange buffers through "IPC interface" which may or may not copy the >> content depending on implementation (although CPU wouldn't notice much >> difference if copy is done in HW). The point is that buffers cannot be e.g. >> directly freed into a remote pool. User have to rely on "IPC interface" to >> either free it, or transmit it back to the remote side. >> >> Two instances need to operate on (logically) separate resources. >> Otherwise e.g. a bug in one ODP instance would crash/corrupt the other, and >> there would not be much point in having two instances and IPC in between >> them... >> >> -Petri >> >> > Not understand what is bad in having the same shared pool? Also argument > "a bug in one ODP instance would crash/corrupt the other" sound silly. Why > it should crash? Everything depend on target which we are going to reach. I > my case I more thing about scalable ODP application across different cpus. > Where threads can not be used. > > Maxim. > > > >> >> >> > > _______________________________________________ > lng-odp mailing list > lng-odp@lists.linaro.org > http://lists.linaro.org/mailman/listinfo/lng-odp >
On 25 September 2014 09:01, Maxim Uvarov <maxim.uvarov@linaro.org> wrote: > On 09/25/2014 05:38 PM, Savolainen, Petri (NSN - FI/Espoo) wrote: >> >> >>> -----Original Message----- >>> From: ext Maxim Uvarov [mailto:maxim.uvarov@linaro.org] >>> Sent: Thursday, September 25, 2014 3:21 PM >>> To: Savolainen, Petri (NSN - FI/Espoo); lng-odp@lists.linaro.org >>> Subject: Re: [lng-odp] [PATCHv6] linux-generic: odp ipc implementation >>> >>> On 09/25/2014 03:50 PM, Savolainen, Petri (NSN - FI/Espoo) wrote: >>>> >>>> Pools or queues must not be shared - the "IPC interface" should be used >>> >>> instead to move buffers from one side to another. Two options for the >>> interface: >>>> >>>> 1) application enqueues empty buffers to the interface, and >>> >>> dequeue/schedule filled buffers from the interface >>>> >>>> 2) let interface to allocate buffers (and copy data), and just >>> >>> dequeue/schedule filled buffers from the interface. This is how packet IO >>> interface works. >>>> >>>> -Petri >>> >>> Absolutely disagree with that. Queues and pools must be shared. Packets >>> have not be copied between threads or processes. Only reference >>> for packets should be transfered. I.e. it must be zero copy >>> implementation to reach maximum performance and allow to run scalable >>> applications. >>> >>> That is match more easy to allocate buffers, do packet copy. But >>> practical usage for that is near the zero. IPC with coping has to be not >>> part of ODP, >>> it might be done on application level. With google proto buffers or pipe >>> or what ever. From my point of view we should think about zero copy >>> solution >>> and forgot about other else. >>> >>> Thanks, >>> Maxim. >> >> Two instances cannot directly access each others pools/queues, but they >> can exchange buffers through "IPC interface" which may or may not copy the >> content depending on implementation (although CPU wouldn't notice much >> difference if copy is done in HW). The point is that buffers cannot be e.g. >> directly freed into a remote pool. User have to rely on "IPC interface" to >> either free it, or transmit it back to the remote side. >> >> Two instances need to operate on (logically) separate resources. Otherwise >> e.g. a bug in one ODP instance would crash/corrupt the other, and there >> would not be much point in having two instances and IPC in between them... >> >> -Petri >> > > Not understand what is bad in having the same shared pool? Also argument "a > bug in one ODP instance would crash/corrupt the other" sound silly. Why it > should crash? Everything depend on target which we are going to reach. Isolation is quite poor with designs that use shared memory. Here are few examples: o Supposed you have shared pool, and spin lock used to synchronize access to its internals Now consider case when one app crashes while holding such spin lock (and note crash could be triggered in another thread, that does not have anything to do with ODP). So your other application that shares pool with stuck forever. o In above example replace spin lock with mutex. Yes, linux supports robust mutex and someone can get notification when holder of mutex crashes but it does no help much, because normally crashed application was in the middle of some operation that need serialization so normally you have no idea "how to fix it" before recovering locked robust mutex and continue. o and so on ... One may try to avoid above issues with lockless algorithms but that drives implementation complexity to high. > I my > case I more thing about scalable ODP application across different cpus. > Where threads can not be used. Go back to Ola's point, what is real use case for it? Why threads cannot be used? Why you cannot do special shared memory pkt_io between to instances of ODP app as Petri suggested. That seems much simpler and clean, with possible shared memory issues encapsulated under shared mem pkt_io implementation. Do you have example where you can show that sharing ODP pools, queues brings significant difference? Thanks, Victor > Maxim. > > >> >> >> > > > _______________________________________________ > lng-odp mailing list > lng-odp@lists.linaro.org > http://lists.linaro.org/mailman/listinfo/lng-odp
On 09/26/2014 12:22 AM, Victor Kamensky wrote: > On 25 September 2014 09:01, Maxim Uvarov <maxim.uvarov@linaro.org> wrote: >> On 09/25/2014 05:38 PM, Savolainen, Petri (NSN - FI/Espoo) wrote: >>> >>>> -----Original Message----- >>>> From: ext Maxim Uvarov [mailto:maxim.uvarov@linaro.org] >>>> Sent: Thursday, September 25, 2014 3:21 PM >>>> To: Savolainen, Petri (NSN - FI/Espoo); lng-odp@lists.linaro.org >>>> Subject: Re: [lng-odp] [PATCHv6] linux-generic: odp ipc implementation >>>> >>>> On 09/25/2014 03:50 PM, Savolainen, Petri (NSN - FI/Espoo) wrote: >>>>> Pools or queues must not be shared - the "IPC interface" should be used >>>> instead to move buffers from one side to another. Two options for the >>>> interface: >>>>> 1) application enqueues empty buffers to the interface, and >>>> dequeue/schedule filled buffers from the interface >>>>> 2) let interface to allocate buffers (and copy data), and just >>>> dequeue/schedule filled buffers from the interface. This is how packet IO >>>> interface works. >>>>> -Petri >>>> Absolutely disagree with that. Queues and pools must be shared. Packets >>>> have not be copied between threads or processes. Only reference >>>> for packets should be transfered. I.e. it must be zero copy >>>> implementation to reach maximum performance and allow to run scalable >>>> applications. >>>> >>>> That is match more easy to allocate buffers, do packet copy. But >>>> practical usage for that is near the zero. IPC with coping has to be not >>>> part of ODP, >>>> it might be done on application level. With google proto buffers or pipe >>>> or what ever. From my point of view we should think about zero copy >>>> solution >>>> and forgot about other else. >>>> >>>> Thanks, >>>> Maxim. >>> Two instances cannot directly access each others pools/queues, but they >>> can exchange buffers through "IPC interface" which may or may not copy the >>> content depending on implementation (although CPU wouldn't notice much >>> difference if copy is done in HW). The point is that buffers cannot be e.g. >>> directly freed into a remote pool. User have to rely on "IPC interface" to >>> either free it, or transmit it back to the remote side. >>> >>> Two instances need to operate on (logically) separate resources. Otherwise >>> e.g. a bug in one ODP instance would crash/corrupt the other, and there >>> would not be much point in having two instances and IPC in between them... >>> >>> -Petri >>> >> Not understand what is bad in having the same shared pool? Also argument "a >> bug in one ODP instance would crash/corrupt the other" sound silly. Why it >> should crash? Everything depend on target which we are going to reach. > Isolation is quite poor with designs that use shared memory. Here are few > examples: > > o Supposed you have shared pool, and spin lock used to synchronize > access to its internals Now consider case when one app crashes while > holding such spin lock (and note crash could be triggered in another thread, > that does not have anything to do with ODP). So your other application > that shares pool with stuck forever. > > o In above example replace spin lock with mutex. Yes, linux supports > robust mutex and someone can get notification when holder of mutex crashes > but it does no help much, because normally crashed application was > in the middle of some operation that need serialization so normally you > have no idea "how to fix it" before recovering locked robust mutex and > continue. > > o and so on ... That is all understandable and expected case. If some dead code has to be not deadlock-able as possible. > One may try to avoid above issues with lockless algorithms but that > drives implementation complexity to high. > >> I my >> case I more thing about scalable ODP application across different cpus. >> Where threads can not be used. > Go back to Ola's point, what is real use case for it? There is IPC doc which has multiply use cases: https://docs.google.com/a/linaro.org/document/d/1bC0XHNGQMMtZq6aYaUaWU-tYi_DgOl4ePQ65XRxDOx8/edit#heading=h.kskoz435lvo2 > Why threads cannot > be used? I think you missed discussion about that. Number of test cases are: load balancing, snort, odp in VMs, different instances of odp for different platforms and etc. For my particular case I need that to scale Snort, which is single threaded. > Why you cannot do special shared memory pkt_io between to > instances of ODP app as Petri suggested. I did it. Please take a look at example in patch: ./example/ipc/odp_ipc.c /* create shared queue between processes*/ pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); then IPC queue: ipcq_def = odp_queue_create("shared-queue", ODP_QUEUE_TYPE_IPC, &qparam); The difference is only that Petri wants to copy packets and I did shared pool with zero copy. > That seems much simpler and > clean, with possible shared memory issues encapsulated under shared > mem pkt_io implementation. Do you have example where you can show > that sharing ODP pools, queues brings significant difference? Snort or any other single threaded apps. VMs, and what I explained above. Also please refer to IPC ARCH doc. Thanks, Maxim. > > Thanks, > Victor > >> Maxim. >> >> >>> >>> >> >> _______________________________________________ >> lng-odp mailing list >> lng-odp@lists.linaro.org >> http://lists.linaro.org/mailman/listinfo/lng-odp
diff --git a/.gitignore b/.gitignore index 2b9e4f5..428f06b 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,7 @@ lib/ obj/ build/ odp_example +odp_ipc odp_packet odp_packet_netmap odp_atomic diff --git a/configure.ac b/configure.ac index 102486d..3db5f59 100644 --- a/configure.ac +++ b/configure.ac @@ -153,6 +153,7 @@ AC_CONFIG_FILES([Makefile platform/linux-keystone2/Makefile platform/linux-dpdk/Makefile example/Makefile + example/ipc/Makefile example/generator/Makefile example/ipsec/Makefile example/l2fwd/Makefile diff --git a/example/Makefile.am b/example/Makefile.am index 72663b9..5bfd6d7 100644 --- a/example/Makefile.am +++ b/example/Makefile.am @@ -1 +1 @@ -SUBDIRS = generator ipsec l2fwd odp_example packet packet_netmap timer +SUBDIRS = generator ipsec l2fwd odp_example packet packet_netmap timer ipc diff --git a/example/ipc/Makefile.am b/example/ipc/Makefile.am new file mode 100644 index 0000000..2fd48f7 --- /dev/null +++ b/example/ipc/Makefile.am @@ -0,0 +1,6 @@ +include $(top_srcdir)/example/Makefile.inc + +bin_PROGRAMS = odp_ipc +odp_ipc_LDFLAGS = $(AM_LDFLAGS) -static + +dist_odp_ipc_SOURCES = odp_ipc.c diff --git a/example/ipc/README b/example/ipc/README new file mode 100644 index 0000000..57df942 --- /dev/null +++ b/example/ipc/README @@ -0,0 +1,56 @@ +/* Copyright (c) 2014, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + + ODP IPC example + +This example shows how to use queues to exchange packets between different +processes. + +Examples scheme: + + Ping (Machine 1) ----> odp_ipc app (Machine 2) + +Example burst mode: +./odp_ipc -i eth0 -m 1 -c 1 +On remote host run ping target that runs odp_ipc. + +[11492/1] enqueue 1 packets, first buf 7921 size 98/1856, cnt 1 +11490 no valid buffer + ring_thread() got buffer from IPC queue size 98/1856 +[11492/1] enqueue 1 packets, first buf 7905 size 98/1856, cnt 2 +11490 no valid buffer + ring_thread() got buffer from IPC queue size 98/1856 +[11492/1] enqueue 1 packets, first buf 7889 size 98/1856, cnt 3 +11490 no valid buffer + ring_thread() got buffer from IPC queue size 98/1856 +[11492/1] enqueue 1 packets, first buf 7873 size 98/1856, cnt 4 + + +Main PID/thread [11492/1] enqueues packets to IPC queue with odp_queue_enq_multi(), +child process thread ring_thread() dequeues packets from ipc queue. + + +Example queue mode: + +./odp_ipc -i eth0 -m 1 -c 1 +waiting for packet... +Enqueue the packet to ipc queue size 98/1856 +waiting for packet... +15917 no valid buffer + ring_thread() got buffer from IPC queue size 98/1856 +Enqueue the packet to ipc queue size 98/1856 +waiting for packet... +15917 no valid buffer + ring_thread() got buffer from IPC queue size 98/1856 +Enqueue the packet to ipc queue size 98/1856 +waiting for packet... +15917 no valid buffer + ring_thread() got buffer from IPC queue size 98/1856 +Enqueue the packet to ipc queue size 98/1856 +waiting for packet... + +Thread 15917 moves packets from ingress queue to IPC queue. Other process +in ring_thread() thread dequeues packets from IPC queue. diff --git a/example/ipc/odp_ipc.c b/example/ipc/odp_ipc.c new file mode 100644 index 0000000..0b5c636 --- /dev/null +++ b/example/ipc/odp_ipc.c @@ -0,0 +1,679 @@ +/* Copyright (c) 2014, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +/** + * @file + * + * @example odp_ipc.c ODP IPC queues example application + */ + +#include <stdlib.h> +#include <string.h> +#include <getopt.h> +#include <unistd.h> + +#include <odp.h> +#include <odph_linux.h> +#include <odph_packet.h> +#include <odph_eth.h> +#include <odph_ip.h> + +#define MAX_WORKERS 32 +#define SHM_PKT_POOL_SIZE (512*2048) +#define SHM_PKT_POOL_BUF_SIZE 1856 +#define MAX_PKT_BURST 16 + +#define APPL_MODE_PKT_BURST 0 +#define APPL_MODE_PKT_QUEUE 1 + +#define RING_SIZE 4096 +#define ODP_RING_NAMESIZE 32 + +#define PRINT_APPL_MODE(x) printf("%s(%i)\n", #x, (x)) + +/** Get rid of path in filename - only for unix-type paths using '/' */ +#define NO_PATH(file_name) (strrchr((file_name), '/') ? \ + strrchr((file_name), '/') + 1 : (file_name)) +/** + * Parsed command line application arguments + */ +typedef struct { + int core_count; + int if_count; /**< Number of interfaces to be used */ + char **if_names; /**< Array of pointers to interface names */ + int mode; /**< Packet IO mode */ + int type; /**< Packet IO type */ + int fanout; /**< Packet IO fanout */ + odp_buffer_pool_t pool; /**< Buffer pool for packet IO */ +} appl_args_t; + +/** + * Thread specific arguments + */ +typedef struct { + char *pktio_dev; /**< Interface name to use */ + odp_buffer_pool_t pool; /**< Buffer pool for packet IO */ + int mode; /**< Thread mode */ + int type; /**< Thread i/o type */ + int fanout; /**< Thread i/o fanout */ +} thread_args_t; + +/** + * Grouping of both parsed CL args and thread specific args - alloc together + */ +typedef struct { + /** Application (parsed) arguments */ + appl_args_t appl; + /** Thread specific arguments */ + thread_args_t thread[MAX_WORKERS]; +} args_t; + +/** Global pointer to args */ +static args_t *args; + +/* helper funcs */ +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len); +static void parse_args(int argc, char *argv[], appl_args_t *appl_args); +static void print_info(char *progname, appl_args_t *appl_args); +static void usage(char *progname); + +static void *ring_thread(void *arg ODP_UNUSED) +{ + int ret; + odp_buffer_t buf; + odp_buffer_pool_t pkt_pool; + odp_pktio_params_t pktio_ipc_params; + odp_pktio_t pktio_ipc; + odp_queue_t ipcq_def; + + printf("ODP RING THREAD PID %d\n", getpid()); + + pkt_pool = odp_buffer_pool_lookup("packet_pool"); + if (pkt_pool == ODP_BUFFER_POOL_INVALID) { + ODP_ERR("Error: pkt_pool not found\n"); + return NULL; + } + + /* create shared queue between processes*/ + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); + if (pktio_ipc == ODP_PKTIO_INVALID) { + ODP_ERR("Error: pktio create failed\n"); + return NULL; + } + + + while (1) { + ipcq_def = odp_queue_lookup("shared-queue"); + if (ipcq_def != ODP_QUEUE_INVALID) + break; + sleep(1); + } + printf("%s() shared-queue found\n", __func__); + + ret = odp_pktio_inq_setdef(pktio_ipc, ipcq_def); + if (ret != 0) { + ODP_ERR("Error: slave thread default ipc-Q setup\n"); + return NULL; + } + + /* In loop take packets from ipc queue and free this buffer */ + while (1) { + buf = odp_queue_deq(ipcq_def); + if (odp_unlikely(!odp_buffer_is_valid(buf))) + continue; + + printf("\t\t%s() got buffer from IPC queue size %ld/%ld\n", + __func__, + (unsigned long)odp_packet_get_len(buf), + (unsigned long)odp_buffer_size(buf)); + odp_buffer_free(buf); + } + + /* unreachable */ + return NULL; +} + + +/** + * Packet IO loopback worker thread using ODP queues + * + * @param arg thread arguments of type 'thread_args_t *' + */ +static void *pktio_queue_thread(void *arg) +{ + int thr; + odp_buffer_pool_t pkt_pool; + odp_pktio_t pktio; + odp_pktio_t pktio_ipc; + thread_args_t *thr_args; + odp_queue_t inq_def; + odp_queue_t ipcq_def; + char inq_name[ODP_QUEUE_NAME_LEN]; + odp_queue_param_t qparam; + odp_buffer_t buf; + int ret; + odp_pktio_params_t params; + odp_pktio_params_t pktio_ipc_params; + socket_params_t *sock_params = ¶ms.sock_params; + + thr_args = arg; + + thr = odp_thread_id(); + + printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr, + thr_args->pktio_dev); + + /* lookup ring from its name */ + /* Lookup the packet pool */ + pkt_pool = odp_buffer_pool_lookup("packet_pool"); + if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) { + ODP_ERR(" [%02i] Error: pkt_pool not found\n", thr); + return NULL; + } + + /* Open a packet IO instance for this thread */ + sock_params->type = thr_args->type; + sock_params->fanout = thr_args->fanout; + pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, ¶ms); + if (pktio == ODP_PKTIO_INVALID) { + ODP_ERR(" [%02i] Error: pktio create failed\n", thr); + return NULL; + } + + /* + * Create and set the default INPUT queue associated with the 'pktio' + * resource + */ + qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT; + qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC; + qparam.sched.group = ODP_SCHED_GROUP_DEFAULT; + snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio); + inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0'; + + inq_def = odp_queue_create(inq_name, ODP_QUEUE_TYPE_PKTIN, &qparam); + if (inq_def == ODP_QUEUE_INVALID) { + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); + return NULL; + } + + ret = odp_pktio_inq_setdef(pktio, inq_def); + if (ret != 0) { + ODP_ERR(" [%02i] Error: default input-Q setup\n", thr); + return NULL; + } + + printf(" [%02i] created pktio:%02i, queue mode (ATOMIC queues)\n" + " default pktio%02i-INPUT queue:%u\n", + thr, pktio, pktio, inq_def); + + /* create shared queue between processes*/ + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); + if (pktio_ipc == ODP_PKTIO_INVALID) { + ODP_ERR(" [%02i] Error: pktio create failed\n", thr); + return NULL; + } + ipcq_def = odp_queue_create("shared-queue", + ODP_QUEUE_TYPE_IPC, &qparam); + if (ipcq_def == ODP_QUEUE_INVALID) { + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); + return NULL; + } + + /* In loop take packets from inq queue and put them to ipc queue */ + for (;;) { + /* Use schedule to get buf from any input queue */ + printf("waiting for packet...\n"); + buf = odp_schedule(NULL, ODP_SCHED_WAIT); + + printf("Enqueue the packet to ipc queue size %ld/%ld\n", + (unsigned long)odp_packet_get_len(buf), + (unsigned long)odp_buffer_size(buf)); + + odp_queue_enq(ipcq_def, buf); + } + +/* unreachable */ +} + +/** + * Packet IO loopback worker thread using bursts from/to IO resources + * + * @param arg thread arguments of type 'thread_args_t *' + */ +static void *pktio_ifburst_thread(void *arg) +{ + int thr; + odp_buffer_pool_t pkt_pool; + odp_pktio_t pktio; + thread_args_t *thr_args; + int pkts, pkts_ok; + odp_packet_t pkt_tbl[MAX_PKT_BURST]; + unsigned long pkt_cnt = 0; + unsigned long err_cnt = 0; + odp_pktio_params_t params; + socket_params_t *sock_params = ¶ms.sock_params; + int ret; + + odp_pktio_t pktio_ipc; + odp_queue_t ipcq_def; + char inq_name[ODP_QUEUE_NAME_LEN]; + odp_queue_param_t qparam; + odp_pktio_params_t pktio_ipc_params; + + thr = odp_thread_id(); + thr_args = arg; + + printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr, + thr_args->pktio_dev); + + /* Lookup the packet pool */ + pkt_pool = odp_buffer_pool_lookup("packet_pool"); + if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) { + ODP_ERR(" [%02i] Error: pkt_pool not found\n", thr); + return NULL; + } + + /* Open a packet IO instance for this thread */ + sock_params->type = thr_args->type; + sock_params->fanout = thr_args->fanout; + pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, ¶ms); + if (pktio == ODP_PKTIO_INVALID) { + ODP_ERR(" [%02i] Error: pktio create failed.\n", thr); + return NULL; + } + + printf(" [%02i] created pktio:%02i, burst mode\n", + thr, pktio); + + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); + if (pktio_ipc == ODP_PKTIO_INVALID) { + ODP_ERR(" [%02i] Error: pktio create failed\n", thr); + return NULL; + } + + qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT; + qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC; + qparam.sched.group = ODP_SCHED_GROUP_DEFAULT; + snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio); + inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0'; + + ipcq_def = odp_queue_create("shared-queue", + ODP_QUEUE_TYPE_IPC, &qparam); + if (ipcq_def == ODP_QUEUE_INVALID) { + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); + return NULL; + } + + /* Loop packets */ + for (;;) { + pkts = odp_pktio_recv(pktio, pkt_tbl, MAX_PKT_BURST); + if (pkts > 0) { + /* Drop packets with errors */ + pkts_ok = drop_err_pkts(pkt_tbl, pkts); + if (pkts_ok > 0) { + ret = odp_queue_enq_multi(ipcq_def, + pkt_tbl, pkts_ok); + pkt_cnt += pkts_ok; + if (ret != 0) { + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n"); + } else { + printf("[%d/%d] enqueue %d packets, first buf %d size %ld/%ld, cnt %lu\n", + getpid(), thr, pkts_ok, + pkt_tbl[0], + (unsigned long)odp_packet_get_len(pkt_tbl[0]), + (unsigned long)odp_buffer_size(pkt_tbl[0]), + pkt_cnt); + } + } + + if (odp_unlikely(pkts_ok != pkts)) + ODP_ERR("Dropped frames:%u - err_cnt:%lu\n", + pkts-pkts_ok, ++err_cnt); + } + } + +/* unreachable */ +} + +/** + * ODP packet example main function + */ +int main(int argc, char *argv[]) +{ + odph_linux_pthread_t thread_tbl[MAX_WORKERS]; + odp_buffer_pool_t pool; + int thr_id; + int num_workers; + void *pool_base; + int i; + int first_core; + int core_count; + + /* Init ODP before calling anything else */ + if (odp_init_global()) { + ODP_ERR("Error: ODP global init failed.\n"); + exit(EXIT_FAILURE); + } + + args = malloc(sizeof(args_t)); + if (args == NULL) { + ODP_ERR("Error: shared mem alloc failed.\n"); + exit(EXIT_FAILURE); + } + memset(args, 0, sizeof(*args)); + + /* Parse and store the application arguments */ + parse_args(argc, argv, &args->appl); + + /* Print both system and application information */ + print_info(NO_PATH(argv[0]), &args->appl); + + core_count = odp_sys_core_count(); + num_workers = core_count; + + if (args->appl.core_count) + num_workers = args->appl.core_count; + + if (num_workers > MAX_WORKERS) + num_workers = MAX_WORKERS; + + printf("Num worker threads: %i\n", num_workers); + + /* + * By default core #0 runs Linux kernel background tasks. + * Start mapping thread from core #1 + */ + first_core = 1; + + if (core_count == 1) + first_core = 0; + + printf("First core: %i\n\n", first_core); + + /* Init this thread */ + thr_id = odp_thread_create(0); + odp_init_local(thr_id); + + /* Create packet pool */ + odp_shm_t shm = odp_shm_reserve("shm_packet_pool", + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE, + ODP_SHM_PROC); + + pool_base = odp_shm_addr(shm); + if (pool_base == NULL) { + ODP_ERR("Error: packet pool mem alloc failed.\n"); + exit(EXIT_FAILURE); + } + + pool = odp_buffer_pool_create("packet_pool", pool_base, + SHM_PKT_POOL_SIZE, + SHM_PKT_POOL_BUF_SIZE, + ODP_CACHE_LINE_SIZE, + ODP_BUFFER_TYPE_PACKET); + if (pool == ODP_BUFFER_POOL_INVALID) { + ODP_ERR("Error: packet pool create failed.\n"); + exit(EXIT_FAILURE); + } + odp_buffer_pool_print(pool); + + + /* Create another process */ + int f = fork(); + + /* Create and init worker threads */ + memset(thread_tbl, 0, sizeof(thread_tbl)); + for (i = 0; i < num_workers; ++i) { + void *(*thr_run_func) (void *); + int core; + int if_idx; + + core = (first_core + i) % core_count; + + if_idx = i % args->appl.if_count; + + args->thread[i].pktio_dev = args->appl.if_names[if_idx]; + args->thread[i].pool = pool; + args->thread[i].mode = args->appl.mode; + args->thread[i].type = args->appl.type; + args->thread[i].fanout = args->appl.fanout; + + if (f) { + thr_run_func = ring_thread; + } else { + if (args->appl.mode == APPL_MODE_PKT_BURST) + thr_run_func = pktio_ifburst_thread; + else /* APPL_MODE_PKT_QUEUE */ + thr_run_func = pktio_queue_thread; + } + /* + * Create threads one-by-one instead of all-at-once, + * because each thread might get different arguments. + * Calls odp_thread_create(cpu) for each thread + */ + odph_linux_pthread_create(thread_tbl, 1, core, thr_run_func, + &args->thread[i]); + } + + /* Master thread waits for other threads to exit */ + odph_linux_pthread_join(thread_tbl, num_workers); + + printf("Exit\n\n"); + + return 0; +} + +/** + * Drop packets which input parsing marked as containing errors. + * + * Frees packets with error and modifies pkt_tbl[] to only contain packets with + * no detected errors. + * + * @param pkt_tbl Array of packet + * @param len Length of pkt_tbl[] + * + * @return Number of packets with no detected error + */ +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len) +{ + odp_packet_t pkt; + unsigned pkt_cnt = len; + unsigned i, j; + + for (i = 0, j = 0; i < len; ++i) { + pkt = pkt_tbl[i]; + + if (odp_unlikely(odp_packet_error(pkt))) { + odph_packet_free(pkt); /* Drop */ + pkt_cnt--; + } else if (odp_unlikely(i != j++)) { + pkt_tbl[j-1] = pkt; + } + } + + return pkt_cnt; +} + +/** + * Parse and store the command line arguments + * + * @param argc argument count + * @param argv[] argument vector + * @param appl_args Store application arguments here + */ +static void parse_args(int argc, char *argv[], appl_args_t *appl_args) +{ + int opt; + int long_index; + char *names, *str, *token, *save; + int i; + int len; + static struct option longopts[] = { + {"count", required_argument, NULL, 'c'}, + {"interface", required_argument, NULL, 'i'}, /* return 'i' */ + {"mode", required_argument, NULL, 'm'}, /* return 'm' */ + {"help", no_argument, NULL, 'h'}, /* return 'h' */ + {NULL, 0, NULL, 0} + }; + + appl_args->mode = -1; /* Invalid, must be changed by parsing */ + appl_args->type = 3; /* 3: ODP_PKTIO_TYPE_SOCKET_MMAP */ + appl_args->fanout = 1; /* turn off fanout by default for mmap */ + + while (1) { + opt = getopt_long(argc, argv, "+c:i:m:t:f:h", + longopts, &long_index); + + if (opt == -1) + break; /* No more options */ + + switch (opt) { + case 'c': + appl_args->core_count = atoi(optarg); + break; + /* parse packet-io interface names */ + case 'i': + len = strlen(optarg); + if (len == 0) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + len += 1; /* add room for '\0' */ + + names = malloc(len); + if (names == NULL) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + /* count the number of tokens separated by ',' */ + strcpy(names, optarg); + for (str = names, i = 0;; str = NULL, i++) { + token = strtok_r(str, ",", &save); + if (token == NULL) + break; + } + appl_args->if_count = i; + + if (appl_args->if_count == 0) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + /* allocate storage for the if names */ + appl_args->if_names = + calloc(appl_args->if_count, sizeof(char *)); + + /* store the if names (reset names string) */ + strcpy(names, optarg); + for (str = names, i = 0;; str = NULL, i++) { + token = strtok_r(str, ",", &save); + if (token == NULL) + break; + appl_args->if_names[i] = token; + } + break; + + case 'm': + i = atoi(optarg); + if (i == 0) + appl_args->mode = APPL_MODE_PKT_BURST; + else + appl_args->mode = APPL_MODE_PKT_QUEUE; + break; + + case 't': + appl_args->type = atoi(optarg); + break; + + case 'f': + appl_args->fanout = atoi(optarg); + break; + + case 'h': + usage(argv[0]); + exit(EXIT_SUCCESS); + break; + + default: + break; + } + } + + if (appl_args->if_count == 0 || appl_args->mode == -1) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + optind = 1; /* reset 'extern optind' from the getopt lib */ +} + +/** + * Print system and application info + */ +static void print_info(char *progname, appl_args_t *appl_args) +{ + int i; + + printf("\n" + "ODP system info\n" + "---------------\n" + "ODP API version: %s\n" + "CPU model: %s\n" + "CPU freq (hz): %"PRIu64"\n" + "Cache line size: %i\n" + "Core count: %i\n" + "\n", + odp_version_api_str(), odp_sys_cpu_model_str(), odp_sys_cpu_hz(), + odp_sys_cache_line_size(), odp_sys_core_count()); + + printf("Running ODP appl: \"%s\"\n" + "-----------------\n" + "IF-count: %i\n" + "Using IFs: ", + progname, appl_args->if_count); + for (i = 0; i < appl_args->if_count; ++i) + printf(" %s", appl_args->if_names[i]); + printf("\n" + "Mode: "); + if (appl_args->mode == APPL_MODE_PKT_BURST) + PRINT_APPL_MODE(APPL_MODE_PKT_BURST); + else + PRINT_APPL_MODE(APPL_MODE_PKT_QUEUE); + printf("\n\n"); + fflush(NULL); +} + +/** + * Prinf usage information + */ +static void usage(char *progname) +{ + printf("\n" + "Usage: %s OPTIONS\n" + " E.g. %s -i eth1,eth2,eth3 -m 0\n" + "\n" + "OpenDataPlane example application.\n" + "\n" + "Mandatory OPTIONS:\n" + " -i, --interface Eth interfaces (comma-separated, no spaces)\n" + " -m, --mode 0: Burst send&receive packets (no queues)\n" + " 1: Send&receive packets through ODP queues.\n" + " -t, --type 1: ODP_PKTIO_TYPE_SOCKET_BASIC\n" + " 2: ODP_PKTIO_TYPE_SOCKET_MMSG\n" + " 3: ODP_PKTIO_TYPE_SOCKET_MMAP\n" + " 4: ODP_PKTIO_TYPE_NETMAP\n" + " Default: 3: ODP_PKTIO_TYPE_SOCKET_MMAP\n" + " -f, --fanout 0: off 1: on (Default 1: on)\n" + "\n" + "Optional OPTIONS\n" + " -c, --count <number> Core count.\n" + " -h, --help Display help and exit.\n" + "\n", NO_PATH(progname), NO_PATH(progname) + ); +} diff --git a/helper/include/odph_ring.h b/helper/include/odph_ring.h index 76c1db8..1d81b5f 100644 --- a/helper/include/odph_ring.h +++ b/helper/include/odph_ring.h @@ -158,6 +158,8 @@ typedef struct odph_ring { #define ODPH_RING_F_SP_ENQ 0x0001 /* The default enqueue is "single-producer".*/ #define ODPH_RING_F_SC_DEQ 0x0002 /* The default dequeue is "single-consumer".*/ +#define ODPH_RING_SHM_PROC 0x0004 /* If set - ring is visible from different + processes. Default is thread visible. */ #define ODPH_RING_QUOT_EXCEED (1 << 31) /* Quota exceed for burst ops */ #define ODPH_RING_SZ_MASK (unsigned)(0x0fffffff) /* Ring size mask */ diff --git a/platform/linux-generic/.dirstamp b/platform/linux-generic/.dirstamp new file mode 100644 index 0000000..e69de29 diff --git a/platform/linux-generic/include/api/odp_pktio_types.h b/platform/linux-generic/include/api/odp_pktio_types.h index 54ce459..79c0f48 100644 --- a/platform/linux-generic/include/api/odp_pktio_types.h +++ b/platform/linux-generic/include/api/odp_pktio_types.h @@ -30,6 +30,7 @@ typedef enum { ODP_PKTIO_TYPE_SOCKET_MMSG, ODP_PKTIO_TYPE_SOCKET_MMAP, ODP_PKTIO_TYPE_NETMAP, + ODP_PKTIO_TYPE_IPC, } odp_pktio_type_t; #include <odp_pktio_socket.h> diff --git a/platform/linux-generic/include/api/odp_queue.h b/platform/linux-generic/include/api/odp_queue.h index 5e083f1..4700a62 100644 --- a/platform/linux-generic/include/api/odp_queue.h +++ b/platform/linux-generic/include/api/odp_queue.h @@ -44,6 +44,8 @@ typedef int odp_queue_type_t; #define ODP_QUEUE_TYPE_POLL 1 /**< Not scheduled queue */ #define ODP_QUEUE_TYPE_PKTIN 2 /**< Packet input queue */ #define ODP_QUEUE_TYPE_PKTOUT 3 /**< Packet output queue */ +#define ODP_QUEUE_TYPE_IPC 4 /**< Packet ipc queue */ +#define ODP_QUEUE_TYPE_IPC_LOOKUP 5 /**< Packet ipc queue */ /** * ODP schedule priority diff --git a/platform/linux-generic/include/api/odp_shared_memory.h b/platform/linux-generic/include/api/odp_shared_memory.h index 7ad29c3..ce0e89c 100644 --- a/platform/linux-generic/include/api/odp_shared_memory.h +++ b/platform/linux-generic/include/api/odp_shared_memory.h @@ -96,6 +96,7 @@ void *odp_shm_addr(odp_shm_t shm); */ int odp_shm_info(odp_shm_t shm, odp_shm_info_t *info); +int odp_shm_lookup_ipc(const char *name); /** * Print all shared memory blocks diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h index 881cc5f..77fff96 100644 --- a/platform/linux-generic/include/odp_packet_io_internal.h +++ b/platform/linux-generic/include/odp_packet_io_internal.h @@ -35,6 +35,7 @@ struct pktio_entry { #ifdef ODP_HAVE_NETMAP pkt_netmap_t pkt_nm; /**< using netmap API for IO */ #endif + odp_buffer_pool_t pool; /**< reference to packet pool */ }; typedef union { diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 8b6c517..077fafb 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -23,6 +23,7 @@ extern "C" { #include <odp_packet_io.h> #include <odp_align.h> +#include <odph_ring.h> #define USE_TICKETLOCK @@ -39,6 +40,9 @@ extern "C" { #define QUEUE_STATUS_NOTSCHED 2 #define QUEUE_STATUS_SCHED 3 +#define QUEUE_IPC_ENTRIES 4096 /**< number of odp buffers in + odp ring queue */ + /* forward declaration */ union queue_entry_u; @@ -65,13 +69,13 @@ struct queue_entry_s { deq_func_t dequeue; enq_multi_func_t enqueue_multi; deq_multi_func_t dequeue_multi; - odp_queue_t handle; odp_buffer_t sched_buf; odp_queue_type_t type; odp_queue_param_t param; odp_pktio_t pktin; odp_pktio_t pktout; + odph_ring_t *r; /* odph_ring ref for ipc queue */ char name[ODP_QUEUE_NAME_LEN]; }; @@ -84,10 +88,18 @@ typedef union queue_entry_u { queue_entry_t *get_qentry(uint32_t queue_id); int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); +int queue_enq_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); + odp_buffer_hdr_t *queue_deq(queue_entry_t *queue); +odp_buffer_hdr_t *queue_deq_ipc(queue_entry_t *queue); int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); +int queue_enq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], + int num); + int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); +int queue_deq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], + int num); void queue_lock(queue_entry_t *queue); void queue_unlock(queue_entry_t *queue); diff --git a/platform/linux-generic/include/odp_shared_memory_internal.h b/platform/linux-generic/include/odp_shared_memory_internal.h new file mode 100644 index 0000000..2bcf966 --- /dev/null +++ b/platform/linux-generic/include/odp_shared_memory_internal.h @@ -0,0 +1,35 @@ +/* Copyright (c) 2014, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + + +/** + * @file + * + * ODP shared memory internal + */ + +#ifndef ODP_SHARED_MEMORY_INTERNAL_H_ +#define ODP_SHARED_MEMORY_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <odp_shared_memory.h> + +/*Extend odp_shm_e: + * ODP_SHM_PROC_NOCREAT - Memory accessible by processes has to be created + * before real usage. + */ +#define ODP_SHM_PROC_NOCREAT (ODP_SHM_PROC + 1) +odp_shm_t plat_odp_shm_reserve(const char *name, uint64_t size, uint64_t align, + uint32_t flag); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c index 06d8935..65b1a42 100644 --- a/platform/linux-generic/odp_packet_io.c +++ b/platform/linux-generic/odp_packet_io.c @@ -132,6 +132,8 @@ static void init_pktio_entry(pktio_entry_t *entry, odp_pktio_params_t *params) memset(&entry->s.pkt_nm, 0, sizeof(entry->s.pkt_nm)); break; #endif + case ODP_PKTIO_TYPE_IPC: + break; default: ODP_ERR("Packet I/O type not supported. Please recompile\n"); break; @@ -197,6 +199,8 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_buffer_pool_t pool, ODP_DBG("Allocating netmap pktio\n"); break; #endif + case ODP_PKTIO_TYPE_IPC: + break; default: ODP_ERR("Invalid pktio type: %02x\n", params->type); return ODP_PKTIO_INVALID; @@ -242,6 +246,9 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_buffer_pool_t pool, } break; #endif + case ODP_PKTIO_TYPE_IPC: + pktio_entry->s.pool = pool; + break; default: free_pktio_entry(id); id = ODP_PKTIO_INVALID; @@ -384,11 +391,22 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t queue) pktio_entry_t *pktio_entry = get_entry(id); queue_entry_t *qentry = queue_to_qentry(queue); - if (pktio_entry == NULL || qentry == NULL) + if (pktio_entry == NULL || qentry == NULL) { + ODP_ERR("%s() return -q reason %p -- %p\n", + __func__, + pktio_entry, qentry); return -1; + } - if (qentry->s.type != ODP_QUEUE_TYPE_PKTIN) + switch (qentry->s.type) { + case ODP_QUEUE_TYPE_PKTIN: + case ODP_QUEUE_TYPE_IPC: + case ODP_QUEUE_TYPE_IPC_LOOKUP: + break; + default: + ODP_ERR("%s() type is %d\n", __func__, qentry->s.type); return -1; + } lock_entry(pktio_entry); pktio_entry->s.inq_default = queue; @@ -399,6 +417,11 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t queue) qentry->s.status = QUEUE_STATUS_SCHED; queue_unlock(qentry); + if (qentry->s.type == ODP_QUEUE_TYPE_IPC) + return 0; + if (qentry->s.type == ODP_QUEUE_TYPE_IPC_LOOKUP) + return 0; + odp_schedule_queue(queue, qentry->s.param.sched.prio); return 0; diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 1318bcd..e821e9f 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -13,6 +13,7 @@ #include <odp_buffer_pool_internal.h> #include <odp_internal.h> #include <odp_shared_memory.h> +#include <odp_shared_memory_internal.h> #include <odp_schedule_internal.h> #include <odp_config.h> #include <odp_packet_io_internal.h> @@ -21,6 +22,11 @@ #include <odp_hints.h> #include <odp_sync.h> +#include <odph_ring.h> + +#include <sys/types.h> +#include <unistd.h> + #ifdef USE_TICKETLOCK #include <odp_ticketlock.h> #define LOCK(a) odp_ticketlock_lock(a) @@ -34,7 +40,7 @@ #endif #include <string.h> - +#include <stdlib.h> typedef struct queue_table_t { queue_entry_t queue[ODP_CONFIG_QUEUES]; @@ -77,6 +83,37 @@ static void queue_init(queue_entry_t *queue, const char *name, queue->s.enqueue_multi = pktout_enq_multi; queue->s.dequeue_multi = pktout_deq_multi; break; + case ODP_QUEUE_TYPE_IPC: + queue->s.r = odph_ring_lookup(name); + if (!queue->s.r) { + queue->s.r = odph_ring_create(name, + QUEUE_IPC_ENTRIES, + ODPH_RING_SHM_PROC); + if (queue->s.r == NULL) + ODP_ERR("ring create failed\n"); + } + queue->s.enqueue = queue_enq_ipc; + queue->s.dequeue = queue_deq_ipc; + queue->s.enqueue_multi = queue_enq_multi_ipc; + queue->s.dequeue_multi = queue_deq_multi_ipc; + break; + case ODP_QUEUE_TYPE_IPC_LOOKUP: + if (odp_shm_lookup_ipc(name) == 1) { + size_t ring_size = QUEUE_IPC_ENTRIES * sizeof(void *) + + sizeof(odph_ring_t); + odp_shm_t shm = plat_odp_shm_reserve(name, ring_size, + ODP_CACHE_LINE_SIZE, + ODP_SHM_PROC_NOCREAT); + + queue->s.r = odp_shm_addr(shm); + if (queue->s.r == NULL) + ODP_ERR("LOOKUP ring create failed\n"); + } + queue->s.enqueue = queue_enq_ipc; + queue->s.dequeue = queue_deq_ipc; + queue->s.enqueue_multi = queue_enq_multi_ipc; + queue->s.dequeue_multi = queue_deq_multi_ipc; + break; default: queue->s.enqueue = queue_enq; queue->s.dequeue = queue_deq; @@ -94,16 +131,15 @@ static void queue_init(queue_entry_t *queue, const char *name, int odp_queue_init_global(void) { uint32_t i; - odp_shm_t shm; ODP_DBG("Queue init ... "); - shm = odp_shm_reserve("odp_queues", - sizeof(queue_table_t), - sizeof(queue_entry_t), 0); - - queue_tbl = odp_shm_addr(shm); - + /* Use malloc to allocate queues table instead of + * odp_shm_reserve() because queues are implemented via + * pointer lists which are differ in different VM space, + * for example in other forked process. + */ + queue_tbl = malloc(sizeof(queue_table_t)); if (queue_tbl == NULL) return -1; @@ -116,6 +152,11 @@ int odp_queue_init_global(void) queue->s.handle = queue_from_id(i); } + /* for linux-generic IPC queue implemented totaly in + * software using odp_ring. + */ + odph_ring_tailq_init(); + ODP_DBG("done\n"); ODP_DBG("Queue init global\n"); ODP_DBG(" struct queue_entry_s size %zu\n", @@ -246,6 +287,27 @@ odp_queue_t odp_queue_lookup(const char *name) UNLOCK(&queue->s.lock); } + /* do look up for shared memory object if exist return that queue*/ + odph_ring_t *r; + + r = odph_ring_lookup(name); + if (r == NULL) { + if (odp_shm_lookup_ipc(name) == 1) { + /* Create local IPC queue connected to shm object */ + odp_queue_t q = odp_queue_create(name, + ODP_QUEUE_TYPE_IPC_LOOKUP, + NULL); + if (q != ODP_QUEUE_INVALID) + return q; + } + } else { + /* odp ring is in odp_ring_list. That means current process + * already created link with such name. That might be ipc + * queue or ring itself. For now print error here. + */ + ODP_ERR("odp ring with name: \"%s\" already initialized\n", name); + } + return ODP_QUEUE_INVALID; } @@ -279,6 +341,38 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) return 0; } +int queue_enq_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) +{ + int ret; + odph_ring_t *r = queue->s.r; + odp_buffer_bits_t handle; + uint32_t index = buf_hdr->handle.index; + uint32_t pool_id = buf_hdr->handle.pool_id; + odp_buffer_t buf; + void **rbuf_p; + + /* get buffer from buf_hdr */ + handle.index = index; + handle.pool_id = pool_id; + + buf = handle.u32; + + rbuf_p = (void *)&buf; + /* use odp_ring locks instead of per process queue lock + * LOCK(&queue->s.lock); + */ + /* queue buffer to the ring. Note: we can't use pointer to buf_hdr + * here due to poiter will be referenced in different porocess + */ + ret = odph_ring_mp_enqueue_bulk(r, rbuf_p, 1); + if (ret != 0) + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n"); + /* + * UNLOCK(&queue->s.lock); + */ + return 0; +} + int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { @@ -314,6 +408,45 @@ int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) return 0; } +int queue_enq_multi_ipc(queue_entry_t *queue, + odp_buffer_hdr_t *buf_hdr[], int num) +{ + int i; + int ret = 0; + odph_ring_t *r = queue->s.r; + odp_buffer_bits_t handle; + odp_buffer_t buf; + void **rbuf_p; + + /* use odp_ring locks instead of per process queue lock + * LOCK(&queue->s.lock); + */ + + /* odp_buffer_t buffers can be in not continius memory, + * so queue them to IPC ring one by one. + */ + for (i = 0; i < num; i++) { + handle.index = buf_hdr[i]->handle.index; + handle.pool_id = buf_hdr[i]->handle.pool_id; + + buf = handle.u32; + + rbuf_p = (void *)&buf; + + /* queue buffer to the ring. Note: we can't use pointer + * to buf_hdr here due to poiter will be referenced in + * different porocess. + */ + ret += odph_ring_mp_enqueue_bulk(r, rbuf_p, 1); + if (ret != 0) + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n"); + } + /* + * UNLOCK(&queue->s.lock); + */ + + return ret; +} int odp_queue_enq_multi(odp_queue_t handle, odp_buffer_t buf[], int num) { @@ -372,6 +505,72 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) return buf_hdr; } +odp_buffer_hdr_t *queue_deq_ipc(queue_entry_t *queue) +{ + odp_buffer_hdr_t *buf_hdr = NULL; + odph_ring_t *r = queue->s.r; + int ret; + odp_buffer_t buf; + void **buf_p = (void *)&buf; + + /* using odp_ring lock + * LOCK(&queue->s.lock); + */ + ret = odph_ring_mc_dequeue_bulk(r, buf_p, 1); + if (ret == 0) + buf_hdr = odp_buf_to_hdr(buf); + /* + * UNLOCK(&queue->s.lock); + */ + + return buf_hdr; +} + +int queue_deq_multi_ipc(queue_entry_t *queue, + odp_buffer_hdr_t *buf_hdr[], int num) +{ + int i = 0; + odph_ring_t *r = queue->s.r; + int ret; + odp_buffer_t buf; + odp_buffer_t ipcbufs[QUEUE_IPC_ENTRIES]; + void **ipcbufs_p = (void *)&ipcbufs; + + /* use odp ring lock + * LOCK(&queue->s.lock); + */ + + if (queue->s.head == NULL) { + /* Already empty queue */ + } else { + odp_buffer_hdr_t *hdr = queue->s.head; + + ret = odph_ring_mc_dequeue_bulk(r, ipcbufs_p, num); + if (ret == 0) { + for (; i < num && hdr; i++) { + memcpy(&buf, (void *)ipcbufs_p[i], + sizeof(odp_buffer_t)); + + buf_hdr[i] = odp_buf_to_hdr(buf); + hdr = hdr->next; + buf_hdr[i]->next = NULL; + } + } + + queue->s.head = hdr; + + if (hdr == NULL) { + /* Queue is now empty */ + queue->s.tail = NULL; + } + } + + /* use odp_ring lock + * UNLOCK(&queue->s.lock); + */ + + return i; +} int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { diff --git a/platform/linux-generic/odp_ring.c b/platform/linux-generic/odp_ring.c index 632aa66..385ce01 100644 --- a/platform/linux-generic/odp_ring.c +++ b/platform/linux-generic/odp_ring.c @@ -158,8 +158,14 @@ odph_ring_create(const char *name, unsigned count, unsigned flags) char ring_name[ODPH_RING_NAMESIZE]; odph_ring_t *r; size_t ring_size; + uint32_t shm_flag; odp_shm_t shm; + if (flags & ODPH_RING_SHM_PROC) + shm_flag = ODP_SHM_PROC; + else + shm_flag = 0; + /* count must be a power of 2 */ if (!ODP_VAL_IS_POWER_2(count) || (count > ODPH_RING_SZ_MASK)) { ODP_ERR("Requested size is invalid, must be power of 2, and do not exceed the size limit %u\n", @@ -172,10 +178,10 @@ odph_ring_create(const char *name, unsigned count, unsigned flags) odp_rwlock_write_lock(&qlock); /* reserve a memory zone for this ring.*/ - shm = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE, 0); + shm = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE, + shm_flag); r = odp_shm_addr(shm); - if (r != NULL) { /* init the ring structure */ snprintf(r->name, sizeof(r->name), "%s", name); diff --git a/platform/linux-generic/odp_shared_memory.c b/platform/linux-generic/odp_shared_memory.c index 1898a34..4631d0b 100644 --- a/platform/linux-generic/odp_shared_memory.c +++ b/platform/linux-generic/odp_shared_memory.c @@ -10,15 +10,21 @@ #include <odp_align.h> #include <odp_system_info.h> #include <odp_debug.h> +#include <odp_shared_memory_internal.h> #include <unistd.h> + #include <sys/mman.h> #include <asm/mman.h> #include <fcntl.h> +#include <unistd.h> +#include <sys/types.h> #include <stdio.h> #include <string.h> +#include <odph_ring.h> +#include <stdlib.h> #define ODP_SHM_NUM_BLOCKS 32 @@ -74,9 +80,8 @@ int odp_shm_init_global(void) ODP_DBG("NOTE: mmap does not support huge pages\n"); #endif - addr = mmap(NULL, sizeof(odp_shm_table_t), - PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); - + /* malloc instead of mmap to bind table to process. */ + addr = malloc(sizeof(odp_shm_table_t)); if (addr == MAP_FAILED) return -1; @@ -113,7 +118,7 @@ static int find_block(const char *name, uint32_t *index) } -odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, +odp_shm_t plat_odp_shm_reserve(const char *name, uint64_t size, uint64_t align, uint32_t flags) { uint32_t i; @@ -165,7 +170,6 @@ odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, } if (i > ODP_SHM_NUM_BLOCKS - 1) { - /* Table full */ odp_spinlock_unlock(&odp_shm_tbl->lock); ODP_DBG("odp_shm_reserve: no more blocks\n"); return ODP_SHM_INVALID; @@ -198,6 +202,8 @@ odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, if (addr == MAP_FAILED) { /* Alloc failed */ odp_spinlock_unlock(&odp_shm_tbl->lock); + if (!(flags & ODP_SHM_PROC_NOCREAT)) + shm_unlink(name); ODP_DBG("odp_shm_reserve: mmap failed\n"); return ODP_SHM_INVALID; } @@ -219,6 +225,11 @@ odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, return block->hdl; } +odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, + uint32_t flag) +{ + return plat_odp_shm_reserve(name, size, align, flag); +} odp_shm_t odp_shm_lookup(const char *name) { @@ -273,6 +284,17 @@ int odp_shm_info(odp_shm_t shm, odp_shm_info_t *info) return 0; } +int odp_shm_lookup_ipc(const char *name) +{ + int shm; + + shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR); + if (shm == -1) + return 0; + + close(shm); + return 1; +} void odp_shm_print_all(void) {
Implement odp implementation for linux-generic using standard odp queue API. Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org> --- v6: - update to the latest code (on top of Petris shm patches); v5: - malloc for queues_tbl. This fixes segfault in v4 if shm was not created; - removed not needed chunk from odp_shm_reserve() introduced in v4; - implement linux-generic plat_odp_shm_reserve() with extendend flags; v4: - fixed Anderses comments. (did not use unlikely() for init functions. Only for packet processing. - checkpatch cleanup; - update to the latest ODP head; - remove allocation memory for r_p; .gitignore | 1 + configure.ac | 1 + example/Makefile.am | 2 +- example/ipc/Makefile.am | 6 + example/ipc/README | 56 ++ example/ipc/odp_ipc.c | 679 +++++++++++++++++++++ helper/include/odph_ring.h | 2 + platform/linux-generic/.dirstamp | 0 .../linux-generic/include/api/odp_pktio_types.h | 1 + platform/linux-generic/include/api/odp_queue.h | 2 + .../linux-generic/include/api/odp_shared_memory.h | 1 + .../linux-generic/include/odp_packet_io_internal.h | 1 + .../linux-generic/include/odp_queue_internal.h | 14 +- .../include/odp_shared_memory_internal.h | 35 ++ platform/linux-generic/odp_packet_io.c | 27 +- platform/linux-generic/odp_queue.c | 215 ++++++- platform/linux-generic/odp_ring.c | 10 +- platform/linux-generic/odp_shared_memory.c | 32 +- 18 files changed, 1066 insertions(+), 19 deletions(-) create mode 100644 example/ipc/Makefile.am create mode 100644 example/ipc/README create mode 100644 example/ipc/odp_ipc.c create mode 100644 platform/linux-generic/.dirstamp create mode 100644 platform/linux-generic/include/odp_shared_memory_internal.h