aboutsummaryrefslogtreecommitdiffstats
path: root/src/cynagora.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cynagora.c')
-rw-r--r--src/cynagora.c636
1 files changed, 501 insertions, 135 deletions
diff --git a/src/cynagora.c b/src/cynagora.c
index 96c6466..b4bd7e9 100644
--- a/src/cynagora.c
+++ b/src/cynagora.c
@@ -31,6 +31,7 @@
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
+#include <ctype.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
@@ -46,11 +47,15 @@
#define MIN_CACHE_SIZE 400
#define CACHESIZE(x) ((x) >= MIN_CACHE_SIZE ? (x) : (x) ? MIN_CACHE_SIZE : 0)
+typedef struct asreq asreq_t;
+typedef struct agent agent_t;
+typedef struct query query_t;
+
/** recording of asynchronous requests */
struct asreq
{
/** link to the next pending request */
- struct asreq *next;
+ asreq_t *next;
/** callback function */
cynagora_async_check_cb_t *callback;
@@ -58,7 +63,22 @@ struct asreq
/** closure of the callback */
void *closure;
};
-typedef struct asreq asreq_t;
+
+/** structure to handle agents */
+struct agent
+{
+ /* Link to the next agent */
+ agent_t *next;
+
+ /* recorded callback of the agent */
+ cynagora_agent_cb_t *agentcb;
+
+ /* closure of the callback */
+ void *closure;
+
+ /* name of the agent */
+ char name[];
+};
/**
* structure recording a client
@@ -101,10 +121,34 @@ struct cynagora
asreq_t *requests;
} async;
+ /** the declared agents */
+ agent_t *agents;
+
+ /** the pending queries */
+ query_t *queries;
+
/** spec of the socket */
char socketspec[];
};
+/** structure of queries for agents */
+struct query
+{
+ /** public query */
+ cynagora_query_t query;
+
+ /** link to the next */
+ query_t *next;
+
+ /** the client of the query */
+ cynagora_t *cynagora;
+
+ /** the askid */
+ char *askid;
+};
+
+static void agent_ask(cynagora_t *cynagora, int count, const char **fields);
+
/**
* Flush the write buffer of the client
*
@@ -140,6 +184,59 @@ flushw(
}
/**
+ * Send a reply
+ *
+ * @param cynagora the client
+ * @param fields the fields to send
+ * @param count the count of fields
+ * @return 0 on success or a negative error code
+ */
+static
+int
+send_reply(
+ cynagora_t *cynagora,
+ const char **fields,
+ int count
+) {
+ int rc, trial, i;
+ prot_t *prot;
+
+ /* retrieves the protocol handler */
+ prot = cynagora->prot;
+ trial = 0;
+ for(;;) {
+
+ /* fill the fields */
+ for (i = rc = 0 ; i < count && rc == 0 ; i++)
+ rc = prot_put_field(prot, fields[i]);
+
+ /* send if done */
+ if (rc == 0) {
+ rc = prot_put_end(prot);
+ if (rc == 0) {
+ rc = flushw(cynagora);
+ break;
+ }
+ }
+
+ /* failed to fill protocol, cancel current composition */
+ prot_put_cancel(prot);
+
+ /* fail if was last trial */
+ if (trial)
+ break;
+
+ /* try to flush the output buffer */
+ rc = flushw(cynagora);
+ if (rc)
+ break;
+
+ trial = 1;
+ }
+ return rc;
+}
+
+/**
* Put the command made of arguments ...
* Increment the count of pending requests.
*
@@ -160,57 +257,33 @@ putxkv(
const cynagora_key_t *optkey,
const cynagora_value_t *optval
) {
- int rc, trial;
- prot_t *prot;
+ int nf, rc;
char text[30];
-
- /* retrieves the protocol handler */
- prot = cynagora->prot;
- for(trial = 0 ; ; trial++) {
- /* fill the protocol handler with command and its arguments */
- rc = prot_put_field(prot, command);
- if (!rc && optarg)
- rc = prot_put_field(prot, optarg);
- if (!rc && optkey) {
- rc = prot_put_field(prot, optkey->client);
- if (!rc)
- rc = prot_put_field(prot, optkey->session);
- if (!rc)
- rc = prot_put_field(prot, optkey->user);
- if (!rc)
- rc = prot_put_field(prot, optkey->permission);
- }
- if (!rc && optval) {
- rc = prot_put_field(prot, optval->value);
- if (!rc) {
- if (!optval->expire)
- text[0] = 0;
- else
- exp2txt(optval->expire, true, text, sizeof text);
- rc = prot_put_field(prot, text);
- }
- }
- if (!rc)
- rc = prot_put_end(prot);
- if (!rc) {
- /* success ! */
- /* client always flushes */
- cynagora->pending++;
- return flushw(cynagora);
+ const char *fields[8];
+
+ /* prepare fields */
+ fields[0] = command;
+ nf = 1;
+ if (optarg)
+ fields[nf++] = optarg;
+ if (optkey) {
+ fields[nf++] = optkey->client;
+ fields[nf++] = optkey->session;
+ fields[nf++] = optkey->user;
+ fields[nf++] = optkey->permission;
+ }
+ if (optval) {
+ fields[nf++] = optval->value;
+ if (optval->expire) {
+ exp2txt(optval->expire, true, text, sizeof text);
+ fields[nf++] = text;
}
-
- /* failed to fill protocol, cancel current composition */
- prot_put_cancel(prot);
-
- /* fail if was last trial */
- if (trial >= 1)
- return rc;
-
- /* try to flush the output buffer */
- rc = flushw(cynagora);
- if (rc)
- return rc;
}
+
+ /* send now */
+ rc = send_reply(cynagora, fields, nf);
+ cynagora->pending += !rc; /* one more pending if no error */
+ return rc;
}
/**
@@ -257,6 +330,10 @@ get_reply(
cacheid = rc > 1 ? (uint32_t)atol(cynagora->reply.fields[1]) : 0;
cache_clear(cynagora->cache, cacheid);
rc = 0;
+ } else if (0 == strcmp(cynagora->reply.fields[0], _ask_)) {
+ /* on asking agent */
+ agent_ask(cynagora, rc - 1, &cynagora->reply.fields[1]);
+ rc = 0;
} else {
if (0 != strcmp(cynagora->reply.fields[0], _item_))
cynagora->pending--;
@@ -446,7 +523,17 @@ void
disconnection(
cynagora_t *cynagora
) {
+ query_t *query;
+
if (cynagora->fd >= 0) {
+ /* forget queries */
+ query = cynagora->queries;
+ cynagora->queries = 0;
+ while(query) {
+ query->cynagora = 0;
+ query = query->next;
+ }
+ /* drop connection */
async(cynagora, EPOLL_CTL_DEL, 0);
close(cynagora->fd);
cynagora->fd = -1;
@@ -466,6 +553,7 @@ connection(
cynagora_t *cynagora
) {
int rc;
+ agent_t *agent;
/* init the client */
cynagora->pending = 0;
@@ -487,8 +575,17 @@ connection(
cache_clear(cynagora->cache,
cynagora->reply.count > 2 ? (uint32_t)atol(cynagora->reply.fields[2]) : 0);
rc = async(cynagora, EPOLL_CTL_ADD, EPOLLIN);
- if (rc >= 0)
+ /* reconnect agent */
+ agent = cynagora->agents;
+ while (agent && rc >= 0) {
+ rc = putxkv(cynagora, _agent_, agent->name, 0, 0);
+ if (rc >= 0)
+ rc = wait_done(cynagora);
+ agent = agent->next;
+ }
+ if (rc >= 0) {
return 0;
+ }
}
}
}
@@ -564,7 +661,7 @@ check_or_test(
}
/******************************************************************************/
-/*** PUBLIC METHODS ***/
+/*** PUBLIC COMMON METHODS ***/
/******************************************************************************/
/* see cynagora.h */
@@ -615,6 +712,7 @@ cynagora_create(
cynagora->async.controlcb = NULL;
cynagora->async.closure = 0;
cynagora->async.requests = NULL;
+ cynagora->agents = NULL;
/* lazy connection */
cynagora->fd = -1;
@@ -651,6 +749,80 @@ cynagora_destroy(
/* see cynagora.h */
int
+cynagora_async_setup(
+ cynagora_t *cynagora,
+ cynagora_async_ctl_cb_t *controlcb,
+ void *closure
+) {
+ asreq_t *ar;
+
+ /* cancel pending requests */
+ while((ar = cynagora->async.requests) != NULL) {
+ cynagora->async.requests = ar->next;
+ ar->callback(ar->closure, -ECANCELED);
+ free(ar);
+ }
+
+ /* remove existing polling */
+ async(cynagora, EPOLL_CTL_DEL, 0);
+
+ /* records new data */
+ cynagora->async.controlcb = controlcb;
+ cynagora->async.closure = closure;
+
+ /* record to polling */
+ return async(cynagora, EPOLL_CTL_ADD, EPOLLIN);
+}
+
+/* see cynagora.h */
+int
+cynagora_async_process(
+ cynagora_t *cynagora
+) {
+ int rc;
+ const char *first;
+ asreq_t *ar;
+ time_t expire;
+ cynagora_key_t key;
+
+ for (;;) {
+ /* non blocking wait for a reply */
+ rc = wait_reply(cynagora, false);
+ if (rc < 0)
+ return rc == -EAGAIN ? 0 : rc;
+
+ /* skip empty replies */
+ if (rc == 0)
+ continue;
+
+ /* skip done/error replies */
+ first = cynagora->reply.fields[0];
+ if (!strcmp(first, _done_)
+ || !strcmp(first, _error_))
+ continue;
+
+ /* ignore unexpected answers */
+ ar = cynagora->async.requests;
+ if (ar == NULL)
+ continue;
+
+ /* emit the asynchronous answer */
+ cynagora->async.requests = ar->next;
+ rc = status_check(cynagora, &expire);
+ if (rc >= 0) {
+ key.client = (const char*)(ar + 1);
+ key.session = &key.client[1 + strlen(key.client)];
+ key.user = &key.session[1 + strlen(key.session)];
+ key.permission = &key.user[1 + strlen(key.user)];
+ cache_put(cynagora->cache, &key, rc, expire, true);
+ }
+ ar->callback(ar->closure, rc);
+ free(ar);
+ }
+}
+
+/* see cynagora.h */
+int
cynagora_cache_resize(
cynagora_t *cynagora,
uint32_t size
@@ -695,6 +867,55 @@ cynagora_test(
/* see cynagora.h */
int
+cynagora_async_check(
+ cynagora_t *cynagora,
+ const cynagora_key_t *key,
+ int simple,
+ cynagora_async_check_cb_t *callback,
+ void *closure
+) {
+ int rc;
+ asreq_t **pr, *ar;
+
+ /* ensure connection */
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
+
+ /* allocate */
+ ar = malloc(sizeof *ar + strlen(key->client) + strlen(key->session) + strlen(key->user) + strlen(key->permission) + 4);
+ if (ar == NULL)
+ return -ENOMEM;
+
+ /* init */
+ ar->next = NULL;
+ ar->callback = callback;
+ ar->closure = closure;
+ stpcpy(1 + stpcpy(1 + stpcpy(1 + stpcpy((char*)(ar + 1), key->client), key->session), key->user), key->permission);
+
+ /* send the request */
+ rc = putxkv(cynagora, simple ? _test_ : _check_, 0, key, 0);
+ if (rc >= 0)
+ rc = flushw(cynagora);
+ if (rc < 0) {
+ free(ar);
+ return rc;
+ }
+
+ /* record the request */
+ pr = &cynagora->async.requests;
+ while(*pr != NULL)
+ pr = &(*pr)->next;
+ *pr = ar;
+ return 0;
+}
+
+/******************************************************************************/
+/*** PUBLIC ADMIN METHODS ***/
+/******************************************************************************/
+
+/* see cynagora.h */
+int
cynagora_get(
cynagora_t *cynagora,
const cynagora_key_t *key,
@@ -847,121 +1068,266 @@ cynagora_drop(
return rc;
}
-/* see cynagora.h */
-int
-cynagora_async_setup(
- cynagora_t *cynagora,
- cynagora_async_ctl_cb_t *controlcb,
- void *closure
-) {
- asreq_t *ar;
+/******************************************************************************/
+/*** PRIVATE AGENT METHODS ***/
+/******************************************************************************/
- /* cancel pending requests */
- while((ar = cynagora->async.requests) != NULL) {
- cynagora->async.requests = ar->next;
- ar->callback(ar->closure, -ECANCELED);
- free(ar);
+/**
+ * Check the name and compute its length. Returns 0 in case of invalid name
+ * @param name the name to check
+ * @return the length of the name or zero if invalid
+ */
+static
+size_t
+agent_check_name(
+ const char *name
+) {
+ char c;
+ size_t length = 0;
+ if (name) {
+ while ((c = name[length])) {
+ if (length > UINT8_MAX
+ || (!isalnum(c) && !strchr("@_-$", c))) {
+ length = 0;
+ break;
+ }
+ length++;
+ }
}
+ return length;
+}
- /* remove existing polling */
- async(cynagora, EPOLL_CTL_DEL, 0);
-
- /* records new data */
- cynagora->async.controlcb = controlcb;
- cynagora->async.closure = closure;
-
- /* record to polling */
- return async(cynagora, EPOLL_CTL_ADD, EPOLLIN);
+/**
+ * Search the recorded agent of name
+ *
+ * @param cynagora the client cynagora
+ * @param name the name of the agent
+ * @param unlink should unlink from the link
+ * @return the found agent or NULL
+ */
+static
+agent_t*
+agent_search(
+ cynagora_t *cynagora,
+ const char *name,
+ bool unlink
+) {
+ agent_t *r, **pr;
+
+ pr = &cynagora->agents;
+ while((r = *pr) && strcmp(name, r->name))
+ pr = &r->next;
+ if (r && unlink)
+ *pr = r->next;
+ return r;
}
-/* see cynagora.h */
+/**
+ * Send an agent reply
+ *
+ * @param cynagora the client
+ * @param askid the ask identifier
+ * @param value the value to return
+ * @param expire the expiration of the value
+ * @return 0 on success or a negative error code
+ */
+static
int
-cynagora_async_process(
- cynagora_t *cynagora
+agent_send_reply(
+ cynagora_t *cynagora,
+ const char *askid,
+ const char *value,
+ time_t expire
+) {
+ int nf;
+ char text[30];
+ const char *fields[4];
+
+ fields[0] = _reply_;
+ fields[1] = askid;
+ fields[2] = value;
+
+ /* format expiration */
+ if (!expire)
+ nf = 3;
+ else {
+ exp2txt(expire, true, text, sizeof text);
+ fields[3] = text;
+ nf = 4;
+ }
+ return send_reply(cynagora, fields, nf);
+}
+
+/**
+ * Dispatch a received agent request
+ *
+ * The received fields should be:
+ *
+ * ASKID NAME VALUE CLIENT SESSION USER PERMISSION
+ *
+ * @param cynagora the handler
+ * @param count the count of fields
+ * @param fields the fields
+ */
+static
+void
+agent_ask(
+ cynagora_t *cynagora,
+ int count,
+ const char **fields
) {
int rc;
- const char *first;
- asreq_t *ar;
- time_t expire;
- cynagora_key_t key;
+ size_t length;
+ agent_t *agent;
+ query_t *query;
+ char *p;
- for (;;) {
- /* non blocking wait for a reply */
- rc = wait_reply(cynagora, false);
- if (rc < 0)
- return rc == -EAGAIN ? 0 : rc;
+ if (count != 7)
+ goto error;
- /* skip empty replies */
- if (rc == 0)
- continue;
+ /* search the agent */
+ agent = agent_search(cynagora, fields[1], false);
+ if (!agent)
+ goto error;
- /* skip done/error replies */
- first = cynagora->reply.fields[0];
- if (!strcmp(first, _done_)
- || !strcmp(first, _error_))
- continue;
+ length = strlen(fields[0]);
+ length += strlen(fields[1]);
+ length += strlen(fields[2]);
+ length += strlen(fields[3]);
+ length += strlen(fields[4]);
+ length += strlen(fields[5]);
+ length += strlen(fields[6]);
- /* ignore unexpected answers */
- ar = cynagora->async.requests;
- if (ar == NULL)
- continue;
+ query = malloc(length + 7 + sizeof *query);
+ if (!query)
+ goto error;
+ p = (char *)&query[1];
- /* emit the asynchronous answer */
- cynagora->async.requests = ar->next;
- rc = status_check(cynagora, &expire);
- if (rc >= 0) {
- key.client = (const char*)(ar + 1);
- key.session = &key.client[1 + strlen(key.client)];
- key.user = &key.session[1 + strlen(key.session)];
- key.permission = &key.user[1 + strlen(key.user)];
- cache_put(cynagora->cache, &key, rc, expire, true);
- }
- ar->callback(ar->closure, rc);
- free(ar);
- }
+ query->askid = p;
+ p = 1 + stpcpy(p, fields[0]);
+
+ query->query.name = p;
+ p = 1 + stpcpy(p, fields[1]);
+
+ query->query.value = p;
+ p = 1 + stpcpy(p, fields[2]);
+
+ query->query.key.client = p;
+ p = 1 + stpcpy(p, fields[3]);
+
+ query->query.key.session = p;
+ p = 1 + stpcpy(p, fields[4]);
+
+ query->query.key.user = p;
+ p = 1 + stpcpy(p, fields[5]);
+
+ query->query.key.permission = p;
+ p = 1 + stpcpy(p, fields[6]);
+
+ query->cynagora = cynagora;
+ query->next = cynagora->queries;
+ cynagora->queries = query;
+
+ rc = agent->agentcb(agent->closure, &query->query);
+ if (rc < 0)
+ cynagora_agent_reply(&query->query, NULL);
+ return;
+error:
+ agent_send_reply(cynagora, count ? fields[0] : "0", _error_, -1);
+}
+
+/******************************************************************************/
+/*** PUBLIC AGENT METHODS ***/
+/******************************************************************************/
+
+int
+cynagora_agent_is_valid_name(
+ const char *name
+) {
+ return agent_check_name(name) != 0;
}
/* see cynagora.h */
int
-cynagora_async_check(
+cynagora_agent_create(
cynagora_t *cynagora,
- const cynagora_key_t *key,
- int simple,
- cynagora_async_check_cb_t *callback,
+ const char *name,
+ cynagora_agent_cb_t *agentcb,
void *closure
) {
int rc;
- asreq_t **pr, *ar;
+ size_t length;
+ agent_t *agent;
+
+ /* check validity */
+ if (cynagora->type != cynagora_Agent)
+ return -EPERM;
+ /* check name */
+ length = agent_check_name(name);
+ if (!length)
+ return -EINVAL;
+
+ /* ensure connection */
rc = ensure_opened(cynagora);
if (rc < 0)
return rc;
- /* allocate */
- ar = malloc(sizeof *ar + strlen(key->client) + strlen(key->session) + strlen(key->user) + strlen(key->permission) + 4);
- if (ar == NULL)
+ /* allocate agent */
+ agent = malloc(length + 1 + sizeof *agent);
+ if (!agent)
return -ENOMEM;
- /* init */
- ar->next = NULL;
- ar->callback = callback;
- ar->closure = closure;
- stpcpy(1 + stpcpy(1 + stpcpy(1 + stpcpy((char*)(ar + 1), key->client), key->session), key->user), key->permission);
+ /* init the structure */
+ agent->agentcb = agentcb;
+ agent->closure = closure;
+ memcpy(agent->name, name, 1 + length);
+ agent->next = cynagora->agents;
+ cynagora->agents = agent;
- /* send the request */
- rc = putxkv(cynagora, simple ? _test_ : _check_, 0, key, 0);
+ /* send the command */
+ rc = putxkv(cynagora, _agent_, name, 0, 0);
if (rc >= 0)
- rc = flushw(cynagora);
+ rc = wait_done(cynagora);
if (rc < 0) {
- free(ar);
- return rc;
+ /* clean on error */
+ agent_search(cynagora, name, true);
+ free(agent);
}
- /* record the request */
- pr = &cynagora->async.requests;
- while(*pr != NULL)
- pr = &(*pr)->next;
- *pr = ar;
- return 0;
+ return rc;
}
+/* see cynagora.h */
+int
+cynagora_agent_reply(
+ cynagora_query_t *_query,
+ cynagora_value_t *value
+) {
+ int rc;
+ query_t *query = (query_t*)_query;
+ query_t **p;
+ cynagora_t *cynagora;
+
+ cynagora = query->cynagora;
+ if (!cynagora)
+ rc = -ECANCELED;
+ else {
+ /* unlink the query */
+ p = &cynagora->queries;
+ while (*p)
+ if (*p != query)
+ p = &(*p)->next;
+ else {
+ *p = query->next;
+ break;
+ }
+
+ /* send the reply */
+ rc = agent_send_reply(cynagora, query->askid,
+ value ? value->value : _error_,
+ value ? value->expire : -1);
+ }
+ free(query);
+ return rc;
+}