aboutsummaryrefslogtreecommitdiffstats
path: root/src/cynagora.c
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2019-10-03 17:58:02 +0200
committerJosé Bollo <jose.bollo@iot.bzh>2019-10-04 16:02:32 +0200
commit9cffedba0dfcf845c34901051d24a4b967ec824d (patch)
treefb648b8c7056cd7f46e0c14589f12c5a857df150 /src/cynagora.c
parentb5bd40e1e68b739307e20e19d2164c5b370846df (diff)
Move to CYNAGORA
Rename everything to match the new name of cynagora Change-Id: I7e529def486ca5721164b338d2c615996fbc6ca4 Signed-off-by: José Bollo <jose.bollo@iot.bzh>
Diffstat (limited to 'src/cynagora.c')
-rw-r--r--src/cynagora.c823
1 files changed, 823 insertions, 0 deletions
diff --git a/src/cynagora.c b/src/cynagora.c
new file mode 100644
index 0000000..1165e64
--- /dev/null
+++ b/src/cynagora.c
@@ -0,0 +1,823 @@
+/*
+ * Copyright (C) 2018 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/******************************************************************************/
+/******************************************************************************/
+/* IMPLEMENTATION OF CLIENT PART OF CYNAGORA-PROTOCOL */
+/******************************************************************************/
+/******************************************************************************/
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+#include <assert.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/epoll.h>
+
+#include "prot.h"
+#include "cyn-protocol.h"
+#include "cynagora.h"
+#include "cache.h"
+#include "socket.h"
+
+#define MIN_CACHE_SIZE 400
+#define CACHESIZE(x) ((x) >= MIN_CACHE_SIZE ? (x) : (x) ? MIN_CACHE_SIZE : 0)
+
+/** recording of asynchronous requests */
+struct asreq
+{
+ /** link to the next pending request */
+ struct asreq *next;
+
+ /** callback function */
+ void (*callback)(
+ void *closure,
+ int status);
+
+ /** closure of the callback */
+ void *closure;
+};
+typedef struct asreq asreq_t;
+
+/**
+ * structure recording a client
+ */
+struct cynagora
+{
+ /** file descriptor of the socket */
+ int fd;
+
+ /** count of pending requests */
+ int pending;
+
+ /** type of link */
+ cynagora_type_t type;
+
+ /** spec of the socket */
+ const char *socketspec;
+
+ /** protocol manager object */
+ prot_t *prot;
+
+ /** cache object */
+ cache_t *cache;
+
+ /** copy of the reply */
+ struct {
+ /** count of fields of the reply */
+ int count;
+
+ /** fields (or fields) of the reply */
+ const char **fields;
+ } reply;
+
+ /** async */
+ struct {
+ /** control callback */
+ cynagora_async_ctl_t controlcb;
+
+ /** closure */
+ void *closure;
+
+ /** requests */
+ asreq_t *requests;
+ } async;
+};
+
+static void disconnection(cynagora_t *cynagora);
+
+/**
+ * Flush the write buffer
+ */
+static
+int
+flushw(
+ cynagora_t *cynagora
+) {
+ int rc;
+ struct pollfd pfd;
+
+ for (;;) {
+ rc = prot_should_write(cynagora->prot);
+ if (!rc)
+ break;
+ rc = prot_write(cynagora->prot, cynagora->fd);
+ if (rc == -EAGAIN) {
+ pfd.fd = cynagora->fd;
+ pfd.events = POLLOUT;
+ do { rc = poll(&pfd, 1, -1); } while (rc < 0 && errno == EINTR);
+ if (rc < 0)
+ rc = -errno;
+ }
+ if (rc < 0) {
+ break;
+ }
+ }
+ return rc;
+}
+
+/**
+ * Put the command made of arguments ...
+ * Increment the count of pending requests.
+ * Return 0 in case of success or a negative number on error.
+ */
+static
+int
+putxkv(
+ cynagora_t *cynagora,
+ const char *command,
+ const char *optarg,
+ const cynagora_key_t *optkey,
+ const cynagora_value_t *optval
+) {
+ int rc, trial;
+ prot_t *prot;
+ char text[30];
+
+ prot = cynagora->prot;
+ for(trial = 0 ; ; trial++) {
+ rc = prot_put_field(prot, command);
+ if (!rc && optarg)
+ rc = prot_put_field(prot, optarg);
+ if (!rc && optkey) {
+ rc = prot_put_field(prot, optkey->client);
+ if (!rc)
+ rc = prot_put_field(prot, optkey->session);
+ if (!rc)
+ rc = prot_put_field(prot, optkey->user);
+ if (!rc)
+ rc = prot_put_field(prot, optkey->permission);
+ }
+ if (!rc && optval) {
+ rc = prot_put_field(prot, optval->value);
+ if (!rc) {
+ if (!optval->expire)
+ text[0] = 0;
+ else
+ snprintf(text, sizeof text, "%lld", (long long)optval->expire);
+ rc = prot_put_field(prot, text);
+ }
+ }
+ if (!rc)
+ rc = prot_put_end(prot);
+ if (!rc) {
+ /* client always flushes */
+ cynagora->pending++;
+ return flushw(cynagora);
+ }
+ prot_put_cancel(prot);
+ if (trial >= 1)
+ return rc;
+ rc = flushw(cynagora);
+ if (rc)
+ return rc;
+ }
+}
+
+static
+int
+wait_input(
+ cynagora_t *cynagora
+) {
+ int rc;
+ struct pollfd pfd;
+
+ pfd.fd = cynagora->fd;
+ pfd.events = POLLIN;
+ do { rc = poll(&pfd, 1, -1); } while (rc < 0 && errno == EINTR);
+ return rc < 0 ? -errno : 0;
+}
+
+static
+int
+get_reply(
+ cynagora_t *cynagora
+) {
+ int rc;
+
+ prot_next(cynagora->prot);
+ rc = prot_get(cynagora->prot, &cynagora->reply.fields);
+ if (rc > 0) {
+ if (0 == strcmp(cynagora->reply.fields[0], _clear_)) {
+ cache_clear(cynagora->cache,
+ rc > 1 ? (uint32_t)atol(cynagora->reply.fields[1]) : 0);
+ rc = 0;
+ } else {
+ if (0 != strcmp(cynagora->reply.fields[0], _item_))
+ cynagora->pending--;
+ }
+ }
+ cynagora->reply.count = rc;
+ return rc;
+}
+
+static
+int
+wait_reply(
+ cynagora_t *cynagora,
+ bool block
+) {
+ int rc;
+
+ for(;;) {
+ prot_next(cynagora->prot);
+ rc = get_reply(cynagora);
+ if (rc > 0)
+ return rc;
+ if (rc < 0) {
+ rc = prot_read(cynagora->prot, cynagora->fd);
+ while (rc <= 0) {
+ if (rc == 0)
+ return -(errno = EPIPE);
+ if (rc == -EAGAIN && block)
+ rc = wait_input(cynagora);
+ if (rc < 0)
+ return rc;
+ rc = prot_read(cynagora->prot, cynagora->fd);
+ }
+ }
+ }
+}
+
+static
+int
+flushr(
+ cynagora_t *cynagora
+) {
+ int rc;
+
+ do { rc = wait_reply(cynagora, false); } while(rc > 0);
+ return rc;
+}
+
+static
+int
+status_done(
+ cynagora_t *cynagora
+) {
+ return strcmp(cynagora->reply.fields[0], _done_) ? -ECANCELED : 0;
+}
+
+static
+int
+status_check(
+ cynagora_t *cynagora,
+ time_t *expire
+) {
+ int rc;
+
+ if (!strcmp(cynagora->reply.fields[0], _yes_))
+ rc = 1;
+ else if (!strcmp(cynagora->reply.fields[0], _no_))
+ rc = 0;
+ else if (!strcmp(cynagora->reply.fields[0], _done_))
+ rc = -EEXIST;
+ else
+ rc = -EPROTO;
+
+ if (cynagora->reply.count >= 2)
+ *expire = strtoll(cynagora->reply.fields[1], NULL, 10);
+ else
+ *expire = 0;
+
+ return rc;
+}
+
+static
+int
+wait_pending_reply(
+ cynagora_t *cynagora
+) {
+ int rc;
+ for (;;) {
+ rc = wait_reply(cynagora, true);
+ if (rc < 0)
+ return rc;
+ if (rc > 0 && cynagora->pending == 0)
+ return rc;
+ }
+}
+
+static
+int
+wait_done(
+ cynagora_t *cynagora
+) {
+ int rc = wait_pending_reply(cynagora);
+ if (rc > 0)
+ rc = status_done(cynagora);
+ return rc;
+}
+
+static
+int
+async(
+ cynagora_t *cynagora,
+ int op,
+ uint32_t events
+) {
+ return cynagora->async.controlcb && cynagora->fd >= 0
+ ? cynagora->async.controlcb(cynagora->async.closure, op, cynagora->fd, events)
+ : 0;
+}
+
+static
+void
+disconnection(
+ cynagora_t *cynagora
+) {
+ if (cynagora->fd >= 0) {
+ async(cynagora, EPOLL_CTL_DEL, 0);
+ close(cynagora->fd);
+ cynagora->fd = -1;
+ }
+}
+
+static
+int
+connection(
+ cynagora_t *cynagora
+) {
+ int rc;
+
+ /* init the client */
+ cynagora->pending = 0;
+ cynagora->reply.count = -1;
+ prot_reset(cynagora->prot);
+ cynagora->fd = socket_open(cynagora->socketspec, 0);
+ if (cynagora->fd < 0)
+ return -errno;
+
+ /* negociate the protocol */
+ rc = putxkv(cynagora, _cynagora_, "1", 0, 0);
+ if (rc >= 0) {
+ rc = wait_pending_reply(cynagora);
+ if (rc >= 0) {
+ rc = -EPROTO;
+ if (cynagora->reply.count >= 2
+ && 0 == strcmp(cynagora->reply.fields[0], _yes_)
+ && 0 == strcmp(cynagora->reply.fields[1], "1")) {
+ cache_clear(cynagora->cache,
+ cynagora->reply.count > 2 ? (uint32_t)atol(cynagora->reply.fields[2]) : 0);
+ rc = async(cynagora, EPOLL_CTL_ADD, EPOLLIN);
+ if (rc >= 0)
+ return 0;
+ }
+ }
+ }
+ disconnection(cynagora);
+ return rc;
+}
+
+static
+int
+ensure_opened(
+ cynagora_t *cynagora
+) {
+ if (cynagora->fd >= 0 && write(cynagora->fd, NULL, 0) < 0)
+ disconnection(cynagora);
+ return cynagora->fd < 0 ? connection(cynagora) : 0;
+}
+
+/************************************************************************************/
+
+int
+cynagora_open(
+ cynagora_t **prcyn,
+ cynagora_type_t type,
+ uint32_t cache_size,
+ const char *socketspec
+) {
+ cynagora_t *cynagora;
+ int rc;
+
+ /* socket spec */
+ switch(type) {
+ default:
+ case cynagora_Check: socketspec = cyn_get_socket_check(socketspec); break;
+ case cynagora_Admin: socketspec = cyn_get_socket_admin(socketspec); break;
+ case cynagora_Agent: socketspec = cyn_get_socket_agent(socketspec); break;
+ }
+
+ /* allocate the structure */
+ *prcyn = cynagora = malloc(sizeof *cynagora + 1 + strlen(socketspec));
+ if (cynagora == NULL) {
+ rc = -ENOMEM;
+ goto error;
+ }
+
+ /* create a protocol object */
+ rc = prot_create(&cynagora->prot);
+ if (rc < 0)
+ goto error2;
+
+ /* socket spec */
+ strcpy((char*)(cynagora+1), socketspec);
+
+ /* record type and weakly create cache */
+ cache_create(&cynagora->cache, CACHESIZE(cache_size));
+ cynagora->type = type;
+ cynagora->socketspec = socketspec;
+ cynagora->async.controlcb = NULL;
+ cynagora->async.closure = 0;
+ cynagora->async.requests = NULL;
+
+ /* lazy connection */
+ cynagora->fd = -1;
+
+ /* done */
+ return 0;
+
+error2:
+ free(cynagora);
+error:
+ *prcyn = NULL;
+ return rc;
+}
+
+void
+cynagora_disconnect(
+ cynagora_t *cynagora
+) {
+ disconnection(cynagora);
+}
+
+void
+cynagora_close(
+ cynagora_t *cynagora
+) {
+ cynagora_async_setup(cynagora, NULL, NULL);
+ disconnection(cynagora);
+ prot_destroy(cynagora->prot);
+ free(cynagora->cache);
+ free(cynagora);
+}
+
+int
+cynagora_enter(
+ cynagora_t *cynagora
+) {
+ int rc;
+
+ if (cynagora->type != cynagora_Admin)
+ return -EPERM;
+ if (cynagora->async.requests != NULL)
+ return -EINPROGRESS;
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
+
+ rc = putxkv(cynagora, _enter_, 0, 0, 0);
+ if (rc >= 0)
+ rc = wait_done(cynagora);
+ return rc;
+}
+
+int
+cynagora_leave(
+ cynagora_t *cynagora,
+ bool commit
+) {
+ int rc;
+
+ if (cynagora->type != cynagora_Admin)
+ return -EPERM;
+ if (cynagora->async.requests != NULL)
+ return -EINPROGRESS;
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
+
+ rc = putxkv(cynagora, _leave_, commit ? _commit_ : 0/*default: rollback*/, 0, 0);
+ if (rc >= 0)
+ rc = wait_done(cynagora);
+ return rc;
+}
+
+static
+int
+check_or_test(
+ cynagora_t *cynagora,
+ const cynagora_key_t *key,
+ const char *action
+) {
+ int rc;
+ time_t expire;
+
+ if (cynagora->async.requests != NULL)
+ return -EINPROGRESS;
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
+
+ /* ensure there is no clear cache pending */
+ flushr(cynagora);
+
+ /* check cache item */
+ rc = cache_search(cynagora->cache, key);
+ if (rc >= 0)
+ return rc;
+
+ /* send the request */
+ rc = putxkv(cynagora, action, 0, key, 0);
+ if (rc >= 0) {
+ /* get the response */
+ rc = wait_pending_reply(cynagora);
+ if (rc >= 0) {
+ rc = status_check(cynagora, &expire);
+ if (cynagora->cache && rc >= 0)
+ cache_put(cynagora->cache, key, rc, expire);
+ }
+ }
+ return rc;
+}
+
+int
+cynagora_check(
+ cynagora_t *cynagora,
+ const cynagora_key_t *key
+) {
+ return check_or_test(cynagora, key, _check_);
+}
+
+int
+cynagora_test(
+ cynagora_t *cynagora,
+ const cynagora_key_t *key
+) {
+ return check_or_test(cynagora, key, _test_);
+}
+
+int
+cynagora_set(
+ cynagora_t *cynagora,
+ const cynagora_key_t *key,
+ const cynagora_value_t *value
+) {
+ int rc;
+
+ if (cynagora->type != cynagora_Admin)
+ return -EPERM;
+ if (cynagora->async.requests != NULL)
+ return -EINPROGRESS;
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
+
+ rc = putxkv(cynagora, _set_, 0, key, value);
+ if (rc >= 0)
+ rc = wait_done(cynagora);
+ return rc;
+}
+
+int
+cynagora_get(
+ cynagora_t *cynagora,
+ const cynagora_key_t *key,
+ void (*callback)(
+ void *closure,
+ const cynagora_key_t *key,
+ const cynagora_value_t *value
+ ),
+ void *closure
+) {
+ int rc;
+ cynagora_key_t k;
+ cynagora_value_t v;
+
+ if (cynagora->type != cynagora_Admin)
+ return -EPERM;
+ if (cynagora->async.requests != NULL)
+ return -EINPROGRESS;
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
+
+ rc = putxkv(cynagora, _get_, 0, key, 0);
+ if (rc >= 0) {
+ rc = wait_reply(cynagora, true);
+ while ((rc == 6 || rc == 7) && !strcmp(cynagora->reply.fields[0], _item_)) {
+ k.client = cynagora->reply.fields[1];
+ k.session = cynagora->reply.fields[2];
+ k.user = cynagora->reply.fields[3];
+ k.permission = cynagora->reply.fields[4];
+ v.value = cynagora->reply.fields[5];
+ v.expire = rc == 6 ? 0 : (time_t)strtoll(cynagora->reply.fields[6], NULL, 10);
+ callback(closure, &k, &v);
+ rc = wait_reply(cynagora, true);
+ }
+ rc = status_done(cynagora);
+ }
+ return rc;
+}
+
+int
+cynagora_log(
+ cynagora_t *cynagora,
+ int on,
+ int off
+) {
+ int rc;
+
+ if (cynagora->type != cynagora_Admin)
+ return -EPERM;
+ if (cynagora->async.requests != NULL)
+ return -EINPROGRESS;
+
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
+
+ rc = putxkv(cynagora, _log_, off ? _off_ : on ? _on_ : 0, 0, 0);
+ if (rc >= 0)
+ rc = wait_done(cynagora);
+
+ return rc < 0 ? rc : cynagora->reply.count < 2 ? 0 : !strcmp(cynagora->reply.fields[1], _on_);
+}
+
+
+int
+cynagora_drop(
+ cynagora_t *cynagora,
+ const cynagora_key_t *key
+) {
+ int rc;
+
+ if (cynagora->type != cynagora_Admin)
+ return -EPERM;
+ if (cynagora->async.requests != NULL)
+ return -EINPROGRESS;
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
+
+ rc = putxkv(cynagora, _drop_, 0, key, 0);
+ if (rc >= 0)
+ rc = wait_done(cynagora);
+ return rc;
+}
+
+/************************************************************************************/
+
+int
+cynagora_cache_resize(
+ cynagora_t *cynagora,
+ uint32_t size
+) {
+ return cache_resize(&cynagora->cache, CACHESIZE(size));
+}
+
+void
+cynagora_cache_clear(
+ cynagora_t *cynagora
+) {
+ cache_clear(cynagora->cache, 0);
+}
+
+int
+cynagora_cache_check(
+ cynagora_t *cynagora,
+ const cynagora_key_t *key
+) {
+ return cache_search(cynagora->cache, key);
+}
+
+
+/************************************************************************************/
+
+int
+cynagora_async_setup(
+ cynagora_t *cynagora,
+ cynagora_async_ctl_t controlcb,
+ void *closure
+) {
+ asreq_t *ar;
+
+ /* cancel pending requests */
+ while((ar = cynagora->async.requests) != NULL) {
+ cynagora->async.requests = ar->next;
+ ar->callback(ar->closure, -ECANCELED);
+ free(ar);
+ }
+ /* remove existing polling */
+ async(cynagora, EPOLL_CTL_DEL, 0);
+ /* records new data */
+ cynagora->async.controlcb = controlcb;
+ cynagora->async.closure = closure;
+ /* record to polling */
+ return async(cynagora, EPOLL_CTL_ADD, EPOLLIN);
+}
+
+int
+cynagora_async_process(
+ cynagora_t *cynagora
+) {
+ int rc;
+ const char *first;
+ asreq_t *ar;
+ time_t expire;
+ cynagora_key_t key;
+
+ for (;;) {
+ /* non blocking wait for a reply */
+ rc = wait_reply(cynagora, false);
+ if (rc < 0)
+ return rc == -EAGAIN ? 0 : rc;
+
+ /* skip empty replies */
+ if (rc == 0)
+ continue;
+
+ /* skip done/error replies */
+ first = cynagora->reply.fields[0];
+ if (!strcmp(first, _done_)
+ || !strcmp(first, _error_))
+ continue;
+
+ /* ignore unexpected answers */
+ ar = cynagora->async.requests;
+ if (ar == NULL)
+ continue;
+
+ /* emit the asynchronous answer */
+ cynagora->async.requests = ar->next;
+ rc = status_check(cynagora, &expire);
+ if (rc >= 0) {
+ key.client = (const char*)(ar + 1);
+ key.session = &key.client[1 + strlen(key.client)];
+ key.user = &key.session[1 + strlen(key.session)];
+ key.permission = &key.user[1 + strlen(key.user)];
+ cache_put(cynagora->cache, &key, rc, expire);
+ }
+ ar->callback(ar->closure, rc);
+ free(ar);
+ }
+}
+
+int
+cynagora_async_check(
+ cynagora_t *cynagora,
+ const cynagora_key_t *key,
+ int simple,
+ void (*callback)(
+ void *closure,
+ int status),
+ void *closure
+) {
+ int rc;
+ asreq_t **pr, *ar;
+
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
+
+ /* allocate */
+ ar = malloc(sizeof *ar + strlen(key->client) + strlen(key->session) + strlen(key->user) + strlen(key->permission) + 4);
+ if (ar == NULL)
+ return -ENOMEM;
+
+ /* init */
+ ar->next = NULL;
+ ar->callback = callback;
+ ar->closure = closure;
+ stpcpy(1 + stpcpy(1 + stpcpy(1 + stpcpy((char*)(ar + 1), key->client), key->session), key->user), key->permission);
+
+ /* send the request */
+ rc = putxkv(cynagora, simple ? _test_ : _check_, 0, key, 0);
+ if (rc >= 0)
+ rc = flushw(cynagora);
+ if (rc < 0) {
+ free(ar);
+ return rc;
+ }
+
+ /* record the request */
+ pr = &cynagora->async.requests;
+ while(*pr != NULL)
+ pr = &(*pr)->next;
+ *pr = ar;
+ return 0;
+}
+