diff options
Diffstat (limited to 'src/plugins/supervisor-api.c')
-rw-r--r-- | src/plugins/supervisor-api.c | 211 |
1 files changed, 126 insertions, 85 deletions
diff --git a/src/plugins/supervisor-api.c b/src/plugins/supervisor-api.c index cb2b476..a649c1e 100644 --- a/src/plugins/supervisor-api.c +++ b/src/plugins/supervisor-api.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 "IoT.bzh" + * Copyright (C) 2018-2019 "IoT.bzh" * Author "Sebastien Douheret" <sebastien@iot.bzh> * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -31,14 +31,14 @@ static int TracePidsNum = 0; static int* TracePidsArr = NULL; -#define TRACE_DAEMON(src, nb, dd, lvl, res) \ - { \ - nb++; \ - json_object_array_add(res, json_object_new_int(dd->pid)); \ - if ((rc = trace_daemon(src->api, dd, lvl)) < 0) { \ +#define TRACE_DAEMON(src, nb, dd, lvl, res) \ + { \ + nb++; \ + json_object_array_add(res, json_object_new_int(dd->pid)); \ + if ((rc = trace_daemon(src->api, dd, lvl)) < 0) { \ afb_req_fail_f(src->request, "failed", "Trace pid %d error %d", dd->pid, rc); \ - goto error; \ - } \ + goto error; \ + } \ } CTLP_CAPI_REGISTER("supervisor"); @@ -251,8 +251,7 @@ CTLP_CAPI(trace_stop, source, argsJ, queryJ) return 0; } -#if 0 // Not used, implemented in lua for now (see xds-supervisor.lua / _trace_events_) -uint64_t get_ts() +static inline uint64_t get_ts() { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); @@ -262,112 +261,154 @@ uint64_t get_ts() static void cb_harvester_write(void* closure, int status, struct json_object* result, struct afb_dynapi* dynapi) { - AFB_API_DEBUG(dynapi, "SEB cb_harvester_write"); + json_object* err = NULL; + if (result) { + wrap_json_unpack(result, "{s:{s:o}}", "request", "info", &err); + } + + AFB_API_DEBUG(dynapi, "cb_harvester_write status: %d, %s", status, json_object_to_json_string(err)); } -static int harvester_post_data(CtlSourceT* source, METRIC_T* metric) +CTLP_CAPI(tracing_events, source, argsJ, eventJ) { int rc; + json_object* request = NULL; + json_object* event = NULL; + json_object* data = NULL; + json_object* timestamp = NULL; json_object *j_res, *j_query, *j_metric; + json_object *j_metadata, *j_values; + int id = 0; + const char* type = NULL; + const char* tag = NULL; + const char* cdata = NULL; + + AFB_API_DEBUG(source->api, ">>> RECV Event uid %s : %s", source->uid, + json_object_to_json_string(eventJ)); - if (!metric->timestamp) { - metric->timestamp = get_ts(); + if (strcmp(source->uid, "supervisor/xds-trace") != 0) { + AFB_API_NOTICE(source->api, "WARNING: un-handle uid '%s'", source->uid); + return 0; } - // To be REWORK - if (metric->dataType == SPVR_DATA_STRING) { - rc = wrap_json_pack(&j_metric, "{s:s s:{s:s s:s} s:{s:s} s:I }", - "name", metric->name, - "metadata", "source", "my_source", "identity", source->uid, - "values", "value", metric->data, - "timestamp", metric->timestamp); - } else if (metric->dataType == SPVR_DATA_INT) { - rc = wrap_json_pack(&j_metric, "{s:s s:{s:s s:s} s:{s:i} s:I }", - "name", metric->name, - "metadata", "source", "my_source", "identity", source->uid, - "values", "value", metric->data, - "timestamp", metric->timestamp); - } else { - afb_req_fail(source->request, "Failed", "Unsupported dataType"); + if ((rc = wrap_json_unpack(eventJ, "{so ss si ss s?o s?o s?o}", + "time", ×tamp, + "tag", &tag, + "id", &id, + "type", &type, + "request", &request, + "event", &event, + "data", &data)) + < 0) { + AFB_API_ERROR(source->api, "Cannot decode event object : %s", wrap_json_get_error_string(rc)); return ERROR; } - if (rc < 0) { - afb_req_fail(source->request, "Failed", "Error packing metric, rc=%d", rc); - return rc; + if (!timestamp) { + timestamp = json_object_new_int64(get_ts()); } - rc = wrap_json_pack(&j_query, "{s:s s:i s:o }", "host", - "localhost", "port", 8086, "metric", j_metric); + rc = wrap_json_pack(&j_metadata, "{s:s s:s}", + "identity", "xds_supervisor", + "tag", tag); if (rc < 0) { - afb_req_fail(source->request, "Failed", "Error packing query, rc=%d", rc); - return rc; + AFB_API_ERROR(source->api, "Error packing j_metadata, rc=%d", rc); + goto _EXIT_TRACE_EVT; } - - AFB_API_DEBUG(source->api, "%s write: %s", SRV_HARVESTER_NAME, - json_object_to_json_string(j_query)); - - /* SEB - rc = afb_api_call_sync_legacy(source->api, SRV_HARVESTER_NAME, "write", j_query, &j_res); + rc = wrap_json_pack(&j_values, "{si}", "id", id); if (rc < 0) { - afb_req_fail(source->request, "Failed", "Error %s write : rc=%d, j_res=%s", SRV_HARVESTER_NAME, rc, - json_object_to_json_string(j_res)); - return rc; + AFB_API_ERROR(source->api, "Error packing j_values, rc=%d", rc); + goto _EXIT_TRACE_EVT; } -*/ - afb_api_call_legacy(source->api, SRV_HARVESTER_NAME, "write", j_query, cb_harvester_write, &j_res); - return 0; -} + if (!strncmp(type, "request", 6) && request) { + // Process "request" type -CTLP_CAPI(tracing_events, source, argsJ, eventJ) -{ - int rc; - METRIC_T metric = { 0 }; - const char* type = NULL; - struct json_object* request = NULL; + json_object* j_action = json_object_object_get(request, "action"); + + // Filter out some traces +#if 0 // TO Test + const char* action = json_object_get_string(j_action); + if (action && (!strncmp(action, "begin", 5) || !strncmp(action, "end", 3) || !strncmp(action, "json", 4))) { + AFB_API_DEBUG(source->api, "trace_events IGNORED event=%s", json_object_to_json_string(request)); + rc = 0; + free(action); + goto _EXIT_TRACE_EVT; + } + if (action) + free(action); +#endif - //struct signalCBT* ctx = (struct signalCBT*)source->context; + json_object_object_add(j_metadata, "type", json_object_new_string("request")); + json_object_object_add(j_metadata, "api", json_object_object_get(request, "api")); + json_object_object_add(j_metadata, "verb", json_object_object_get(request, "verb")); + json_object_object_add(j_metadata, "action", j_action); + json_object_object_add(j_metadata, "session", json_object_object_get(request, "session")); + json_object_object_add(j_metadata, "req_index", json_object_new_string(json_object_to_json_string(json_object_object_get(request, "index")))); - AFB_API_DEBUG(source->api, ">>> RECV Event uid %s : %s", source->uid, - json_object_to_json_string(eventJ)); + AFB_API_DEBUG(source->api, "Type request, j_metadata=%s", json_object_to_json_string(j_metadata)); - if (strcmp(source->uid, "xds/supervisor/trace") != 0) { - AFB_API_NOTICE(source->api, "WARNING: un-handle uid '%s'", source->uid); - return 0; - } + } else if (!strncmp(type, "event", 5) && event) { + // Process "event" type - if ((rc = wrap_json_unpack(eventJ, "{s:?s}", "type", &type)) < 0) { - afb_req_fail(source->request, "Failed", "Cannot decode event type"); - return ERROR; + json_object_object_add(j_metadata, "type", json_object_new_string("event")); + json_object_object_add(j_metadata, "id", json_object_object_get(event, "id")); + json_object_object_add(j_metadata, "name", json_object_object_get(event, "name")); + json_object_object_add(j_metadata, "action", json_object_object_get(event, "action")); + + AFB_API_DEBUG(source->api, "Type event, j_metadata=%s", json_object_to_json_string(j_metadata)); + } else { + AFB_API_ERROR(source->api, "Null request or event field"); + rc = ERROR; + goto _EXIT_TRACE_EVT; } - if (strcmp(type, "request") != 0) { - afb_req_fail(source->request, "Failed", "Cannot retrieve request"); - return ERROR; + if (data) { + // Serialized data (IOW an JSON object pass as a string) + cdata = json_object_to_json_string(data); + json_object_object_add(j_values, "data", json_object_new_string(cdata)); + json_object_object_add(j_values, "data_bytes", json_object_new_int((int32_t)strlen(cdata))); } - if (!json_object_object_get_ex(eventJ, "request", &request)) { - afb_req_fail(source->request, "Failed", "Cannot decode event request"); - return ERROR; + + rc = wrap_json_pack(&j_metric, "{s:s s:o s:o s:o }", + "name", "xds/supervisor/trace", + "metadata", j_metadata, + "values", j_values, + "timestamp", timestamp); + + if (rc < 0) { + AFB_API_ERROR(source->api, "Error packing j_metric, rc=%d", rc); + goto _EXIT_TRACE_EVT; } - // TODO: decode request and build trace + rc = wrap_json_pack(&j_query, "{s:s s:i s:[o] }", + "host", "localhost", // FIXME - configurable + "port", 8086, // FIXME - configurable + "metric", j_metric); + if (rc < 0) { + AFB_API_ERROR(source->api, "Error packing query, rc=%d", rc); + goto _EXIT_TRACE_EVT; + } - metric.name = "trace"; - /* FIXME string KO - metric.dataType = SPVR_DATA_STRING; - metric.data = "test1234"; - */ - metric.dataType = SPVR_DATA_INT; - int val = 54321; - metric.data = &val; + AFB_API_DEBUG(source->api, "%s write: %s", SRV_HARVESTER_NAME, + json_object_to_json_string(j_query)); - rc = harvester_post_data(source, &metric); +#if 0 // SYNC_CALL - for debugging purpose + rc = afb_api_call_sync_legacy(source->api, SRV_HARVESTER_NAME, "write", j_query, &j_res); if (rc < 0) { - afb_req_fail(source->request, "Failed", "ERROR harvester_post_data: rc %d", rc); - return rc; + AFB_API_ERROR(source->api, "Error %s write : rc=%d, j_res=%s", SRV_HARVESTER_NAME, rc, + json_object_to_json_string(j_res)); + goto _EXIT_TRACE_EVT; } +#else + afb_api_call_legacy(source->api, SRV_HARVESTER_NAME, "write", j_query, cb_harvester_write, &j_res); +#endif - return 0; + // No error + rc = 0; + +_EXIT_TRACE_EVT: + // no need to free type, tag and cdata (already managed by json_object_generic_delete) + + return rc; } -#endif |