From f93730e72eeeade82ca72d7a7ad8cde404ef95f1 Mon Sep 17 00:00:00 2001 From: Romain Forlot Date: Tue, 17 Apr 2018 18:25:12 +0200 Subject: Handle query result and prepare to forward by API First draft to be able to read at regular interval the TSDB InfluxDB. This prepare a JSON object formated to be handled as an input argument from a regular call to the API verb 'write' Then it is only needed to call the remote verb to forward all the results Change-Id: I80076dd9cf00ba43075d37dd4d15180e63e37289 Signed-off-by: Romain Forlot --- src/plugins/influxdb-reader.c | 112 +++++++++++++++++++++++++++++++----------- src/utils/list.c | 14 ++++++ src/utils/list.h | 1 + 3 files changed, 97 insertions(+), 30 deletions(-) diff --git a/src/plugins/influxdb-reader.c b/src/plugins/influxdb-reader.c index 3a6a2b1..b6bef25 100644 --- a/src/plugins/influxdb-reader.c +++ b/src/plugins/influxdb-reader.c @@ -15,7 +15,6 @@ * limitations under the License. */ - #include "influxdb.h" #include @@ -27,72 +26,125 @@ #include "../utils/list.h" -void fill_tag_n_fields(void *c, json_object *columnJ) +struct metrics_list { + struct series_t serie; + json_object *metricsJ; +}; + +static void fill_n_send_values(void *c, json_object *valuesJ) +{ + struct list *it = NULL; + int length = json_object_get_string_len(valuesJ), i = 0, j = 0; + struct metrics_list *m_list = (struct metrics_list *)c; + json_object *one_metric = json_object_new_object(); + + for (i = 0; i < length; i++) { + if(!i) + m_list->serie.timestamp = json_object_get_int64(valuesJ); + else { + if(set_value(m_list->serie.serie_columns.tags, valuesJ, i)) { + if(set_value(m_list->serie.serie_columns.fields, valuesJ, j)) { + AFB_ERROR("No tags nor fields fits."); + } + j++; + } + } + } + + /* Build a metric object to add in the JSON array */ + json_object_object_add(one_metric, "name", json_object_new_string(m_list->serie.name)); + json_object_object_add(one_metric, "timestamp", json_object_new_int64(m_list->serie.timestamp)); + for(it = m_list->serie.serie_columns.tags; it != NULL; it = it->next) + json_object_object_add(one_metric, m_list->serie.serie_columns.tags->key, m_list->serie.serie_columns.tags->value); + for(it = m_list->serie.serie_columns.fields; it != NULL; it = it->next) + json_object_object_add(one_metric, m_list->serie.serie_columns.fields->key, m_list->serie.serie_columns.fields->value); + + json_object_array_add(m_list->metricsJ, one_metric); +} + +static void fill_key(void *c, json_object *columnJ) { int length = json_object_get_string_len(columnJ); const char *column = json_object_get_string(columnJ); - struct series_columns_t *s_col = (struct series_columns_t *)c; + struct metrics_list *m_list = (struct metrics_list *)c; - if(strncasecmp(&column[length-1], "_t", 2) == 0) { - add_key(s_col->tags, column); + if(strncasecmp(&column[length-2], "_t", 2) == 0) { + add_key(&m_list->serie.serie_columns.tags, column); } - else if(strncasecmp(&column[length-1], "_f", 2) == 0) { - add_key(s_col->fields, column); + else if(strncasecmp(&column[length-2], "_f", 2) == 0) { + add_key(&m_list->serie.serie_columns.fields, column); } - return; } -void unpack_metric_from_db(void *series_processed, json_object *metricJ) +static void unpack_metric_from_db(void *ml, json_object *metricJ) { - const char *name; - int length = 0; - struct series_columns_t col = {.tags = NULL, .fields = NULL}; + struct metrics_list *m_list = (struct metrics_list*)ml; json_object *columnsJ = NULL, *valuesJ = NULL; if(wrap_json_unpack(metricJ, "{ss, so, so!}", - "name", &name, + "name", &m_list->serie.name, "columns", &columnsJ, "values", &valuesJ)) { AFB_ERROR("Unpacking metric goes wrong"); return; } - length = json_object_get_string_len(columnsJ); - wrap_json_array_for_all(columnsJ, fill_tag_n_fields, &col); + wrap_json_array_for_all(columnsJ, fill_key, m_list); + wrap_json_array_for_all(valuesJ, fill_n_send_values, m_list); - /* Increment counter of series well processed */ - ++*((int*)series_processed); } -int unpack_series(json_object *seriesJ) +static json_object *unpack_series(json_object *seriesJ) { - size_t series_processed = 0; - wrap_json_array_for_all(seriesJ, unpack_metric_from_db, (void*)&series_processed); - - return 0; + struct metrics_list m_list = { + .serie = { + .name = NULL, + .serie_columns = { + .tags = NULL, + .fields = NULL + }, + .timestamp = 0 + }, + .metricsJ = json_object_new_array() + }; + + wrap_json_array_for_all(seriesJ, unpack_metric_from_db, (void*)&m_list); + + return m_list.metricsJ; } -void forward_to_garner(const char *result, size_t size) +static void forward_to_garner(const char *result, size_t size) { int id = 0; - json_object *resultsJ = NULL, *seriesJ = NULL, + *metrics2send = NULL, + *call_resultJ = NULL, *db_dumpJ = json_tokener_parse(result); + if( wrap_json_unpack(db_dumpJ, "{so!}", "results", &resultsJ) || wrap_json_unpack(resultsJ, "[{si,so!}]", "statement_id", &id, "series", &seriesJ)) { - AFB_ERROR("Unpacking results from influxdb request. Request results was:\n%s", result); +// AFB_DEBUG("Unpacking results from influxdb request. Request results was:\n%s", result); return; } - if(seriesJ) - unpack_series(seriesJ); + if(seriesJ) { + metrics2send = unpack_series(seriesJ); + if(json_object_array_length(metrics2send)) { + if(afb_service_call_sync("garner", "write", metrics2send, &call_resultJ)) { + AFB_ERROR("Metrics were sent but not done, an error happens. Details: %s", json_object_to_json_string(call_resultJ)); + } + } + } + else { + AFB_ERROR("Empty response. Request results was:\n%s", result); + } } -void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size) +static void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size) { long rep_code = curl_wrap_response_code_get(curl); switch(rep_code) { @@ -112,7 +164,7 @@ void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *re } } -CURL *make_curl_query_get(const char *url) +static CURL *make_curl_query_get(const char *url) { CURL *curl; char *args[5]; @@ -154,7 +206,7 @@ CURL *make_curl_query_get(const char *url) return curl; } -int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata) +static int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata) { CURL *curl; struct reader_args *a = NULL; diff --git a/src/utils/list.c b/src/utils/list.c index bd7eeac..d47f761 100644 --- a/src/utils/list.c +++ b/src/utils/list.c @@ -64,6 +64,20 @@ void add_key(struct list **l, const char *key) } } +int set_value(struct list *l, json_object *val, int index) +{ + int i; + + for (i = 0; i < index; i++) { + l = l->next; + if ( l == NULL ) + return -1; + } + + l->value = val; + return 0; +} + struct list *get_elt(struct list *l, int index) { int i; diff --git a/src/utils/list.h b/src/utils/list.h index 7cacbed..9f93cb6 100644 --- a/src/utils/list.h +++ b/src/utils/list.h @@ -29,6 +29,7 @@ struct list { void destroy_list(struct list *l); void add_elt(struct list **l, const char *key, json_object *value); void add_key(struct list **l, const char *key); +int set_value(struct list *l, json_object *val, int index); struct list *get_elt(struct list *l, int index); struct list *find_elt_from_key(struct list *l, const char *key); json_object *find_key_value(struct list *l, const char *key); -- cgit 1.2.3-korg