From d927b8c4d931b3fa4c5744778976081e9218a838 Mon Sep 17 00:00:00 2001 From: Jose Bollo Date: Mon, 14 Oct 2019 18:02:38 +0200 Subject: Implementation of agent protocol and tool Signed-off-by: Jose Bollo --- src/cynagora.c | 636 +++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 501 insertions(+), 135 deletions(-) (limited to 'src/cynagora.c') diff --git a/src/cynagora.c b/src/cynagora.c index 96c6466..b4bd7e9 100644 --- a/src/cynagora.c +++ b/src/cynagora.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -46,11 +47,15 @@ #define MIN_CACHE_SIZE 400 #define CACHESIZE(x) ((x) >= MIN_CACHE_SIZE ? (x) : (x) ? MIN_CACHE_SIZE : 0) +typedef struct asreq asreq_t; +typedef struct agent agent_t; +typedef struct query query_t; + /** recording of asynchronous requests */ struct asreq { /** link to the next pending request */ - struct asreq *next; + asreq_t *next; /** callback function */ cynagora_async_check_cb_t *callback; @@ -58,7 +63,22 @@ struct asreq /** closure of the callback */ void *closure; }; -typedef struct asreq asreq_t; + +/** structure to handle agents */ +struct agent +{ + /* Link to the next agent */ + agent_t *next; + + /* recorded callback of the agent */ + cynagora_agent_cb_t *agentcb; + + /* closure of the callback */ + void *closure; + + /* name of the agent */ + char name[]; +}; /** * structure recording a client @@ -101,10 +121,34 @@ struct cynagora asreq_t *requests; } async; + /** the declared agents */ + agent_t *agents; + + /** the pending queries */ + query_t *queries; + /** spec of the socket */ char socketspec[]; }; +/** structure of queries for agents */ +struct query +{ + /** public query */ + cynagora_query_t query; + + /** link to the next */ + query_t *next; + + /** the client of the query */ + cynagora_t *cynagora; + + /** the askid */ + char *askid; +}; + +static void agent_ask(cynagora_t *cynagora, int count, const char **fields); + /** * Flush the write buffer of the client * @@ -139,6 +183,59 @@ flushw( return rc; } +/** + * Send a reply + * + * @param cynagora the client + * @param fields the fields to send + * @param count the count of fields + * @return 0 on success or a negative error code + */ +static +int +send_reply( + cynagora_t *cynagora, + const char **fields, + int count +) { + int rc, trial, i; + prot_t *prot; + + /* retrieves the protocol handler */ + prot = cynagora->prot; + trial = 0; + for(;;) { + + /* fill the fields */ + for (i = rc = 0 ; i < count && rc == 0 ; i++) + rc = prot_put_field(prot, fields[i]); + + /* send if done */ + if (rc == 0) { + rc = prot_put_end(prot); + if (rc == 0) { + rc = flushw(cynagora); + break; + } + } + + /* failed to fill protocol, cancel current composition */ + prot_put_cancel(prot); + + /* fail if was last trial */ + if (trial) + break; + + /* try to flush the output buffer */ + rc = flushw(cynagora); + if (rc) + break; + + trial = 1; + } + return rc; +} + /** * Put the command made of arguments ... * Increment the count of pending requests. @@ -160,57 +257,33 @@ putxkv( const cynagora_key_t *optkey, const cynagora_value_t *optval ) { - int rc, trial; - prot_t *prot; + int nf, rc; char text[30]; - - /* retrieves the protocol handler */ - prot = cynagora->prot; - for(trial = 0 ; ; trial++) { - /* fill the protocol handler with command and its arguments */ - rc = prot_put_field(prot, command); - if (!rc && optarg) - rc = prot_put_field(prot, optarg); - if (!rc && optkey) { - rc = prot_put_field(prot, optkey->client); - if (!rc) - rc = prot_put_field(prot, optkey->session); - if (!rc) - rc = prot_put_field(prot, optkey->user); - if (!rc) - rc = prot_put_field(prot, optkey->permission); - } - if (!rc && optval) { - rc = prot_put_field(prot, optval->value); - if (!rc) { - if (!optval->expire) - text[0] = 0; - else - exp2txt(optval->expire, true, text, sizeof text); - rc = prot_put_field(prot, text); - } - } - if (!rc) - rc = prot_put_end(prot); - if (!rc) { - /* success ! */ - /* client always flushes */ - cynagora->pending++; - return flushw(cynagora); + const char *fields[8]; + + /* prepare fields */ + fields[0] = command; + nf = 1; + if (optarg) + fields[nf++] = optarg; + if (optkey) { + fields[nf++] = optkey->client; + fields[nf++] = optkey->session; + fields[nf++] = optkey->user; + fields[nf++] = optkey->permission; + } + if (optval) { + fields[nf++] = optval->value; + if (optval->expire) { + exp2txt(optval->expire, true, text, sizeof text); + fields[nf++] = text; } - - /* failed to fill protocol, cancel current composition */ - prot_put_cancel(prot); - - /* fail if was last trial */ - if (trial >= 1) - return rc; - - /* try to flush the output buffer */ - rc = flushw(cynagora); - if (rc) - return rc; } + + /* send now */ + rc = send_reply(cynagora, fields, nf); + cynagora->pending += !rc; /* one more pending if no error */ + return rc; } /** @@ -257,6 +330,10 @@ get_reply( cacheid = rc > 1 ? (uint32_t)atol(cynagora->reply.fields[1]) : 0; cache_clear(cynagora->cache, cacheid); rc = 0; + } else if (0 == strcmp(cynagora->reply.fields[0], _ask_)) { + /* on asking agent */ + agent_ask(cynagora, rc - 1, &cynagora->reply.fields[1]); + rc = 0; } else { if (0 != strcmp(cynagora->reply.fields[0], _item_)) cynagora->pending--; @@ -446,7 +523,17 @@ void disconnection( cynagora_t *cynagora ) { + query_t *query; + if (cynagora->fd >= 0) { + /* forget queries */ + query = cynagora->queries; + cynagora->queries = 0; + while(query) { + query->cynagora = 0; + query = query->next; + } + /* drop connection */ async(cynagora, EPOLL_CTL_DEL, 0); close(cynagora->fd); cynagora->fd = -1; @@ -466,6 +553,7 @@ connection( cynagora_t *cynagora ) { int rc; + agent_t *agent; /* init the client */ cynagora->pending = 0; @@ -487,8 +575,17 @@ connection( cache_clear(cynagora->cache, cynagora->reply.count > 2 ? (uint32_t)atol(cynagora->reply.fields[2]) : 0); rc = async(cynagora, EPOLL_CTL_ADD, EPOLLIN); - if (rc >= 0) + /* reconnect agent */ + agent = cynagora->agents; + while (agent && rc >= 0) { + rc = putxkv(cynagora, _agent_, agent->name, 0, 0); + if (rc >= 0) + rc = wait_done(cynagora); + agent = agent->next; + } + if (rc >= 0) { return 0; + } } } } @@ -564,7 +661,7 @@ check_or_test( } /******************************************************************************/ -/*** PUBLIC METHODS ***/ +/*** PUBLIC COMMON METHODS ***/ /******************************************************************************/ /* see cynagora.h */ @@ -615,6 +712,7 @@ cynagora_create( cynagora->async.controlcb = NULL; cynagora->async.closure = 0; cynagora->async.requests = NULL; + cynagora->agents = NULL; /* lazy connection */ cynagora->fd = -1; @@ -649,6 +747,80 @@ cynagora_destroy( free(cynagora); } +/* see cynagora.h */ +int +cynagora_async_setup( + cynagora_t *cynagora, + cynagora_async_ctl_cb_t *controlcb, + void *closure +) { + asreq_t *ar; + + /* cancel pending requests */ + while((ar = cynagora->async.requests) != NULL) { + cynagora->async.requests = ar->next; + ar->callback(ar->closure, -ECANCELED); + free(ar); + } + + /* remove existing polling */ + async(cynagora, EPOLL_CTL_DEL, 0); + + /* records new data */ + cynagora->async.controlcb = controlcb; + cynagora->async.closure = closure; + + /* record to polling */ + return async(cynagora, EPOLL_CTL_ADD, EPOLLIN); +} + +/* see cynagora.h */ +int +cynagora_async_process( + cynagora_t *cynagora +) { + int rc; + const char *first; + asreq_t *ar; + time_t expire; + cynagora_key_t key; + + for (;;) { + /* non blocking wait for a reply */ + rc = wait_reply(cynagora, false); + if (rc < 0) + return rc == -EAGAIN ? 0 : rc; + + /* skip empty replies */ + if (rc == 0) + continue; + + /* skip done/error replies */ + first = cynagora->reply.fields[0]; + if (!strcmp(first, _done_) + || !strcmp(first, _error_)) + continue; + + /* ignore unexpected answers */ + ar = cynagora->async.requests; + if (ar == NULL) + continue; + + /* emit the asynchronous answer */ + cynagora->async.requests = ar->next; + rc = status_check(cynagora, &expire); + if (rc >= 0) { + key.client = (const char*)(ar + 1); + key.session = &key.client[1 + strlen(key.client)]; + key.user = &key.session[1 + strlen(key.session)]; + key.permission = &key.user[1 + strlen(key.user)]; + cache_put(cynagora->cache, &key, rc, expire, true); + } + ar->callback(ar->closure, rc); + free(ar); + } +} + /* see cynagora.h */ int cynagora_cache_resize( @@ -693,6 +865,55 @@ cynagora_test( return check_or_test(cynagora, key, _test_); } +/* see cynagora.h */ +int +cynagora_async_check( + cynagora_t *cynagora, + const cynagora_key_t *key, + int simple, + cynagora_async_check_cb_t *callback, + void *closure +) { + int rc; + asreq_t **pr, *ar; + + /* ensure connection */ + rc = ensure_opened(cynagora); + if (rc < 0) + return rc; + + /* allocate */ + ar = malloc(sizeof *ar + strlen(key->client) + strlen(key->session) + strlen(key->user) + strlen(key->permission) + 4); + if (ar == NULL) + return -ENOMEM; + + /* init */ + ar->next = NULL; + ar->callback = callback; + ar->closure = closure; + stpcpy(1 + stpcpy(1 + stpcpy(1 + stpcpy((char*)(ar + 1), key->client), key->session), key->user), key->permission); + + /* send the request */ + rc = putxkv(cynagora, simple ? _test_ : _check_, 0, key, 0); + if (rc >= 0) + rc = flushw(cynagora); + if (rc < 0) { + free(ar); + return rc; + } + + /* record the request */ + pr = &cynagora->async.requests; + while(*pr != NULL) + pr = &(*pr)->next; + *pr = ar; + return 0; +} + +/******************************************************************************/ +/*** PUBLIC ADMIN METHODS ***/ +/******************************************************************************/ + /* see cynagora.h */ int cynagora_get( @@ -847,121 +1068,266 @@ cynagora_drop( return rc; } -/* see cynagora.h */ -int -cynagora_async_setup( - cynagora_t *cynagora, - cynagora_async_ctl_cb_t *controlcb, - void *closure -) { - asreq_t *ar; +/******************************************************************************/ +/*** PRIVATE AGENT METHODS ***/ +/******************************************************************************/ - /* cancel pending requests */ - while((ar = cynagora->async.requests) != NULL) { - cynagora->async.requests = ar->next; - ar->callback(ar->closure, -ECANCELED); - free(ar); +/** + * Check the name and compute its length. Returns 0 in case of invalid name + * @param name the name to check + * @return the length of the name or zero if invalid + */ +static +size_t +agent_check_name( + const char *name +) { + char c; + size_t length = 0; + if (name) { + while ((c = name[length])) { + if (length > UINT8_MAX + || (!isalnum(c) && !strchr("@_-$", c))) { + length = 0; + break; + } + length++; + } } + return length; +} - /* remove existing polling */ - async(cynagora, EPOLL_CTL_DEL, 0); - - /* records new data */ - cynagora->async.controlcb = controlcb; - cynagora->async.closure = closure; - - /* record to polling */ - return async(cynagora, EPOLL_CTL_ADD, EPOLLIN); +/** + * Search the recorded agent of name + * + * @param cynagora the client cynagora + * @param name the name of the agent + * @param unlink should unlink from the link + * @return the found agent or NULL + */ +static +agent_t* +agent_search( + cynagora_t *cynagora, + const char *name, + bool unlink +) { + agent_t *r, **pr; + + pr = &cynagora->agents; + while((r = *pr) && strcmp(name, r->name)) + pr = &r->next; + if (r && unlink) + *pr = r->next; + return r; } -/* see cynagora.h */ +/** + * Send an agent reply + * + * @param cynagora the client + * @param askid the ask identifier + * @param value the value to return + * @param expire the expiration of the value + * @return 0 on success or a negative error code + */ +static int -cynagora_async_process( - cynagora_t *cynagora +agent_send_reply( + cynagora_t *cynagora, + const char *askid, + const char *value, + time_t expire +) { + int nf; + char text[30]; + const char *fields[4]; + + fields[0] = _reply_; + fields[1] = askid; + fields[2] = value; + + /* format expiration */ + if (!expire) + nf = 3; + else { + exp2txt(expire, true, text, sizeof text); + fields[3] = text; + nf = 4; + } + return send_reply(cynagora, fields, nf); +} + +/** + * Dispatch a received agent request + * + * The received fields should be: + * + * ASKID NAME VALUE CLIENT SESSION USER PERMISSION + * + * @param cynagora the handler + * @param count the count of fields + * @param fields the fields + */ +static +void +agent_ask( + cynagora_t *cynagora, + int count, + const char **fields ) { int rc; - const char *first; - asreq_t *ar; - time_t expire; - cynagora_key_t key; + size_t length; + agent_t *agent; + query_t *query; + char *p; - for (;;) { - /* non blocking wait for a reply */ - rc = wait_reply(cynagora, false); - if (rc < 0) - return rc == -EAGAIN ? 0 : rc; + if (count != 7) + goto error; - /* skip empty replies */ - if (rc == 0) - continue; + /* search the agent */ + agent = agent_search(cynagora, fields[1], false); + if (!agent) + goto error; - /* skip done/error replies */ - first = cynagora->reply.fields[0]; - if (!strcmp(first, _done_) - || !strcmp(first, _error_)) - continue; + length = strlen(fields[0]); + length += strlen(fields[1]); + length += strlen(fields[2]); + length += strlen(fields[3]); + length += strlen(fields[4]); + length += strlen(fields[5]); + length += strlen(fields[6]); - /* ignore unexpected answers */ - ar = cynagora->async.requests; - if (ar == NULL) - continue; + query = malloc(length + 7 + sizeof *query); + if (!query) + goto error; + p = (char *)&query[1]; - /* emit the asynchronous answer */ - cynagora->async.requests = ar->next; - rc = status_check(cynagora, &expire); - if (rc >= 0) { - key.client = (const char*)(ar + 1); - key.session = &key.client[1 + strlen(key.client)]; - key.user = &key.session[1 + strlen(key.session)]; - key.permission = &key.user[1 + strlen(key.user)]; - cache_put(cynagora->cache, &key, rc, expire, true); - } - ar->callback(ar->closure, rc); - free(ar); - } + query->askid = p; + p = 1 + stpcpy(p, fields[0]); + + query->query.name = p; + p = 1 + stpcpy(p, fields[1]); + + query->query.value = p; + p = 1 + stpcpy(p, fields[2]); + + query->query.key.client = p; + p = 1 + stpcpy(p, fields[3]); + + query->query.key.session = p; + p = 1 + stpcpy(p, fields[4]); + + query->query.key.user = p; + p = 1 + stpcpy(p, fields[5]); + + query->query.key.permission = p; + p = 1 + stpcpy(p, fields[6]); + + query->cynagora = cynagora; + query->next = cynagora->queries; + cynagora->queries = query; + + rc = agent->agentcb(agent->closure, &query->query); + if (rc < 0) + cynagora_agent_reply(&query->query, NULL); + return; +error: + agent_send_reply(cynagora, count ? fields[0] : "0", _error_, -1); +} + +/******************************************************************************/ +/*** PUBLIC AGENT METHODS ***/ +/******************************************************************************/ + +int +cynagora_agent_is_valid_name( + const char *name +) { + return agent_check_name(name) != 0; } /* see cynagora.h */ int -cynagora_async_check( +cynagora_agent_create( cynagora_t *cynagora, - const cynagora_key_t *key, - int simple, - cynagora_async_check_cb_t *callback, + const char *name, + cynagora_agent_cb_t *agentcb, void *closure ) { int rc; - asreq_t **pr, *ar; + size_t length; + agent_t *agent; + + /* check validity */ + if (cynagora->type != cynagora_Agent) + return -EPERM; + /* check name */ + length = agent_check_name(name); + if (!length) + return -EINVAL; + + /* ensure connection */ rc = ensure_opened(cynagora); if (rc < 0) return rc; - /* allocate */ - ar = malloc(sizeof *ar + strlen(key->client) + strlen(key->session) + strlen(key->user) + strlen(key->permission) + 4); - if (ar == NULL) + /* allocate agent */ + agent = malloc(length + 1 + sizeof *agent); + if (!agent) return -ENOMEM; - /* init */ - ar->next = NULL; - ar->callback = callback; - ar->closure = closure; - stpcpy(1 + stpcpy(1 + stpcpy(1 + stpcpy((char*)(ar + 1), key->client), key->session), key->user), key->permission); + /* init the structure */ + agent->agentcb = agentcb; + agent->closure = closure; + memcpy(agent->name, name, 1 + length); + agent->next = cynagora->agents; + cynagora->agents = agent; - /* send the request */ - rc = putxkv(cynagora, simple ? _test_ : _check_, 0, key, 0); + /* send the command */ + rc = putxkv(cynagora, _agent_, name, 0, 0); if (rc >= 0) - rc = flushw(cynagora); + rc = wait_done(cynagora); if (rc < 0) { - free(ar); - return rc; + /* clean on error */ + agent_search(cynagora, name, true); + free(agent); } - /* record the request */ - pr = &cynagora->async.requests; - while(*pr != NULL) - pr = &(*pr)->next; - *pr = ar; - return 0; + return rc; } +/* see cynagora.h */ +int +cynagora_agent_reply( + cynagora_query_t *_query, + cynagora_value_t *value +) { + int rc; + query_t *query = (query_t*)_query; + query_t **p; + cynagora_t *cynagora; + + cynagora = query->cynagora; + if (!cynagora) + rc = -ECANCELED; + else { + /* unlink the query */ + p = &cynagora->queries; + while (*p) + if (*p != query) + p = &(*p)->next; + else { + *p = query->next; + break; + } + + /* send the reply */ + rc = agent_send_reply(cynagora, query->askid, + value ? value->value : _error_, + value ? value->expire : -1); + } + free(query); + return rc; +} -- cgit 1.2.3-korg