diff options
Diffstat (limited to 'src/plugins')
-rw-r--r-- | src/plugins/supervisor-api.c | 157 | ||||
-rw-r--r-- | src/plugins/supervisor-api.h | 18 | ||||
-rw-r--r-- | src/plugins/supervisor.c | 93 | ||||
-rw-r--r-- | src/plugins/supervisor.h | 2 |
4 files changed, 213 insertions, 57 deletions
diff --git a/src/plugins/supervisor-api.c b/src/plugins/supervisor-api.c index 0c1a5bc..c21cd8d 100644 --- a/src/plugins/supervisor-api.c +++ b/src/plugins/supervisor-api.c @@ -19,6 +19,7 @@ #include <fcntl.h> #include <stdio.h> #include <string.h> +#include <time.h> #include <unistd.h> #include "ctl-plugin.h" @@ -44,6 +45,7 @@ CTLP_CAPI(list, source, argsJ, eventJ) return ERROR; } + AFB_ApiInfo(source->api, "Build response (nb daemons %d)", daemons->count); result = json_object_new_array(); for (int i = 0; i < daemons->count; i++) { @@ -58,31 +60,43 @@ CTLP_CAPI(list, source, argsJ, eventJ) //, "config", daemons->daemons[i]->config); json_object_array_add(result, item); } - AFB_ReqSucess(source->request, result, NULL); + + AFB_ApiInfo(source->api, "Send response"); + AFB_ReqSuccess(source->request, result, NULL); return 0; } -CTLP_CAPI(trace, source, argsJ, eventJ) +CTLP_CAPI(trace, source, argsJ, queryJ) { int rc; json_object* result = NULL; DAEMONS_T* daemons = NULL; - const char* ws_name; - const char* wsn; + const char* ws_name = ""; + const char* level = NULL; + pid_t pid = -1; - if (wrap_json_unpack(argsJ, "{s:?s}", "ws", &ws_name)) { + if (wrap_json_unpack(queryJ, "{s:?i s:?s s:?s}", + "pid", &pid, "ws", &ws_name, "level", &level)) { AFB_ReqFail(source->request, "Failed", "Error processing arguments."); return ERROR; } - AFB_ApiNotice(source->api, "Trace ws: %s", ws_name); + + if (pid == -1 && strlen(ws_name) == 0) { + AFB_ReqFail(source->request, "failed", "one of pid or ws parameter must be set"); + return ERROR; + } + + AFB_ApiDebug(source->api, "Trace pid: %d ws: %s", pid, ws_name); getDaemons(source->api, &daemons); if (daemons == NULL || daemons->count <= 0) { AFB_ReqFail(source->request, "failed", "No daemon found"); + return ERROR; } // search server and client pid DAEMON_T *pid_s = NULL, *pid_c = NULL; + const char* wsn; for (int i = 0; i < daemons->count; i++) { AFB_ApiDebug(source->api, "_DEBUG_ svr %s", json_object_to_json_string(daemons->daemons[i]->ws_servers)); @@ -109,7 +123,7 @@ CTLP_CAPI(trace, source, argsJ, eventJ) } if (pid_s != NULL && pid_c != NULL) { - if ((rc = trace_exchange(source->api, pid_s, pid_c)) < 0) { + if ((rc = trace_exchange(source->api, pid_s, pid_c, level)) < 0) { AFB_ReqFailF(source->request, "failed", "Trace error %d", rc); } break; @@ -121,44 +135,127 @@ CTLP_CAPI(trace, source, argsJ, eventJ) return ERROR; } - AFB_ReqSucessF(source->request, result, "Tracing Server pid=%d <-> Client pid=%d", pid_s->pid, pid_c->pid); + AFB_ReqSuccessF(source->request, result, "Tracing Server pid=%d <-> Client pid=%d", pid_s->pid, pid_c->pid); + + return 0; +} + +uint64_t get_ts() +{ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + + return (uint64_t)(ts.tv_sec) * (uint64_t)1000000000 + (uint64_t)(ts.tv_nsec); +} + +static void cb_harvester_write(void* closure, int status, struct json_object* result, struct afb_dynapi* dynapi) +{ + AFB_ApiDebug(dynapi, "SEB cb_harvester_write"); +} + +/* SEB Cleanup if move in lua */ +static int harvester_post_data(CtlSourceT* source, METRIC_T* metric) +{ + int rc; + json_object *j_res, *j_query, *j_metric; + + if (!metric->timestamp) { + metric->timestamp = get_ts(); + } + + // To be REWORK + if (metric->dataType == SPVR_DATA_STRING) { + rc = wrap_json_pack(&j_metric, "{s:s s:{s:s s:s} s:{s:s} s:I }", + "name", metric->name, + "metadata", "source", "my_source", "identity", source->uid, + "values", "value", metric->data, + "timestamp", metric->timestamp); + } else if (metric->dataType == SPVR_DATA_INT) { + rc = wrap_json_pack(&j_metric, "{s:s s:{s:s s:s} s:{s:i} s:I }", + "name", metric->name, + "metadata", "source", "my_source", "identity", source->uid, + "values", "value", metric->data, + "timestamp", metric->timestamp); + } else { + AFB_ApiError(source->api, "Unsupported dataType"); + return ERROR; + } + + if (rc < 0) { + AFB_ApiError(source->api, "Error packing metric, rc=%d", rc); + return rc; + } + + rc = wrap_json_pack(&j_query, "{s:s s:i s:o }", "host", + "localhost", "port", 8086, "metric", j_metric); + if (rc < 0) { + AFB_ApiError(source->api, "Error packing query, rc=%d", rc); + return rc; + } + + AFB_ApiDebug(source->api, "%s write: %s", SRV_HARVESTER_NAME, + json_object_to_json_string(j_query)); + + /* SEB + rc = AFB_ServiceSync(source->api, SRV_HARVESTER_NAME, "write", j_query, &j_res); + if (rc < 0) { + AFB_ApiError(source->api, "Error %s write : rc=%d, j_res=%s", SRV_HARVESTER_NAME, rc, + json_object_to_json_string(j_res)); + return rc; + } +*/ + AFB_ServiceCall(source->api, SRV_HARVESTER_NAME, "write", j_query, cb_harvester_write, &j_res); return 0; } -/* SEB TODO -void xds_event_cb(const char* evtname, json_object* j_event) +CTLP_CAPI(tracing_events, source, argsJ, eventJ) { int rc; - METRIC_T metric; + METRIC_T metric = { 0 }; const char* type = NULL; struct json_object* request = NULL; - AFB_NOTICE("RECV Event %s : %s", evtname, - json_object_to_json_string(j_event)); + //struct signalCBT* ctx = (struct signalCBT*)source->context; + + AFB_ApiDebug(source->api, ">>> RECV Event uid %s : %s", source->uid, + json_object_to_json_string(eventJ)); - if (strcmp(evtname, "supervisor/trace") != 0) { - return; + if (strcmp(source->uid, "supervisor/trace") != 0) { + AFB_ApiNotice(source->api, "WARNING: un-handle uid '%s'", source->uid); + return 0; } - if ((rc = wrap_json_unpack(j_event, "{s:?s}", "type", &type)) < 0) { - AFB_ERROR("Cannot decode event type"); - return; + if ((rc = wrap_json_unpack(eventJ, "{s:?s}", "type", &type)) < 0) { + AFB_ApiError(source->api, "Cannot decode event type"); + return ERROR; } - if (strcmp(type, "request") == 0) { + if (strcmp(type, "request") != 0) { + AFB_ApiError(source->api, "Cannot retrieve request"); + return ERROR; + } + if (!json_object_object_get_ex(eventJ, "request", &request)) { + AFB_ApiError(source->api, "Cannot decode event request"); + return ERROR; + } - if (!json_object_object_get_ex(j_event, "request", &request)) { - AFB_ERROR("Cannot decode event request"); - return; - } - metric.name = "trace"; - metric.data = request; + // TODO: decode request and build trace - rc = harvester_post_data(&metric); - if (rc < 0) { - AFB_ERROR("ERROR harvester_post_data: rc %d", rc); - } + metric.name = "trace"; + /* FIXME string KO + metric.dataType = SPVR_DATA_STRING; + metric.data = "test1234"; + */ + metric.dataType = SPVR_DATA_INT; + int val = 54321; + metric.data = &val; + + rc = harvester_post_data(source, &metric); + if (rc < 0) { + AFB_ApiError(source->api, "ERROR harvester_post_data: rc %d", rc); + return rc; } + + return 0; } -*/ diff --git a/src/plugins/supervisor-api.h b/src/plugins/supervisor-api.h index beadeca..fddaf98 100644 --- a/src/plugins/supervisor-api.h +++ b/src/plugins/supervisor-api.h @@ -16,9 +16,9 @@ */ #pragma once -#include <stdbool.h> -#include "wrap-json.h" #include "filescan-utils.h" +#include "wrap-json.h" +#include <stdbool.h> #define SRV_HARVESTER_NAME "harvester" @@ -26,11 +26,19 @@ #define META_IDENTITY "" // FIXME #ifndef ERROR - #define ERROR -1 +#define ERROR -1 #endif +typedef enum { + SPVR_DATA_STRING = 0, + SPVR_DATA_INT, + SPVR_DATA_BOOL, + SPVR_DATA_FLOAT, +} SpvrDataTypeT; + typedef struct metric_t { char* name; - json_object* data; - struct timespec timestamp; + SpvrDataTypeT dataType; + void* data; + uint64_t timestamp; } METRIC_T; diff --git a/src/plugins/supervisor.c b/src/plugins/supervisor.c index 649713e..503609a 100644 --- a/src/plugins/supervisor.c +++ b/src/plugins/supervisor.c @@ -39,6 +39,7 @@ struct decode_daemon_str { DAEMONS_T* daemons; AFB_ApiT api; const char* ignored_daemon; + int* ret_code; }; static void decode_daemons_cb(void* closure, json_object* obj, const char* fName) @@ -61,7 +62,7 @@ static void decode_daemons_cb(void* closure, json_object* obj, const char* fName return; } - AFB_ApiInfo(clStr->api, "Get config of pid %d", cred.pid); + AFB_ApiInfo(clStr->api, "Get supervisor/config - pid %d", cred.pid); daemon->pid = cred.pid; // Get config @@ -69,6 +70,7 @@ static void decode_daemons_cb(void* closure, json_object* obj, const char* fName rc = AFB_ServiceSync(clStr->api, SRV_SUPERVISOR_NAME, "config", j_query, &j_response); if (rc < 0) { AFB_ApiError(clStr->api, "Cannot get config of pid %d", cred.pid); + *clStr->ret_code = rc; return; } @@ -103,6 +105,8 @@ static void decode_daemons_cb(void* closure, json_object* obj, const char* fName } // Get apis + AFB_ApiInfo(clStr->api, "Get supervisor/do monitor get apis - pid %d", cred.pid); + // '{"pid":6262,"api":"monitor","verb":"get","args":{"apis":true}} wrap_json_pack(&j_query, "{si ss ss s {sb}}", "pid", cred.pid, @@ -117,6 +121,8 @@ static void decode_daemons_cb(void* closure, json_object* obj, const char* fName AFB_ApiDebug(clStr->api, "%s do ...get apis result, res=%s", SRV_SUPERVISOR_NAME, json_object_to_json_string(j_response)); if (json_object_object_get_ex(j_response, "response", &j_config) && json_object_object_get_ex(j_config, "apis", &j_apis)) { + // Don't forward monitor config details + json_object_object_del(j_apis, "monitor"); daemon->apis = j_apis; } } @@ -131,59 +137,104 @@ int getDaemons(AFB_ApiT apiHandle, DAEMONS_T** daemons) *daemons = calloc(sizeof(DAEMONS_T), 1); + AFB_ApiInfo(apiHandle, "Call supervisor/discover"); if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "discover", NULL, &j_response)) < 0) { return rc; } + AFB_ApiInfo(apiHandle, "Call supervisor/list"); if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "list", NULL, &j_response)) < 0) { return rc; } + AFB_ApiInfo(apiHandle, "Get details info for each daemon"); AFB_ApiDebug(apiHandle, "%s list result, res=%s", SRV_SUPERVISOR_NAME, json_object_to_json_string(j_response)); - if (json_object_object_get_ex(j_response, "response", &j_daemons)) { - struct decode_daemon_str str = { - *daemons, - apiHandle, - GetBinderName() - }; - wrap_json_object_for_all(j_daemons, decode_daemons_cb, &str); + if (!json_object_object_get_ex(j_response, "response", &j_daemons)) { } - - return 0; + struct decode_daemon_str cls = { + *daemons, + apiHandle, + apiHandle->apiname, + &rc + }; + wrap_json_object_for_all(j_daemons, decode_daemons_cb, &cls); + + return rc; } -int trace_exchange(AFB_ApiT apiHandle, DAEMON_T* svr, DAEMON_T* cli) +#define XDS_TAG_REQUEST "xds:*/request" +#define XDS_TAG_EVENT "xds:*/event" +#define XDS_TRACE_NAME "xds-trace" + +int trace_exchange(AFB_ApiT apiHandle, DAEMON_T* svr, DAEMON_T* cli, const char* level) { int rc; - json_object *j_response, *j_query; + json_object *j_response, *j_query, *j_tracereq, *j_traceevt; if (svr == NULL || cli == NULL) { return -1; } - wrap_json_pack(&j_query, "{s:i, s:{s:s}}", "pid", svr->pid, "add", - "request", "common"); - if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, - &j_response)) - < 0) { + // First drop previous traces + // monitor/trace({ "drop": { "tag": "*/request" } }) + // Note: ignored error (expected 1st time/when no trace exist) + wrap_json_pack(&j_query, "{s:i s:{s:[s s]}}", "pid", svr->pid, + "drop", "tag", XDS_TAG_REQUEST, XDS_TAG_EVENT); + AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, &j_response); + + wrap_json_pack(&j_query, "{s:i s:{s:[s s]}}", "pid", cli->pid, + "drop", "tag", XDS_TAG_REQUEST, XDS_TAG_EVENT); + AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, &j_response); + + j_tracereq = json_object_new_array(); + if (level && !strncmp(level, "all", 3)) { + json_object_array_add(j_tracereq, json_object_new_string("all")); + } else { + json_object_array_add(j_tracereq, json_object_new_string("life")); + json_object_array_add(j_tracereq, json_object_new_string("result")); + } + json_object_get(j_tracereq); // because use 2 times to configure both server and client + + j_traceevt = json_object_new_array(); + if (level && !strncmp(level, "all", 3)) { + json_object_array_add(j_traceevt, json_object_new_string("all")); + } else { + json_object_array_add(j_traceevt, json_object_new_string("common")); + } + json_object_get(j_traceevt); // because use 2 times to configure both server and client + + // Configure trace for server daemon + // request: monitor/trace({ "add": { "tag": "xds:*/request", "name": "trace", "request": "all" } }) + wrap_json_pack(&j_query, "{s:i, s: [{s:s s:s s:o}, {s:s s:s s:o}] }", + "pid", svr->pid, "add", + "tag", XDS_TAG_REQUEST, "name", XDS_TRACE_NAME, "request", j_tracereq, + "tag", XDS_TAG_EVENT, "name", XDS_TRACE_NAME, "event", j_traceevt); + + if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, &j_response)) < 0) { AFB_ApiError(apiHandle, "ERROR trace %d result: %s", svr->pid, json_object_to_json_string(j_response)); return rc; } - wrap_json_pack(&j_query, "{s:i}", "pid", cli->pid); - if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, - &j_response)) - < 0) { + // Configure trace for client daemon(s) + // request: monitor/trace({ "pid": 1234, "add": { "event": "all" } }) +#if 1 // SEB + wrap_json_pack(&j_query, "{s:i, s: [{s:s s:s s:o}, {s:s s:s s:o}] }", + "pid", cli->pid, "add", + "tag", XDS_TAG_REQUEST, "name", XDS_TRACE_NAME, "request", j_tracereq, + "tag", XDS_TAG_EVENT, "name", XDS_TRACE_NAME, "event", j_traceevt); + + if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, &j_response)) < 0) { AFB_ApiError(apiHandle, "ERROR trace %d result: %s", cli->pid, json_object_to_json_string(j_response)); return rc; } +#endif return 0; } diff --git a/src/plugins/supervisor.h b/src/plugins/supervisor.h index 3311734..6d33151 100644 --- a/src/plugins/supervisor.h +++ b/src/plugins/supervisor.h @@ -49,5 +49,5 @@ typedef struct daemons_result_ extern int getDaemons(AFB_ApiT apiHandle, DAEMONS_T **daemons); -extern int trace_exchange(AFB_ApiT apiHandle, DAEMON_T *svr, DAEMON_T *cli); +extern int trace_exchange(AFB_ApiT apiHandle, DAEMON_T *svr, DAEMON_T *cli, const char *level); extern int supervisor_init(void); |