aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins')
-rw-r--r--src/plugins/supervisor-api.c157
-rw-r--r--src/plugins/supervisor-api.h18
-rw-r--r--src/plugins/supervisor.c93
-rw-r--r--src/plugins/supervisor.h2
4 files changed, 213 insertions, 57 deletions
diff --git a/src/plugins/supervisor-api.c b/src/plugins/supervisor-api.c
index 0c1a5bc..c21cd8d 100644
--- a/src/plugins/supervisor-api.c
+++ b/src/plugins/supervisor-api.c
@@ -19,6 +19,7 @@
#include <fcntl.h>
#include <stdio.h>
#include <string.h>
+#include <time.h>
#include <unistd.h>
#include "ctl-plugin.h"
@@ -44,6 +45,7 @@ CTLP_CAPI(list, source, argsJ, eventJ)
return ERROR;
}
+ AFB_ApiInfo(source->api, "Build response (nb daemons %d)", daemons->count);
result = json_object_new_array();
for (int i = 0; i < daemons->count; i++) {
@@ -58,31 +60,43 @@ CTLP_CAPI(list, source, argsJ, eventJ)
//, "config", daemons->daemons[i]->config);
json_object_array_add(result, item);
}
- AFB_ReqSucess(source->request, result, NULL);
+
+ AFB_ApiInfo(source->api, "Send response");
+ AFB_ReqSuccess(source->request, result, NULL);
return 0;
}
-CTLP_CAPI(trace, source, argsJ, eventJ)
+CTLP_CAPI(trace, source, argsJ, queryJ)
{
int rc;
json_object* result = NULL;
DAEMONS_T* daemons = NULL;
- const char* ws_name;
- const char* wsn;
+ const char* ws_name = "";
+ const char* level = NULL;
+ pid_t pid = -1;
- if (wrap_json_unpack(argsJ, "{s:?s}", "ws", &ws_name)) {
+ if (wrap_json_unpack(queryJ, "{s:?i s:?s s:?s}",
+ "pid", &pid, "ws", &ws_name, "level", &level)) {
AFB_ReqFail(source->request, "Failed", "Error processing arguments.");
return ERROR;
}
- AFB_ApiNotice(source->api, "Trace ws: %s", ws_name);
+
+ if (pid == -1 && strlen(ws_name) == 0) {
+ AFB_ReqFail(source->request, "failed", "one of pid or ws parameter must be set");
+ return ERROR;
+ }
+
+ AFB_ApiDebug(source->api, "Trace pid: %d ws: %s", pid, ws_name);
getDaemons(source->api, &daemons);
if (daemons == NULL || daemons->count <= 0) {
AFB_ReqFail(source->request, "failed", "No daemon found");
+ return ERROR;
}
// search server and client pid
DAEMON_T *pid_s = NULL, *pid_c = NULL;
+ const char* wsn;
for (int i = 0; i < daemons->count; i++) {
AFB_ApiDebug(source->api, "_DEBUG_ svr %s",
json_object_to_json_string(daemons->daemons[i]->ws_servers));
@@ -109,7 +123,7 @@ CTLP_CAPI(trace, source, argsJ, eventJ)
}
if (pid_s != NULL && pid_c != NULL) {
- if ((rc = trace_exchange(source->api, pid_s, pid_c)) < 0) {
+ if ((rc = trace_exchange(source->api, pid_s, pid_c, level)) < 0) {
AFB_ReqFailF(source->request, "failed", "Trace error %d", rc);
}
break;
@@ -121,44 +135,127 @@ CTLP_CAPI(trace, source, argsJ, eventJ)
return ERROR;
}
- AFB_ReqSucessF(source->request, result, "Tracing Server pid=%d <-> Client pid=%d", pid_s->pid, pid_c->pid);
+ AFB_ReqSuccessF(source->request, result, "Tracing Server pid=%d <-> Client pid=%d", pid_s->pid, pid_c->pid);
+
+ return 0;
+}
+
+uint64_t get_ts()
+{
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME, &ts);
+
+ return (uint64_t)(ts.tv_sec) * (uint64_t)1000000000 + (uint64_t)(ts.tv_nsec);
+}
+
+static void cb_harvester_write(void* closure, int status, struct json_object* result, struct afb_dynapi* dynapi)
+{
+ AFB_ApiDebug(dynapi, "SEB cb_harvester_write");
+}
+
+/* SEB Cleanup if move in lua */
+static int harvester_post_data(CtlSourceT* source, METRIC_T* metric)
+{
+ int rc;
+ json_object *j_res, *j_query, *j_metric;
+
+ if (!metric->timestamp) {
+ metric->timestamp = get_ts();
+ }
+
+ // To be REWORK
+ if (metric->dataType == SPVR_DATA_STRING) {
+ rc = wrap_json_pack(&j_metric, "{s:s s:{s:s s:s} s:{s:s} s:I }",
+ "name", metric->name,
+ "metadata", "source", "my_source", "identity", source->uid,
+ "values", "value", metric->data,
+ "timestamp", metric->timestamp);
+ } else if (metric->dataType == SPVR_DATA_INT) {
+ rc = wrap_json_pack(&j_metric, "{s:s s:{s:s s:s} s:{s:i} s:I }",
+ "name", metric->name,
+ "metadata", "source", "my_source", "identity", source->uid,
+ "values", "value", metric->data,
+ "timestamp", metric->timestamp);
+ } else {
+ AFB_ApiError(source->api, "Unsupported dataType");
+ return ERROR;
+ }
+
+ if (rc < 0) {
+ AFB_ApiError(source->api, "Error packing metric, rc=%d", rc);
+ return rc;
+ }
+
+ rc = wrap_json_pack(&j_query, "{s:s s:i s:o }", "host",
+ "localhost", "port", 8086, "metric", j_metric);
+ if (rc < 0) {
+ AFB_ApiError(source->api, "Error packing query, rc=%d", rc);
+ return rc;
+ }
+
+ AFB_ApiDebug(source->api, "%s write: %s", SRV_HARVESTER_NAME,
+ json_object_to_json_string(j_query));
+
+ /* SEB
+ rc = AFB_ServiceSync(source->api, SRV_HARVESTER_NAME, "write", j_query, &j_res);
+ if (rc < 0) {
+ AFB_ApiError(source->api, "Error %s write : rc=%d, j_res=%s", SRV_HARVESTER_NAME, rc,
+ json_object_to_json_string(j_res));
+ return rc;
+ }
+*/
+ AFB_ServiceCall(source->api, SRV_HARVESTER_NAME, "write", j_query, cb_harvester_write, &j_res);
return 0;
}
-/* SEB TODO
-void xds_event_cb(const char* evtname, json_object* j_event)
+CTLP_CAPI(tracing_events, source, argsJ, eventJ)
{
int rc;
- METRIC_T metric;
+ METRIC_T metric = { 0 };
const char* type = NULL;
struct json_object* request = NULL;
- AFB_NOTICE("RECV Event %s : %s", evtname,
- json_object_to_json_string(j_event));
+ //struct signalCBT* ctx = (struct signalCBT*)source->context;
+
+ AFB_ApiDebug(source->api, ">>> RECV Event uid %s : %s", source->uid,
+ json_object_to_json_string(eventJ));
- if (strcmp(evtname, "supervisor/trace") != 0) {
- return;
+ if (strcmp(source->uid, "supervisor/trace") != 0) {
+ AFB_ApiNotice(source->api, "WARNING: un-handle uid '%s'", source->uid);
+ return 0;
}
- if ((rc = wrap_json_unpack(j_event, "{s:?s}", "type", &type)) < 0) {
- AFB_ERROR("Cannot decode event type");
- return;
+ if ((rc = wrap_json_unpack(eventJ, "{s:?s}", "type", &type)) < 0) {
+ AFB_ApiError(source->api, "Cannot decode event type");
+ return ERROR;
}
- if (strcmp(type, "request") == 0) {
+ if (strcmp(type, "request") != 0) {
+ AFB_ApiError(source->api, "Cannot retrieve request");
+ return ERROR;
+ }
+ if (!json_object_object_get_ex(eventJ, "request", &request)) {
+ AFB_ApiError(source->api, "Cannot decode event request");
+ return ERROR;
+ }
- if (!json_object_object_get_ex(j_event, "request", &request)) {
- AFB_ERROR("Cannot decode event request");
- return;
- }
- metric.name = "trace";
- metric.data = request;
+ // TODO: decode request and build trace
- rc = harvester_post_data(&metric);
- if (rc < 0) {
- AFB_ERROR("ERROR harvester_post_data: rc %d", rc);
- }
+ metric.name = "trace";
+ /* FIXME string KO
+ metric.dataType = SPVR_DATA_STRING;
+ metric.data = "test1234";
+ */
+ metric.dataType = SPVR_DATA_INT;
+ int val = 54321;
+ metric.data = &val;
+
+ rc = harvester_post_data(source, &metric);
+ if (rc < 0) {
+ AFB_ApiError(source->api, "ERROR harvester_post_data: rc %d", rc);
+ return rc;
}
+
+ return 0;
}
-*/
diff --git a/src/plugins/supervisor-api.h b/src/plugins/supervisor-api.h
index beadeca..fddaf98 100644
--- a/src/plugins/supervisor-api.h
+++ b/src/plugins/supervisor-api.h
@@ -16,9 +16,9 @@
*/
#pragma once
-#include <stdbool.h>
-#include "wrap-json.h"
#include "filescan-utils.h"
+#include "wrap-json.h"
+#include <stdbool.h>
#define SRV_HARVESTER_NAME "harvester"
@@ -26,11 +26,19 @@
#define META_IDENTITY "" // FIXME
#ifndef ERROR
- #define ERROR -1
+#define ERROR -1
#endif
+typedef enum {
+ SPVR_DATA_STRING = 0,
+ SPVR_DATA_INT,
+ SPVR_DATA_BOOL,
+ SPVR_DATA_FLOAT,
+} SpvrDataTypeT;
+
typedef struct metric_t {
char* name;
- json_object* data;
- struct timespec timestamp;
+ SpvrDataTypeT dataType;
+ void* data;
+ uint64_t timestamp;
} METRIC_T;
diff --git a/src/plugins/supervisor.c b/src/plugins/supervisor.c
index 649713e..503609a 100644
--- a/src/plugins/supervisor.c
+++ b/src/plugins/supervisor.c
@@ -39,6 +39,7 @@ struct decode_daemon_str {
DAEMONS_T* daemons;
AFB_ApiT api;
const char* ignored_daemon;
+ int* ret_code;
};
static void decode_daemons_cb(void* closure, json_object* obj, const char* fName)
@@ -61,7 +62,7 @@ static void decode_daemons_cb(void* closure, json_object* obj, const char* fName
return;
}
- AFB_ApiInfo(clStr->api, "Get config of pid %d", cred.pid);
+ AFB_ApiInfo(clStr->api, "Get supervisor/config - pid %d", cred.pid);
daemon->pid = cred.pid;
// Get config
@@ -69,6 +70,7 @@ static void decode_daemons_cb(void* closure, json_object* obj, const char* fName
rc = AFB_ServiceSync(clStr->api, SRV_SUPERVISOR_NAME, "config", j_query, &j_response);
if (rc < 0) {
AFB_ApiError(clStr->api, "Cannot get config of pid %d", cred.pid);
+ *clStr->ret_code = rc;
return;
}
@@ -103,6 +105,8 @@ static void decode_daemons_cb(void* closure, json_object* obj, const char* fName
}
// Get apis
+ AFB_ApiInfo(clStr->api, "Get supervisor/do monitor get apis - pid %d", cred.pid);
+
// '{"pid":6262,"api":"monitor","verb":"get","args":{"apis":true}}
wrap_json_pack(&j_query, "{si ss ss s {sb}}",
"pid", cred.pid,
@@ -117,6 +121,8 @@ static void decode_daemons_cb(void* closure, json_object* obj, const char* fName
AFB_ApiDebug(clStr->api, "%s do ...get apis result, res=%s", SRV_SUPERVISOR_NAME, json_object_to_json_string(j_response));
if (json_object_object_get_ex(j_response, "response", &j_config) && json_object_object_get_ex(j_config, "apis", &j_apis)) {
+ // Don't forward monitor config details
+ json_object_object_del(j_apis, "monitor");
daemon->apis = j_apis;
}
}
@@ -131,59 +137,104 @@ int getDaemons(AFB_ApiT apiHandle, DAEMONS_T** daemons)
*daemons = calloc(sizeof(DAEMONS_T), 1);
+ AFB_ApiInfo(apiHandle, "Call supervisor/discover");
if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "discover", NULL,
&j_response))
< 0) {
return rc;
}
+ AFB_ApiInfo(apiHandle, "Call supervisor/list");
if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "list", NULL,
&j_response))
< 0) {
return rc;
}
+ AFB_ApiInfo(apiHandle, "Get details info for each daemon");
AFB_ApiDebug(apiHandle, "%s list result, res=%s", SRV_SUPERVISOR_NAME, json_object_to_json_string(j_response));
- if (json_object_object_get_ex(j_response, "response", &j_daemons)) {
- struct decode_daemon_str str = {
- *daemons,
- apiHandle,
- GetBinderName()
- };
- wrap_json_object_for_all(j_daemons, decode_daemons_cb, &str);
+ if (!json_object_object_get_ex(j_response, "response", &j_daemons)) {
}
-
- return 0;
+ struct decode_daemon_str cls = {
+ *daemons,
+ apiHandle,
+ apiHandle->apiname,
+ &rc
+ };
+ wrap_json_object_for_all(j_daemons, decode_daemons_cb, &cls);
+
+ return rc;
}
-int trace_exchange(AFB_ApiT apiHandle, DAEMON_T* svr, DAEMON_T* cli)
+#define XDS_TAG_REQUEST "xds:*/request"
+#define XDS_TAG_EVENT "xds:*/event"
+#define XDS_TRACE_NAME "xds-trace"
+
+int trace_exchange(AFB_ApiT apiHandle, DAEMON_T* svr, DAEMON_T* cli, const char* level)
{
int rc;
- json_object *j_response, *j_query;
+ json_object *j_response, *j_query, *j_tracereq, *j_traceevt;
if (svr == NULL || cli == NULL) {
return -1;
}
- wrap_json_pack(&j_query, "{s:i, s:{s:s}}", "pid", svr->pid, "add",
- "request", "common");
- if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query,
- &j_response))
- < 0) {
+ // First drop previous traces
+ // monitor/trace({ "drop": { "tag": "*/request" } })
+ // Note: ignored error (expected 1st time/when no trace exist)
+ wrap_json_pack(&j_query, "{s:i s:{s:[s s]}}", "pid", svr->pid,
+ "drop", "tag", XDS_TAG_REQUEST, XDS_TAG_EVENT);
+ AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, &j_response);
+
+ wrap_json_pack(&j_query, "{s:i s:{s:[s s]}}", "pid", cli->pid,
+ "drop", "tag", XDS_TAG_REQUEST, XDS_TAG_EVENT);
+ AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, &j_response);
+
+ j_tracereq = json_object_new_array();
+ if (level && !strncmp(level, "all", 3)) {
+ json_object_array_add(j_tracereq, json_object_new_string("all"));
+ } else {
+ json_object_array_add(j_tracereq, json_object_new_string("life"));
+ json_object_array_add(j_tracereq, json_object_new_string("result"));
+ }
+ json_object_get(j_tracereq); // because use 2 times to configure both server and client
+
+ j_traceevt = json_object_new_array();
+ if (level && !strncmp(level, "all", 3)) {
+ json_object_array_add(j_traceevt, json_object_new_string("all"));
+ } else {
+ json_object_array_add(j_traceevt, json_object_new_string("common"));
+ }
+ json_object_get(j_traceevt); // because use 2 times to configure both server and client
+
+ // Configure trace for server daemon
+ // request: monitor/trace({ "add": { "tag": "xds:*/request", "name": "trace", "request": "all" } })
+ wrap_json_pack(&j_query, "{s:i, s: [{s:s s:s s:o}, {s:s s:s s:o}] }",
+ "pid", svr->pid, "add",
+ "tag", XDS_TAG_REQUEST, "name", XDS_TRACE_NAME, "request", j_tracereq,
+ "tag", XDS_TAG_EVENT, "name", XDS_TRACE_NAME, "event", j_traceevt);
+
+ if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, &j_response)) < 0) {
AFB_ApiError(apiHandle, "ERROR trace %d result: %s", svr->pid,
json_object_to_json_string(j_response));
return rc;
}
- wrap_json_pack(&j_query, "{s:i}", "pid", cli->pid);
- if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query,
- &j_response))
- < 0) {
+ // Configure trace for client daemon(s)
+ // request: monitor/trace({ "pid": 1234, "add": { "event": "all" } })
+#if 1 // SEB
+ wrap_json_pack(&j_query, "{s:i, s: [{s:s s:s s:o}, {s:s s:s s:o}] }",
+ "pid", cli->pid, "add",
+ "tag", XDS_TAG_REQUEST, "name", XDS_TRACE_NAME, "request", j_tracereq,
+ "tag", XDS_TAG_EVENT, "name", XDS_TRACE_NAME, "event", j_traceevt);
+
+ if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, &j_response)) < 0) {
AFB_ApiError(apiHandle, "ERROR trace %d result: %s", cli->pid,
json_object_to_json_string(j_response));
return rc;
}
+#endif
return 0;
}
diff --git a/src/plugins/supervisor.h b/src/plugins/supervisor.h
index 3311734..6d33151 100644
--- a/src/plugins/supervisor.h
+++ b/src/plugins/supervisor.h
@@ -49,5 +49,5 @@ typedef struct daemons_result_
extern int getDaemons(AFB_ApiT apiHandle, DAEMONS_T **daemons);
-extern int trace_exchange(AFB_ApiT apiHandle, DAEMON_T *svr, DAEMON_T *cli);
+extern int trace_exchange(AFB_ApiT apiHandle, DAEMON_T *svr, DAEMON_T *cli, const char *level);
extern int supervisor_init(void);