aboutsummaryrefslogtreecommitdiffstats
path: root/src/cyn-server.c
diff options
context:
space:
mode:
authorJose Bollo <jose.bollo@iot.bzh>2019-10-14 18:02:38 +0200
committerJose Bollo <jose.bollo@iot.bzh>2019-10-16 11:05:46 +0200
commitd927b8c4d931b3fa4c5744778976081e9218a838 (patch)
tree41a8403daf9d90caa46429d625da4ff92b51261c /src/cyn-server.c
parentf53a76ce91ab83c7345a104b57f148738101c58d (diff)
Implementation of agent protocol and tool
Signed-off-by: Jose Bollo <jose.bollo@iot.bzh>
Diffstat (limited to 'src/cyn-server.c')
-rw-r--r--src/cyn-server.c174
1 files changed, 160 insertions, 14 deletions
diff --git a/src/cyn-server.c b/src/cyn-server.c
index be72e92..97a44d1 100644
--- a/src/cyn-server.c
+++ b/src/cyn-server.c
@@ -44,6 +44,11 @@
#include "socket.h"
#include "pollitem.h"
#include "expire.h"
+#include "cynagora.h"
+
+typedef struct client client_t;
+typedef struct agent agent_t;
+typedef struct ask ask_t;
#define MAX_PUTX_ITEMS 15
@@ -87,8 +92,26 @@ struct client
/** polling callback */
pollitem_t pollitem;
+
+ /** list of pending ask */
+ ask_t *asks;
+
+ /** last askid */
+ uint32_t askid;
+};
+
+/** structure for pending asks */
+struct ask
+{
+ /** chained list */
+ ask_t *next;
+
+ /** query */
+ cynagora_query_t *query;
+
+ /** id of the ask */
+ uint32_t id;
};
-typedef struct client client_t;
/** structure for servers */
struct cyn_server
@@ -265,8 +288,8 @@ entercb(
) {
client_t *cli = closure;
- cli->entered = true;
- cli->entering = false;
+ cli->entered = 1;
+ cli->entering = 0;
send_done(cli);
}
@@ -371,6 +394,96 @@ getcb(
value->value, exp2get(value->expire, text, sizeof text), NULL);
}
+/** search the request of askid */
+static
+ask_t*
+searchask(
+ client_t *cli,
+ uint32_t askid,
+ bool unlink
+) {
+ ask_t *r, **prv;
+
+ prv = &cli->asks;
+ while ((r = *prv) && r->id != askid)
+ prv = &r->next;
+ if (r && unlink)
+ *prv = r->next;
+ return r;
+}
+
+/** callback of agents */
+static
+int
+agentcb(
+ const char *name,
+ void *closure,
+ const data_key_t *key,
+ const char *value,
+ cynagora_query_t *query
+) {
+ client_t *cli = closure;
+ uint32_t askid;
+ ask_t *ask;
+ char buffer[30];
+ int rc;
+
+ /* search a valid id */
+ do {
+ askid = ++cli->askid;
+ } while (!askid && searchask(cli, askid, false));
+ rc = snprintf(buffer, sizeof buffer, "%lu", (long unsigned)askid);
+ if (rc < 0)
+ return -errno;
+ if (rc >= (int)sizeof buffer)
+ return -ECANCELED;
+
+ /* allocate the ask structure */
+ ask = malloc(sizeof *ask);
+ if (!ask)
+ return -ENOMEM;
+ ask->id = askid;
+ ask->query = query;
+
+ /* link the ask */
+ ask->next = cli->asks;
+ cli->asks = ask;
+
+ /* make the query */
+ putx(cli, _ask_, buffer, name, value,
+ key->client, key->session, key->user, key->permission,
+ NULL);
+ flushw(cli);
+ return 0;
+}
+
+static
+void
+replycb(
+ client_t *cli,
+ const char *askid,
+ const char *yesno,
+ const char *expire
+) {
+ ask_t *ask;
+ unsigned long int ul;
+ data_value_t value;
+
+ ul = strtoul(askid, NULL, 10);
+ if (ul <= UINT32_MAX) {
+ ask = searchask(cli, (uint32_t)ul, true);
+ if (ask) {
+ value.value = yesno;
+ if (!expire)
+ value.expire = 0;
+ else if (!txt2exp(expire, &value.expire, true))
+ value.expire = -1;
+ cyn_query_reply(ask->query, &value);
+ free(ask);
+ }
+ }
+}
+
/** handle a request */
static
void
@@ -408,10 +521,11 @@ onrequest(
switch(args[0][0]) {
case 'a': /* agent */
- if (ckarg(args[0], _agent_, 1) && count == 5) {
+ if (ckarg(args[0], _agent_, 1) && count == 2) {
if (cli->type != server_Agent)
break;
- /* TODO */ break;
+ rc = cyn_agent_add(args[1], agentcb, cli);
+ send_done_or_error(cli, rc);
return;
}
break;
@@ -447,7 +561,7 @@ onrequest(
break;
if (cli->entered || cli->entering)
break;
- cli->entering = true;
+ cli->entering = 1;
/* TODO: remove from polling until entered? */
cyn_enter_async(entercb, cli);
return;
@@ -475,7 +589,7 @@ onrequest(
if (!cli->entered)
break;
rc = cyn_leave(cli, count == 2 && ckarg(args[1], _commit_, 0));
- cli->entered = false;
+ cli->entered = 0;
send_done_or_error(cli, rc);
return;
}
@@ -494,6 +608,14 @@ onrequest(
return;
}
break;
+ case 'r': /* reply */
+ if (ckarg(args[0], _reply_, 1) && (count == 3 || count == 4)) {
+ if (cli->type != server_Agent)
+ break;
+ replycb(cli, args[1], args[2], count == 4 ? args[3] : NULL);
+ return;
+ }
+ break;
case 's': /* set */
if (ckarg(args[0], _set_, 1) && (count == 6 || count == 7)) {
if (cli->type != server_Admin)
@@ -541,7 +663,7 @@ onchange(
) {
client_t *cli = closure;
if (cli->checked) {
- cli->checked = false;
+ cli->checked = 0;
putx(cli, _clear_, cyn_changeid_string(), NULL);
flushw(cli);
}
@@ -554,13 +676,35 @@ destroy_client(
client_t *cli,
bool closefds
) {
+ ask_t *ask;
+ data_value_t value;
+
+ /* remove observers */
+ cyn_on_change_remove(onchange, cli);
+
+ /* close protocol */
if (closefds)
close(cli->pollitem.fd);
if (cli->entering)
cyn_enter_async_cancel(entercb, cli);
if (cli->entered)
cyn_leave(cli, false);
- cyn_on_change_remove(onchange, cli);
+
+ /* clean of asks */
+ ask = cli->asks;
+ if (ask) {
+ value.value = _no_;
+ value.expire = -1;
+ do {
+ cyn_query_reply(ask->query, &value);
+ cli->asks = ask->next;
+ free(ask);
+ ask = cli->asks;
+ } while (ask);
+ }
+
+ /* clean of agents */
+ cyn_agent_remove_by_cc(agentcb, cli);
prot_destroy(cli->prot);
free(cli);
}
@@ -634,14 +778,16 @@ create_client(
/* records the file descriptor */
cli->type = type;
cli->version = 0; /* version not set */
- cli->relax = true; /* relax on error */
- cli->invalid = false; /* not invalid */
- cli->entered = false; /* not entered */
- cli->entering = false; /* not entering */
- cli->checked = false; /* no check made */
+ cli->relax = 0; /* relax on error */
+ cli->invalid = 0; /* not invalid */
+ cli->entered = 0; /* not entered */
+ cli->entering = 0; /* not entering */
+ cli->checked = 0; /* no check made */
cli->pollitem.handler = on_client_event;
cli->pollitem.closure = cli;
cli->pollitem.fd = fd;
+ cli->asks = NULL;
+ cli->askid = 0;
return 0;
error3:
prot_destroy(cli->prot);