@@ -29,7 +29,12 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
#define CEPH_HOMELESS_OSD -1
-/* a given osd we're communicating with */
+/*
+ * A given osd we're communicating with.
+ *
+ * Note that the o_requests tree can be searched while holding the "lock" mutex
+ * or the "o_requests_lock" spinlock. Insertion or removal requires both!
+ */
struct ceph_osd {
refcount_t o_ref;
struct ceph_osd_client *o_osdc;
@@ -37,6 +42,7 @@ struct ceph_osd {
int o_incarnation;
struct rb_node o_node;
struct ceph_connection o_con;
+ spinlock_t o_requests_lock;
struct rb_root o_requests;
struct rb_root o_linger_requests;
struct rb_root o_backoff_mappings;
@@ -1198,6 +1198,7 @@ static void osd_init(struct ceph_osd *osd)
{
refcount_set(&osd->o_ref, 1);
RB_CLEAR_NODE(&osd->o_node);
+ spin_lock_init(&osd->o_requests_lock);
osd->o_requests = RB_ROOT;
osd->o_linger_requests = RB_ROOT;
osd->o_backoff_mappings = RB_ROOT;
@@ -1427,7 +1428,9 @@ static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
atomic_inc(&osd->o_osdc->num_homeless);
get_osd(osd);
+ spin_lock(&osd->o_requests_lock);
insert_request(&osd->o_requests, req);
+ spin_unlock(&osd->o_requests_lock);
req->r_osd = osd;
}
@@ -1439,7 +1442,9 @@ static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
req, req->r_tid);
req->r_osd = NULL;
+ spin_lock(&osd->o_requests_lock);
erase_request(&osd->o_requests, req);
+ spin_unlock(&osd->o_requests_lock);
put_osd(osd);
if (!osd_homeless(osd))