aboutsummaryrefslogtreecommitdiffstats
path: root/src/cynagora.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cynagora.c')
-rw-r--r--src/cynagora.c416
1 files changed, 277 insertions, 139 deletions
diff --git a/src/cynagora.c b/src/cynagora.c
index 1165e64..ee55dc6 100644
--- a/src/cynagora.c
+++ b/src/cynagora.c
@@ -52,9 +52,7 @@ struct asreq
struct asreq *next;
/** callback function */
- void (*callback)(
- void *closure,
- int status);
+ cynagora_async_check_cb_t *callback;
/** closure of the callback */
void *closure;
@@ -75,9 +73,6 @@ struct cynagora
/** type of link */
cynagora_type_t type;
- /** spec of the socket */
- const char *socketspec;
-
/** protocol manager object */
prot_t *prot;
@@ -96,7 +91,7 @@ struct cynagora
/** async */
struct {
/** control callback */
- cynagora_async_ctl_t controlcb;
+ cynagora_async_ctl_cb_t *controlcb;
/** closure */
void *closure;
@@ -104,12 +99,17 @@ struct cynagora
/** requests */
asreq_t *requests;
} async;
-};
-static void disconnection(cynagora_t *cynagora);
+ /** spec of the socket */
+ char socketspec[];
+};
/**
- * Flush the write buffer
+ * Flush the write buffer of the client
+ *
+ * @param cynagora the handler of the client
+ *
+ * @return 0 in case of success or a negative -errno value
*/
static
int
@@ -141,7 +141,14 @@ flushw(
/**
* Put the command made of arguments ...
* Increment the count of pending requests.
- * Return 0 in case of success or a negative number on error.
+ *
+ * @param cynagora the handler of the client
+ * @param command the command to send
+ * @param optarg an optional argument or NULL
+ * @param optkey an optional key or NULL
+ * @param optval an optional value or NULL
+ *
+ * @return 0 in case of success or a negative -errno value
*/
static
int
@@ -156,8 +163,10 @@ putxkv(
prot_t *prot;
char text[30];
+ /* retrieves the protocol handler */
prot = cynagora->prot;
for(trial = 0 ; ; trial++) {
+ /* fill the protocol handler with command and its arguments */
rc = prot_put_field(prot, command);
if (!rc && optarg)
rc = prot_put_field(prot, optarg);
@@ -183,19 +192,33 @@ putxkv(
if (!rc)
rc = prot_put_end(prot);
if (!rc) {
+ /* success ! */
/* client always flushes */
cynagora->pending++;
return flushw(cynagora);
}
+
+ /* failed to fill protocol, cancel current composition */
prot_put_cancel(prot);
+
+ /* fail if was last trial */
if (trial >= 1)
return rc;
+
+ /* try to flush the output buffer */
rc = flushw(cynagora);
if (rc)
return rc;
}
}
+/**
+ * Wait some input event
+ *
+ * @param cynagora the handler of the client
+ *
+ * @return 0 in case of success or a negative -errno value
+ */
static
int
wait_input(
@@ -210,19 +233,28 @@ wait_input(
return rc < 0 ? -errno : 0;
}
+/**
+ * Get the next reply if any
+ *
+ * @param cynagora the handler of the client
+ *
+ * @return the count of field of the reply (can be 0)
+ * or -EAGAIN if there is no reply
+ */
static
int
get_reply(
cynagora_t *cynagora
) {
int rc;
+ uint32_t cacheid;
prot_next(cynagora->prot);
rc = prot_get(cynagora->prot, &cynagora->reply.fields);
if (rc > 0) {
if (0 == strcmp(cynagora->reply.fields[0], _clear_)) {
- cache_clear(cynagora->cache,
- rc > 1 ? (uint32_t)atol(cynagora->reply.fields[1]) : 0);
+ cacheid = rc > 1 ? (uint32_t)atol(cynagora->reply.fields[1]) : 0;
+ cache_clear(cynagora->cache, cacheid);
rc = 0;
} else {
if (0 != strcmp(cynagora->reply.fields[0], _item_))
@@ -233,6 +265,16 @@ get_reply(
return rc;
}
+/**
+ * Wait for a reply
+ *
+ * @param cynagora the handler of the client
+ * @param block
+ *
+ * @return the count of fields greater than 0 or a negative -errno value
+ * or -EAGAIN if nothing and block == 0
+ * or -EPIPE if broken link
+ */
static
int
wait_reply(
@@ -242,11 +284,13 @@ wait_reply(
int rc;
for(;;) {
- prot_next(cynagora->prot);
+ /* get the next reply if any */
rc = get_reply(cynagora);
if (rc > 0)
return rc;
+
if (rc < 0) {
+ /* wait for an answer */
rc = prot_read(cynagora->prot, cynagora->fd);
while (rc <= 0) {
if (rc == 0)
@@ -261,6 +305,13 @@ wait_reply(
}
}
+/**
+ * Read and process any input data
+ *
+ * @param cynagora the client handler
+ *
+ * @return 0 on success or a negative -errno error code
+ */
static
int
flushr(
@@ -272,6 +323,13 @@ flushr(
return rc;
}
+/**
+ * Test if the first field is "done"
+ *
+ * @param cynagora the handler of the client
+ *
+ * @return 0 if done or -ECANCELED otherwise
+ */
static
int
status_done(
@@ -280,6 +338,14 @@ status_done(
return strcmp(cynagora->reply.fields[0], _done_) ? -ECANCELED : 0;
}
+/**
+ * Translates the check/test reply to a forbiden/granted status
+ *
+ * @param cynagora the handler of the client
+ * @param expire where to store the expiration read
+ *
+ * @return 0 in case of success or a negative -errno value
+ */
static
int
status_check(
@@ -305,6 +371,13 @@ status_check(
return rc;
}
+/**
+ * Wait for a reply
+ *
+ * @param cynagora the handler of the client
+ *
+ * @return 0 in case of success or a negative -errno value
+ */
static
int
wait_pending_reply(
@@ -320,6 +393,14 @@ wait_pending_reply(
}
}
+/**
+ * Wait the reply "done"
+ *
+ * @param cynagora the handler of the client
+ *
+ * @return 0 in case of success or a negative -errno value
+ * -ECANCELED when received an error status
+ */
static
int
wait_done(
@@ -331,6 +412,15 @@ wait_done(
return rc;
}
+/**
+ * Calls the asynchronous control callback with operation and the given events
+ *
+ * @param cynagora the handler of the client
+ * @param op operation (see epoll_ctl)
+ * @param events the events (see epoll_ctl)
+ *
+ * @return 0 in case of success or a negative -errno value
+ */
static
int
async(
@@ -343,6 +433,11 @@ async(
: 0;
}
+/**
+ * Disconnect the client
+ *
+ * @param cynagora the handler of the client
+ */
static
void
disconnection(
@@ -355,6 +450,13 @@ disconnection(
}
}
+/**
+ * connect the client
+ *
+ * @param cynagora the handler of the client
+ *
+ * @return 0 in case of success or a negative -errno value
+ */
static
int
connection(
@@ -391,6 +493,13 @@ connection(
return rc;
}
+/**
+ * ensure the connection is opened
+ *
+ * @param cynagora the handler of the client
+ *
+ * @return 0 in case of success or a negative -errno value
+ */
static
int
ensure_opened(
@@ -401,10 +510,63 @@ ensure_opened(
return cynagora->fd < 0 ? connection(cynagora) : 0;
}
-/************************************************************************************/
+/**
+ * Check or test synchronously
+ *
+ * @param cynagora
+ * @param key
+ * @param action
+ *
+ * @return 0 in case of success or a negative -errno value
+ */
+static
+int
+check_or_test(
+ cynagora_t *cynagora,
+ const cynagora_key_t *key,
+ const char *action
+) {
+ int rc;
+ time_t expire;
+
+ /* forbids 2 queries interleaved */
+ if (cynagora->async.requests != NULL)
+ return -EINPROGRESS;
+
+ /* ensure opened */
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
+
+ /* ensure there is no clear cache pending */
+ flushr(cynagora);
+
+ /* check cache item */
+ rc = cache_search(cynagora->cache, key);
+ if (rc >= 0)
+ return rc;
+
+ /* send the request */
+ rc = putxkv(cynagora, action, 0, key, 0);
+ if (rc >= 0) {
+ /* get the response */
+ rc = wait_pending_reply(cynagora);
+ if (rc >= 0) {
+ rc = status_check(cynagora, &expire);
+ if (rc >= 0 && action == _check_ && cynagora->cache)
+ cache_put(cynagora->cache, key, rc, expire);
+ }
+ }
+ return rc;
+}
+
+/******************************************************************************/
+/*** PUBLIC METHODS ***/
+/******************************************************************************/
+/* see cynagora.h */
int
-cynagora_open(
+cynagora_create(
cynagora_t **prcyn,
cynagora_type_t type,
uint32_t cache_size,
@@ -415,10 +577,18 @@ cynagora_open(
/* socket spec */
switch(type) {
+ case cynagora_Admin:
+ socketspec = cyn_get_socket_admin(socketspec);
+ break;
+
+ case cynagora_Agent:
+ socketspec = cyn_get_socket_agent(socketspec);
+ break;
+
+ case cynagora_Check:
default:
- case cynagora_Check: socketspec = cyn_get_socket_check(socketspec); break;
- case cynagora_Admin: socketspec = cyn_get_socket_admin(socketspec); break;
- case cynagora_Agent: socketspec = cyn_get_socket_agent(socketspec); break;
+ socketspec = cyn_get_socket_check(socketspec);
+ break;
}
/* allocate the structure */
@@ -434,12 +604,11 @@ cynagora_open(
goto error2;
/* socket spec */
- strcpy((char*)(cynagora+1), socketspec);
+ strcpy(cynagora->socketspec, socketspec);
/* record type and weakly create cache */
- cache_create(&cynagora->cache, CACHESIZE(cache_size));
+ cache_create(&cynagora->cache, CACHESIZE(cache_size)); /* ignore errors */
cynagora->type = type;
- cynagora->socketspec = socketspec;
cynagora->async.controlcb = NULL;
cynagora->async.closure = 0;
cynagora->async.requests = NULL;
@@ -457,6 +626,7 @@ error:
return rc;
}
+/* see cynagora.h */
void
cynagora_disconnect(
cynagora_t *cynagora
@@ -464,8 +634,9 @@ cynagora_disconnect(
disconnection(cynagora);
}
+/* see cynagora.h */
void
-cynagora_close(
+cynagora_destroy(
cynagora_t *cynagora
) {
cynagora_async_setup(cynagora, NULL, NULL);
@@ -475,85 +646,33 @@ cynagora_close(
free(cynagora);
}
+/* see cynagora.h */
int
-cynagora_enter(
- cynagora_t *cynagora
+cynagora_cache_resize(
+ cynagora_t *cynagora,
+ uint32_t size
) {
- int rc;
-
- if (cynagora->type != cynagora_Admin)
- return -EPERM;
- if (cynagora->async.requests != NULL)
- return -EINPROGRESS;
- rc = ensure_opened(cynagora);
- if (rc < 0)
- return rc;
-
- rc = putxkv(cynagora, _enter_, 0, 0, 0);
- if (rc >= 0)
- rc = wait_done(cynagora);
- return rc;
+ return cache_resize(&cynagora->cache, CACHESIZE(size));
}
-int
-cynagora_leave(
- cynagora_t *cynagora,
- bool commit
+/* see cynagora.h */
+void
+cynagora_cache_clear(
+ cynagora_t *cynagora
) {
- int rc;
-
- if (cynagora->type != cynagora_Admin)
- return -EPERM;
- if (cynagora->async.requests != NULL)
- return -EINPROGRESS;
- rc = ensure_opened(cynagora);
- if (rc < 0)
- return rc;
-
- rc = putxkv(cynagora, _leave_, commit ? _commit_ : 0/*default: rollback*/, 0, 0);
- if (rc >= 0)
- rc = wait_done(cynagora);
- return rc;
+ cache_clear(cynagora->cache, 0);
}
-static
+/* see cynagora.h */
int
-check_or_test(
+cynagora_cache_check(
cynagora_t *cynagora,
- const cynagora_key_t *key,
- const char *action
+ const cynagora_key_t *key
) {
- int rc;
- time_t expire;
-
- if (cynagora->async.requests != NULL)
- return -EINPROGRESS;
- rc = ensure_opened(cynagora);
- if (rc < 0)
- return rc;
-
- /* ensure there is no clear cache pending */
- flushr(cynagora);
-
- /* check cache item */
- rc = cache_search(cynagora->cache, key);
- if (rc >= 0)
- return rc;
-
- /* send the request */
- rc = putxkv(cynagora, action, 0, key, 0);
- if (rc >= 0) {
- /* get the response */
- rc = wait_pending_reply(cynagora);
- if (rc >= 0) {
- rc = status_check(cynagora, &expire);
- if (cynagora->cache && rc >= 0)
- cache_put(cynagora->cache, key, rc, expire);
- }
- }
- return rc;
+ return cache_search(cynagora->cache, key);
}
+/* see cynagora.h */
int
cynagora_check(
cynagora_t *cynagora,
@@ -562,6 +681,7 @@ cynagora_check(
return check_or_test(cynagora, key, _check_);
}
+/* see cynagora.h */
int
cynagora_test(
cynagora_t *cynagora,
@@ -570,37 +690,12 @@ cynagora_test(
return check_or_test(cynagora, key, _test_);
}
-int
-cynagora_set(
- cynagora_t *cynagora,
- const cynagora_key_t *key,
- const cynagora_value_t *value
-) {
- int rc;
-
- if (cynagora->type != cynagora_Admin)
- return -EPERM;
- if (cynagora->async.requests != NULL)
- return -EINPROGRESS;
- rc = ensure_opened(cynagora);
- if (rc < 0)
- return rc;
-
- rc = putxkv(cynagora, _set_, 0, key, value);
- if (rc >= 0)
- rc = wait_done(cynagora);
- return rc;
-}
-
+/* see cynagora.h */
int
cynagora_get(
cynagora_t *cynagora,
const cynagora_key_t *key,
- void (*callback)(
- void *closure,
- const cynagora_key_t *key,
- const cynagora_value_t *value
- ),
+ cynagora_get_cb_t *callback,
void *closure
) {
int rc;
@@ -633,6 +728,7 @@ cynagora_get(
return rc;
}
+/* see cynagora.h */
int
cynagora_log(
cynagora_t *cynagora,
@@ -657,11 +753,10 @@ cynagora_log(
return rc < 0 ? rc : cynagora->reply.count < 2 ? 0 : !strcmp(cynagora->reply.fields[1], _on_);
}
-
+/* see cynagora.h */
int
-cynagora_drop(
- cynagora_t *cynagora,
- const cynagora_key_t *key
+cynagora_enter(
+ cynagora_t *cynagora
) {
int rc;
@@ -673,44 +768,84 @@ cynagora_drop(
if (rc < 0)
return rc;
- rc = putxkv(cynagora, _drop_, 0, key, 0);
+ rc = putxkv(cynagora, _enter_, 0, 0, 0);
if (rc >= 0)
rc = wait_done(cynagora);
return rc;
}
-/************************************************************************************/
-
+/* see cynagora.h */
int
-cynagora_cache_resize(
+cynagora_leave(
cynagora_t *cynagora,
- uint32_t size
+ int commit
) {
- return cache_resize(&cynagora->cache, CACHESIZE(size));
+ int rc;
+
+ if (cynagora->type != cynagora_Admin)
+ return -EPERM;
+ if (cynagora->async.requests != NULL)
+ return -ECANCELED;
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
+
+ rc = putxkv(cynagora, _leave_, commit ? _commit_ : 0/*default: rollback*/, 0, 0);
+ if (rc >= 0)
+ rc = wait_done(cynagora);
+ return rc;
}
-void
-cynagora_cache_clear(
- cynagora_t *cynagora
+/* see cynagora.h */
+int
+cynagora_set(
+ cynagora_t *cynagora,
+ const cynagora_key_t *key,
+ const cynagora_value_t *value
) {
- cache_clear(cynagora->cache, 0);
+ int rc;
+
+ if (cynagora->type != cynagora_Admin)
+ return -EPERM;
+ if (cynagora->async.requests != NULL)
+ return -ECANCELED;
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
+
+ rc = putxkv(cynagora, _set_, 0, key, value);
+ if (rc >= 0)
+ rc = wait_done(cynagora);
+ return rc;
}
+/* see cynagora.h */
int
-cynagora_cache_check(
+cynagora_drop(
cynagora_t *cynagora,
const cynagora_key_t *key
) {
- return cache_search(cynagora->cache, key);
-}
+ int rc;
+ if (cynagora->type != cynagora_Admin)
+ return -EPERM;
+ if (cynagora->async.requests != NULL)
+ return -ECANCELED;
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
-/************************************************************************************/
+ rc = putxkv(cynagora, _drop_, 0, key, 0);
+ if (rc >= 0)
+ rc = wait_done(cynagora);
+ return rc;
+}
+/* see cynagora.h */
int
cynagora_async_setup(
cynagora_t *cynagora,
- cynagora_async_ctl_t controlcb,
+ cynagora_async_ctl_cb_t *controlcb,
void *closure
) {
asreq_t *ar;
@@ -721,15 +856,19 @@ cynagora_async_setup(
ar->callback(ar->closure, -ECANCELED);
free(ar);
}
+
/* remove existing polling */
async(cynagora, EPOLL_CTL_DEL, 0);
+
/* records new data */
cynagora->async.controlcb = controlcb;
cynagora->async.closure = closure;
+
/* record to polling */
return async(cynagora, EPOLL_CTL_ADD, EPOLLIN);
}
+/* see cynagora.h */
int
cynagora_async_process(
cynagora_t *cynagora
@@ -776,14 +915,13 @@ cynagora_async_process(
}
}
+/* see cynagora.h */
int
cynagora_async_check(
cynagora_t *cynagora,
const cynagora_key_t *key,
int simple,
- void (*callback)(
- void *closure,
- int status),
+ cynagora_async_check_cb_t *callback,
void *closure
) {
int rc;