aboutsummaryrefslogtreecommitdiffstats
path: root/src/cynagora.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cynagora.c')
-rw-r--r--src/cynagora.c224
1 files changed, 152 insertions, 72 deletions
diff --git a/src/cynagora.c b/src/cynagora.c
index 2928779..aee6a4c 100644
--- a/src/cynagora.c
+++ b/src/cynagora.c
@@ -43,6 +43,7 @@
#include "cache.h"
#include "socket.h"
#include "expire.h"
+#include "idgen.h"
#define MIN_CACHE_SIZE 400
#define CACHESIZE(x) ((x) >= MIN_CACHE_SIZE ? (x) : (x) ? MIN_CACHE_SIZE : 0)
@@ -62,6 +63,9 @@ struct asreq
/** closure of the callback */
void *closure;
+
+ /** id of the request */
+ idgen_t id;
};
/** structure to handle agents */
@@ -91,6 +95,12 @@ struct cynagora
/** count of pending requests */
int pending;
+ /** synchronous lock */
+ bool synclock;
+
+ /** entered in critical section */
+ bool entered;
+
/** type of link */
cynagora_type_t type;
@@ -127,6 +137,9 @@ struct cynagora
/** the pending queries */
query_t *queries;
+ /** id generator */
+ idgen_t idgen;
+
/** spec of the socket */
char socketspec[];
};
@@ -436,17 +449,17 @@ status_check(
rc = 1;
else if (!strcmp(cynagora->reply.fields[0], _no_))
rc = 0;
- else if (!strcmp(cynagora->reply.fields[0], _done_))
+ else if (!strcmp(cynagora->reply.fields[0], _ack_))
rc = -EEXIST;
else
rc = -EPROTO;
- if (cynagora->reply.count < 2)
+ if (cynagora->reply.count < 3)
*expire = 0;
- else if (cynagora->reply.fields[1][0] == '-')
+ else if (cynagora->reply.fields[2][0] == '-')
*expire = -1;
else
- txt2exp(cynagora->reply.fields[1], expire, true);
+ txt2exp(cynagora->reply.fields[2], expire, true);
return rc;
}
@@ -570,7 +583,7 @@ connection(
if (rc >= 0) {
rc = -EPROTO;
if (cynagora->reply.count >= 2
- && 0 == strcmp(cynagora->reply.fields[0], _yes_)
+ && 0 == strcmp(cynagora->reply.fields[0], _done_)
&& 0 == strcmp(cynagora->reply.fields[1], "1")) {
cache_clear(cynagora->cache,
cynagora->reply.count > 2 ? (uint32_t)atol(cynagora->reply.fields[2]) : 0);
@@ -631,14 +644,14 @@ check_or_test(
int rc;
time_t expire;
- /* forbids 2 queries interleaved */
- if (cynagora->async.requests != NULL)
- return -EINPROGRESS;
+ if (cynagora->synclock)
+ return -EBUSY;
+ cynagora->synclock = true;
/* ensure opened */
rc = ensure_opened(cynagora);
if (rc < 0)
- return rc;
+ goto end;
/* ensure there is no clear cache pending */
flushr(cynagora);
@@ -647,11 +660,11 @@ check_or_test(
if (!force) {
rc = cache_search(cynagora->cache, key);
if (rc >= 0)
- return rc;
+ goto end;
}
/* send the request */
- rc = putxkv(cynagora, action, 0, key, 0);
+ rc = putxkv(cynagora, action, "{sync}", key, 0);
if (rc >= 0) {
/* get the response */
rc = wait_pending_reply(cynagora);
@@ -661,9 +674,37 @@ check_or_test(
cache_put(cynagora->cache, key, rc, expire, true);
}
}
+end:
+ cynagora->synclock = false;
return rc;
}
+/**
+ * get the pending asynchrounous request
+ *
+ * @param cynagora the cynagora client
+ * @param id id of the request to find
+ * @param unlink if true, remove the request from the
+ * list of requests
+ * @return the found request of NULL
+ */
+static
+asreq_t *
+search_async_request(
+ cynagora_t *cynagora,
+ const char *id,
+ bool unlink
+) {
+ asreq_t *ar, **par;
+
+ par = &cynagora->async.requests;
+ while((ar = *par) && strcmp(ar->id, id))
+ par = &ar->next;
+ if (ar && unlink)
+ *par = ar->next;
+ return ar;
+}
+
/******************************************************************************/
/*** PUBLIC COMMON METHODS ***/
/******************************************************************************/
@@ -712,11 +753,14 @@ cynagora_create(
/* record type and weakly create cache */
cache_create(&cynagora->cache, CACHESIZE(cache_size)); /* ignore errors */
+ cynagora->entered = false;
+ cynagora->synclock = false;
cynagora->type = type;
cynagora->async.controlcb = NULL;
cynagora->async.closure = 0;
cynagora->async.requests = NULL;
cynagora->agents = NULL;
+ idgen_init(cynagora->idgen);
/* lazy connection */
cynagora->fd = -1;
@@ -785,6 +829,7 @@ cynagora_async_process(
) {
int rc;
const char *first;
+ const char *id;
asreq_t *ar;
time_t expire;
cynagora_key_t key;
@@ -805,13 +850,15 @@ cynagora_async_process(
|| !strcmp(first, _error_))
continue;
+ /* search the request */
+ id = cynagora->reply.count < 2 ? "" : cynagora->reply.fields[1];
+ ar = search_async_request(cynagora, id, true);
+
/* ignore unexpected answers */
- ar = cynagora->async.requests;
if (ar == NULL)
continue;
/* emit the asynchronous answer */
- cynagora->async.requests = ar->next;
rc = status_check(cynagora, &expire);
if (rc >= 0) {
key.client = (const char*)(ar + 1);
@@ -882,7 +929,7 @@ cynagora_async_check(
void *closure
) {
int rc;
- asreq_t **pr, *ar;
+ asreq_t *ar;
/* ensure connection */
rc = ensure_opened(cynagora);
@@ -907,13 +954,16 @@ cynagora_async_check(
return -ENOMEM;
/* init */
- ar->next = NULL;
+ do {
+ idgen_next(cynagora->idgen);
+ } while (search_async_request(cynagora, cynagora->idgen, false));
+ strcpy(ar->id, cynagora->idgen);
ar->callback = callback;
ar->closure = closure;
stpcpy(1 + stpcpy(1 + stpcpy(1 + stpcpy((char*)(ar + 1), key->client), key->session), key->user), key->permission);
/* send the request */
- rc = putxkv(cynagora, simple ? _test_ : _check_, 0, key, 0);
+ rc = putxkv(cynagora, simple ? _test_ : _check_, ar->id, key, 0);
if (rc >= 0)
rc = flushw(cynagora);
if (rc < 0) {
@@ -922,10 +972,8 @@ cynagora_async_check(
}
/* record the request */
- pr = &cynagora->async.requests;
- while(*pr != NULL)
- pr = &(*pr)->next;
- *pr = ar;
+ ar->next = cynagora->async.requests;
+ cynagora->async.requests = ar;
return 0;
}
@@ -947,30 +995,32 @@ cynagora_get(
if (cynagora->type != cynagora_Admin)
return -EPERM;
- if (cynagora->async.requests != NULL)
- return -EINPROGRESS;
- rc = ensure_opened(cynagora);
- if (rc < 0)
- return rc;
+ if (cynagora->synclock)
+ return -EBUSY;
- rc = putxkv(cynagora, _get_, 0, key, 0);
+ cynagora->synclock = true;
+ rc = ensure_opened(cynagora);
if (rc >= 0) {
- rc = wait_reply(cynagora, true);
- while ((rc == 6 || rc == 7) && !strcmp(cynagora->reply.fields[0], _item_)) {
- k.client = cynagora->reply.fields[1];
- k.session = cynagora->reply.fields[2];
- k.user = cynagora->reply.fields[3];
- k.permission = cynagora->reply.fields[4];
- v.value = cynagora->reply.fields[5];
- if (rc == 6)
- v.expire = 0;
- else if (!txt2exp(cynagora->reply.fields[6], &v.expire, true))
- v.expire = -1;
- callback(closure, &k, &v);
+ rc = putxkv(cynagora, _get_, 0, key, 0);
+ if (rc >= 0) {
rc = wait_reply(cynagora, true);
+ while ((rc == 6 || rc == 7) && !strcmp(cynagora->reply.fields[0], _item_)) {
+ k.client = cynagora->reply.fields[1];
+ k.session = cynagora->reply.fields[2];
+ k.user = cynagora->reply.fields[3];
+ k.permission = cynagora->reply.fields[4];
+ v.value = cynagora->reply.fields[5];
+ if (rc == 6)
+ v.expire = 0;
+ else if (!txt2exp(cynagora->reply.fields[6], &v.expire, true))
+ v.expire = -1;
+ callback(closure, &k, &v);
+ rc = wait_reply(cynagora, true);
+ }
+ rc = status_done(cynagora);
}
- rc = status_done(cynagora);
}
+ cynagora->synclock = false;
return rc;
}
@@ -985,16 +1035,20 @@ cynagora_log(
if (cynagora->type != cynagora_Admin)
return -EPERM;
- if (cynagora->async.requests != NULL)
- return -EINPROGRESS;
+ if (cynagora->synclock)
+ return -EBUSY;
+ cynagora->synclock = true;
rc = ensure_opened(cynagora);
- if (rc < 0)
- return rc;
-
- rc = putxkv(cynagora, _log_, off ? _off_ : on ? _on_ : 0, 0, 0);
- if (rc >= 0)
- rc = wait_done(cynagora);
+ if (rc >= 0) {
+ rc = putxkv(cynagora, _log_, off ? _off_ : on ? _on_ : 0, 0, 0);
+ if (rc >= 0) {
+ rc = wait_done(cynagora);
+ if (rc > 0)
+ rc = cynagora->reply.count >= 2 && !strcmp(cynagora->reply.fields[1], _on_);
+ }
+ }
+ cynagora->synclock = false;
return rc < 0 ? rc : cynagora->reply.count < 2 ? 0 : !strcmp(cynagora->reply.fields[1], _on_);
}
@@ -1008,15 +1062,23 @@ cynagora_enter(
if (cynagora->type != cynagora_Admin)
return -EPERM;
- if (cynagora->async.requests != NULL)
- return -EINPROGRESS;
+ if (cynagora->entered)
+ return -ECANCELED;
+ if (cynagora->synclock)
+ return -EBUSY;
+
+ cynagora->synclock = true;
rc = ensure_opened(cynagora);
- if (rc < 0)
- return rc;
+ if (rc >= 0) {
+ rc = putxkv(cynagora, _enter_, 0, 0, 0);
+ if (rc >= 0) {
+ rc = wait_done(cynagora);
+ if (rc >= 0)
+ cynagora->entered = true;
+ }
+ }
+ cynagora->synclock = false;
- rc = putxkv(cynagora, _enter_, 0, 0, 0);
- if (rc >= 0)
- rc = wait_done(cynagora);
return rc;
}
@@ -1030,15 +1092,23 @@ cynagora_leave(
if (cynagora->type != cynagora_Admin)
return -EPERM;
- if (cynagora->async.requests != NULL)
+ if (!cynagora->entered)
return -ECANCELED;
+ if (cynagora->synclock)
+ return -EBUSY;
+
+ cynagora->synclock = true;
rc = ensure_opened(cynagora);
- if (rc < 0)
- return rc;
+ if (rc >= 0) {
+ rc = putxkv(cynagora, _leave_, commit ? _commit_ : 0/*default: rollback*/, 0, 0);
+ if (rc >= 0) {
+ rc = wait_done(cynagora);
+ if (rc >= 0)
+ cynagora->entered = false;
+ }
+ }
+ cynagora->synclock = false;
- rc = putxkv(cynagora, _leave_, commit ? _commit_ : 0/*default: rollback*/, 0, 0);
- if (rc >= 0)
- rc = wait_done(cynagora);
return rc;
}
@@ -1053,15 +1123,20 @@ cynagora_set(
if (cynagora->type != cynagora_Admin)
return -EPERM;
- if (cynagora->async.requests != NULL)
+ if (!cynagora->entered)
return -ECANCELED;
+ if (cynagora->synclock)
+ return -EBUSY;
+
+ cynagora->synclock = true;
rc = ensure_opened(cynagora);
- if (rc < 0)
- return rc;
+ if (rc >= 0) {
+ rc = putxkv(cynagora, _set_, 0, key, value);
+ if (rc >= 0)
+ rc = wait_done(cynagora);
+ }
+ cynagora->synclock = false;
- rc = putxkv(cynagora, _set_, 0, key, value);
- if (rc >= 0)
- rc = wait_done(cynagora);
return rc;
}
@@ -1075,15 +1150,20 @@ cynagora_drop(
if (cynagora->type != cynagora_Admin)
return -EPERM;
- if (cynagora->async.requests != NULL)
+ if (!cynagora->entered)
return -ECANCELED;
+
+ if (cynagora->synclock)
+ return -EBUSY;
+ cynagora->synclock = true;
rc = ensure_opened(cynagora);
- if (rc < 0)
- return rc;
+ if (rc >= 0) {
+ rc = putxkv(cynagora, _drop_, 0, key, 0);
+ if (rc >= 0)
+ rc = wait_done(cynagora);
+ }
+ cynagora->synclock = false;
- rc = putxkv(cynagora, _drop_, 0, key, 0);
- if (rc >= 0)
- rc = wait_done(cynagora);
return rc;
}