diff options
Diffstat (limited to 'src/plugins')
-rw-r--r-- | src/plugins/supervisor-api.c | 193 | ||||
-rw-r--r-- | src/plugins/supervisor.c | 52 | ||||
-rw-r--r-- | src/plugins/supervisor.h | 3 |
3 files changed, 177 insertions, 71 deletions
diff --git a/src/plugins/supervisor-api.c b/src/plugins/supervisor-api.c index c21cd8d..bfe448c 100644 --- a/src/plugins/supervisor-api.c +++ b/src/plugins/supervisor-api.c @@ -27,6 +27,20 @@ #include "supervisor.h" #include "wrap-json.h" +// TODO: replace by a chained list of DAEMON_T* +static int TracePidsNum = 0; +static int* TracePidsArr = NULL; + +#define TRACE_DAEMON(src, nb, dd, lvl, res) \ + { \ + nb++; \ + json_object_array_add(res, json_object_new_int(dd->pid)); \ + if ((rc = trace_daemon(src->api, dd, lvl)) < 0) { \ + AFB_ReqFailF(src->request, "failed", "Trace pid %d error %d", dd->pid, rc); \ + goto error; \ + } \ + } + CTLP_CAPI_REGISTER("supervisor"); CTLP_ONLOAD(plugin, ret) @@ -41,7 +55,7 @@ CTLP_CAPI(list, source, argsJ, eventJ) getDaemons(source->api, &daemons); if (daemons == NULL) { - AFB_ApiError(source->api, "failed"); + AFB_ReqFail(source->request, "Failed", "Error daemon list null"); return ERROR; } @@ -66,7 +80,7 @@ CTLP_CAPI(list, source, argsJ, eventJ) return 0; } -CTLP_CAPI(trace, source, argsJ, queryJ) +CTLP_CAPI(trace_start, source, argsJ, queryJ) { int rc; json_object* result = NULL; @@ -74,72 +88,167 @@ CTLP_CAPI(trace, source, argsJ, queryJ) const char* ws_name = ""; const char* level = NULL; pid_t pid = -1; + json_object* j_pids = NULL; + int nb_traced_daemons = 0; - if (wrap_json_unpack(queryJ, "{s:?i s:?s s:?s}", - "pid", &pid, "ws", &ws_name, "level", &level)) { + // User can ask to trace either + // a specific pid or an array of pids or a specific ws name + if (wrap_json_unpack(queryJ, "{s:?i s:?o s:?s s:?s s:?s}", + "pid", &pid, "pids", &j_pids, "ws", &ws_name, "level", &level)) { AFB_ReqFail(source->request, "Failed", "Error processing arguments."); return ERROR; } - if (pid == -1 && strlen(ws_name) == 0) { - AFB_ReqFail(source->request, "failed", "one of pid or ws parameter must be set"); + if (pid == -1 && j_pids == NULL && strlen(ws_name) == 0) { + AFB_ReqFail(source->request, "failed", "one of pid or pids or ws parameter must be set"); + return ERROR; + } + + if (TracePidsNum > 0) { + AFB_ReqFail(source->request, "failed", "already tracing"); return ERROR; } - AFB_ApiDebug(source->api, "Trace pid: %d ws: %s", pid, ws_name); + AFB_ApiDebug(source->api, "Trace ws=%s, pid=%d, pids=%s", ws_name, pid, json_object_to_json_string(j_pids)); + + if (j_pids != NULL && json_object_is_type(j_pids, json_type_array)) { + TracePidsNum = (int)json_object_array_length(j_pids); + TracePidsArr = malloc(sizeof(int) * TracePidsNum); + if (TracePidsArr == NULL) { + AFB_ReqFail(source->request, "failed", "out of memory"); + goto error; + } + int i = 0; + while (i < TracePidsNum) { + json_object* j_ppid = json_object_array_get_idx(j_pids, i); + TracePidsArr[i++] = json_object_get_int(j_ppid); + } + } getDaemons(source->api, &daemons); if (daemons == NULL || daemons->count <= 0) { AFB_ReqFail(source->request, "failed", "No daemon found"); - return ERROR; + goto error; } - // search server and client pid - DAEMON_T *pid_s = NULL, *pid_c = NULL; + // search which daemons should be traced const char* wsn; + nb_traced_daemons = 0; + result = json_object_new_array(); for (int i = 0; i < daemons->count; i++) { AFB_ApiDebug(source->api, "_DEBUG_ svr %s", json_object_to_json_string(daemons->daemons[i]->ws_servers)); AFB_ApiDebug(source->api, "_DEBUG_ cli %s", json_object_to_json_string(daemons->daemons[i]->ws_clients)); - json_object* ws_servers = daemons->daemons[i]->ws_servers; - for (int j = 0; j < json_object_array_length(ws_servers); j++) { - - wsn = json_object_get_string(json_object_array_get_idx(ws_servers, j++)); - if (wsn && strstr(wsn, ws_name) != NULL) { - pid_s = daemons->daemons[i]; - break; + if (pid != -1 && pid == daemons->daemons[i]->pid) { + // Trace a specific pid + TRACE_DAEMON(source, nb_traced_daemons, daemons->daemons[i], level, result); + TracePidsNum = 1; + TracePidsArr = malloc(sizeof(int) * TracePidsNum); + if (TracePidsArr == NULL) { + AFB_ReqFail(source->request, "failed", "out of memory"); + goto error; } - } + TracePidsArr[0] = daemons->daemons[i]->pid; + break; - json_object* ws_clients = daemons->daemons[i]->ws_clients; - for (int j = 0; j < json_object_array_length(ws_clients); j++) { - wsn = json_object_get_string(json_object_array_get_idx(ws_clients, j++)); - if (wsn && strstr(wsn, ws_name) != NULL) { - pid_c = daemons->daemons[i]; - break; + } else if (j_pids != NULL) { + // Trace from a list of pids + for (int j = 0; j < TracePidsNum; j++) { + if (TracePidsArr[j] == daemons->daemons[i]->pid) { + TRACE_DAEMON(source, nb_traced_daemons, daemons->daemons[i], level, result); + } + } + } else if (ws_name != NULL) { + // Trace based on WS name + json_object* ws_servers = daemons->daemons[i]->ws_servers; + for (int j = 0; j < json_object_array_length(ws_servers); j++) { + wsn = json_object_get_string(json_object_array_get_idx(ws_servers, j)); + if (wsn && strstr(wsn, ws_name) != NULL) { + TRACE_DAEMON(source, nb_traced_daemons, daemons->daemons[i], level, result); + + TracePidsNum++; + int* tmpPtr = realloc(TracePidsArr, sizeof(int) * TracePidsNum); + if (tmpPtr == NULL) { + AFB_ReqFail(source->request, "failed", "out of memory"); + goto error; + } + TracePidsArr = tmpPtr; + TracePidsArr[TracePidsNum - 1] = daemons->daemons[i]->pid; + break; + } } - } - if (pid_s != NULL && pid_c != NULL) { - if ((rc = trace_exchange(source->api, pid_s, pid_c, level)) < 0) { - AFB_ReqFailF(source->request, "failed", "Trace error %d", rc); + json_object* ws_clients = daemons->daemons[i]->ws_clients; + for (int j = 0; j < json_object_array_length(ws_clients); j++) { + wsn = json_object_get_string(json_object_array_get_idx(ws_clients, j)); + if (wsn && strstr(wsn, ws_name) != NULL) { + TRACE_DAEMON(source, nb_traced_daemons, daemons->daemons[i], level, result); + + TracePidsNum++; + int* tmpPtr = realloc(TracePidsArr, sizeof(int) * TracePidsNum); + if (tmpPtr == NULL) { + AFB_ReqFail(source->request, "failed", "out of memory"); + goto error; + } + TracePidsArr = tmpPtr; + TracePidsArr[TracePidsNum - 1] = daemons->daemons[i]->pid; + break; + } } - break; } } - if (pid_s == NULL || pid_c == NULL) { - AFB_ReqFail(source->request, "failed", "Cannot determine Server or Client"); + if (nb_traced_daemons == 0) { + AFB_ReqFail(source->request, "failed", "No daemon found match criteria"); + goto error; + } + + AFB_ReqSuccess(source->request, result, "Trace successfully started"); + + return 0; + +error: + if (TracePidsNum > 0) + free(TracePidsArr); + TracePidsNum = 0; + TracePidsArr = NULL; + return ERROR; +} + +CTLP_CAPI(trace_stop, source, argsJ, queryJ) +{ + int rc, nbErr = 0; + json_object* result = NULL; + + if (TracePidsNum == 0) { + AFB_ReqFail(source->request, "failed", "Trace already stopped"); return ERROR; } - AFB_ReqSuccessF(source->request, result, "Tracing Server pid=%d <-> Client pid=%d", pid_s->pid, pid_c->pid); + for (int j = 0; j < TracePidsNum; j++) { + rc = trace_drop(source->api, TracePidsArr[j]); + if (rc < 0) { + // FIMXE - return list of error messages + nbErr++; + } + } + if (TracePidsNum > 0) + free(TracePidsArr); + TracePidsNum = 0; + TracePidsArr = NULL; + + if (nbErr) { + AFB_ReqFailF(source->request, "failed", "Error while stopping tracing (%d)", nbErr); + return ERROR; + } + AFB_ReqSuccess(source->request, result, "Trace successfully stopped"); return 0; } +#if 0 // Not used, implemented in lua for now (see xds-supervisor.lua / _trace_events_) uint64_t get_ts() { struct timespec ts; @@ -153,7 +262,6 @@ static void cb_harvester_write(void* closure, int status, struct json_object* re AFB_ApiDebug(dynapi, "SEB cb_harvester_write"); } -/* SEB Cleanup if move in lua */ static int harvester_post_data(CtlSourceT* source, METRIC_T* metric) { int rc; @@ -177,19 +285,19 @@ static int harvester_post_data(CtlSourceT* source, METRIC_T* metric) "values", "value", metric->data, "timestamp", metric->timestamp); } else { - AFB_ApiError(source->api, "Unsupported dataType"); + AFB_ReqFail(source->request, "Failed", "Unsupported dataType"); return ERROR; } if (rc < 0) { - AFB_ApiError(source->api, "Error packing metric, rc=%d", rc); + AFB_ReqFail(source->request, "Failed", "Error packing metric, rc=%d", rc); return rc; } rc = wrap_json_pack(&j_query, "{s:s s:i s:o }", "host", "localhost", "port", 8086, "metric", j_metric); if (rc < 0) { - AFB_ApiError(source->api, "Error packing query, rc=%d", rc); + AFB_ReqFail(source->request, "Failed", "Error packing query, rc=%d", rc); return rc; } @@ -199,7 +307,7 @@ static int harvester_post_data(CtlSourceT* source, METRIC_T* metric) /* SEB rc = AFB_ServiceSync(source->api, SRV_HARVESTER_NAME, "write", j_query, &j_res); if (rc < 0) { - AFB_ApiError(source->api, "Error %s write : rc=%d, j_res=%s", SRV_HARVESTER_NAME, rc, + AFB_ReqFail(source->request, "Failed", "Error %s write : rc=%d, j_res=%s", SRV_HARVESTER_NAME, rc, json_object_to_json_string(j_res)); return rc; } @@ -221,22 +329,22 @@ CTLP_CAPI(tracing_events, source, argsJ, eventJ) AFB_ApiDebug(source->api, ">>> RECV Event uid %s : %s", source->uid, json_object_to_json_string(eventJ)); - if (strcmp(source->uid, "supervisor/trace") != 0) { + if (strcmp(source->uid, "xds/supervisor/trace") != 0) { AFB_ApiNotice(source->api, "WARNING: un-handle uid '%s'", source->uid); return 0; } if ((rc = wrap_json_unpack(eventJ, "{s:?s}", "type", &type)) < 0) { - AFB_ApiError(source->api, "Cannot decode event type"); + AFB_ReqFail(source->request, "Failed", "Cannot decode event type"); return ERROR; } if (strcmp(type, "request") != 0) { - AFB_ApiError(source->api, "Cannot retrieve request"); + AFB_ReqFail(source->request, "Failed", "Cannot retrieve request"); return ERROR; } if (!json_object_object_get_ex(eventJ, "request", &request)) { - AFB_ApiError(source->api, "Cannot decode event request"); + AFB_ReqFail(source->request, "Failed", "Cannot decode event request"); return ERROR; } @@ -253,9 +361,10 @@ CTLP_CAPI(tracing_events, source, argsJ, eventJ) rc = harvester_post_data(source, &metric); if (rc < 0) { - AFB_ApiError(source->api, "ERROR harvester_post_data: rc %d", rc); + AFB_ReqFail(source->request, "Failed", "ERROR harvester_post_data: rc %d", rc); return rc; } return 0; } +#endif diff --git a/src/plugins/supervisor.c b/src/plugins/supervisor.c index e3c71c0..598ba68 100644 --- a/src/plugins/supervisor.c +++ b/src/plugins/supervisor.c @@ -123,6 +123,14 @@ static void decode_daemons_cb(void* closure, json_object* obj, const char* fName if (json_object_object_get_ex(j_response, "response", &j_config) && json_object_object_get_ex(j_config, "apis", &j_apis)) { // Don't forward monitor config details json_object_object_del(j_apis, "monitor"); + + // Only forward apis verb without all details + /* TODO + j_newApis = json_object_new_object(); + json_object_object_foreach(json_object_object_get(j_apis, "apis"), key, val) { + ... + } + */ daemon->apis = j_apis; } } @@ -171,34 +179,26 @@ int getDaemons(AFB_ApiT apiHandle, DAEMONS_T** daemons) #define XDS_TAG_EVENT "xds:trace/event" #define XDS_TRACE_NAME "xds-trace" -int trace_exchange(AFB_ApiT apiHandle, DAEMON_T* svr, DAEMON_T* cli, const char* level) +int trace_daemon(AFB_ApiT apiHandle, DAEMON_T* dm, const char* level) { int rc; json_object *j_response, *j_query, *j_tracereq, *j_traceevt; - if (svr == NULL || cli == NULL) { + if (dm == NULL) { return -1; } // First drop previous traces - // monitor/trace({ "drop": { "tag": "trace/request" } }) // Note: ignored error (expected 1st time/when no trace exist) - wrap_json_pack(&j_query, "{s:i s:{s:[s s]}}", "pid", svr->pid, - "drop", "tag", XDS_TAG_REQUEST, XDS_TAG_EVENT); - AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, &j_response); - - wrap_json_pack(&j_query, "{s:i s:{s:[s s]}}", "pid", cli->pid, - "drop", "tag", XDS_TAG_REQUEST, XDS_TAG_EVENT); - AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, &j_response); + trace_drop(apiHandle, dm->pid); j_tracereq = json_object_new_array(); if (level && !strncmp(level, "all", 3)) { json_object_array_add(j_tracereq, json_object_new_string("all")); } else { json_object_array_add(j_tracereq, json_object_new_string("life")); - json_object_array_add(j_tracereq, json_object_new_string("result")); + json_object_array_add(j_tracereq, json_object_new_string("reply")); } - json_object_get(j_tracereq); // because use 2 times to configure both server and client j_traceevt = json_object_new_array(); if (level && !strncmp(level, "all", 3)) { @@ -206,35 +206,31 @@ int trace_exchange(AFB_ApiT apiHandle, DAEMON_T* svr, DAEMON_T* cli, const char* } else { json_object_array_add(j_traceevt, json_object_new_string("common")); } - json_object_get(j_traceevt); // because use 2 times to configure both server and client - // Configure trace for server daemon + // Configure tracing of specified daemon // request: monitor/trace({ "add": { "tag": "xds:trace/request", "name": "trace", "request": "all" } }) wrap_json_pack(&j_query, "{s:i, s: [{s:s s:s s:o}, {s:s s:s s:o}] }", - "pid", svr->pid, "add", + "pid", dm->pid, "add", "tag", XDS_TAG_REQUEST, "name", XDS_TRACE_NAME, "request", j_tracereq, "tag", XDS_TAG_EVENT, "name", XDS_TRACE_NAME, "event", j_traceevt); if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, &j_response)) < 0) { - AFB_ApiError(apiHandle, "ERROR trace %d result: %s", svr->pid, + AFB_ApiError(apiHandle, "ERROR tracing pid %d result: %s", dm->pid, json_object_to_json_string(j_response)); return rc; } - // Configure trace for client daemon(s) - // request: monitor/trace({ "pid": 1234, "add": { "event": "all" } }) - wrap_json_pack(&j_query, "{s:i, s: [{s:s s:s s:o}, {s:s s:s s:o}] }", - "pid", cli->pid, "add", - "tag", XDS_TAG_REQUEST, "name", XDS_TRACE_NAME, "request", j_tracereq, - "tag", XDS_TAG_EVENT, "name", XDS_TRACE_NAME, "event", j_traceevt); + return 0; +} - if ((rc = AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, &j_response)) < 0) { - AFB_ApiError(apiHandle, "ERROR trace %d result: %s", cli->pid, - json_object_to_json_string(j_response)); - return rc; - } +// FIXME prototype must be int trace_drop(AFB_ApiT apiHandle, DAEMON_T* dm) +int trace_drop(AFB_ApiT apiHandle, int pid) +{ + json_object *j_response, *j_query; - return 0; + // monitor/trace({ "drop": { "tag": "trace/request" } }) + wrap_json_pack(&j_query, "{s:i s:{s:[s s]}}", "pid", pid, "drop", "tag", XDS_TAG_REQUEST, XDS_TAG_EVENT); + return AFB_ServiceSync(apiHandle, SRV_SUPERVISOR_NAME, "trace", j_query, &j_response); } int supervisor_init(void) diff --git a/src/plugins/supervisor.h b/src/plugins/supervisor.h index 6d33151..a7779ce 100644 --- a/src/plugins/supervisor.h +++ b/src/plugins/supervisor.h @@ -49,5 +49,6 @@ typedef struct daemons_result_ extern int getDaemons(AFB_ApiT apiHandle, DAEMONS_T **daemons); -extern int trace_exchange(AFB_ApiT apiHandle, DAEMON_T *svr, DAEMON_T *cli, const char *level); +extern int trace_daemon(AFB_ApiT apiHandle, DAEMON_T *dm, const char *level); +extern int trace_drop(AFB_ApiT apiHandle, int pid); extern int supervisor_init(void); |