summaryrefslogtreecommitdiffstats
path: root/src/main-cynagora-agent.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/main-cynagora-agent.c')
-rw-r--r--src/main-cynagora-agent.c641
1 files changed, 440 insertions, 201 deletions
diff --git a/src/main-cynagora-agent.c b/src/main-cynagora-agent.c
index 88ed995..99d7338 100644
--- a/src/main-cynagora-agent.c
+++ b/src/main-cynagora-agent.c
@@ -23,6 +23,7 @@
#include <stdint.h>
#include <stdbool.h>
#include <stdio.h>
+#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <getopt.h>
@@ -38,6 +39,16 @@
#include "cynagora.h"
#include "expire.h"
+#define BUFFER_SIZE 500
+#define ARGUMENT_COUNT 10
+#define QUERY_COUNT 40
+#define SUBQUERY_COUNT 80
+
+#define FD_FOR_STDIN -1
+#define FD_FOR_CYNAGORA -2
+
+
+#define _LONGHELP_ 'H'
#define _HELP_ 'h'
#define _SOCKET_ 's'
#define _VERSION_ 'v'
@@ -45,11 +56,12 @@
static
const char
-shortopts[] = "hps:v";
+shortopts[] = "Hhps:v";
static
const struct option
longopts[] = {
+ { "long-help", 0, NULL, _LONGHELP_ },
{ "help", 0, NULL, _HELP_ },
{ "piped", 0, NULL, _PIPED_ },
{ "socket", 1, NULL, _SOCKET_ },
@@ -61,20 +73,66 @@ static
const char
helptxt[] =
"\n"
- "usage: cynagora-agent [options]... name [program]\n"
+ "usage: cynagora-agent [options]... name [program [args]...]\n"
"\n"
- "otpions:\n"
+ "options:\n"
" -s, --socket xxx set the base xxx for sockets\n"
" -p, --piped replace stdin/out by out/in of program"
- " -h, --help print this help and exit\n"
+ " -h, --help print short help and exit\n"
+ " -H, --long-help print long help and exit\n"
" -v, --version print the version and exit\n"
"\n"
- "When program is given, cynagora-agent performs invoke it with\n"
- "arguments VALUE CLIENT SESSION USER PERMISSION and expect it\n"
- "to echo the result with optional expiration\n"
- "Otherwise cynagora-agent echo its requests on one line:\n"
- "ID VALUE CLIENT SESSION USER PERMISSION and expect to read the\n"
- "replies: ID RESULT [EXPIRE]\n"
+;
+
+static
+const char
+longhelptxt[] =
+ "When no program is given, cynagora-agent output queries as below\n"
+ "\n"
+ " ID VALUE CLIENT SESSION USER PERMISSION\n"
+ "\n"
+ "where ID is a numeric identifier, VALUE is the value associated\n"
+ "to the agent and client, session, user and permission are from the\n"
+ "query.\n"
+ "\n"
+ "For the replies, it reads from its input:\n"
+ "\n"
+ " ID (yes|no) [expire]\n"
+ "\n"
+ "For the sub queries, it reads from its input:\n"
+ "\n"
+ " ID sub NUM CLIENT SESSION USER PERMISSION\n"
+ "\n"
+ "Where NUM is a numeric identifier. It will reply to sub queries with:\n"
+ "\n"
+ " reply NUM (yes|no)\n"
+ "\n"
+ "When the option --piped is given, the input and output are connected\n"
+ "to the output and input of the given program.\n"
+ "\n"
+ "When program is given but not the option --piped then an instance of\n"
+ "program is invoked for each agent query with predefined environment\n"
+ "variable set: \n"
+ " - CYAG_VALUE value associated to the agent\n"
+ " - CYAG_CLIENT client of the query\n"
+ " - CYAG_SESSION session of the query\n"
+ " - CYAG_USER user of the query\n"
+ " - CYAG_PERMISSION permission of the query\n"
+ "\n"
+ "The program will reply\n"
+ "\n"
+ " (yes|no) [expire]\n"
+ "\n"
+ "and then terminates quickly.\n"
+ "It can also ask for sub-queries:\n"
+ "\n"
+ " sub NUM CLIENT SESSION USER PERMISSION\n"
+ "\n"
+ "Where NUM is a numeric identifier. It will reply to sub queries with:\n"
+ "\n"
+ " reply NUM (yes|no)\n"
+ "\n"
+ "And the program terminates quickly.\n"
"\n"
;
@@ -84,50 +142,23 @@ versiontxt[] =
"cynagora-agent version 1.99.99\n"
;
-typedef struct {
- int filled;
- char buffer[4000];
-} buf_t;
-
-typedef struct {
- int count;
- char *args[20];
-} args_t;
-
-typedef struct {
- int id;
- cynagora_query_t *query;
-} query_t;
-
-typedef struct {
- pid_t pid;
- int id;
- int io[2];
-} proc_t;
-
static char *name;
static char **prog;
static int piped;
static cynagora_t *cynagora;
-static int nexid;
-static buf_t buffer;
-static query_t queries[200];
-static proc_t procs[200];
+/******************************************************************************/
+/******************************************************************************/
-int qidx(int id)
-{
- int r = sizeof queries / sizeof *queries;
- while (r-- && queries[r].id != id);
- return r;
-}
+typedef struct {
+ int filled;
+ char buffer[BUFFER_SIZE];
+} buf_t;
-int pidx(pid_t pid)
-{
- int r = sizeof procs / sizeof *procs;
- while (r-- && procs[r].pid != pid);
- return r;
-}
+typedef struct {
+ int count;
+ char *args[ARGUMENT_COUNT];
+} args_t;
int buf_parse(buf_t *buf, args_t *args)
{
@@ -176,6 +207,7 @@ void buf_unprefix(buf_t *buf, int count)
}
}
+/* read the 'buf' from 'fd' and call 'fun' if line of args exists */
void read_and_dispatch(int fd, buf_t *buf, void (*fun)(int,char**))
{
int n;
@@ -196,6 +228,63 @@ void read_and_dispatch(int fd, buf_t *buf, void (*fun)(int,char**))
}
}
+/******************************************************************************/
+/******************************************************************************/
+
+typedef struct {
+ cynagora_query_t *query;
+ pid_t pid;
+ int id;
+ int io[2];
+ buf_t buf;
+} query_t;
+
+typedef struct {
+ int me;
+ int id;
+ int num;
+} subq_t;
+
+static query_t queries[QUERY_COUNT];
+static subq_t subqs[SUBQUERY_COUNT];
+
+static int nexid;
+static int nexme;
+static int qx;
+static int efd;
+
+int qidx(int id)
+{
+ int r = sizeof queries / sizeof *queries;
+ while (r-- && queries[r].id != id);
+ return r;
+}
+
+int pidx(pid_t pid)
+{
+ int r = sizeof queries / sizeof *queries;
+ while (r-- && queries[r].pid != pid);
+ return r;
+}
+
+int sidx(int me)
+{
+ int r = sizeof subqs / sizeof *subqs;
+ while (r-- && subqs[r].me != me);
+ return r;
+}
+
+/*
+ * pipes and forks
+ *
+ * for the parent it returns the pid > 0 of the child and in io[0] the input
+ * from child and in io[1] to output to child.
+ *
+ * for the child it return 0 and stdin (0) comes from the prent and stout (1)
+ * outputs to parent (io is not used)
+ *
+ * returns -1 in case of error with errno appropriately
+ */
pid_t split(int io[2])
{
int rc;
@@ -230,95 +319,232 @@ pid_t split(int io[2])
return pid;
}
-void deadchild(int sig, siginfo_t *info, void *item)
+/* like printf but with fd and sync */
+int emit(int fd, const char *fmt, ...)
{
- int i;
- pid_t pid;
- buf_t buf;
- args_t args;
- ssize_t sz;
+ int n, w, p;
+ va_list ap;
+ char buffer[2000];
- pid = info->si_pid;
- i = pidx(pid);
- if (i >= 0) {
- args.count = 0;
- sz = read(procs[i].io[0], buf.buffer, sizeof buf.buffer);
- if (sz > 0) {
- buf.filled = (int)sz;
- buf_parse(&buf, &args);
- }
- if (!args.count) {
- args.args[0] = "no";
- args.args[1] = "-";
- args.count = 2;
+ va_start(ap, fmt);
+ n = vsnprintf(buffer, sizeof buffer, fmt, ap);
+ va_end(ap);
+
+ if (n >= (int)sizeof buffer) {
+ errno = EINVAL;
+ return -1;
+ }
+ p = 0;
+ while (p < n) {
+ w = (int)write(fd, &buffer[p], (size_t)(n - p));
+ if (w > 0)
+ p += w;
+ else if (errno != EINTR)
+ return -1;
+ }
+ fsync(fd);
+ return 0;
+
+}
+
+void clear(int id)
+{
+ int r;
+ struct epoll_event e;
+
+ r = sizeof subqs / sizeof *subqs;
+ while (r)
+ if (subqs[--r].id == id)
+ memset(&subqs[r], 0, sizeof subqs[r]);
+
+ r = sizeof queries / sizeof *queries;
+ while (r)
+ if (queries[--r].id == id) {
+ if (queries[r].pid) {
+ memset(&e, 0, sizeof e);
+ epoll_ctl(efd, EPOLL_CTL_DEL, queries[r].io[0], &e);
+ close(queries[r].io[0]);
+ close(queries[r].io[1]);
+ }
+ memset(&queries[r], 0, sizeof queries[r]);
}
- if (args.count == 1)
- printf("%d %s\n", procs[i].id, args.args[0]);
+}
+
+int reply(int q, char *diag, char *expire)
+{
+ cynagora_value_t value;
+ cynagora_query_t *query;
+
+ query = queries[q].query;
+ value.value = strdupa(diag ?: "no");
+ txt2exp(expire ?: "*", &value.expire, true);
+ clear(queries[q].id);
+ return cynagora_agent_reply(query, &value);
+}
+
+void terminate(int id)
+{
+ int q;
+
+ if (id) {
+ q = qidx(id);
+ if (q >= 0)
+ reply(q, NULL, NULL);
else
- printf("%d %s %s\n", procs[i].id, args.args[0], args.args[1]);
- fflush(stdout);
- close(procs[i].io[0]);
- close(procs[i].io[1]);
- procs[i].pid = 0;
+ clear(id);
}
- waitpid(pid, NULL, 0);
}
-void onquery(int ac, char **av)
+void on_subquery_reply(void *closure, int status)
{
- int i;
- pid_t pid;
+ int me, s, q, fd;
- i = pidx(0);
- if (ac == 6 && i >= 0) {
- procs[i].pid = pid = split(procs[i].io);
- if (pid >= 0) {
- procs[i].id = atoi(av[0]);
- if (!pid) {
- setenv("CYAG_VALUE", av[1], 1);
- setenv("CYAG_CLIENT", av[2], 1);
- setenv("CYAG_SESSION", av[3], 1);
- setenv("CYAG_USER", av[4], 1);
- setenv("CYAG_PERMISSION", av[5], 1);
- execvp(prog[0], prog);
- fprintf(stderr, "error: can't exec %s: %s\n", prog[0], strerror(errno));
- exit(1);
- }
- return;
- close(procs[i].io[0]);
- close(procs[i].io[1]);
+
+ me = (int)(intptr_t)closure;
+ s = sidx(me);
+ if (s >= 0) {
+ q = qidx(subqs[s].id);
+ if (q > 0) {
+ fd = prog ? queries[q].io[1] : 1;
+ emit(fd, "reply %d %s\n", subqs[s].num, status ? "yes" : "no");
}
- procs[i].pid = 0;
+ memset(&subqs[s], 0, sizeof subqs[s]);
}
- fprintf(stdout, "%s no -\n", av[0]);
+
}
-int runloop()
+int subquery(int q, int num, char *client, char *session, char *user, char *permission)
+{
+ int rc, me, s;
+ cynagora_key_t key;
+
+ /* get an id */
+ do {
+ me = ++nexme;
+ if (me < 0)
+ me = nexme = 1;
+ } while (sidx(me) >= 0);
+
+ s = sidx(0);
+ subqs[s].me = me;
+ subqs[s].id = queries[q].id;
+ subqs[s].num = num;
+
+ key.client = client ?: "?";
+ key.session = session ?: "?";
+ key.user = user ?: "?";
+ key.permission = permission ?: "?";
+
+ rc = cynagora_agent_subquery_async(queries[q].query, &key, 0, on_subquery_reply, (void*)(intptr_t)me);
+ return rc;
+}
+
+int launch(int q)
+{
+ int rc;
+ int io[2];
+ pid_t pid;
+
+ pid = split(io);
+ if (pid < 0)
+ rc = -1;
+ else if (pid == 0) {
+ setenv("CYAG_VALUE", queries[q].query->value, 1);
+ setenv("CYAG_CLIENT", queries[q].query->key.client, 1);
+ setenv("CYAG_SESSION", queries[q].query->key.session, 1);
+ setenv("CYAG_USER", queries[q].query->key.user, 1);
+ setenv("CYAG_PERMISSION", queries[q].query->key.permission, 1);
+ execvp(prog[0], prog);
+ emit(2, "error: can't exec %s: %s\n", prog[0], strerror(errno));
+ exit(1);
+ } else {
+ queries[q].pid = pid;
+ queries[q].io[0] = io[0];
+ queries[q].io[1] = io[1];
+ rc = 0;
+ }
+ return rc;
+}
+
+void dispatch(int q, int ac, char **av)
+{
+ if (q < 0)
+ return;
+
+ if (ac < 1 || strcmp(av[0], "sub")) {
+ reply(q, av[0], ac > 1 ? av[1] : NULL);
+ } else {
+ subquery(q, ac > 1 ? atoi(av[1]) : 1,
+ ac > 2 ? av[2] : NULL,
+ ac > 3 ? av[3] : NULL,
+ ac > 4 ? av[4] : NULL,
+ ac > 5 ? av[5] : NULL);
+ }
+}
+
+void dispatch_direct(int ac, char **av)
+{
+ int q, qid;
+
+ qid = atoi(av[0]);
+ q = qidx(qid);
+ if (q < 0)
+ return;
+
+ dispatch(q, ac - 1, &av[1]);
+}
+
+void dispatch_fork(int ac, char **av)
+{
+ dispatch(qx, ac, av);
+}
+
+void process_fork(int id)
+{
+ int q = qidx(id);
+ if (q >= 0) {
+ qx = q;
+ read_and_dispatch(queries[q].io[0], &queries[q].buf, dispatch_fork);
+ }
+}
+
+/* handles death of a child */
+void deadchild(int sig, siginfo_t *info, void *item)
+{
+ pid_t pid;
+
+ if (piped) {
+ exit(info->si_code == CLD_EXITED ? info->si_status : 127);
+ return;
+ }
+
+ pid = info->si_pid;
+/*
+ int q, id;
+
+ q = pidx(pid);
+ if (q >= 0) {
+ id = queries[q].id;
+ qx = q;
+ read_and_dispatch(queries[q].io[0], &queries[q].buf, dispatch_fork);
+ terminate(id);
+ }
+*/
+ waitpid(pid, NULL, 0);
+}
+
+int setup_deadchild()
{
- struct pollfd pfd;
struct sigaction sigact;
/* set the signal handler */
sigact.sa_sigaction = deadchild;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = SA_NOCLDSTOP | SA_SIGINFO;
- sigaction(SIGCHLD, &sigact, NULL);
-
- pfd.fd = 0;
- pfd.events = POLLIN;
- for(;;) {
- pfd.revents = 0;
- poll(&pfd, 1, -1);
- if (pfd.revents & POLLIN)
- read_and_dispatch(0, &buffer, onquery);
- if (pfd.revents & POLLHUP)
- break;
- }
-
- return 0;
+ return sigaction(SIGCHLD, &sigact, NULL);
}
-int setup_child()
+int run_piped_program()
{
int rc, io[2];
pid_t pid;
@@ -327,6 +553,7 @@ int setup_child()
pid = split(io);
if (pid < 0)
return -1;
+
if (pid) {
close(0);
dup(io[0]);
@@ -338,20 +565,25 @@ int setup_child()
}
/* run piped if required */
- if (piped) {
- rc = execvp(prog[0], prog);
- fprintf(stderr, "error: can't exec %s: %s\n", prog[0], strerror(errno));
- } else {
- rc = runloop();
- if (rc)
- fprintf(stderr, "error: can't loop: %s\n", strerror(errno));
- }
+ rc = execvp(prog[0], prog);
+ emit(2, "error: can't exec %s: %s\n", prog[0], strerror(errno));
exit(!!rc);
+ return rc;
+}
+
+int async_ctl(void *closure, int op, int fd, uint32_t events)
+{
+ struct epoll_event e;
+ memset(&e, 0, sizeof e);
+ e.events = events;
+ e.data.fd = FD_FOR_CYNAGORA;
+ return epoll_ctl(efd, op, fd, &e);
}
int agentcb(void *closure, cynagora_query_t *query)
{
- int i, id, rc;
+ int q, id, rc;
+ struct epoll_event e;
/* get an id */
do {
@@ -361,62 +593,35 @@ int agentcb(void *closure, cynagora_query_t *query)
} while (qidx(id) >= 0);
/* get an index */
- i = qidx(0);
- if (i < 0)
+ q = qidx(0);
+ if (q < 0)
return -ECANCELED;
- queries[i].id = id;
- queries[i].query = query;
+ queries[q].id = id;
+ queries[q].query = query;
/* compose the value */
- rc = fprintf(stdout, "%d %s %s %s %s %s\n",
- id, query->value, query->key.client, query->key.session,
- query->key.user, query->key.permission);
+ if (prog) {
+ rc = launch(q);
+ if (rc == 0) {
+ memset(&e, 0, sizeof e);
+ e.events = EPOLLIN;
+ e.data.fd = id;
+ rc = epoll_ctl(efd, EPOLL_CTL_ADD, queries[q].io[0], &e);
+ }
+ } else {
+ rc = emit(1, "%d %s %s %s %s %s\n",
+ id, query->value, query->key.client, query->key.session,
+ query->key.user, query->key.permission);
+ }
if (rc < 0) {
- queries[i].query = NULL;
- queries[i].id = 0;
+ clear(id);
return -ECANCELED;
}
return 0;
}
-void onreply(int ac, char **av)
-{
- int i, id;
- cynagora_value_t value;
-
- id = atoi(av[0]);
- i = qidx(id);
- if (i >= 0) {
- value.value = "no";
- value.expire = 0;
- if (ac > 1)
- value.value = av[1];
- if (ac > 2)
- txt2exp(av[2], &value.expire, true);
- cynagora_agent_reply(queries[i].query, &value);
- queries[i].query = NULL;
- queries[i].id = 0;
- }
-}
-
-int async_ctl(void *closure, int op, int fd, uint32_t events)
-{
- int *pfd = closure;
-
- switch(op) {
- case EPOLL_CTL_ADD:
- case EPOLL_CTL_MOD:
- *pfd = fd;
- break;
- case EPOLL_CTL_DEL:
- *pfd = -1;
- break;
- }
- return 0;
-}
-
int main(int ac, char **av)
{
int opt;
@@ -424,8 +629,9 @@ int main(int ac, char **av)
int help = 0;
int version = 0;
int error = 0;
+ struct epoll_event e;
char *socket = NULL;
- struct pollfd fds[2];
+ buf_t buffer = { .filled = 0 };
/* scan arguments */
for (;;) {
@@ -434,8 +640,11 @@ int main(int ac, char **av)
break;
switch(opt) {
+ case _LONGHELP_:
+ help = 2;
+ break;
case _HELP_:
- help = 1;
+ help++;
break;
case _PIPED_:
piped = 1;
@@ -454,81 +663,111 @@ int main(int ac, char **av)
/* handles help, version, error */
if (help) {
- fprintf(stdout, helptxt);
+ printf("%s", helptxt);
+ if (help > 1)
+ printf("%s", longhelptxt);
return 0;
}
if (version) {
- fprintf(stdout, versiontxt);
+ printf("%s", versiontxt);
return 0;
}
/* check agent name */
if (optind == ac) {
- fprintf(stderr, "error: name missing\n");
+ emit(2, "error: name missing\n");
error = 1;
} else {
name = av[optind++];
if (!cynagora_agent_is_valid_name(name)) {
- fprintf(stderr, "error: invalid agent name %s\n", name);
+ emit(2, "error: invalid agent name %s\n", name);
error = 1;
- } else if (optind == ac) {
- prog = NULL;
- } else {
+ } else if (optind < ac) {
prog = &av[optind++];
+ }else if (piped) {
+ emit(2, "error: piped without program\n");
+ error = 1;
+ } else {
+ prog = NULL;
}
}
if (error)
return 1;
+ /* create the polling */
+ efd = epoll_create1(EPOLL_CLOEXEC);
+ if (efd < 0) {
+ emit(2, "error: epoll_create failed, %s\n", strerror(errno));
+ return 1;
+ }
+
/* initialize server */
signal(SIGPIPE, SIG_IGN); /* avoid SIGPIPE! */
rc = cynagora_create(&cynagora, cynagora_Agent, 0, socket);
if (rc < 0) {
- fprintf(stderr, "error: initialization failed, %s\n", strerror(-rc));
+ emit(2, "error: initialization failed, %s\n", strerror(-rc));
return 1;
}
- fds[1].fd = -1;
- rc = cynagora_async_setup(cynagora, async_ctl, &fds[1].fd);
+ rc = cynagora_async_setup(cynagora, async_ctl, NULL);
if (rc < 0) {
- fprintf(stderr, "error: asynchronous setup failed, %s\n", strerror(-rc));
+ emit(2, "error: asynchronous setup failed, %s\n", strerror(-rc));
return 1;
}
/* create the agent */
rc = cynagora_agent_create(cynagora, name, agentcb, NULL);
if (rc < 0) {
- fprintf(stderr, "error: creation of agent failed, %s\n", strerror(-rc));
+ emit(2, "error: creation of agent failed, %s\n", strerror(-rc));
return 1;
}
/* setup piped */
- if (prog) {
- rc = setup_child();
+ setup_deadchild();
+ if (piped) {
+ rc = run_piped_program();
if (rc < 0) {
- fprintf(stderr, "error: can't setup child, %s\n", strerror(errno));
+ emit(2, "error: can't run piped program, %s\n", strerror(errno));
return 1;
}
+ prog = NULL;
}
- /* setup output */
- setlinebuf(stdout);
+ /* catch input if needed */
+ if (!prog) {
+ memset(&e, 0, sizeof e);
+ e.events = EPOLLIN;
+ e.data.fd = FD_FOR_STDIN;
+ rc = epoll_ctl(efd, EPOLL_CTL_ADD, 0, &e);
+ if (rc < 0) {
+ emit(2, "error: set epoll, %s\n", strerror(errno));
+ return 1;
+ }
+ }
- fcntl(0, F_SETFL, O_NONBLOCK);
- fds[0].fd = 0;
- fds[0].events = fds[1].events = POLLIN;
for(;;) {
- rc = poll(fds, 2, -1);
- if (fds[0].revents & POLLIN)
- read_and_dispatch(0, &buffer, onreply);
- if (fds[1].revents & POLLIN) {
- rc = cynagora_async_process(cynagora);
- if (rc < 0)
- fprintf(stderr, "asynchronous processing failed: %s\n", strerror(-rc));
+ rc = epoll_wait(efd, &e, 1, -1);
+ if (rc == 1) {
+ if (e.events & EPOLLIN) {
+ if (e.data.fd == FD_FOR_STDIN) {
+ read_and_dispatch(0, &buffer, dispatch_direct);
+ } else if (e.data.fd == FD_FOR_CYNAGORA) {
+ rc = cynagora_async_process(cynagora);
+ if (rc < 0)
+ emit(2, "asynchronous processing failed: %s\n", strerror(-rc));
+ } else {
+ process_fork(e.data.fd);
+ }
+ }
+ if (e.events & EPOLLHUP) {
+ if (e.data.fd == FD_FOR_STDIN) {
+ break;
+ } else if (e.data.fd == FD_FOR_CYNAGORA) {
+ break;
+ } else {
+ terminate(e.data.fd);
+ }
+ }
}
- if (fds[0].revents & POLLHUP)
- break;
- if (fds[1].revents & POLLHUP)
- break;
}
return 0;
}