diff options
-rw-r--r-- | src/cynagora.c | 186 | ||||
-rw-r--r-- | src/cynagora.h | 2 |
2 files changed, 126 insertions, 62 deletions
diff --git a/src/cynagora.c b/src/cynagora.c index aee6a4c..466592d 100644 --- a/src/cynagora.c +++ b/src/cynagora.c @@ -49,20 +49,34 @@ #define CACHESIZE(x) ((x) >= MIN_CACHE_SIZE ? (x) : (x) ? MIN_CACHE_SIZE : 0) typedef struct asreq asreq_t; +typedef struct ascb ascb_t; typedef struct agent agent_t; typedef struct query query_t; -/** recording of asynchronous requests */ -struct asreq +/** recording of asynchronous request callbacks */ +struct ascb { - /** link to the next pending request */ - asreq_t *next; + /** link to the next pending callback */ + ascb_t *next; /** callback function */ cynagora_async_check_cb_t *callback; /** closure of the callback */ void *closure; +}; + +/** recording of asynchronous requests */ +struct asreq +{ + /** link to the next pending request */ + asreq_t *next; + + /** callbacks */ + ascb_t *callbacks; + + /** key of the request */ + cynagora_key_t key; /** id of the request */ idgen_t id; @@ -134,7 +148,7 @@ struct cynagora /** the declared agents */ agent_t *agents; - /** the pending queries */ + /** the pending agent queries */ query_t *queries; /** id generator */ @@ -161,6 +175,7 @@ struct query }; static void agent_ask(cynagora_t *cynagora, int count, const char **fields); +static int async_reply_process(cynagora_t *cynagora, int count); /** * Flush the write buffer of the client @@ -334,22 +349,30 @@ get_reply( cynagora_t *cynagora ) { int rc; + const char *first; uint32_t cacheid; prot_next(cynagora->prot); rc = prot_get(cynagora->prot, &cynagora->reply.fields); if (rc > 0) { - if (0 == strcmp(cynagora->reply.fields[0], _clear_)) { + first = cynagora->reply.fields[0]; + if (0 == strcmp(first, _clear_)) { + /* clearing the cache */ cacheid = rc > 1 ? (uint32_t)atol(cynagora->reply.fields[1]) : 0; cache_clear(cynagora->cache, cacheid); rc = 0; - } else if (0 == strcmp(cynagora->reply.fields[0], _ask_)) { + } else if (0 == strcmp(first, _ask_)) { /* on asking agent */ agent_ask(cynagora, rc - 1, &cynagora->reply.fields[1]); rc = 0; } else { - if (0 != strcmp(cynagora->reply.fields[0], _item_)) + if (0 != strcmp(cynagora->reply.fields[0], _item_)) { cynagora->pending--; + if (strcmp(first, _done_) && strcmp(first, _error_)) { + if (async_reply_process(cynagora, rc)) + rc = 0; + } + } } } cynagora->reply.count = rc; @@ -441,6 +464,7 @@ static int status_check( cynagora_t *cynagora, + int count, time_t *expire ) { int rc; @@ -454,7 +478,7 @@ status_check( else rc = -EPROTO; - if (cynagora->reply.count < 3) + if (count < 3) *expire = 0; else if (cynagora->reply.fields[2][0] == '-') *expire = -1; @@ -653,11 +677,11 @@ check_or_test( if (rc < 0) goto end; - /* ensure there is no clear cache pending */ - flushr(cynagora); - /* check cache item */ if (!force) { + /* ensure there is no clear cache pending */ + flushr(cynagora); + rc = cache_search(cynagora->cache, key); if (rc >= 0) goto end; @@ -669,7 +693,7 @@ check_or_test( /* get the response */ rc = wait_pending_reply(cynagora); if (rc >= 0) { - rc = status_check(cynagora, &expire); + rc = status_check(cynagora, rc, &expire); if (rc >= 0 && action == _check_) cache_put(cynagora->cache, key, rc, expire, true); } @@ -705,6 +729,37 @@ search_async_request( return ar; } +static +int +async_reply_process( + cynagora_t *cynagora, + int count +) { + int status; + const char *id; + asreq_t *ar; + ascb_t *ac; + time_t expire; + + id = count < 2 ? "" : cynagora->reply.fields[1]; + ar = search_async_request(cynagora, id, true); + + if (!ar) + return 0; + + /* emit the asynchronous answer */ + status = status_check(cynagora, count, &expire); + if (status >= 0) + cache_put(cynagora->cache, &ar->key, status, expire, true); + while((ac = ar->callbacks) != NULL) { + ar->callbacks = ac->next; + ac->callback(ac->closure, status); + free(ac); + } + free(ar); + return 1; +} + /******************************************************************************/ /*** PUBLIC COMMON METHODS ***/ /******************************************************************************/ @@ -803,11 +858,16 @@ cynagora_async_setup( void *closure ) { asreq_t *ar; + ascb_t *ac; /* cancel pending requests */ while((ar = cynagora->async.requests) != NULL) { cynagora->async.requests = ar->next; - ar->callback(ar->closure, -ECANCELED); + while((ac = ar->callbacks) != NULL) { + ar->callbacks = ac->next; + ac->callback(ac->closure, -ECANCELED); + free(ac); + } free(ar); } @@ -828,47 +888,12 @@ cynagora_async_process( cynagora_t *cynagora ) { int rc; - const char *first; - const char *id; - asreq_t *ar; - time_t expire; - cynagora_key_t key; for (;;) { /* non blocking wait for a reply */ rc = wait_reply(cynagora, false); if (rc < 0) return rc == -EAGAIN ? 0 : rc; - - /* skip empty replies */ - if (rc == 0) - continue; - - /* skip done/error replies */ - first = cynagora->reply.fields[0]; - if (!strcmp(first, _done_) - || !strcmp(first, _error_)) - continue; - - /* search the request */ - id = cynagora->reply.count < 2 ? "" : cynagora->reply.fields[1]; - ar = search_async_request(cynagora, id, true); - - /* ignore unexpected answers */ - if (ar == NULL) - continue; - - /* emit the asynchronous answer */ - rc = status_check(cynagora, &expire); - if (rc >= 0) { - key.client = (const char*)(ar + 1); - key.session = &key.client[1 + strlen(key.client)]; - key.user = &key.session[1 + strlen(key.session)]; - key.permission = &key.user[1 + strlen(key.user)]; - cache_put(cynagora->cache, &key, rc, expire, true); - } - ar->callback(ar->closure, rc); - free(ar); } } @@ -895,6 +920,8 @@ cynagora_cache_check( cynagora_t *cynagora, const cynagora_key_t *key ) { + /* ensure there is no clear cache pending */ + flushr(cynagora); return cache_search(cynagora->cache, key); } @@ -930,17 +957,19 @@ cynagora_async_check( ) { int rc; asreq_t *ar; + ascb_t *ac; + char *p; /* ensure connection */ rc = ensure_opened(cynagora); if (rc < 0) return rc; - /* ensure there is no clear cache pending */ - flushr(cynagora); - /* check cache item */ if (!force) { + /* ensure there is no clear cache pending */ + flushr(cynagora); + rc = cache_search(cynagora->cache, key); if (rc >= 0) { callback(closure, rc); @@ -948,32 +977,67 @@ cynagora_async_check( } } - /* allocate */ + /* allocates the callback */ + ac = malloc(sizeof *ac); + if (ac == NULL) + return -ENOMEM; + ac->callback = callback; + ac->closure = closure; + + /* search the request */ + ar = cynagora->async.requests; + while (ar && (strcmp(key->client, ar->key.client) + || strcmp(key->session, ar->key.session) + || strcmp(key->user, ar->key.user) + || strcmp(key->permission, ar->key.permission))) + ar = ar->next; + + /* a same request is pending, use it */ + if (ar) { + ac->next = ar->callbacks; + ar->callbacks = ac; + return 0; + } + + /* allocate for the request */ ar = malloc(sizeof *ar + strlen(key->client) + strlen(key->session) + strlen(key->user) + strlen(key->permission) + 4); - if (ar == NULL) + if (ar == NULL) { + free(ac); return -ENOMEM; + } /* init */ + ac->next = NULL; + ar->callbacks = ac; + p = (char*)(ar + 1); + ar->key.client = p; + p = stpcpy(p, key->client) + 1; + ar->key.session = p; + p = stpcpy(p, key->session) + 1; + ar->key.user = p; + p = stpcpy(p, key->user) + 1; + ar->key.permission = p; + stpcpy(p, key->permission); do { idgen_next(cynagora->idgen); } while (search_async_request(cynagora, cynagora->idgen, false)); strcpy(ar->id, cynagora->idgen); - ar->callback = callback; - ar->closure = closure; - stpcpy(1 + stpcpy(1 + stpcpy(1 + stpcpy((char*)(ar + 1), key->client), key->session), key->user), key->permission); + ar->next = cynagora->async.requests; + cynagora->async.requests = ar; /* send the request */ rc = putxkv(cynagora, simple ? _test_ : _check_, ar->id, key, 0); - if (rc >= 0) - rc = flushw(cynagora); if (rc < 0) { + ar = search_async_request(cynagora, ar->id, true); + while((ac = ar->callbacks)) { + ar->callbacks = ac->next; + free(ac); + } free(ar); return rc; } /* record the request */ - ar->next = cynagora->async.requests; - cynagora->async.requests = ar; return 0; } diff --git a/src/cynagora.h b/src/cynagora.h index 363e725..167b9bc 100644 --- a/src/cynagora.h +++ b/src/cynagora.h @@ -257,7 +257,7 @@ cynagora_test( ); /** - * Check the key asynchronousely (async) + * Check the key asynchronously (async) * * @param cynagora the handler of the client * @param key the key to query |