@@ -617,4 +617,17 @@ config DM_ZONED
If unsure, say N.
+config DM_USER
+ tristate "Block device in userspace"
+ depends on BLK_DEV_DM
+ help
+ This device-mapper target allows a userspace daemon to provide the
+ contents of a block device. See
+ <file:Documentation/block/dm-user.rst> for more information.
+
+ To compile this code as a module, choose M here: the module will be
+ called dm-user.
+
+ If unsure, say N.
+
endif # MD
@@ -51,6 +51,7 @@ obj-$(CONFIG_BLK_DEV_DM_BUILTIN) += dm-builtin.o
obj-$(CONFIG_DM_UNSTRIPED) += dm-unstripe.o
obj-$(CONFIG_DM_BUFIO) += dm-bufio.o
obj-$(CONFIG_DM_BIO_PRISON) += dm-bio-prison.o
+obj-$(CONFIG_DM_USER) += dm-user.o
obj-$(CONFIG_DM_CRYPT) += dm-crypt.o
obj-$(CONFIG_DM_DELAY) += dm-delay.o
obj-$(CONFIG_DM_DUST) += dm-dust.o
new file mode 100644
@@ -0,0 +1,1227 @@
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2020 Palmer Dabbelt <palmerdabbelt@google.com>
+ */
+
+#include <linux/device-mapper.h>
+#include <uapi/linux/dm-user.h>
+
+#include <linux/bio.h>
+#include <linux/init.h>
+#include <linux/mempool.h>
+#include <linux/miscdevice.h>
+#include <linux/module.h>
+#include <linux/poll.h>
+#include <linux/uio.h>
+#include <linux/wait.h>
+
+#define DM_MSG_PREFIX "user"
+
+#define MAX_OUTSTANDING_MESSAGES 128
+
+/*
+ * dm-user uses four structures:
+ *
+ * - "struct target", the outermost structure, corresponds to a single device
+ * mapper target. This contains the set of outstanding BIOs that have been
+ * provided by DM and are not actively being processed by the user, along
+ * with a misc device that userspace can open to communicate with the
+ * kernel. Each time userspaces opens the misc device a new channel is
+ * created.
+ * - "struct channel", which represents a single active communication channel
+ * with userspace. Userspace may choose arbitrary read/write sizes to use
+ * when processing messages, channels form these into logical accesses.
+ * When userspace responds to a full message the channel completes the BIO
+ * and obtains a new message to process from the target.
+ * - "struct message", which wraps a BIO with the additional information
+ * required by the kernel to sort out what to do with BIOs when they return
+ * from userspace.
+ * - "struct dm_user_message", which is the exact message format that
+ * userspace sees.
+ *
+ * The hot path contains three distinct operations:
+ *
+ * - user_map(), which is provided a BIO from device mapper that is queued
+ * into the target. This allocates and enqueues a new message.
+ * - dev_read(), which dequeues a message, copies it to userspace.
+ * - dev_write(), which looks up a message (keyed by sequence number) and
+ * completes the corresponding BIO.
+ *
+ * Lock ordering (outer to inner)
+ *
+ * 1) miscdevice's global lock. This is held around dev_open, so it has to be
+ * the outermost lock.
+ * 2) target->lock
+ * 3) channel->lock
+ */
+
+struct message {
+ /*
+ * Messages themselves do not need a lock, they're protected by either
+ * the target or channel's lock, depending on which can reference them
+ * directly.
+ */
+ struct dm_user_message msg;
+ struct bio *bio;
+ size_t posn_to_user;
+ size_t total_to_user;
+ size_t posn_from_user;
+ size_t total_from_user;
+
+ struct list_head from_user;
+ struct list_head to_user;
+
+ /*
+ * These are written back from the user. They live in the same spot in
+ * the message, but we need to either keep the old values around or
+ * call a bunch more BIO helpers. These are only valid after write has
+ * adopted the message.
+ */
+ u64 return_type;
+ u64 return_flags;
+};
+
+struct target {
+ /*
+ * A target has a single lock, which protects everything in the target
+ * (but does not protect the channels associated with a target).
+ */
+ struct mutex lock;
+
+ /*
+ * There is only one point at which anything blocks: userspace blocks
+ * reading a new message, which is woken up by device mapper providing
+ * a new BIO to process (or tearing down the target). The
+ * corresponding write side doesn't block, instead we treat userspace's
+ * response containing a message that has yet to be mapped as an
+ * invalid operation.
+ */
+ struct wait_queue_head wq;
+
+ /*
+ * Messages are delivered to userspace in order, but may be returned
+ * out of order. This allows userspace to schedule IO if it wants to.
+ */
+ mempool_t message_pool;
+ u64 next_seq_to_map;
+ u64 next_seq_to_user;
+ struct list_head to_user;
+
+ /*
+ * There is a misc device per target. The name is selected by
+ * userspace (via a DM create ioctl argument), and each ends up in
+ * /dev/dm-user/. It looks like a better way to do this may be to have
+ * a filesystem to manage these, but this was more expedient. The
+ * current mechanism is functional, but does result in an arbitrary
+ * number of dynamically created misc devices.
+ */
+ struct miscdevice miscdev;
+
+ /*
+ * Device mapper's target destructor triggers tearing this all down,
+ * but we can't actually free until every channel associated with this
+ * target has been destroyed. Channels each have a reference to their
+ * target, and there is an additional single reference that corresponds
+ * to both DM and the misc device (both of which are destroyed by DM).
+ *
+ * In the common case userspace will be asleep waiting for a new
+ * message when device mapper decides to destroy the target, which
+ * means no new messages will appear. The destroyed flag triggers a
+ * wakeup, which will end up removing the reference.
+ */
+ struct kref references;
+ int dm_destroyed;
+};
+
+struct channel {
+ struct target *target;
+
+ /*
+ * A channel has a single lock, which prevents multiple reads (or
+ * multiple writes) from conflicting with each other.
+ */
+ struct mutex lock;
+
+ struct message *cur_to_user;
+ struct message *cur_from_user;
+ ssize_t to_user_error;
+ ssize_t from_user_error;
+
+ /*
+ * Once a message has been forwarded to userspace on a channel it must
+ * be responded to on the same channel. This allows us to error out
+ * the messages that have not yet been responded to by a channel when
+ * that channel closes, which makes handling errors more reasonable for
+ * fault-tolerant userspace daemons. It also happens to make avoiding
+ * shared locks between user_map() and dev_read() a lot easier.
+ *
+ * This does preclude a multi-threaded work stealing userspace
+ * implementation (or at least, force a degree of head-of-line blocking
+ * on the response path).
+ */
+ struct list_head from_user;
+
+ /*
+ * Responses from userspace can arrive in arbitrarily small chunks.
+ * We need some place to buffer one up until we can find the
+ * corresponding kernel-side message to continue processing, so instead
+ * of allocating them we just keep one off to the side here. This can
+ * only ever be pointed to by from_user_cur, and will never have a BIO.
+ */
+ struct message scratch_message_from_user;
+};
+
+static inline struct target *target_from_target(struct dm_target *target)
+{
+ WARN_ON(target->private == NULL);
+ return target->private;
+}
+
+static inline struct target *target_from_miscdev(struct miscdevice *miscdev)
+{
+ return container_of(miscdev, struct target, miscdev);
+}
+
+static inline struct channel *channel_from_file(struct file *file)
+{
+ WARN_ON(file->private_data == NULL);
+ return file->private_data;
+}
+
+static inline struct target *target_from_channel(struct channel *c)
+{
+ WARN_ON(c->target == NULL);
+ return c->target;
+}
+
+static inline size_t bio_size(struct bio *bio)
+{
+ struct bio_vec bvec;
+ struct bvec_iter iter;
+ size_t out = 0;
+
+ bio_for_each_segment(bvec, bio, iter)
+ out += bio_iter_len(bio, iter);
+ return out;
+}
+
+static inline size_t bio_bytes_needed_to_user(struct bio *bio)
+{
+ switch (bio_op(bio)) {
+ case REQ_OP_WRITE:
+ return sizeof(struct dm_user_message) + bio_size(bio);
+ case REQ_OP_READ:
+ case REQ_OP_FLUSH:
+ case REQ_OP_DISCARD:
+ case REQ_OP_SECURE_ERASE:
+ case REQ_OP_WRITE_SAME:
+ case REQ_OP_WRITE_ZEROES:
+ case REQ_OP_ZONE_OPEN:
+ case REQ_OP_ZONE_CLOSE:
+ case REQ_OP_ZONE_FINISH:
+ case REQ_OP_ZONE_APPEND:
+ case REQ_OP_ZONE_RESET:
+ return sizeof(struct dm_user_message);
+
+ /*
+ * These ops are not passed to userspace under the assumption that
+ * they're not going to be particularly useful in that context.
+ */
+ case REQ_OP_SCSI_IN:
+ case REQ_OP_SCSI_OUT:
+ case REQ_OP_DRV_IN:
+ case REQ_OP_DRV_OUT:
+ /* Anything new isn't supported,at least not yet. */
+ default:
+ return -EOPNOTSUPP;
+ }
+}
+
+static inline size_t bio_bytes_needed_from_user(struct bio *bio)
+{
+ switch (bio_op(bio)) {
+ case REQ_OP_READ:
+ return sizeof(struct dm_user_message) + bio_size(bio);
+ case REQ_OP_WRITE:
+ case REQ_OP_FLUSH:
+ case REQ_OP_DISCARD:
+ case REQ_OP_SECURE_ERASE:
+ case REQ_OP_WRITE_SAME:
+ case REQ_OP_WRITE_ZEROES:
+ case REQ_OP_ZONE_OPEN:
+ case REQ_OP_ZONE_CLOSE:
+ case REQ_OP_ZONE_FINISH:
+ case REQ_OP_ZONE_APPEND:
+ case REQ_OP_ZONE_RESET:
+ return sizeof(struct dm_user_message);
+
+ /*
+ * These ops are not passed to userspace under the assumption that
+ * they're not going to be particularly useful in that context.
+ */
+ case REQ_OP_SCSI_IN:
+ case REQ_OP_SCSI_OUT:
+ case REQ_OP_DRV_IN:
+ case REQ_OP_DRV_OUT:
+ /* Anything new isn't supported,at least not yet. */
+ default:
+ return -EOPNOTSUPP;
+ }
+}
+
+static inline long bio_type_to_user_type(struct bio *bio)
+{
+ switch (bio_op(bio)) {
+ case REQ_OP_READ:
+ return DM_USER_REQ_MAP_READ;
+ case REQ_OP_WRITE:
+ return DM_USER_REQ_MAP_WRITE;
+ case REQ_OP_FLUSH:
+ return DM_USER_REQ_MAP_FLUSH;
+ case REQ_OP_DISCARD:
+ return DM_USER_REQ_MAP_DISCARD;
+ case REQ_OP_SECURE_ERASE:
+ return DM_USER_REQ_MAP_SECURE_ERASE;
+ case REQ_OP_WRITE_SAME:
+ return DM_USER_REQ_MAP_WRITE_SAME;
+ case REQ_OP_WRITE_ZEROES:
+ return DM_USER_REQ_MAP_WRITE_ZEROES;
+ case REQ_OP_ZONE_OPEN:
+ return DM_USER_REQ_MAP_ZONE_OPEN;
+ case REQ_OP_ZONE_CLOSE:
+ return DM_USER_REQ_MAP_ZONE_CLOSE;
+ case REQ_OP_ZONE_FINISH:
+ return DM_USER_REQ_MAP_ZONE_FINISH;
+ case REQ_OP_ZONE_APPEND:
+ return DM_USER_REQ_MAP_ZONE_APPEND;
+ case REQ_OP_ZONE_RESET:
+ return DM_USER_REQ_MAP_ZONE_RESET;
+
+ /*
+ * These ops are not passed to userspace under the assumption that
+ * they're not going to be particularly useful in that context.
+ */
+ case REQ_OP_SCSI_IN:
+ case REQ_OP_SCSI_OUT:
+ case REQ_OP_DRV_IN:
+ case REQ_OP_DRV_OUT:
+ /* Anything new isn't supported,at least not yet. */
+ default:
+ return -EOPNOTSUPP;
+ }
+}
+
+static inline long bio_flags_to_user_flags(struct bio *bio)
+{
+ u64 out = 0;
+ typeof(bio->bi_opf) opf = bio->bi_opf & ~REQ_OP_MASK;
+
+ if (opf & REQ_FAILFAST_DEV) {
+ opf &= ~REQ_FAILFAST_DEV;
+ out |= DM_USER_REQ_MAP_FLAG_FAILFAST_DEV;
+ }
+
+ if (opf & REQ_FAILFAST_TRANSPORT) {
+ opf &= ~REQ_FAILFAST_TRANSPORT;
+ out |= DM_USER_REQ_MAP_FLAG_FAILFAST_TRANSPORT;
+ }
+
+ if (opf & REQ_FAILFAST_DRIVER) {
+ opf &= ~REQ_FAILFAST_DRIVER;
+ out |= DM_USER_REQ_MAP_FLAG_FAILFAST_DRIVER;
+ }
+
+ if (opf & REQ_SYNC) {
+ opf &= ~REQ_SYNC;
+ out |= DM_USER_REQ_MAP_FLAG_SYNC;
+ }
+
+ if (opf & REQ_META) {
+ opf &= ~REQ_META;
+ out |= DM_USER_REQ_MAP_FLAG_META;
+ }
+
+ if (opf & REQ_PRIO) {
+ opf &= ~REQ_PRIO;
+ out |= DM_USER_REQ_MAP_FLAG_PRIO;
+ }
+
+ if (opf & REQ_NOMERGE) {
+ opf &= ~REQ_NOMERGE;
+ out |= DM_USER_REQ_MAP_FLAG_NOMERGE;
+ }
+
+ if (opf & REQ_IDLE) {
+ opf &= ~REQ_IDLE;
+ out |= DM_USER_REQ_MAP_FLAG_IDLE;
+ }
+
+ if (opf & REQ_INTEGRITY) {
+ opf &= ~REQ_INTEGRITY;
+ out |= DM_USER_REQ_MAP_FLAG_INTEGRITY;
+ }
+
+ if (opf & REQ_FUA) {
+ opf &= ~REQ_FUA;
+ out |= DM_USER_REQ_MAP_FLAG_FUA;
+ }
+
+ if (opf & REQ_PREFLUSH) {
+ opf &= ~REQ_PREFLUSH;
+ out |= DM_USER_REQ_MAP_FLAG_PREFLUSH;
+ }
+
+ if (opf & REQ_PREFLUSH) {
+ opf &= ~REQ_PREFLUSH;
+ out |= DM_USER_REQ_MAP_FLAG_PREFLUSH;
+ }
+
+ if (opf & REQ_RAHEAD) {
+ opf &= ~REQ_RAHEAD;
+ out |= DM_USER_REQ_MAP_FLAG_RAHEAD;
+ }
+
+ if (opf & REQ_BACKGROUND) {
+ opf &= ~REQ_BACKGROUND;
+ out |= DM_USER_REQ_MAP_FLAG_BACKGROUND;
+ }
+
+ if (opf & REQ_BACKGROUND) {
+ opf &= ~REQ_BACKGROUND;
+ out |= DM_USER_REQ_MAP_FLAG_BACKGROUND;
+ }
+
+ if (opf & REQ_NOWAIT) {
+ opf &= ~REQ_NOWAIT;
+ out |= DM_USER_REQ_MAP_FLAG_NOWAIT;
+ }
+
+ if (opf & REQ_CGROUP_PUNT) {
+ opf &= ~REQ_CGROUP_PUNT;
+ out |= DM_USER_REQ_MAP_FLAG_CGROUP_PUNT;
+ }
+
+ if (opf & REQ_NOUNMAP) {
+ opf &= ~REQ_NOUNMAP;
+ out |= DM_USER_REQ_MAP_FLAG_NOUNMAP;
+ }
+
+ if (opf & REQ_HIPRI) {
+ opf &= ~REQ_HIPRI;
+ out |= DM_USER_REQ_MAP_FLAG_HIPRI;
+ }
+
+ if (unlikely(opf)) {
+ pr_warn("unsupported BIO type %x\n", opf);
+ return -EOPNOTSUPP;
+ }
+ WARN_ON(out < 0);
+ return out;
+}
+
+/*
+ * Not quite what's in blk-map.c, but instead what I thought the functions in
+ * blk-map did. This one seems more generally useful and I think we could
+ * write the blk-map version in terms of this one. The differences are that
+ * this has a return value that counts, and blk-map uses the BIO _all iters.
+ * Neither advance the BIO iter but don't advance the IOV iter, which is a bit
+ * odd here.
+ */
+static ssize_t bio_copy_from_iter(struct bio *bio, struct iov_iter *iter)
+{
+ struct bio_vec bvec;
+ struct bvec_iter biter;
+ ssize_t out = 0;
+
+ bio_for_each_segment(bvec, bio, biter) {
+ ssize_t ret;
+
+ ret = copy_page_from_iter(bvec.bv_page, bvec.bv_offset,
+ bvec.bv_len, iter);
+
+ /*
+ * FIXME: I thought that IOV copies had a mechanism for
+ * terminating early, if for example a signal came in while
+ * sleeping waiting for a page to be mapped, but I don't see
+ * where that would happen.
+ */
+ WARN_ON(ret < 0);
+ out += ret;
+
+ if (!iov_iter_count(iter))
+ break;
+
+ if (ret < bvec.bv_len)
+ return ret;
+ }
+
+ return out;
+}
+
+static ssize_t bio_copy_to_iter(struct bio *bio, struct iov_iter *iter)
+{
+ struct bio_vec bvec;
+ struct bvec_iter biter;
+ ssize_t out = 0;
+
+ bio_for_each_segment(bvec, bio, biter) {
+ ssize_t ret;
+
+ ret = copy_page_to_iter(bvec.bv_page, bvec.bv_offset,
+ bvec.bv_len, iter);
+
+ /* as above */
+ WARN_ON(ret < 0);
+ out += ret;
+
+ if (!iov_iter_count(iter))
+ break;
+
+ if (ret < bvec.bv_len)
+ return ret;
+ }
+
+ return out;
+}
+
+static ssize_t msg_copy_to_iov(struct message *msg, struct iov_iter *to)
+{
+ ssize_t copied = 0;
+
+ if (!iov_iter_count(to))
+ return 0;
+
+ if (msg->posn_to_user < sizeof(msg->msg)) {
+ copied = copy_to_iter((char *)(&msg->msg) + msg->posn_to_user,
+ sizeof(msg->msg) - msg->posn_to_user, to);
+ } else {
+ copied = bio_copy_to_iter(msg->bio, to);
+ if (copied > 0)
+ bio_advance(msg->bio, copied);
+ }
+
+ if (copied < 0)
+ return copied;
+
+ msg->posn_to_user += copied;
+ return copied;
+}
+
+static ssize_t msg_copy_from_iov(struct message *msg, struct iov_iter *from)
+{
+ ssize_t copied = 0;
+
+ if (!iov_iter_count(from))
+ return 0;
+
+ if (msg->posn_from_user < sizeof(msg->msg)) {
+ copied = copy_from_iter(
+ (char *)(&msg->msg) + msg->posn_from_user,
+ sizeof(msg->msg) - msg->posn_from_user, from);
+ } else {
+ copied = bio_copy_from_iter(msg->bio, from);
+ if (copied > 0)
+ bio_advance(msg->bio, copied);
+ }
+
+ if (copied < 0)
+ return copied;
+
+ msg->posn_from_user += copied;
+ return copied;
+}
+
+static struct message *msg_get_map(struct target *t)
+{
+ struct message *m;
+
+ lockdep_assert_held(&t->lock);
+
+ m = mempool_alloc(&t->message_pool, GFP_NOIO);
+ m->msg.seq = t->next_seq_to_map++;
+ INIT_LIST_HEAD(&m->to_user);
+ INIT_LIST_HEAD(&m->from_user);
+ return m;
+}
+
+static struct message *msg_get_to_user(struct target *t)
+{
+ struct message *m;
+
+ lockdep_assert_held(&t->lock);
+
+ if (list_empty(&t->to_user))
+ return NULL;
+
+ m = list_first_entry(&t->to_user, struct message, to_user);
+ list_del(&m->to_user);
+ return m;
+}
+
+static struct message *msg_get_from_user(struct channel *c, u64 seq)
+{
+ struct message *m;
+ struct list_head *cur;
+
+ lockdep_assert_held(&c->lock);
+
+ list_for_each(cur, &c->from_user) {
+ m = list_entry(cur, struct message, from_user);
+ if (m->msg.seq == seq) {
+ list_del(&m->from_user);
+ return m;
+ }
+ }
+
+ return NULL;
+}
+
+void message_kill(struct message *m, mempool_t *pool)
+{
+ m->bio->bi_status = BLK_STS_IOERR;
+ bio_endio(m->bio);
+ bio_put(m->bio);
+ mempool_free(m, pool);
+}
+
+/*
+ * Returns 0 when there is no work left to do. This must be callable without
+ * holding the target lock, as it is part of the waitqueue's check expression.
+ * When called without the lock it may spuriously indicate there is remaining
+ * work, but when called with the lock it must be accurate.
+ */
+int target_poll(struct target *t)
+{
+ return !list_empty(&t->to_user) || t->dm_destroyed;
+}
+
+void target_release(struct kref *ref)
+{
+ struct target *t = container_of(ref, struct target, references);
+ struct list_head *cur;
+
+ /*
+ * There may be outstanding BIOs that have not yet been given to
+ * userspace. At this point there's nothing we can do about them, as
+ * there are and will never be any channels.
+ */
+ list_for_each (cur, &t->to_user) {
+ message_kill(list_entry(cur, struct message, to_user),
+ &t->message_pool);
+ }
+
+ mempool_exit(&t->message_pool);
+ mutex_unlock(&t->lock);
+ mutex_destroy(&t->lock);
+ kfree(t);
+}
+
+void target_put(struct target *t)
+{
+ /*
+ * This both releases a reference to the target and the lock. We leave
+ * it up to the caller to hold the lock, as they probably needed it for
+ * something else.
+ */
+ lockdep_assert_held(&t->lock);
+
+ if (!kref_put(&t->references, target_release))
+ mutex_unlock(&t->lock);
+}
+
+struct channel *channel_alloc(struct target *t)
+{
+ struct channel *c;
+
+ lockdep_assert_held(&t->lock);
+
+ c = kzalloc(sizeof(*c), GFP_KERNEL);
+ if (c == NULL)
+ return NULL;
+
+ kref_get(&t->references);
+ c->target = t;
+ c->cur_from_user = &c->scratch_message_from_user;
+ mutex_init(&c->lock);
+ INIT_LIST_HEAD(&c->from_user);
+ return c;
+}
+
+void channel_free(struct channel *c)
+{
+ struct list_head *cur;
+
+ lockdep_assert_held(&c->lock);
+
+ /*
+ * There may be outstanding BIOs that have been given to userspace but
+ * have not yet been completed. The channel has been shut down so
+ * there's no way to process the rest of those messages, so we just go
+ * ahead and error out the BIOs. Hopefully whatever's on the other end
+ * can handle the errors. One could imagine splitting the BIOs and
+ * completing as much as we got, but that seems like overkill here.
+ *
+ * Our only other options would be to let the BIO hang around (which
+ * seems way worse) or to resubmit it to userspace in the hope there's
+ * another channel. I don't really like the idea of submitting a
+ * message twice.
+ */
+ if (c->cur_to_user != NULL)
+ message_kill(c->cur_to_user, &c->target->message_pool);
+ if (c->cur_from_user != &c->scratch_message_from_user)
+ message_kill(c->cur_from_user, &c->target->message_pool);
+ list_for_each(cur, &c->from_user)
+ message_kill(list_entry(cur, struct message, to_user),
+ &c->target->message_pool);
+
+ mutex_lock(&c->target->lock);
+ target_put(c->target);
+ mutex_unlock(&c->lock);
+ mutex_destroy(&c->lock);
+ kfree(c);
+}
+
+static int dev_open(struct inode *inode, struct file *file)
+{
+ struct channel *c;
+ struct target *t;
+
+ /*
+ * This is called by miscdev, which sets private_data to point to the
+ * struct miscdevice that was opened. The rest of our file operations
+ * want to refer to the channel that's been opened, so we swap that
+ * pointer out with a fresh channel.
+ *
+ * This is called with the miscdev lock held, which is also held while
+ * registering/unregistering the miscdev. The miscdev must be
+ * registered for this to get called, which means there must be an
+ * outstanding reference to the target, which means it cannot be freed
+ * out from under us despite us not holding a reference yet.
+ */
+ t = container_of(file->private_data, struct target, miscdev);
+ mutex_lock(&t->lock);
+ file->private_data = c = channel_alloc(t);
+
+ if (c == NULL) {
+ mutex_unlock(&t->lock);
+ return -ENOSPC;
+ }
+
+ mutex_unlock(&t->lock);
+ return 0;
+}
+
+static ssize_t dev_read(struct kiocb *iocb, struct iov_iter *to)
+{
+ struct channel *c = channel_from_file(iocb->ki_filp);
+ ssize_t total_processed = 0;
+ ssize_t processed;
+
+ mutex_lock(&c->lock);
+
+ if (unlikely(c->to_user_error)) {
+ total_processed = c->to_user_error;
+ goto cleanup_unlock;
+ }
+
+ if (c->cur_to_user == NULL) {
+ struct target *t = target_from_channel(c);
+
+ mutex_lock(&t->lock);
+
+ while (!target_poll(t)) {
+ int e;
+
+ mutex_unlock(&t->lock);
+ mutex_unlock(&c->lock);
+ e = wait_event_interruptible(t->wq, target_poll(t));
+ mutex_lock(&c->lock);
+ mutex_lock(&t->lock);
+
+ if (unlikely(e != 0)) {
+ /*
+ * We haven't processed any bytes in either the
+ * BIO or the IOV, so we can just terminate
+ * right now. Elsewhere in the kernel handles
+ * restarting the syscall when appropriate.
+ */
+ total_processed = e;
+ mutex_unlock(&t->lock);
+ goto cleanup_unlock;
+ }
+ }
+
+ if (unlikely(t->dm_destroyed)) {
+ /*
+ * DM has destroyed this target, so just lock
+ * the user out. There's really nothing else
+ * we can do here. Note that we don't actually
+ * tear any thing down until userspace has
+ * closed the FD, as there may still be
+ * outstanding BIOs.
+ *
+ * This is kind of a wacky error code to
+ * return. My goal was really just to try and
+ * find something that wasn't likely to be
+ * returned by anything else in the miscdev
+ * path. The message "block device required"
+ * seems like a somewhat reasonable thing to
+ * say when the target has disappeared out from
+ * under us, but "not block" isn't sensible.
+ */
+ c->to_user_error = total_processed = -ENOTBLK;
+ mutex_unlock(&t->lock);
+ goto cleanup_unlock;
+ }
+
+ /*
+ * Ensures that accesses to the message data are not ordered
+ * before the remote accesses that produce that message data.
+ *
+ * This pairs with the barrier in user_map(), via the
+ * conditional within the while loop above. Also see the lack
+ * of barrier in user_dtr(), which is why this can be after the
+ * destroyed check.
+ */
+ smp_rmb();
+
+ c->cur_to_user = msg_get_to_user(t);
+ WARN_ON(c->cur_to_user == NULL);
+ mutex_unlock(&t->lock);
+ }
+
+ processed = msg_copy_to_iov(c->cur_to_user, to);
+ total_processed += processed;
+
+ WARN_ON(c->cur_to_user->posn_to_user > c->cur_to_user->total_to_user);
+ if (c->cur_to_user->posn_to_user == c->cur_to_user->total_to_user) {
+ struct message *m = c->cur_to_user;
+
+ c->cur_to_user = NULL;
+ list_add_tail(&m->from_user, &c->from_user);
+ }
+
+cleanup_unlock:
+ mutex_unlock(&c->lock);
+ return total_processed;
+}
+
+static ssize_t dev_splice_read(struct file *in, loff_t *ppos,
+ struct pipe_inode_info *pipe, size_t len,
+ unsigned int flags)
+{
+ return -EOPNOTSUPP;
+}
+
+static ssize_t dev_write(struct kiocb *iocb, struct iov_iter *from)
+{
+ struct channel *c = channel_from_file(iocb->ki_filp);
+ ssize_t total_processed = 0;
+ ssize_t processed;
+
+ mutex_lock(&c->lock);
+
+ if (unlikely(c->from_user_error)) {
+ total_processed = c->from_user_error;
+ goto cleanup_unlock;
+ }
+
+ /*
+ * cur_from_user can never be NULL. If there's no real message it must
+ * point to the scratch space.
+ */
+ WARN_ON(c->cur_from_user == NULL);
+ if (c->cur_from_user->posn_from_user < sizeof(struct dm_user_message)) {
+ struct message *msg, *old;
+
+ processed = msg_copy_from_iov(c->cur_from_user, from);
+ if (processed <= 0) {
+ pr_warn("msg_copy_from_iov() returned %zu\n",
+ processed);
+ c->from_user_error = -EINVAL;
+ goto cleanup_unlock;
+ }
+ total_processed += processed;
+
+ /*
+ * In the unlikely event the user has provided us a very short
+ * write, not even big enough to fill a message, just succeed.
+ * We'll eventually build up enough bytes to do something.
+ */
+ if (unlikely(c->cur_from_user->posn_from_user <
+ sizeof(struct dm_user_message)))
+ goto cleanup_unlock;
+
+ old = c->cur_from_user;
+ mutex_lock(&c->target->lock);
+ msg = msg_get_from_user(c, c->cur_from_user->msg.seq);
+ if (msg == NULL) {
+ pr_info("user provided an invalid messag seq of %llx\n",
+ old->msg.seq);
+ mutex_unlock(&c->target->lock);
+ c->from_user_error = -EINVAL;
+ goto cleanup_unlock;
+ }
+ mutex_unlock(&c->target->lock);
+
+ WARN_ON(old->posn_from_user != sizeof(struct dm_user_message));
+ msg->posn_from_user = sizeof(struct dm_user_message);
+ msg->return_type = old->msg.type;
+ msg->return_flags = old->msg.flags;
+ WARN_ON(msg->posn_from_user > msg->total_from_user);
+ c->cur_from_user = msg;
+ WARN_ON(old != &c->scratch_message_from_user);
+ }
+
+ /*
+ * Userspace can signal an error for single requests by overwriting the
+ * seq field.
+ */
+ switch (c->cur_from_user->return_type) {
+ case DM_USER_RESP_SUCCESS:
+ c->cur_from_user->bio->bi_status = BLK_STS_OK;
+ break;
+ case DM_USER_RESP_ERROR:
+ case DM_USER_RESP_UNSUPPORTED:
+ default:
+ c->cur_from_user->bio->bi_status = BLK_STS_IOERR;
+ goto finish_bio;
+ }
+
+ /*
+ * The op was a success as far as userspace is concerned, so process
+ * whatever data may come along with it. The user may provide the BIO
+ * data in multiple chunks, in which case we don't need to finish the
+ * BIO.
+ */
+ processed = msg_copy_from_iov(c->cur_from_user, from);
+ total_processed += processed;
+
+ if (c->cur_from_user->posn_from_user <
+ c->cur_from_user->total_from_user)
+ goto cleanup_unlock;
+
+finish_bio:
+ /*
+ * When we set up this message the BIO's size matched the
+ * message size, if that's not still the case then something
+ * has gone off the rails.
+ */
+ WARN_ON(bio_size(c->cur_from_user->bio) != 0);
+ bio_endio(c->cur_from_user->bio);
+ bio_put(c->cur_from_user->bio);
+
+ /*
+ * We don't actually need to take the target lock here, as all
+ * we're doing is freeing the message and mempools have their
+ * own lock. Each channel has its ows scratch message.
+ */
+ WARN_ON(c->cur_from_user == &c->scratch_message_from_user);
+ mempool_free(c->cur_from_user, &c->target->message_pool);
+ c->scratch_message_from_user.posn_from_user = 0;
+ c->cur_from_user = &c->scratch_message_from_user;
+
+cleanup_unlock:
+ mutex_unlock(&c->lock);
+ return total_processed;
+}
+
+static ssize_t dev_splice_write(struct pipe_inode_info *pipe, struct file *out,
+ loff_t *ppos, size_t len, unsigned int flags)
+{
+ return -EOPNOTSUPP;
+}
+
+static __poll_t dev_poll(struct file *file, poll_table *wait)
+{
+ return -EOPNOTSUPP;
+}
+
+static int dev_release(struct inode *inode, struct file *file)
+{
+ struct channel *c;
+
+ c = channel_from_file(file);
+ mutex_lock(&c->lock);
+ channel_free(c);
+
+ return 0;
+}
+
+static int dev_fasync(int fd, struct file *file, int on)
+{
+ return -EOPNOTSUPP;
+}
+
+static long dev_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
+{
+ return -EOPNOTSUPP;
+}
+
+static const struct file_operations file_operations = {
+ .owner = THIS_MODULE,
+ .open = dev_open,
+ .llseek = no_llseek,
+ .read_iter = dev_read,
+ .splice_read = dev_splice_read,
+ .write_iter = dev_write,
+ .splice_write = dev_splice_write,
+ .poll = dev_poll,
+ .release = dev_release,
+ .fasync = dev_fasync,
+ .unlocked_ioctl = dev_ioctl,
+};
+
+static int user_ctr(struct dm_target *ti, unsigned int argc, char **argv)
+{
+ struct target *t;
+ int r;
+
+ if (argc != 3) {
+ ti->error = "Invalid argument count";
+ r = -EINVAL;
+ goto cleanup_none;
+ }
+
+ t = kzalloc(sizeof(*t), GFP_KERNEL);
+ if (t == NULL) {
+ r = -ENOSPC;
+ goto cleanup_none;
+ }
+ ti->private = t;
+
+ /*
+ * We begin with a single reference to the target, which is miscdev's
+ * reference. This ensures that the target won't be freed
+ * until after the miscdev has been unregistered and all extant
+ * channels have been closed.
+ */
+ kref_init(&t->references);
+ kref_get(&t->references);
+
+ mutex_init(&t->lock);
+ init_waitqueue_head(&t->wq);
+ INIT_LIST_HEAD(&t->to_user);
+ mempool_init_kmalloc_pool(&t->message_pool, MAX_OUTSTANDING_MESSAGES,
+ sizeof(struct message));
+
+ t->miscdev.minor = MISC_DYNAMIC_MINOR;
+ t->miscdev.fops = &file_operations;
+ t->miscdev.name = kasprintf(GFP_KERNEL, "dm-user/%s", argv[2]);
+ if (t->miscdev.name == NULL) {
+ r = -ENOSPC;
+ goto cleanup_message_pool;
+ }
+
+ /*
+ * Once the miscdev is registered it can be opened and therefor
+ * concurrent references to the channel can happen. Holding the target
+ * lock during misc_register() could deadlock. If registration
+ * succeeds then we will not access the target again so we just stick a
+ * barrier here, which pairs with taking the target lock everywhere
+ * else the target is accessed.
+ *
+ * I forgot where we ended up on the RCpc/RCsc locks. IIU RCsc locks
+ * would mean that we could take the target lock earlier and release it
+ * here instead of the memory barrier. I'm not sure that's any better,
+ * though, and this isn't on a hot path so it probably doesn't matter
+ * either way.
+ */
+ smp_mb();
+
+ r = misc_register(&t->miscdev);
+ if (r) {
+ DMERR("Unable to register miscdev %s for dm-user",
+ t->miscdev.name);
+ r = -ENOSPC;
+ goto cleanup_misc_name;
+ }
+
+ return 0;
+
+cleanup_misc_name:
+ kfree(t->miscdev.name);
+cleanup_message_pool:
+ mempool_exit(&t->message_pool);
+ kfree(t);
+cleanup_none:
+ return r;
+}
+
+static void user_dtr(struct dm_target *ti)
+{
+ struct target *t = target_from_target(ti);
+
+ /*
+ * Removes the miscdev. This must be called without the target lock
+ * held to avoid a possible deadlock because our open implementation is
+ * called holding the miscdev lock and must later take the target lock.
+ *
+ * There is no race here because only DM can register/unregister the
+ * miscdev, and DM ensures that doesn't happen twice. The internal
+ * miscdev lock is sufficient to ensure there are no races between
+ * deregistering the miscdev and open.
+ */
+ misc_deregister(&t->miscdev);
+
+ /*
+ * We are now free to take the target's lock and drop our reference to
+ * the target. There are almost certainly tasks sleeping in read on at
+ * least one of the channels associated with this target, this
+ * explicitly wakes them up and terminates the read.
+ */
+ mutex_lock(&t->lock);
+ /*
+ * No barrier here, as wait/wake ensures that the flag visibility is
+ * correct WRT the wake/sleep state of the target tasks.
+ */
+ t->dm_destroyed = true;
+ wake_up_all(&t->wq);
+ target_put(t);
+}
+
+/*
+ * Consumes a BIO from device mapper, queueing it up for userspace.
+ */
+static int user_map(struct dm_target *ti, struct bio *bio)
+{
+ struct target *t;
+ struct message *entry;
+
+ t = target_from_target(ti);
+ /*
+ * FIXME
+ *
+ * This seems like a bad idea. Specifically, here we're
+ * directly on the IO path when we take the target lock, which may also
+ * be taken from a user context. The user context doesn't actively
+ * trigger anything that may sleep while holding the lock, but this
+ * still seems like a bad idea.
+ *
+ * The obvious way to fix this would be to use a proper queue, which
+ * would result in no shared locks between the direct IO path and user
+ * tasks. I had a version that did this, but the head-of-line blocking
+ * from the circular buffer resulted in us needing a fairly large
+ * allocation in order to avoid situations in which the queue fills up
+ * and everything goes off the rails.
+ *
+ * I could jump through a some hoops to avoid a shared lock while still
+ * allowing for a large queue, but I'm not actually sure that allowing
+ * for very large queues is the right thing to do here. Intuitively it
+ * seems better to keep the queues small in here (essentially sized to
+ * the user latency for performance reasons only) and signal up the
+ * stack to start throttling IOs. I don't see a way to do that
+ * (returning DM_MAPIO_REQUEUE seems like it'd work, but doesn't do
+ * that).
+ *
+ * The best way I could come up with to fix this would be to use a
+ * two-lock concurrent queue that's of infinite size (ie, linked list
+ * based), which would get rid of the explicit shared lock. The
+ * mempool spinlock would still be shared, but I could just defer the
+ * free from dev_write to user_map (and probably a worker).
+ */
+ mutex_lock(&t->lock);
+
+ /*
+ * FIXME
+ *
+ * The assumption here is that there's no benefit to returning
+ * DM_MAPIO_KILL as opposed to just erroring out the BIO, but I'm not
+ * sure that's actually true -- for example, I could imagine users
+ * expecting that submitted BIOs are unlikely to fail and therefor
+ * relying on submission failure to indicate an unsupported type.
+ *
+ * There's two ways I can think of to fix this:
+ * - Add DM arguments that are parsed during the constructor that
+ * allow various dm_target flags to be set that indicate the op
+ * types supported by this target. This may make sense for things
+ * like discard, where DM can already transform the BIOs to a form
+ * that's likely to be supported.
+ * - Some sort of pre-filter that allows userspace to hook in here
+ * and kill BIOs before marking them as submitted. My guess would
+ * be that a userspace round trip is a bad idea here, but a BPF
+ * call seems resonable.
+ *
+ * My guess is that we'd likely want to do both. The first one is easy
+ * and gives DM the proper info, so it seems better. The BPF call
+ * seems overly complex for just this, but one could imagine wanting to
+ * sometimes return _MAPPED and a BPF filter would be the way to do
+ * that.
+ *
+ * For example, in Android we have an in-kernel DM device called
+ * "dm-bow" that takes advange of some portion of the space that has
+ * been discarded on a device to provide opportunistic block-level
+ * backups. While one could imagine just implementing this entirely in
+ * userspace, that would come with an appreciable performance penalty.
+ * Instead one could keep a BPF program that forwards most accesses
+ * directly to the backing block device while informing a userspace
+ * daemon of any discarded space and on writes to blocks that are to be
+ * backed up.
+ */
+ if (unlikely((bio_type_to_user_type(bio) < 0)
+ || (bio_flags_to_user_flags(bio) < 0))) {
+ mutex_unlock(&t->lock);
+ pr_warn("dm-user: unsupported bio_op() %d\n", bio_op(bio));
+ return DM_MAPIO_KILL;
+ }
+
+ entry = msg_get_map(t);
+ if (unlikely(entry == NULL)) {
+ mutex_unlock(&t->lock);
+ pr_warn("dm-user: unable to allocate message\n");
+ return DM_MAPIO_KILL;
+ }
+
+ bio_get(bio);
+ entry->msg.type = bio_type_to_user_type(bio);
+ entry->msg.flags = bio_flags_to_user_flags(bio);
+ entry->msg.sector = bio->bi_iter.bi_sector;
+ entry->msg.len = bio_size(bio);
+ entry->bio = bio;
+ entry->posn_to_user = 0;
+ entry->total_to_user = bio_bytes_needed_to_user(bio);
+ entry->posn_from_user = 0;
+ entry->total_from_user = bio_bytes_needed_from_user(bio);
+ /* Pairs with the barrier in dev_read() */
+ smp_wmb();
+ list_add_tail(&entry->to_user, &t->to_user);
+ wake_up_interruptible(&t->wq);
+ mutex_unlock(&t->lock);
+ return DM_MAPIO_SUBMITTED;
+}
+
+static struct target_type user_target = {
+ .name = "user",
+ .version = { 1, 0, 0 },
+ .module = THIS_MODULE,
+ .ctr = user_ctr,
+ .dtr = user_dtr,
+ .map = user_map,
+};
+
+static int __init dm_user_init(void)
+{
+ int r;
+
+ r = dm_register_target(&user_target);
+ if (r) {
+ DMERR("register failed %d", r);
+ goto error;
+ }
+
+ return 0;
+
+error:
+ return r;
+}
+
+static void __exit dm_user_exit(void)
+{
+ dm_unregister_target(&user_target);
+}
+
+module_init(dm_user_init);
+module_exit(dm_user_exit);
+MODULE_AUTHOR("Palmer Dabbelt <palmerdabbelt@google.com>");
+MODULE_DESCRIPTION(DM_NAME " target returning blocks from userspace");
+MODULE_LICENSE("GPL");