aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Bollo <jose.bollo@iot.bzh>2019-10-24 19:24:33 +0200
committerJose Bollo <jose.bollo@iot.bzh>2019-10-25 13:55:28 +0200
commitde2f16aeefe44b27f870e2b6d964b0fc11aa1b93 (patch)
treef51a48d1e02203cb7240eea52ab452a53425fe98
parentdf80ab073c4b6c668e5ebedac01a0cfda7ff7c69 (diff)
Implements sub queries of agents
Change-Id: I30e40521d8f8a2694df00a5c9f55adfe748fbd68 Signed-off-by: José Bollo <jose.bollo@iot.bzh>
-rw-r--r--src/cyn-protocol.c1
-rw-r--r--src/cyn-protocol.h1
-rw-r--r--src/cyn-server.c72
-rw-r--r--src/cynagora-protocol.txt6
-rw-r--r--src/cynagora.c222
-rw-r--r--src/cynagora.h20
-rw-r--r--src/expire.c1
-rw-r--r--src/main-cynagora-admin.c48
-rw-r--r--src/main-cynagora-agent.c641
9 files changed, 691 insertions, 321 deletions
diff --git a/src/cyn-protocol.c b/src/cyn-protocol.c
index f5d4825..d5391ab 100644
--- a/src/cyn-protocol.c
+++ b/src/cyn-protocol.c
@@ -46,6 +46,7 @@ const char
_reply_[] = "reply",
_rollback_[] = "rollback",
_set_[] = "set",
+ _sub_[] = "sub",
_test_[] = "test",
_yes_[] = "yes";
diff --git a/src/cyn-protocol.h b/src/cyn-protocol.h
index 2c55df9..65a69d1 100644
--- a/src/cyn-protocol.h
+++ b/src/cyn-protocol.h
@@ -44,6 +44,7 @@ extern const char
_reply_[],
_rollback_[],
_set_[],
+ _sub_[],
_test_[],
_yes_[];
diff --git a/src/cyn-server.c b/src/cyn-server.c
index 6c0e373..abf37d7 100644
--- a/src/cyn-server.c
+++ b/src/cyn-server.c
@@ -406,6 +406,27 @@ checkcb(
free(check);
}
+/** allocate the check */
+static
+check_t *
+alloccheck(
+ client_t *cli,
+ const char *id,
+ bool ischeck
+) {
+ check_t *check;
+
+ check = malloc(sizeof *check + 1 + strlen(id));
+ if (check) {
+ strcpy(check->id, id);
+ check->ischeck = ischeck;
+ check->client = cli;
+ check->next = cli->checks;
+ cli->checks = check;
+ }
+ return check;
+}
+
/** initiate the check */
static
void
@@ -417,17 +438,13 @@ makecheck(
) {
data_key_t key;
check_t *check;
+ const char *id;
- check = malloc(sizeof *check + 1 + strlen(args[1]));
+ id = args[1];
+ check = alloccheck(cli, id, ischeck);
if (!check)
- replycheck(cli, args[1], NULL, ischeck);
+ replycheck(cli, id, NULL, ischeck);
else {
- strcpy(check->id, args[1]);
- check->ischeck = ischeck;
- check->client = cli;
- check->next = cli->checks;
- cli->checks = check;
-
key.client = args[2];
key.session = args[3];
key.user = args[4];
@@ -506,6 +523,7 @@ agentcb(
return 0;
}
+/* treat the reply to an agent query */
static
void
replycb(
@@ -529,6 +547,38 @@ replycb(
}
}
+/** initiate the check */
+static
+void
+makesub(
+ client_t *cli,
+ const char *args[]
+) {
+ ask_t *ask;
+ data_key_t key;
+ check_t *check;
+ const char *id;
+ const char *askid;
+
+ id = args[2];
+ askid = args[1];
+ ask = searchask(cli, askid, false);
+ if (ask) {
+ check = alloccheck(cli, id, true);
+ if (check) {
+ key.client = args[3];
+ key.session = args[4];
+ key.user = args[5];
+ key.permission = args[6];
+
+ cyn_query_subquery_async(
+ ask->query, checkcb, check, &key);
+ return;
+ }
+ }
+ replycheck(cli, id, NULL, true);
+}
+
/** handle a request */
static
void
@@ -676,6 +726,12 @@ onrequest(
send_done_or_error(cli, rc);
return;
}
+ if (ckarg(args[0], _sub_, 1) && count == 7) {
+ if (cli->type != server_Agent)
+ break;
+ makesub(cli, args);
+ return;
+ }
break;
case 't': /* test */
if (ckarg(args[0], _test_, 1) && count == 6) {
diff --git a/src/cynagora-protocol.txt b/src/cynagora-protocol.txt
index ebefde8..bf255d9 100644
--- a/src/cynagora-protocol.txt
+++ b/src/cynagora-protocol.txt
@@ -118,12 +118,12 @@ synopsis:
s->c ask ASKID NAME VALUE CLIENT SESSION USER PERMISSION
c->s reply ASKID ([yes|no] [always|session|one-time|EXPIRE])
-### sub queries (agent):
+### sub check (agent):
synopsis:
- c->s sub ASKID (test|check) ID CLIENT SESSION USER PERMISSION
- s->c (ack|yes|no) ID [EXPIRE]
+ c->s sub ASKID ID CLIENT SESSION USER PERMISSION
+ s->c (yes|no) ID [EXPIRE]
Notes
-----
diff --git a/src/cynagora.c b/src/cynagora.c
index 2e35876..7f3f001 100644
--- a/src/cynagora.c
+++ b/src/cynagora.c
@@ -756,6 +756,121 @@ async_reply_process(
return 1;
}
+static
+int
+async_check(
+ cynagora_t *cynagora,
+ const cynagora_key_t *key,
+ int force,
+ int simple,
+ cynagora_async_check_cb_t *callback,
+ void *closure,
+ const char *askid
+) {
+ int rc;
+ asreq_t *ar;
+ ascb_t *ac;
+ char *p;
+ int nf;
+ const char *fields[8];
+
+ /* ensure connection */
+ rc = ensure_opened(cynagora);
+ if (rc < 0)
+ return rc;
+
+ /* check cache item */
+ if (!force) {
+ /* ensure there is no clear cache pending */
+ flushr(cynagora);
+
+ rc = cache_search(cynagora->cache, key);
+ if (rc >= 0) {
+ callback(closure, rc);
+ return 0;
+ }
+ }
+
+ /* allocates the callback */
+ ac = malloc(sizeof *ac);
+ if (ac == NULL)
+ return -ENOMEM;
+ ac->callback = callback;
+ ac->closure = closure;
+
+ /* common request only if not subqueries of agents */
+ if (!askid) {
+ /* search the request */
+ ar = cynagora->async.requests;
+ while (ar && (strcmp(key->client, ar->key.client)
+ || strcmp(key->session, ar->key.session)
+ || strcmp(key->user, ar->key.user)
+ || strcmp(key->permission, ar->key.permission)))
+ ar = ar->next;
+
+ /* a same request is pending, use it */
+ if (ar) {
+ ac->next = ar->callbacks;
+ ar->callbacks = ac;
+ return 0;
+ }
+ }
+
+ /* allocate for the request */
+ ar = malloc(sizeof *ar + strlen(key->client) + strlen(key->session) + strlen(key->user) + strlen(key->permission) + 4);
+ if (ar == NULL) {
+ free(ac);
+ return -ENOMEM;
+ }
+
+ /* init */
+ ac->next = NULL;
+ ar->callbacks = ac;
+ p = (char*)(ar + 1);
+ ar->key.client = p;
+ p = stpcpy(p, key->client) + 1;
+ ar->key.session = p;
+ p = stpcpy(p, key->session) + 1;
+ ar->key.user = p;
+ p = stpcpy(p, key->user) + 1;
+ ar->key.permission = p;
+ stpcpy(p, key->permission);
+ do {
+ idgen_next(cynagora->idgen);
+ } while (search_async_request(cynagora, cynagora->idgen, false));
+ strcpy(ar->id, cynagora->idgen);
+ ar->next = cynagora->async.requests;
+ cynagora->async.requests = ar;
+
+ /* send the request */
+ if (askid) {
+ fields[0] = _sub_;
+ fields[1] = askid;
+ nf = 2;
+ } else {
+ fields[0] = simple ? _test_ : _check_;
+ nf = 1;
+ }
+ fields[nf++] = ar->id;
+ fields[nf++] = key->client;
+ fields[nf++] = key->session;
+ fields[nf++] = key->user;
+ fields[nf++] = key->permission;
+ rc = send_reply(cynagora, fields, nf);
+ if (rc < 0) {
+ ar = search_async_request(cynagora, ar->id, true);
+ while((ac = ar->callbacks)) {
+ ar->callbacks = ac->next;
+ free(ac);
+ }
+ free(ar);
+ return rc;
+ }
+
+ /* record the request */
+ return 0;
+}
+
/******************************************************************************/
/*** PUBLIC COMMON METHODS ***/
/******************************************************************************/
@@ -951,90 +1066,7 @@ cynagora_async_check(
cynagora_async_check_cb_t *callback,
void *closure
) {
- int rc;
- asreq_t *ar;
- ascb_t *ac;
- char *p;
-
- /* ensure connection */
- rc = ensure_opened(cynagora);
- if (rc < 0)
- return rc;
-
- /* check cache item */
- if (!force) {
- /* ensure there is no clear cache pending */
- flushr(cynagora);
-
- rc = cache_search(cynagora->cache, key);
- if (rc >= 0) {
- callback(closure, rc);
- return 0;
- }
- }
-
- /* allocates the callback */
- ac = malloc(sizeof *ac);
- if (ac == NULL)
- return -ENOMEM;
- ac->callback = callback;
- ac->closure = closure;
-
- /* search the request */
- ar = cynagora->async.requests;
- while (ar && (strcmp(key->client, ar->key.client)
- || strcmp(key->session, ar->key.session)
- || strcmp(key->user, ar->key.user)
- || strcmp(key->permission, ar->key.permission)))
- ar = ar->next;
-
- /* a same request is pending, use it */
- if (ar) {
- ac->next = ar->callbacks;
- ar->callbacks = ac;
- return 0;
- }
-
- /* allocate for the request */
- ar = malloc(sizeof *ar + strlen(key->client) + strlen(key->session) + strlen(key->user) + strlen(key->permission) + 4);
- if (ar == NULL) {
- free(ac);
- return -ENOMEM;
- }
-
- /* init */
- ac->next = NULL;
- ar->callbacks = ac;
- p = (char*)(ar + 1);
- ar->key.client = p;
- p = stpcpy(p, key->client) + 1;
- ar->key.session = p;
- p = stpcpy(p, key->session) + 1;
- ar->key.user = p;
- p = stpcpy(p, key->user) + 1;
- ar->key.permission = p;
- stpcpy(p, key->permission);
- do {
- idgen_next(cynagora->idgen);
- } while (search_async_request(cynagora, cynagora->idgen, false));
- strcpy(ar->id, cynagora->idgen);
- ar->next = cynagora->async.requests;
- cynagora->async.requests = ar;
-
- /* send the request */
- rc = putxkv(cynagora, simple ? _test_ : _check_, ar->id, key, 0);
- if (rc < 0) {
- ar = search_async_request(cynagora, ar->id, true);
- while((ac = ar->callbacks)) {
- ar->callbacks = ac->next;
- free(ac);
- }
- free(ar);
- return rc;
- }
-
- /* record the request */
- return 0;
+ return async_check(cynagora, key, force, simple, callback, closure, NULL);
}
/******************************************************************************/
@@ -1465,3 +1497,25 @@ cynagora_agent_reply(
free(query);
return rc;
}
+
+/* see cynagora.h */
+int
+cynagora_agent_subquery_async(
+ cynagora_query_t *_query,
+ const cynagora_key_t *key,
+ int force,
+ cynagora_async_check_cb_t *callback,
+ void *closure
+) {
+ int rc;
+ query_t *query = (query_t*)_query;
+ cynagora_t *cynagora;
+
+ cynagora = query->cynagora;
+ if (!cynagora)
+ rc = -ECANCELED;
+ else
+ rc = async_check(cynagora, key, force, false,
+ callback, closure, query->askid);
+ return rc;
+}
diff --git a/src/cynagora.h b/src/cynagora.h
index 167b9bc..7ae166a 100644
--- a/src/cynagora.h
+++ b/src/cynagora.h
@@ -499,3 +499,23 @@ cynagora_agent_reply(
cynagora_query_t *query,
cynagora_value_t *value
);
+
+/**
+ * Check a rule as a sub query of the agent
+ *
+ * @param query the related agent query
+ * @param key the key to check
+ * @param force if true forbids cache check
+ * @param callback the callback to handle the asynchronous reply
+ * @param closure the closure to the callback
+ * @return 0 on success or a negative -errno code
+ */
+extern
+int
+cynagora_agent_subquery_async(
+ cynagora_query_t *query,
+ const cynagora_key_t *key,
+ int force,
+ cynagora_async_check_cb_t *callback,
+ void *closure
+);
diff --git a/src/expire.c b/src/expire.c
index a90a8de..baf9b6b 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -97,7 +97,6 @@ static bool parse_time_spec(const char *txt, time_t *time_out)
return true;
}
-
/* see expire.h */
bool txt2exp(const char *txt, time_t *time_out, bool absolute)
{
diff --git a/src/main-cynagora-admin.c b/src/main-cynagora-admin.c
index 210084c..aea022a 100644
--- a/src/main-cynagora-admin.c
+++ b/src/main-cynagora-admin.c
@@ -91,7 +91,7 @@ static
const char
help__text[] =
"\n"
- "Commands are: list, set, drop, acheck, check, atest, test, cache, clear, quit, log, help\n"
+ "Commands are: list, set, drop, acheck, check, test, stest, cache, clear, quit, log, help\n"
"Type 'help command' to get help on the command\n"
"Type 'help expiration' to get help on expirations\n"
"\n"
@@ -180,12 +180,12 @@ help_check_text[] =
static
const char
-help_acheck_text[] =
+help_scheck_text[] =
"\n"
- "Command: acheck client session user permission\n"
+ "Command: scheck client session user permission\n"
"\n"
- "Check asynchronously authorization for the given 'client', 'session', 'user', 'permission'.\n"
- "Same as check but don't wait the answer.\n"
+ "Check synchronously authorization for the given 'client', 'session', 'user', 'permission'.\n"
+ "Same as check but wait the answer.\n"
"\n"
;
@@ -209,12 +209,12 @@ help_test_text[] =
static
const char
-help_atest_text[] =
+help_stest_text[] =
"\n"
- "Command: atest client session user permission\n"
+ "Command: stest client session user permission\n"
"\n"
- "Test asynchronously authorization for the given 'client', 'session', 'user', 'permission'.\n"
- "Same as test but don't wait the answer.\n"
+ "Test synchronously authorization for the given 'client', 'session', 'user', 'permission'.\n"
+ "Same as test but wait the answer.\n"
"\n"
;
@@ -518,7 +518,7 @@ int do_drop(int ac, char **av)
return uc;
}
-int do_check(int ac, char **av, int (*f)(cynagora_t*,const cynagora_key_t*,int))
+int do_scheck(int ac, char **av, int (*f)(cynagora_t*,const cynagora_key_t*,int))
{
int uc, rc;
@@ -556,7 +556,7 @@ int do_cache_check(int ac, char **av)
return uc;
}
-void acheck_cb(void *closure, int status)
+void check_cb(void *closure, int status)
{
if (status > 0)
fprintf(stdout, "allowed\n");
@@ -567,14 +567,14 @@ void acheck_cb(void *closure, int status)
pending--;
}
-int do_acheck(int ac, char **av, bool simple)
+int do_check(int ac, char **av, bool simple)
{
int uc, rc;
rc = get_csup(ac, av, &uc, NULL);
if (rc == 0) {
pending++;
- rc = cynagora_async_check(cynagora, &key, 0, simple, acheck_cb, NULL);
+ rc = cynagora_async_check(cynagora, &key, 0, simple, check_cb, NULL);
if (rc < 0) {
fprintf(stderr, "error %s\n", strerror(-rc));
pending--;
@@ -615,12 +615,12 @@ int do_help(int ac, char **av)
fprintf(stdout, "%s", help_drop_text);
else if (ac > 1 && !strcmp(av[1], "check"))
fprintf(stdout, "%s", help_check_text);
- else if (ac > 1 && !strcmp(av[1], "acheck"))
- fprintf(stdout, "%s", help_acheck_text);
+ else if (ac > 1 && !strcmp(av[1], "scheck"))
+ fprintf(stdout, "%s", help_scheck_text);
else if (ac > 1 && !strcmp(av[1], "test"))
fprintf(stdout, "%s", help_test_text);
- else if (ac > 1 && !strcmp(av[1], "atest"))
- fprintf(stdout, "%s", help_atest_text);
+ else if (ac > 1 && !strcmp(av[1], "stest"))
+ fprintf(stdout, "%s", help_stest_text);
else if (ac > 1 && !strcmp(av[1], "cache"))
fprintf(stdout, "%s", help_cache_text);
else if (ac > 1 && !strcmp(av[1], "clear"))
@@ -654,17 +654,17 @@ int do_any(int ac, char **av)
if (!strcmp(av[0], "drop"))
return do_drop(ac, av);
+ if (!strcmp(av[0], "scheck"))
+ return do_scheck(ac, av, cynagora_check);
+
if (!strcmp(av[0], "check"))
- return do_check(ac, av, cynagora_check);
+ return do_check(ac, av, 0);
- if (!strcmp(av[0], "acheck"))
- return do_acheck(ac, av, 0);
+ if (!strcmp(av[0], "stest"))
+ return do_scheck(ac, av, cynagora_test);
if (!strcmp(av[0], "test"))
- return do_check(ac, av, cynagora_test);
-
- if (!strcmp(av[0], "atest"))
- return do_acheck(ac, av, 1);
+ return do_check(ac, av, 1);
if (!strcmp(av[0], "cache"))
return do_cache_check(ac, av);
diff --git a/src/main-cynagora-agent.c b/src/main-cynagora-agent.c
index 88ed995..99d7338 100644
--- a/src/main-cynagora-agent.c
+++ b/src/main-cynagora-agent.c
@@ -23,6 +23,7 @@
#include <stdint.h>
#include <stdbool.h>
#include <stdio.h>
+#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <getopt.h>
@@ -38,6 +39,16 @@
#include "cynagora.h"
#include "expire.h"
+#define BUFFER_SIZE 500
+#define ARGUMENT_COUNT 10
+#define QUERY_COUNT 40
+#define SUBQUERY_COUNT 80
+
+#define FD_FOR_STDIN -1
+#define FD_FOR_CYNAGORA -2
+
+
+#define _LONGHELP_ 'H'
#define _HELP_ 'h'
#define _SOCKET_ 's'
#define _VERSION_ 'v'
@@ -45,11 +56,12 @@
static
const char
-shortopts[] = "hps:v";
+shortopts[] = "Hhps:v";
static
const struct option
longopts[] = {
+ { "long-help", 0, NULL, _LONGHELP_ },
{ "help", 0, NULL, _HELP_ },
{ "piped", 0, NULL, _PIPED_ },
{ "socket", 1, NULL, _SOCKET_ },
@@ -61,20 +73,66 @@ static
const char
helptxt[] =
"\n"
- "usage: cynagora-agent [options]... name [program]\n"
+ "usage: cynagora-agent [options]... name [program [args]...]\n"
"\n"
- "otpions:\n"
+ "options:\n"
" -s, --socket xxx set the base xxx for sockets\n"
" -p, --piped replace stdin/out by out/in of program"
- " -h, --help print this help and exit\n"
+ " -h, --help print short help and exit\n"
+ " -H, --long-help print long help and exit\n"
" -v, --version print the version and exit\n"
"\n"
- "When program is given, cynagora-agent performs invoke it with\n"
- "arguments VALUE CLIENT SESSION USER PERMISSION and expect it\n"
- "to echo the result with optional expiration\n"
- "Otherwise cynagora-agent echo its requests on one line:\n"
- "ID VALUE CLIENT SESSION USER PERMISSION and expect to read the\n"
- "replies: ID RESULT [EXPIRE]\n"
+;
+
+static
+const char
+longhelptxt[] =
+ "When no program is given, cynagora-agent output queries as below\n"
+ "\n"
+ " ID VALUE CLIENT SESSION USER PERMISSION\n"
+ "\n"
+ "where ID is a numeric identifier, VALUE is the value associated\n"
+ "to the agent and client, session, user and permission are from the\n"
+ "query.\n"
+ "\n"
+ "For the replies, it reads from its input:\n"
+ "\n"
+ " ID (yes|no) [expire]\n"
+ "\n"
+ "For the sub queries, it reads from its input:\n"
+ "\n"
+ " ID sub NUM CLIENT SESSION USER PERMISSION\n"
+ "\n"
+ "Where NUM is a numeric identifier. It will reply to sub queries with:\n"
+ "\n"
+ " reply NUM (yes|no)\n"
+ "\n"
+ "When the option --piped is given, the input and output are connected\n"
+ "to the output and input of the given program.\n"
+ "\n"
+ "When program is given but not the option --piped then an instance of\n"
+ "program is invoked for each agent query with predefined environment\n"
+ "variable set: \n"
+ " - CYAG_VALUE value associated to the agent\n"
+ " - CYAG_CLIENT client of the query\n"
+ " - CYAG_SESSION session of the query\n"
+ " - CYAG_USER user of the query\n"
+ " - CYAG_PERMISSION permission of the query\n"
+ "\n"
+ "The program will reply\n"
+ "\n"
+ " (yes|no) [expire]\n"
+ "\n"
+ "and then terminates quickly.\n"
+ "It can also ask for sub-queries:\n"
+ "\n"
+ " sub NUM CLIENT SESSION USER PERMISSION\n"
+ "\n"
+ "Where NUM is a numeric identifier. It will reply to sub queries with:\n"
+ "\n"
+ " reply NUM (yes|no)\n"
+ "\n"
+ "And the program terminates quickly.\n"
"\n"
;
@@ -84,50 +142,23 @@ versiontxt[] =
"cynagora-agent version 1.99.99\n"
;
-typedef struct {
- int filled;
- char buffer[4000];
-} buf_t;
-
-typedef struct {
- int count;
- char *args[20];
-} args_t;
-
-typedef struct {
- int id;
- cynagora_query_t *query;
-} query_t;
-
-typedef struct {
- pid_t pid;
- int id;
- int io[2];
-} proc_t;
-
static char *name;
static char **prog;
static int piped;
static cynagora_t *cynagora;
-static int nexid;
-static buf_t buffer;
-static query_t queries[200];
-static proc_t procs[200];
+/******************************************************************************/
+/******************************************************************************/
-int qidx(int id)
-{
- int r = sizeof queries / sizeof *queries;
- while (r-- && queries[r].id != id);
- return r;
-}
+typedef struct {
+ int filled;
+ char buffer[BUFFER_SIZE];
+} buf_t;
-int pidx(pid_t pid)
-{
- int r = sizeof procs / sizeof *procs;
- while (r-- && procs[r].pid != pid);
- return r;
-}
+typedef struct {
+ int count;
+ char *args[ARGUMENT_COUNT];
+} args_t;
int buf_parse(buf_t *buf, args_t *args)
{
@@ -176,6 +207,7 @@ void buf_unprefix(buf_t *buf, int count)
}
}
+/* read the 'buf' from 'fd' and call 'fun' if line of args exists */
void read_and_dispatch(int fd, buf_t *buf, void (*fun)(int,char**))
{
int n;
@@ -196,6 +228,63 @@ void read_and_dispatch(int fd, buf_t *buf, void (*fun)(int,char**))
}
}
+/******************************************************************************/
+/******************************************************************************/
+
+typedef struct {
+ cynagora_query_t *query;
+ pid_t pid;
+ int id;
+ int io[2];
+ buf_t buf;
+} query_t;
+
+typedef struct {
+ int me;
+ int id;
+ int num;
+} subq_t;
+
+static query_t queries[QUERY_COUNT];
+static subq_t subqs[SUBQUERY_COUNT];
+
+static int nexid;
+static int nexme;
+static int qx;
+static int efd;
+
+int qidx(int id)
+{
+ int r = sizeof queries / sizeof *queries;
+ while (r-- && queries[r].id != id);
+ return r;
+}
+
+int pidx(pid_t pid)
+{
+ int r = sizeof queries / sizeof *queries;
+ while (r-- && queries[r].pid != pid);
+ return r;
+}
+
+int sidx(int me)
+{
+ int r = sizeof subqs / sizeof *subqs;
+ while (r-- && subqs[r].me != me);
+ return r;
+}
+
+/*
+ * pipes and forks
+ *
+ * for the parent it returns the pid > 0 of the child and in io[0] the input
+ * from child and in io[1] to output to child.
+ *
+ * for the child it return 0 and stdin (0) comes from the prent and stout (1)
+ * outputs to parent (io is not used)
+ *
+ * returns -1 in case of error with errno appropriately
+ */
pid_t split(int io[2])
{
int rc;
@@ -230,95 +319,232 @@ pid_t split(int io[2])
return pid;
}
-void deadchild(int sig, siginfo_t *info, void *item)
+/* like printf but with fd and sync */
+int emit(int fd, const char *fmt, ...)
{
- int i;
- pid_t pid;
- buf_t buf;
- args_t args;
- ssize_t sz;
+ int n, w, p;
+ va_list ap;
+ char buffer[2000];
- pid = info->si_pid;
- i = pidx(pid);
- if (i >= 0) {
- args.count = 0;
- sz = read(procs[i].io[0], buf.buffer, sizeof buf.buffer);
- if (sz > 0) {
- buf.filled = (int)sz;
- buf_parse(&buf, &args);
- }
- if (!args.count) {
- args.args[0] = "no";
- args.args[1] = "-";
- args.count = 2;
+ va_start(ap, fmt);
+ n = vsnprintf(buffer, sizeof buffer, fmt, ap);
+ va_end(ap);
+
+ if (n >= (int)sizeof buffer) {
+ errno = EINVAL;
+ return -1;
+ }
+ p = 0;
+ while (p < n) {
+ w = (int)write(fd, &buffer[p], (size_t)(n - p));
+ if (w > 0)
+ p += w;
+ else if (errno != EINTR)
+ return -1;
+ }
+ fsync(fd);
+ return 0;
+
+}
+
+void clear(int id)
+{
+ int r;
+ struct epoll_event e;
+
+ r = sizeof subqs / sizeof *subqs;
+ while (r)
+ if (subqs[--r].id == id)
+ memset(&subqs[r], 0, sizeof subqs[r]);
+
+ r = sizeof queries / sizeof *queries;
+ while (r)
+ if (queries[--r].id == id) {
+ if (queries[r].pid) {
+ memset(&e, 0, sizeof e);
+ epoll_ctl(efd, EPOLL_CTL_DEL, queries[r].io[0], &e);
+ close(queries[r].io[0]);
+ close(queries[r].io[1]);
+ }
+ memset(&queries[r], 0, sizeof queries[r]);
}
- if (args.count == 1)
- printf("%d %s\n", procs[i].id, args.args[0]);
+}
+
+int reply(int q, char *diag, char *expire)
+{
+ cynagora_value_t value;
+ cynagora_query_t *query;
+
+ query = queries[q].query;
+ value.value = strdupa(diag ?: "no");
+ txt2exp(expire ?: "*", &value.expire, true);
+ clear(queries[q].id);
+ return cynagora_agent_reply(query, &value);
+}
+
+void terminate(int id)
+{
+ int q;
+
+ if (id) {
+ q = qidx(id);
+ if (q >= 0)
+ reply(q, NULL, NULL);
else
- printf("%d %s %s\n", procs[i].id, args.args[0], args.args[1]);
- fflush(stdout);
- close(procs[i].io[0]);
- close(procs[i].io[1]);
- procs[i].pid = 0;
+ clear(id);
}
- waitpid(pid, NULL, 0);
}
-void onquery(int ac, char **av)
+void on_subquery_reply(void *closure, int status)
{
- int i;
- pid_t pid;
+ int me, s, q, fd;
- i = pidx(0);
- if (ac == 6 && i >= 0) {
- procs[i].pid = pid = split(procs[i].io);
- if (pid >= 0) {
- procs[i].id = atoi(av[0]);
- if (!pid) {
- setenv("CYAG_VALUE", av[1], 1);
- setenv("CYAG_CLIENT", av[2], 1);
- setenv("CYAG_SESSION", av[3], 1);
- setenv("CYAG_USER", av[4], 1);
- setenv("CYAG_PERMISSION", av[5], 1);
- execvp(prog[0], prog);
- fprintf(stderr, "error: can't exec %s: %s\n", prog[0], strerror(errno));
- exit(1);
- }
- return;
- close(procs[i].io[0]);
- close(procs[i].io[1]);
+
+ me = (int)(intptr_t)closure;
+ s = sidx(me);
+ if (s >= 0) {
+ q = qidx(subqs[s].id);
+ if (q > 0) {
+ fd = prog ? queries[q].io[1] : 1;
+ emit(fd, "reply %d %s\n", subqs[s].num, status ? "yes" : "no");
}
- procs[i].pid = 0;
+ memset(&subqs[s], 0, sizeof subqs[s]);
}
- fprintf(stdout, "%s no -\n", av[0]);
+
}
-int runloop()
+int subquery(int q, int num, char *client, char *session, char *user, char *permission)
+{
+ int rc, me, s;
+ cynagora_key_t key;
+
+ /* get an id */
+ do {
+ me = ++nexme;
+ if (me < 0)
+ me = nexme = 1;
+ } while (sidx(me) >= 0);
+
+ s = sidx(0);
+ subqs[s].me = me;
+ subqs[s].id = queries[q].id;
+ subqs[s].num = num;
+
+ key.client = client ?: "?";
+ key.session = session ?: "?";
+ key.user = user ?: "?";
+ key.permission = permission ?: "?";
+
+ rc = cynagora_agent_subquery_async(queries[q].query, &key, 0, on_subquery_reply, (void*)(intptr_t)me);
+ return rc;
+}
+
+int launch(int q)
+{
+ int rc;
+ int io[2];
+ pid_t pid;
+
+ pid = split(io);
+ if (pid < 0)
+ rc = -1;
+ else if (pid == 0) {
+ setenv("CYAG_VALUE", queries[q].query->value, 1);
+ setenv("CYAG_CLIENT", queries[q].query->key.client, 1);
+ setenv("CYAG_SESSION", queries[q].query->key.session, 1);
+ setenv("CYAG_USER", queries[q].query->key.user, 1);
+ setenv("CYAG_PERMISSION", queries[q].query->key.permission, 1);
+ execvp(prog[0], prog);
+ emit(2, "error: can't exec %s: %s\n", prog[0], strerror(errno));
+ exit(1);
+ } else {
+ queries[q].pid = pid;
+ queries[q].io[0] = io[0];
+ queries[q].io[1] = io[1];
+ rc = 0;
+ }
+ return rc;
+}
+
+void dispatch(int q, int ac, char **av)
+{
+ if (q < 0)
+ return;
+
+ if (ac < 1 || strcmp(av[0], "sub")) {
+ reply(q, av[0], ac > 1 ? av[1] : NULL);
+ } else {
+ subquery(q, ac > 1 ? atoi(av[1]) : 1,
+ ac > 2 ? av[2] : NULL,
+ ac > 3 ? av[3] : NULL,
+ ac > 4 ? av[4] : NULL,
+ ac > 5 ? av[5] : NULL);
+ }
+}
+
+void dispatch_direct(int ac, char **av)
+{
+ int q, qid;
+
+ qid = atoi(av[0]);
+ q = qidx(qid);
+ if (q < 0)
+ return;
+
+ dispatch(q, ac - 1, &av[1]);
+}
+
+void dispatch_fork(int ac, char **av)
+{
+ dispatch(qx, ac, av);
+}
+
+void process_fork(int id)
+{
+ int q = qidx(id);
+ if (q >= 0) {
+ qx = q;
+ read_and_dispatch(queries[q].io[0], &queries[q].buf, dispatch_fork);
+ }
+}
+
+/* handles death of a child */
+void deadchild(int sig, siginfo_t *info, void *item)
+{
+ pid_t pid;
+
+ if (piped) {
+ exit(info->si_code == CLD_EXITED ? info->si_status : 127);
+ return;
+ }
+
+ pid = info->si_pid;
+/*
+ int q, id;
+
+ q = pidx(pid);
+ if (q >= 0) {
+ id = queries[q].id;
+ qx = q;
+ read_and_dispatch(queries[q].io[0], &queries[q].buf, dispatch_fork);
+ terminate(id);
+ }
+*/
+ waitpid(pid, NULL, 0);
+}
+
+int setup_deadchild()
{
- struct pollfd pfd;
struct sigaction sigact;
/* set the signal handler */
sigact.sa_sigaction = deadchild;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = SA_NOCLDSTOP | SA_SIGINFO;
- sigaction(SIGCHLD, &sigact, NULL);
-
- pfd.fd = 0;
- pfd.events = POLLIN;
- for(;;) {
- pfd.revents = 0;
- poll(&pfd, 1, -1);
- if (pfd.revents & POLLIN)
- read_and_dispatch(0, &buffer, onquery);
- if (pfd.revents & POLLHUP)
- break;
- }
-
- return 0;
+ return sigaction(SIGCHLD, &sigact, NULL);
}
-int setup_child()
+int run_piped_program()
{
int rc, io[2];
pid_t pid;
@@ -327,6 +553,7 @@ int setup_child()
pid = split(io);
if (pid < 0)
return -1;
+
if (pid) {
close(0);
dup(io[0]);
@@ -338,20 +565,25 @@ int setup_child()
}
/* run piped if required */
- if (piped) {
- rc = execvp(prog[0], prog);
- fprintf(stderr, "error: can't exec %s: %s\n", prog[0], strerror(errno));
- } else {
- rc = runloop();
- if (rc)
- fprintf(stderr, "error: can't loop: %s\n", strerror(errno));
- }
+ rc = execvp(prog[0], prog);
+ emit(2, "error: can't exec %s: %s\n", prog[0], strerror(errno));
exit(!!rc);
+ return rc;
+}
+
+int async_ctl(void *closure, int op, int fd, uint32_t events)
+{
+ struct epoll_event e;
+ memset(&e, 0, sizeof e);
+ e.events = events;
+ e.data.fd = FD_FOR_CYNAGORA;
+ return epoll_ctl(efd, op, fd, &e);
}
int agentcb(void *closure, cynagora_query_t *query)
{
- int i, id, rc;
+ int q, id, rc;
+ struct epoll_event e;
/* get an id */
do {
@@ -361,62 +593,35 @@ int agentcb(void *closure, cynagora_query_t *query)
} while (qidx(id) >= 0);
/* get an index */
- i = qidx(0);
- if (i < 0)
+ q = qidx(0);
+ if (q < 0)
return -ECANCELED;
- queries[i].id = id;
- queries[i].query = query;
+ queries[q].id = id;
+ queries[q].query = query;
/* compose the value */
- rc = fprintf(stdout, "%d %s %s %s %s %s\n",
- id, query->value, query->key.client, query->key.session,
- query->key.user, query->key.permission);
+ if (prog) {
+ rc = launch(q);
+ if (rc == 0) {
+ memset(&e, 0, sizeof e);
+ e.events = EPOLLIN;
+ e.data.fd = id;
+ rc = epoll_ctl(efd, EPOLL_CTL_ADD, queries[q].io[0], &e);
+ }
+ } else {
+ rc = emit(1, "%d %s %s %s %s %s\n",
+ id, query->value, query->key.client, query->key.session,
+ query->key.user, query->key.permission);
+ }
if (rc < 0) {
- queries[i].query = NULL;
- queries[i].id = 0;
+ clear(id);
return -ECANCELED;
}
return 0;
}
-void onreply(int ac, char **av)
-{
- int i, id;
- cynagora_value_t value;
-
- id = atoi(av[0]);
- i = qidx(id);
- if (i >= 0) {
- value.value = "no";
- value.expire = 0;
- if (ac > 1)
- value.value = av[1];
- if (ac > 2)
- txt2exp(av[2], &value.expire, true);
- cynagora_agent_reply(queries[i].query, &value);
- queries[i].query = NULL;
- queries[i].id = 0;
- }
-}
-
-int async_ctl(void *closure, int op, int fd, uint32_t events)
-{
- int *pfd = closure;
-
- switch(op) {
- case EPOLL_CTL_ADD:
- case EPOLL_CTL_MOD:
- *pfd = fd;
- break;
- case EPOLL_CTL_DEL:
- *pfd = -1;
- break;
- }
- return 0;
-}
-
int main(int ac, char **av)
{
int opt;
@@ -424,8 +629,9 @@ int main(int ac, char **av)
int help = 0;
int version = 0;
int error = 0;
+ struct epoll_event e;
char *socket = NULL;
- struct pollfd fds[2];
+ buf_t buffer = { .filled = 0 };
/* scan arguments */
for (;;) {
@@ -434,8 +640,11 @@ int main(int ac, char **av)
break;
switch(opt) {
+ case _LONGHELP_:
+ help = 2;
+ break;
case _HELP_:
- help = 1;
+ help++;
break;
case _PIPED_:
piped = 1;
@@ -454,81 +663,111 @@ int main(int ac, char **av)
/* handles help, version, error */
if (help) {
- fprintf(stdout, helptxt);
+ printf("%s", helptxt);
+ if (help > 1)
+ printf("%s", longhelptxt);
return 0;
}
if (version) {
- fprintf(stdout, versiontxt);
+ printf("%s", versiontxt);
return 0;
}
/* check agent name */
if (optind == ac) {
- fprintf(stderr, "error: name missing\n");
+ emit(2, "error: name missing\n");
error = 1;
} else {
name = av[optind++];
if (!cynagora_agent_is_valid_name(name)) {
- fprintf(stderr, "error: invalid agent name %s\n", name);
+ emit(2, "error: invalid agent name %s\n", name);
error = 1;
- } else if (optind == ac) {
- prog = NULL;
- } else {
+ } else if (optind < ac) {
prog = &av[optind++];
+ }else if (piped) {
+ emit(2, "error: piped without program\n");
+ error = 1;
+ } else {
+ prog = NULL;
}
}
if (error)
return 1;
+ /* create the polling */
+ efd = epoll_create1(EPOLL_CLOEXEC);
+ if (efd < 0) {
+ emit(2, "error: epoll_create failed, %s\n", strerror(errno));
+ return 1;
+ }
+
/* initialize server */
signal(SIGPIPE, SIG_IGN); /* avoid SIGPIPE! */
rc = cynagora_create(&cynagora, cynagora_Agent, 0, socket);
if (rc < 0) {
- fprintf(stderr, "error: initialization failed, %s\n", strerror(-rc));
+ emit(2, "error: initialization failed, %s\n", strerror(-rc));
return 1;
}
- fds[1].fd = -1;
- rc = cynagora_async_setup(cynagora, async_ctl, &fds[1].fd);
+ rc = cynagora_async_setup(cynagora, async_ctl, NULL);
if (rc < 0) {
- fprintf(stderr, "error: asynchronous setup failed, %s\n", strerror(-rc));
+ emit(2, "error: asynchronous setup failed, %s\n", strerror(-rc));
return 1;
}
/* create the agent */
rc = cynagora_agent_create(cynagora, name, agentcb, NULL);
if (rc < 0) {
- fprintf(stderr, "error: creation of agent failed, %s\n", strerror(-rc));
+ emit(2, "error: creation of agent failed, %s\n", strerror(-rc));
return 1;
}
/* setup piped */
- if (prog) {
- rc = setup_child();
+ setup_deadchild();
+ if (piped) {
+ rc = run_piped_program();
if (rc < 0) {
- fprintf(stderr, "error: can't setup child, %s\n", strerror(errno));
+ emit(2, "error: can't run piped program, %s\n", strerror(errno));
return 1;
}
+ prog = NULL;
}
- /* setup output */
- setlinebuf(stdout);
+ /* catch input if needed */
+ if (!prog) {
+ memset(&e, 0, sizeof e);
+ e.events = EPOLLIN;
+ e.data.fd = FD_FOR_STDIN;
+ rc = epoll_ctl(efd, EPOLL_CTL_ADD, 0, &e);
+ if (rc < 0) {
+ emit(2, "error: set epoll, %s\n", strerror(errno));
+ return 1;
+ }
+ }
- fcntl(0, F_SETFL, O_NONBLOCK);
- fds[0].fd = 0;
- fds[0].events = fds[1].events = POLLIN;
for(;;) {
- rc = poll(fds, 2, -1);
- if (fds[0].revents & POLLIN)
- read_and_dispatch(0, &buffer, onreply);
- if (fds[1].revents & POLLIN) {
- rc = cynagora_async_process(cynagora);
- if (rc < 0)
- fprintf(stderr, "asynchronous processing failed: %s\n", strerror(-rc));
+ rc = epoll_wait(efd, &e, 1, -1);
+ if (rc == 1) {
+ if (e.events & EPOLLIN) {
+ if (e.data.fd == FD_FOR_STDIN) {
+ read_and_dispatch(0, &buffer, dispatch_direct);
+ } else if (e.data.fd == FD_FOR_CYNAGORA) {
+ rc = cynagora_async_process(cynagora);
+ if (rc < 0)
+ emit(2, "asynchronous processing failed: %s\n", strerror(-rc));
+ } else {
+ process_fork(e.data.fd);
+ }
+ }
+ if (e.events & EPOLLHUP) {
+ if (e.data.fd == FD_FOR_STDIN) {
+ break;
+ } else if (e.data.fd == FD_FOR_CYNAGORA) {
+ break;
+ } else {
+ terminate(e.data.fd);
+ }
+ }
}
- if (fds[0].revents & POLLHUP)
- break;
- if (fds[1].revents & POLLHUP)
- break;
}
return 0;
}