summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/cynagora.c186
-rw-r--r--src/cynagora.h2
2 files changed, 126 insertions, 62 deletions
diff --git a/src/cynagora.c b/src/cynagora.c
index aee6a4c..466592d 100644
--- a/src/cynagora.c
+++ b/src/cynagora.c
@@ -49,20 +49,34 @@
#define CACHESIZE(x) ((x) >= MIN_CACHE_SIZE ? (x) : (x) ? MIN_CACHE_SIZE : 0)
typedef struct asreq asreq_t;
+typedef struct ascb ascb_t;
typedef struct agent agent_t;
typedef struct query query_t;
-/** recording of asynchronous requests */
-struct asreq
+/** recording of asynchronous request callbacks */
+struct ascb
{
- /** link to the next pending request */
- asreq_t *next;
+ /** link to the next pending callback */
+ ascb_t *next;
/** callback function */
cynagora_async_check_cb_t *callback;
/** closure of the callback */
void *closure;
+};
+
+/** recording of asynchronous requests */
+struct asreq
+{
+ /** link to the next pending request */
+ asreq_t *next;
+
+ /** callbacks */
+ ascb_t *callbacks;
+
+ /** key of the request */
+ cynagora_key_t key;
/** id of the request */
idgen_t id;
@@ -134,7 +148,7 @@ struct cynagora
/** the declared agents */
agent_t *agents;
- /** the pending queries */
+ /** the pending agent queries */
query_t *queries;
/** id generator */
@@ -161,6 +175,7 @@ struct query
};
static void agent_ask(cynagora_t *cynagora, int count, const char **fields);
+static int async_reply_process(cynagora_t *cynagora, int count);
/**
* Flush the write buffer of the client
@@ -334,22 +349,30 @@ get_reply(
cynagora_t *cynagora
) {
int rc;
+ const char *first;
uint32_t cacheid;
prot_next(cynagora->prot);
rc = prot_get(cynagora->prot, &cynagora->reply.fields);
if (rc > 0) {
- if (0 == strcmp(cynagora->reply.fields[0], _clear_)) {
+ first = cynagora->reply.fields[0];
+ if (0 == strcmp(first, _clear_)) {
+ /* clearing the cache */
cacheid = rc > 1 ? (uint32_t)atol(cynagora->reply.fields[1]) : 0;
cache_clear(cynagora->cache, cacheid);
rc = 0;
- } else if (0 == strcmp(cynagora->reply.fields[0], _ask_)) {
+ } else if (0 == strcmp(first, _ask_)) {
/* on asking agent */
agent_ask(cynagora, rc - 1, &cynagora->reply.fields[1]);
rc = 0;
} else {
- if (0 != strcmp(cynagora->reply.fields[0], _item_))
+ if (0 != strcmp(cynagora->reply.fields[0], _item_)) {
cynagora->pending--;
+ if (strcmp(first, _done_) && strcmp(first, _error_)) {
+ if (async_reply_process(cynagora, rc))
+ rc = 0;
+ }
+ }
}
}
cynagora->reply.count = rc;
@@ -441,6 +464,7 @@ static
int
status_check(
cynagora_t *cynagora,
+ int count,
time_t *expire
) {
int rc;
@@ -454,7 +478,7 @@ status_check(
else
rc = -EPROTO;
- if (cynagora->reply.count < 3)
+ if (count < 3)
*expire = 0;
else if (cynagora->reply.fields[2][0] == '-')
*expire = -1;
@@ -653,11 +677,11 @@ check_or_test(
if (rc < 0)
goto end;
- /* ensure there is no clear cache pending */
- flushr(cynagora);
-
/* check cache item */
if (!force) {
+ /* ensure there is no clear cache pending */
+ flushr(cynagora);
+
rc = cache_search(cynagora->cache, key);
if (rc >= 0)
goto end;
@@ -669,7 +693,7 @@ check_or_test(
/* get the response */
rc = wait_pending_reply(cynagora);
if (rc >= 0) {
- rc = status_check(cynagora, &expire);
+ rc = status_check(cynagora, rc, &expire);
if (rc >= 0 && action == _check_)
cache_put(cynagora->cache, key, rc, expire, true);
}
@@ -705,6 +729,37 @@ search_async_request(
return ar;
}
+static
+int
+async_reply_process(
+ cynagora_t *cynagora,
+ int count
+) {
+ int status;
+ const char *id;
+ asreq_t *ar;
+ ascb_t *ac;
+ time_t expire;
+
+ id = count < 2 ? "" : cynagora->reply.fields[1];
+ ar = search_async_request(cynagora, id, true);
+
+ if (!ar)
+ return 0;
+
+ /* emit the asynchronous answer */
+ status = status_check(cynagora, count, &expire);
+ if (status >= 0)
+ cache_put(cynagora->cache, &ar->key, status, expire, true);
+ while((ac = ar->callbacks) != NULL) {
+ ar->callbacks = ac->next;
+ ac->callback(ac->closure, status);
+ free(ac);
+ }
+ free(ar);
+ return 1;
+}
+
/******************************************************************************/
/*** PUBLIC COMMON METHODS ***/
/******************************************************************************/
@@ -803,11 +858,16 @@ cynagora_async_setup(
void *closure
) {
asreq_t *ar;
+ ascb_t *ac;
/* cancel pending requests */
while((ar = cynagora->async.requests) != NULL) {
cynagora->async.requests = ar->next;
- ar->callback(ar->closure, -ECANCELED);
+ while((ac = ar->callbacks) != NULL) {
+ ar->callbacks = ac->next;
+ ac->callback(ac->closure, -ECANCELED);
+ free(ac);
+ }
free(ar);
}
@@ -828,47 +888,12 @@ cynagora_async_process(
cynagora_t *cynagora
) {
int rc;
- const char *first;
- const char *id;
- asreq_t *ar;
- time_t expire;
- cynagora_key_t key;
for (;;) {
/* non blocking wait for a reply */
rc = wait_reply(cynagora, false);
if (rc < 0)
return rc == -EAGAIN ? 0 : rc;
-
- /* skip empty replies */
- if (rc == 0)
- continue;
-
- /* skip done/error replies */
- first = cynagora->reply.fields[0];
- if (!strcmp(first, _done_)
- || !strcmp(first, _error_))
- continue;
-
- /* search the request */
- id = cynagora->reply.count < 2 ? "" : cynagora->reply.fields[1];
- ar = search_async_request(cynagora, id, true);
-
- /* ignore unexpected answers */
- if (ar == NULL)
- continue;
-
- /* emit the asynchronous answer */
- rc = status_check(cynagora, &expire);
- if (rc >= 0) {
- key.client = (const char*)(ar + 1);
- key.session = &key.client[1 + strlen(key.client)];
- key.user = &key.session[1 + strlen(key.session)];
- key.permission = &key.user[1 + strlen(key.user)];
- cache_put(cynagora->cache, &key, rc, expire, true);
- }
- ar->callback(ar->closure, rc);
- free(ar);
}
}
@@ -895,6 +920,8 @@ cynagora_cache_check(
cynagora_t *cynagora,
const cynagora_key_t *key
) {
+ /* ensure there is no clear cache pending */
+ flushr(cynagora);
return cache_search(cynagora->cache, key);
}
@@ -930,17 +957,19 @@ cynagora_async_check(
) {
int rc;
asreq_t *ar;
+ ascb_t *ac;
+ char *p;
/* ensure connection */
rc = ensure_opened(cynagora);
if (rc < 0)
return rc;
- /* ensure there is no clear cache pending */
- flushr(cynagora);
-
/* check cache item */
if (!force) {
+ /* ensure there is no clear cache pending */
+ flushr(cynagora);
+
rc = cache_search(cynagora->cache, key);
if (rc >= 0) {
callback(closure, rc);
@@ -948,32 +977,67 @@ cynagora_async_check(
}
}
- /* allocate */
+ /* allocates the callback */
+ ac = malloc(sizeof *ac);
+ if (ac == NULL)
+ return -ENOMEM;
+ ac->callback = callback;
+ ac->closure = closure;
+
+ /* search the request */
+ ar = cynagora->async.requests;
+ while (ar && (strcmp(key->client, ar->key.client)
+ || strcmp(key->session, ar->key.session)
+ || strcmp(key->user, ar->key.user)
+ || strcmp(key->permission, ar->key.permission)))
+ ar = ar->next;
+
+ /* a same request is pending, use it */
+ if (ar) {
+ ac->next = ar->callbacks;
+ ar->callbacks = ac;
+ return 0;
+ }
+
+ /* allocate for the request */
ar = malloc(sizeof *ar + strlen(key->client) + strlen(key->session) + strlen(key->user) + strlen(key->permission) + 4);
- if (ar == NULL)
+ if (ar == NULL) {
+ free(ac);
return -ENOMEM;
+ }
/* init */
+ ac->next = NULL;
+ ar->callbacks = ac;
+ p = (char*)(ar + 1);
+ ar->key.client = p;
+ p = stpcpy(p, key->client) + 1;
+ ar->key.session = p;
+ p = stpcpy(p, key->session) + 1;
+ ar->key.user = p;
+ p = stpcpy(p, key->user) + 1;
+ ar->key.permission = p;
+ stpcpy(p, key->permission);
do {
idgen_next(cynagora->idgen);
} while (search_async_request(cynagora, cynagora->idgen, false));
strcpy(ar->id, cynagora->idgen);
- ar->callback = callback;
- ar->closure = closure;
- stpcpy(1 + stpcpy(1 + stpcpy(1 + stpcpy((char*)(ar + 1), key->client), key->session), key->user), key->permission);
+ ar->next = cynagora->async.requests;
+ cynagora->async.requests = ar;
/* send the request */
rc = putxkv(cynagora, simple ? _test_ : _check_, ar->id, key, 0);
- if (rc >= 0)
- rc = flushw(cynagora);
if (rc < 0) {
+ ar = search_async_request(cynagora, ar->id, true);
+ while((ac = ar->callbacks)) {
+ ar->callbacks = ac->next;
+ free(ac);
+ }
free(ar);
return rc;
}
/* record the request */
- ar->next = cynagora->async.requests;
- cynagora->async.requests = ar;
return 0;
}
diff --git a/src/cynagora.h b/src/cynagora.h
index 363e725..167b9bc 100644
--- a/src/cynagora.h
+++ b/src/cynagora.h
@@ -257,7 +257,7 @@ cynagora_test(
);
/**
- * Check the key asynchronousely (async)
+ * Check the key asynchronously (async)
*
* @param cynagora the handler of the client
* @param key the key to query