summaryrefslogtreecommitdiffstats
path: root/src/afb-supervision.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/afb-supervision.c')
-rw-r--r--src/afb-supervision.c369
1 files changed, 369 insertions, 0 deletions
diff --git a/src/afb-supervision.c b/src/afb-supervision.c
new file mode 100644
index 00000000..a7099a4a
--- /dev/null
+++ b/src/afb-supervision.c
@@ -0,0 +1,369 @@
+/*
+ * Copyright (C) 2016, 2017 "IoT.bzh"
+ * Author José Bollo <jose.bollo@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+#define AFB_BINDING_PRAGMA_NO_VERBOSE_MACRO
+
+#include <string.h>
+#include <errno.h>
+#include <signal.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <json-c/json.h>
+#include <afb/afb-binding-v2.h>
+
+#include "afb-cred.h"
+#include "afb-api.h"
+#include "afb-apiset.h"
+#include "afb-api-so-v2.h"
+#include "afb-xreq.h"
+#include "afb-trace.h"
+#include "afb-session.h"
+#include "afb-supervision.h"
+#include "afs-supervision.h"
+#include "afb-stub-ws.h"
+#include "afb-debug.h"
+#include "verbose.h"
+#include "wrap-json.h"
+
+/* api and apiset name */
+static const char supervision_apiname[] = AFS_SURPERVISION_APINAME;
+
+/* path of the supervision socket */
+static const char supervisor_socket_path[] = AFS_SURPERVISION_SOCKET;
+
+/* mutual exclusion */
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+
+/* the standard apiset */
+extern struct afb_apiset *main_apiset;
+
+/* the supervision apiset (not exported) */
+static struct afb_apiset *supervision_apiset;
+
+/* local api implementation */
+static void on_supervision_call(void *closure, struct afb_xreq *xreq);
+static struct afb_api_itf supervision_api_itf =
+{
+ .call = on_supervision_call
+};
+
+/* the supervisor link */
+static struct afb_stub_ws *supervisor;
+
+/* the trace api */
+static struct afb_trace *trace;
+
+/* open the socket */
+static int open_supervisor_socket(const char *path)
+{
+ int fd, rc;
+ struct sockaddr_un addr;
+ size_t length;
+
+ /* check path length */
+ length = strlen(path);
+ if (length >= 108) {
+ errno = ENAMETOOLONG;
+ return -1;
+ }
+
+ /* create the unix socket */
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (fd < 0)
+ return fd;
+
+ /* prepare the connection address */
+ memset(&addr, 0, sizeof addr);
+ addr.sun_family = AF_UNIX;
+ strcpy(addr.sun_path, path);
+ if (addr.sun_path[0] == '@')
+ addr.sun_path[0] = 0; /* implement abstract sockets */
+
+ /* connect the socket */
+ rc = connect(fd, (struct sockaddr *) &addr, (socklen_t)(sizeof addr));
+ if (rc < 0) {
+ close(fd);
+ return rc;
+ }
+ return fd;
+}
+
+static void cleanup_supervisor(void *nada)
+{
+ struct afb_trace *t = __atomic_exchange_n(&trace, NULL, __ATOMIC_RELAXED);
+ if (t)
+ afb_trace_unref(t);
+ supervisor = NULL;
+}
+
+static void disconnect_supervisor()
+{
+ struct afb_stub_ws *s = __atomic_exchange_n(&supervisor, NULL, __ATOMIC_RELAXED);
+
+ if (s)
+ afb_stub_ws_unref(s);
+}
+
+/* try to connect to supervisor */
+static void try_connect_supervisor()
+{
+ int fd;
+ ssize_t srd;
+ struct afs_supervision_initiator initiator;
+
+ /* get the mutex */
+ pthread_mutex_lock(&mutex);
+
+ /* needs to connect? */
+ if (supervisor || !supervision_apiset)
+ goto end;
+
+ /* check existing path */
+ if (supervisor_socket_path[0] != '@' && access(supervisor_socket_path, F_OK)) {
+ NOTICE("Can't acces socket path %s: %m", supervisor_socket_path);
+ goto end;
+ }
+
+ /* socket connection */
+ fd = open_supervisor_socket(supervisor_socket_path);
+ if (fd < 0) {
+ NOTICE("Can't connect supervision socket to %s: %m", supervisor_socket_path);
+ goto end;
+ }
+
+ /* negociation */
+ do { srd = read(fd, &initiator, sizeof initiator); } while(srd < 0 && errno == EINTR);
+ if (srd < 0) {
+ NOTICE("Can't read supervisor %s: %m", supervisor_socket_path);
+ goto end2;
+ }
+ if ((size_t)srd != sizeof initiator) {
+ ERROR("When reading supervisor %s: %m", supervisor_socket_path);
+ goto end2;
+ }
+ if (strnlen(initiator.interface, sizeof initiator.interface) == sizeof initiator.interface) {
+ ERROR("Bad interface of supervisor %s", supervisor_socket_path);
+ goto end2;
+ }
+ if (strcmp(initiator.interface, AFS_SURPERVISION_INTERFACE_1)) {
+ ERROR("Unknown interface %s for supervisor %s", initiator.interface, supervisor_socket_path);
+ goto end2;
+ }
+ if (strnlen(initiator.extra, sizeof initiator.extra) == sizeof initiator.extra) {
+ ERROR("Bad extra of supervisor %s", supervisor_socket_path);
+ goto end2;
+ }
+
+ /* interprets extras */
+ if (!strcmp(initiator.extra, "CLOSE")) {
+ INFO("Supervisor asks to CLOSE");
+ goto end2;
+ }
+ if (!strcmp(initiator.extra, "WAIT")) {
+ afb_debug_wait("supervisor");
+ }
+ if (!strcmp(initiator.extra, "BREAK")) {
+ afb_debug_break("supervisor");
+ }
+
+ /* make the supervisor link */
+ supervisor = afb_stub_ws_create_server(fd, supervision_apiname, supervision_apiset);
+ if (!supervisor) {
+ ERROR("Creation of supervisor failed: %m");
+ goto end2;
+ }
+
+ /* successful termination */
+ goto end;
+
+end2:
+ close(fd);
+end:
+ pthread_mutex_unlock(&mutex);
+}
+
+static void on_sighup(int signum)
+{
+ try_connect_supervisor();
+}
+
+/**
+ * initalize the supervision
+ */
+int afb_supervision_init()
+{
+ int rc;
+ struct sigaction sa;
+
+ /* don't reinit */
+ if (supervision_apiset)
+ return 0;
+
+ /* create the apiset */
+ supervision_apiset = afb_apiset_create(supervision_apiname, 0);
+ if (!supervision_apiset) {
+ ERROR("Can't create supervision's apiset");
+ return -1;
+ }
+
+ /* init the apiset */
+ rc = afb_apiset_add(supervision_apiset, supervision_apiname,
+ (struct afb_api){ .itf = &supervision_api_itf, .closure = NULL});
+ if (rc < 0) {
+ ERROR("Can't create supervision's apiset: %m");
+ afb_apiset_unref(supervision_apiset);
+ supervision_apiset = NULL;
+ return rc;
+ }
+
+ /* get SIGHUP */
+ memset(&sa, 0, sizeof sa);
+ sa.sa_handler = on_sighup;
+ rc = sigaction(SIGHUP, &sa, NULL);
+ if (rc < 0)
+ ERROR("Can't connect supervision to SIGHUP: %m");
+
+ /* connect to supervision */
+ try_connect_supervisor();
+ return 0;
+}
+
+/******************************************************************************
+**** Implementation monitoring verbs
+******************************************************************************/
+static void slist(void *closure, struct afb_session *session)
+{
+ struct json_object *list = closure;
+ struct json_object *item;
+
+ wrap_json_pack(&item, "{ss}", "token", afb_session_token(session));
+ json_object_object_add(list, afb_session_uuid(session), item);
+}
+
+/******************************************************************************
+**** Implementation monitoring verbs
+******************************************************************************/
+
+static const char *verbs[] = {
+ "break", "do", "exit", "sclose", "slist", "trace", "wait" };
+enum { Break , Do , Exit , Sclose , Slist , Trace , Wait };
+
+
+static void on_supervision_call(void *closure, struct afb_xreq *xreq)
+{
+ int i, rc;
+ struct json_object *args, *drop, *add, *sub, *list;
+ const char *api, *verb, *uuid;
+ struct afb_session *session;
+ const struct afb_api *xapi;
+ struct afb_req req;
+
+ /* search the verb */
+ i = (int)(sizeof verbs / sizeof *verbs);
+ while(--i >= 0 && strcasecmp(verbs[i], xreq->request.verb));
+ if (i < 0) {
+ afb_xreq_fail_unknown_verb(xreq);
+ return;
+ }
+
+ /* process */
+ args = afb_xreq_json(xreq);
+ switch(i) {
+ case Exit:
+ i = 0;
+ if (wrap_json_unpack(args, "i", &i))
+ wrap_json_unpack(args, "{si}", "code", &i);
+ ERROR("existing from supervision with code %d -> %d", i, i & 127);
+ exit(i & 127);
+ break;
+ case Sclose:
+ uuid = NULL;
+ if (wrap_json_unpack(args, "s", &uuid))
+ wrap_json_unpack(args, "{ss}", "uuid", &uuid);
+ if (!uuid)
+ afb_xreq_fail(xreq, "invalid", NULL);
+ else {
+ session = afb_session_search(uuid);
+ if (!session)
+ afb_xreq_fail(xreq, "not-found", NULL);
+ else {
+ afb_session_close(session);
+ afb_session_unref(session);
+ afb_session_purge();
+ afb_xreq_success(xreq, NULL, NULL);
+ }
+ }
+ break;
+ case Slist:
+ list = json_object_new_object();
+ afb_session_foreach(slist, list);
+ afb_xreq_success(xreq, list, NULL);
+ break;
+ case Trace:
+ if (!trace)
+ trace = afb_trace_create(supervision_apiname, NULL /* not bound to any session */);
+
+ req = afb_xreq_unstore((struct afb_stored_req*)xreq);
+ wrap_json_unpack(args, "{s?o s?o}", "add", &add, "drop", &drop);
+ if (add) {
+ rc = afb_trace_add(req, add, trace);
+ if (rc)
+ return;
+ }
+ if (drop) {
+ rc = afb_trace_drop(req, drop, trace);
+ if (rc)
+ return;
+ }
+ afb_req_success(req, NULL, NULL);
+ break;
+ case Do:
+ sub = NULL;
+ if (wrap_json_unpack(args, "{ss ss s?o*}", "api", &api, "verb", &verb, "args", &sub))
+ afb_xreq_fail(xreq, "error", "bad request");
+ else {
+ xapi = afb_apiset_lookup_started(main_apiset, api, 1);
+ if (!xapi)
+ afb_xreq_fail_unknown_api(xreq);
+ else {
+ afb_cred_unref(xreq->cred);
+ xreq->cred = NULL;
+ xreq->request.api = api;
+ xreq->request.verb = verb;
+ xreq->json = json_object_get(sub);
+ xapi->itf->call(xapi->closure, xreq);
+ json_object_put(args);
+ }
+ }
+ break;
+ case Wait:
+ afb_req_success(req, NULL, NULL);
+ afb_debug_wait("supervisor");
+ break;
+ case Break:
+ afb_req_success(req, NULL, NULL);
+ afb_debug_break("supervisor");
+ break;
+ }
+}
+