diff options
Diffstat (limited to 'src/cynagora.c')
-rw-r--r-- | src/cynagora.c | 224 |
1 files changed, 152 insertions, 72 deletions
diff --git a/src/cynagora.c b/src/cynagora.c index 2928779..aee6a4c 100644 --- a/src/cynagora.c +++ b/src/cynagora.c @@ -43,6 +43,7 @@ #include "cache.h" #include "socket.h" #include "expire.h" +#include "idgen.h" #define MIN_CACHE_SIZE 400 #define CACHESIZE(x) ((x) >= MIN_CACHE_SIZE ? (x) : (x) ? MIN_CACHE_SIZE : 0) @@ -62,6 +63,9 @@ struct asreq /** closure of the callback */ void *closure; + + /** id of the request */ + idgen_t id; }; /** structure to handle agents */ @@ -91,6 +95,12 @@ struct cynagora /** count of pending requests */ int pending; + /** synchronous lock */ + bool synclock; + + /** entered in critical section */ + bool entered; + /** type of link */ cynagora_type_t type; @@ -127,6 +137,9 @@ struct cynagora /** the pending queries */ query_t *queries; + /** id generator */ + idgen_t idgen; + /** spec of the socket */ char socketspec[]; }; @@ -436,17 +449,17 @@ status_check( rc = 1; else if (!strcmp(cynagora->reply.fields[0], _no_)) rc = 0; - else if (!strcmp(cynagora->reply.fields[0], _done_)) + else if (!strcmp(cynagora->reply.fields[0], _ack_)) rc = -EEXIST; else rc = -EPROTO; - if (cynagora->reply.count < 2) + if (cynagora->reply.count < 3) *expire = 0; - else if (cynagora->reply.fields[1][0] == '-') + else if (cynagora->reply.fields[2][0] == '-') *expire = -1; else - txt2exp(cynagora->reply.fields[1], expire, true); + txt2exp(cynagora->reply.fields[2], expire, true); return rc; } @@ -570,7 +583,7 @@ connection( if (rc >= 0) { rc = -EPROTO; if (cynagora->reply.count >= 2 - && 0 == strcmp(cynagora->reply.fields[0], _yes_) + && 0 == strcmp(cynagora->reply.fields[0], _done_) && 0 == strcmp(cynagora->reply.fields[1], "1")) { cache_clear(cynagora->cache, cynagora->reply.count > 2 ? (uint32_t)atol(cynagora->reply.fields[2]) : 0); @@ -631,14 +644,14 @@ check_or_test( int rc; time_t expire; - /* forbids 2 queries interleaved */ - if (cynagora->async.requests != NULL) - return -EINPROGRESS; + if (cynagora->synclock) + return -EBUSY; + cynagora->synclock = true; /* ensure opened */ rc = ensure_opened(cynagora); if (rc < 0) - return rc; + goto end; /* ensure there is no clear cache pending */ flushr(cynagora); @@ -647,11 +660,11 @@ check_or_test( if (!force) { rc = cache_search(cynagora->cache, key); if (rc >= 0) - return rc; + goto end; } /* send the request */ - rc = putxkv(cynagora, action, 0, key, 0); + rc = putxkv(cynagora, action, "{sync}", key, 0); if (rc >= 0) { /* get the response */ rc = wait_pending_reply(cynagora); @@ -661,9 +674,37 @@ check_or_test( cache_put(cynagora->cache, key, rc, expire, true); } } +end: + cynagora->synclock = false; return rc; } +/** + * get the pending asynchrounous request + * + * @param cynagora the cynagora client + * @param id id of the request to find + * @param unlink if true, remove the request from the + * list of requests + * @return the found request of NULL + */ +static +asreq_t * +search_async_request( + cynagora_t *cynagora, + const char *id, + bool unlink +) { + asreq_t *ar, **par; + + par = &cynagora->async.requests; + while((ar = *par) && strcmp(ar->id, id)) + par = &ar->next; + if (ar && unlink) + *par = ar->next; + return ar; +} + /******************************************************************************/ /*** PUBLIC COMMON METHODS ***/ /******************************************************************************/ @@ -712,11 +753,14 @@ cynagora_create( /* record type and weakly create cache */ cache_create(&cynagora->cache, CACHESIZE(cache_size)); /* ignore errors */ + cynagora->entered = false; + cynagora->synclock = false; cynagora->type = type; cynagora->async.controlcb = NULL; cynagora->async.closure = 0; cynagora->async.requests = NULL; cynagora->agents = NULL; + idgen_init(cynagora->idgen); /* lazy connection */ cynagora->fd = -1; @@ -785,6 +829,7 @@ cynagora_async_process( ) { int rc; const char *first; + const char *id; asreq_t *ar; time_t expire; cynagora_key_t key; @@ -805,13 +850,15 @@ cynagora_async_process( || !strcmp(first, _error_)) continue; + /* search the request */ + id = cynagora->reply.count < 2 ? "" : cynagora->reply.fields[1]; + ar = search_async_request(cynagora, id, true); + /* ignore unexpected answers */ - ar = cynagora->async.requests; if (ar == NULL) continue; /* emit the asynchronous answer */ - cynagora->async.requests = ar->next; rc = status_check(cynagora, &expire); if (rc >= 0) { key.client = (const char*)(ar + 1); @@ -882,7 +929,7 @@ cynagora_async_check( void *closure ) { int rc; - asreq_t **pr, *ar; + asreq_t *ar; /* ensure connection */ rc = ensure_opened(cynagora); @@ -907,13 +954,16 @@ cynagora_async_check( return -ENOMEM; /* init */ - ar->next = NULL; + do { + idgen_next(cynagora->idgen); + } while (search_async_request(cynagora, cynagora->idgen, false)); + strcpy(ar->id, cynagora->idgen); ar->callback = callback; ar->closure = closure; stpcpy(1 + stpcpy(1 + stpcpy(1 + stpcpy((char*)(ar + 1), key->client), key->session), key->user), key->permission); /* send the request */ - rc = putxkv(cynagora, simple ? _test_ : _check_, 0, key, 0); + rc = putxkv(cynagora, simple ? _test_ : _check_, ar->id, key, 0); if (rc >= 0) rc = flushw(cynagora); if (rc < 0) { @@ -922,10 +972,8 @@ cynagora_async_check( } /* record the request */ - pr = &cynagora->async.requests; - while(*pr != NULL) - pr = &(*pr)->next; - *pr = ar; + ar->next = cynagora->async.requests; + cynagora->async.requests = ar; return 0; } @@ -947,30 +995,32 @@ cynagora_get( if (cynagora->type != cynagora_Admin) return -EPERM; - if (cynagora->async.requests != NULL) - return -EINPROGRESS; - rc = ensure_opened(cynagora); - if (rc < 0) - return rc; + if (cynagora->synclock) + return -EBUSY; - rc = putxkv(cynagora, _get_, 0, key, 0); + cynagora->synclock = true; + rc = ensure_opened(cynagora); if (rc >= 0) { - rc = wait_reply(cynagora, true); - while ((rc == 6 || rc == 7) && !strcmp(cynagora->reply.fields[0], _item_)) { - k.client = cynagora->reply.fields[1]; - k.session = cynagora->reply.fields[2]; - k.user = cynagora->reply.fields[3]; - k.permission = cynagora->reply.fields[4]; - v.value = cynagora->reply.fields[5]; - if (rc == 6) - v.expire = 0; - else if (!txt2exp(cynagora->reply.fields[6], &v.expire, true)) - v.expire = -1; - callback(closure, &k, &v); + rc = putxkv(cynagora, _get_, 0, key, 0); + if (rc >= 0) { rc = wait_reply(cynagora, true); + while ((rc == 6 || rc == 7) && !strcmp(cynagora->reply.fields[0], _item_)) { + k.client = cynagora->reply.fields[1]; + k.session = cynagora->reply.fields[2]; + k.user = cynagora->reply.fields[3]; + k.permission = cynagora->reply.fields[4]; + v.value = cynagora->reply.fields[5]; + if (rc == 6) + v.expire = 0; + else if (!txt2exp(cynagora->reply.fields[6], &v.expire, true)) + v.expire = -1; + callback(closure, &k, &v); + rc = wait_reply(cynagora, true); + } + rc = status_done(cynagora); } - rc = status_done(cynagora); } + cynagora->synclock = false; return rc; } @@ -985,16 +1035,20 @@ cynagora_log( if (cynagora->type != cynagora_Admin) return -EPERM; - if (cynagora->async.requests != NULL) - return -EINPROGRESS; + if (cynagora->synclock) + return -EBUSY; + cynagora->synclock = true; rc = ensure_opened(cynagora); - if (rc < 0) - return rc; - - rc = putxkv(cynagora, _log_, off ? _off_ : on ? _on_ : 0, 0, 0); - if (rc >= 0) - rc = wait_done(cynagora); + if (rc >= 0) { + rc = putxkv(cynagora, _log_, off ? _off_ : on ? _on_ : 0, 0, 0); + if (rc >= 0) { + rc = wait_done(cynagora); + if (rc > 0) + rc = cynagora->reply.count >= 2 && !strcmp(cynagora->reply.fields[1], _on_); + } + } + cynagora->synclock = false; return rc < 0 ? rc : cynagora->reply.count < 2 ? 0 : !strcmp(cynagora->reply.fields[1], _on_); } @@ -1008,15 +1062,23 @@ cynagora_enter( if (cynagora->type != cynagora_Admin) return -EPERM; - if (cynagora->async.requests != NULL) - return -EINPROGRESS; + if (cynagora->entered) + return -ECANCELED; + if (cynagora->synclock) + return -EBUSY; + + cynagora->synclock = true; rc = ensure_opened(cynagora); - if (rc < 0) - return rc; + if (rc >= 0) { + rc = putxkv(cynagora, _enter_, 0, 0, 0); + if (rc >= 0) { + rc = wait_done(cynagora); + if (rc >= 0) + cynagora->entered = true; + } + } + cynagora->synclock = false; - rc = putxkv(cynagora, _enter_, 0, 0, 0); - if (rc >= 0) - rc = wait_done(cynagora); return rc; } @@ -1030,15 +1092,23 @@ cynagora_leave( if (cynagora->type != cynagora_Admin) return -EPERM; - if (cynagora->async.requests != NULL) + if (!cynagora->entered) return -ECANCELED; + if (cynagora->synclock) + return -EBUSY; + + cynagora->synclock = true; rc = ensure_opened(cynagora); - if (rc < 0) - return rc; + if (rc >= 0) { + rc = putxkv(cynagora, _leave_, commit ? _commit_ : 0/*default: rollback*/, 0, 0); + if (rc >= 0) { + rc = wait_done(cynagora); + if (rc >= 0) + cynagora->entered = false; + } + } + cynagora->synclock = false; - rc = putxkv(cynagora, _leave_, commit ? _commit_ : 0/*default: rollback*/, 0, 0); - if (rc >= 0) - rc = wait_done(cynagora); return rc; } @@ -1053,15 +1123,20 @@ cynagora_set( if (cynagora->type != cynagora_Admin) return -EPERM; - if (cynagora->async.requests != NULL) + if (!cynagora->entered) return -ECANCELED; + if (cynagora->synclock) + return -EBUSY; + + cynagora->synclock = true; rc = ensure_opened(cynagora); - if (rc < 0) - return rc; + if (rc >= 0) { + rc = putxkv(cynagora, _set_, 0, key, value); + if (rc >= 0) + rc = wait_done(cynagora); + } + cynagora->synclock = false; - rc = putxkv(cynagora, _set_, 0, key, value); - if (rc >= 0) - rc = wait_done(cynagora); return rc; } @@ -1075,15 +1150,20 @@ cynagora_drop( if (cynagora->type != cynagora_Admin) return -EPERM; - if (cynagora->async.requests != NULL) + if (!cynagora->entered) return -ECANCELED; + + if (cynagora->synclock) + return -EBUSY; + cynagora->synclock = true; rc = ensure_opened(cynagora); - if (rc < 0) - return rc; + if (rc >= 0) { + rc = putxkv(cynagora, _drop_, 0, key, 0); + if (rc >= 0) + rc = wait_done(cynagora); + } + cynagora->synclock = false; - rc = putxkv(cynagora, _drop_, 0, key, 0); - if (rc >= 0) - rc = wait_done(cynagora); return rc; } |