@@ -25,6 +25,10 @@ struct dlm_ckv_lock {
char name[DLM_RESNAME_MAXLEN];
};
+struct dlm_ckv_kv {
+ struct dlm_ckv_lock lock;
+};
+
struct dlm_ckv_bucket {
dlm_lockspace_t *ls;
struct kref refcount;
@@ -240,6 +244,130 @@ dlm_ckv_lock_release(struct dlm_ckv_lock *ckv_lock)
}
EXPORT_SYMBOL(dlm_ckv_lock_release);
+struct dlm_ckv_kv *
+dlm_ckv_create_kv(struct dlm_ckv_bucket *bucket, const char *key)
+{
+ struct dlm_ckv_lock *ckv_lock;
+ void *ptr;
+ int res;
+
+ ptr = kzalloc(DLM_CKV_LVB_SIZE, GFP_KERNEL);
+ if (!ptr)
+ return NULL;
+
+ ckv_lock = dlm_ckv_create_lock(bucket, key);
+ if (!ckv_lock)
+ goto create_fail;
+
+ ckv_lock->lksb.lksb.sb_lvbptr = ptr;
+
+ res = dlm_ckv_lock_wait(ckv_lock->bucket->ls, DLM_LOCK_NL,
+ &ckv_lock->lksb, 0, ckv_lock->name, NULL);
+ if (res)
+ goto lock_failed;
+
+ return (struct dlm_ckv_kv *)ckv_lock;
+
+lock_failed:
+ dlm_ckv_free_lock(ckv_lock);
+create_fail:
+ kfree(ptr);
+ return NULL;
+}
+EXPORT_SYMBOL(dlm_ckv_create_kv);
+
+void
+dlm_ckv_free_kv(struct dlm_ckv_kv *kv)
+{
+ struct dlm_ckv_bucket *bucket = kv->lock.bucket;
+ struct dlm_ckv_lock *ckv_lock = &kv->lock;
+
+ dlm_ckv_unlock_wait(bucket->ls, &ckv_lock->lksb);
+
+ kfree(ckv_lock->lksb.lksb.sb_lvbptr);
+ kfree(ckv_lock);
+
+ kref_put(&bucket->refcount, bucket_release);
+
+}
+EXPORT_SYMBOL(dlm_ckv_free_kv);
+
+int
+dlm_ckv_get(struct dlm_ckv_kv *kv, char *value, size_t len)
+{
+ struct dlm_ckv_lock *ckv_lock = &kv->lock;
+ struct dlm_ckv_bucket *bucket;
+ int res;
+
+ BUG_ON(!ckv_lock);
+ bucket = ckv_lock->bucket;
+
+ res = dlm_ckv_lock_wait(ckv_lock->bucket->ls, DLM_LOCK_CR,
+ &ckv_lock->lksb,
+ DLM_LKF_VALBLK | DLM_LKF_CONVERT,
+ ckv_lock->name, NULL);
+ if (res) {
+ pr_info("Can not get lock %s, rc=%d\n",
+ ckv_lock->name, res);
+ goto fail;
+ }
+
+ if (ckv_lock->lksb.lksb.sb_flags & DLM_SBF_VALNOTVALID) {
+ pr_info(" %s LVB was invalid\n", ckv_lock->name);
+ memset(value, 0, len);
+ } else
+ memcpy(value, ckv_lock->lksb.lksb.sb_lvbptr, len);
+
+ res = dlm_ckv_lock_wait(ckv_lock->bucket->ls, DLM_LOCK_NL,
+ &ckv_lock->lksb,
+ DLM_LKF_CONVERT,
+ ckv_lock->name, NULL);
+ if (res) {
+ pr_info("Can not release lock %s\n", ckv_lock->name);
+ goto fail;
+ }
+
+ return res;
+fail:
+ return res;
+}
+EXPORT_SYMBOL(dlm_ckv_get);
+
+int
+dlm_ckv_set(struct dlm_ckv_kv *kv, const char *value, size_t len)
+{
+ struct dlm_ckv_lock *ckv_lock = &kv->lock;
+ struct dlm_ckv_bucket *bucket;
+ int res;
+
+ BUG_ON(!ckv_lock);
+ bucket = ckv_lock->bucket;
+
+ res = dlm_ckv_lock_wait(ckv_lock->bucket->ls, DLM_LOCK_EX,
+ &ckv_lock->lksb,
+ DLM_LKF_CONVERT,
+ ckv_lock->name, NULL);
+ if (res) {
+ pr_info("Can not get lock %s\n", ckv_lock->name);
+ goto fail;
+ }
+
+ memcpy(ckv_lock->lksb.lksb.sb_lvbptr, value, len);
+ res = dlm_ckv_lock_wait(ckv_lock->bucket->ls, DLM_LOCK_NL,
+ &ckv_lock->lksb,
+ DLM_LKF_VALBLK | DLM_LKF_CONVERT,
+ ckv_lock->name, NULL);
+ if (res) {
+ pr_info("Can not release lock %s\n", ckv_lock->name);
+ goto fail;
+ }
+
+ return res;
+fail:
+ return res;
+}
+EXPORT_SYMBOL(dlm_ckv_set);
+
static void dlm_cvk_pre_n_bast(void *astarg, int mode)
{
struct dlm_ckv_lksb *lksb = astarg;
@@ -4,6 +4,9 @@
struct dlm_ckv_bucket;
struct dlm_ckv_lock;
+struct dlm_ckv_kv;
+
+#define DLM_CKV_VALUE_MAX_SIZE 255
typedef void (*dlm_ckv_notify_cb)(void *userarg);
@@ -18,6 +21,12 @@ void dlm_ckv_free_lock(struct dlm_ckv_lock *ckv_lock);
int dlm_ckv_lock_get(struct dlm_ckv_lock *ckv_lock);
int dlm_ckv_lock_release(struct dlm_ckv_lock *ckv_lock);
+struct dlm_ckv_kv *
+dlm_ckv_create_kv(struct dlm_ckv_bucket *bucket, const char *key);
+void dlm_ckv_free_kv(struct dlm_ckv_kv *kv);
+int dlm_ckv_get(struct dlm_ckv_kv *kv, char *value, size_t len);
+int dlm_ckv_set(struct dlm_ckv_kv *kv, const char *value, size_t len);
+
struct dlm_ckv_notify *
dlm_ckv_create_notification(struct dlm_ckv_bucket *bucket, const char *name,
dlm_ckv_notify_cb cb);
Simple key-value cluster storage. Signed-off-by: Dmitry Bogdanov <d.bogdanov@yadro.com> --- drivers/target/dlm_ckv.c | 128 +++++++++++++++++++++++++++++++++++++++ drivers/target/dlm_ckv.h | 9 +++ 2 files changed, 137 insertions(+)