summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRomain Forlot <romain.forlot@iot.bzh>2018-04-17 18:25:12 +0200
committerSebastien Douheret <sebastien.douheret@iot.bzh>2018-07-10 23:41:14 +0200
commitf93730e72eeeade82ca72d7a7ad8cde404ef95f1 (patch)
tree8569e6ab7d62e7d03aebab9442f091175f907558
parent4b6d97da2d4bc1a9e328cf861e75f5ee1bb968e3 (diff)
Handle query result and prepare to forward by API
First draft to be able to read at regular interval the TSDB InfluxDB. This prepare a JSON object formated to be handled as an input argument from a regular call to the API verb 'write' Then it is only needed to call the remote verb to forward all the results Change-Id: I80076dd9cf00ba43075d37dd4d15180e63e37289 Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
-rw-r--r--src/plugins/influxdb-reader.c112
-rw-r--r--src/utils/list.c14
-rw-r--r--src/utils/list.h1
3 files changed, 97 insertions, 30 deletions
diff --git a/src/plugins/influxdb-reader.c b/src/plugins/influxdb-reader.c
index 3a6a2b1..b6bef25 100644
--- a/src/plugins/influxdb-reader.c
+++ b/src/plugins/influxdb-reader.c
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-
#include "influxdb.h"
#include <errno.h>
@@ -27,72 +26,125 @@
#include "../utils/list.h"
-void fill_tag_n_fields(void *c, json_object *columnJ)
+struct metrics_list {
+ struct series_t serie;
+ json_object *metricsJ;
+};
+
+static void fill_n_send_values(void *c, json_object *valuesJ)
+{
+ struct list *it = NULL;
+ int length = json_object_get_string_len(valuesJ), i = 0, j = 0;
+ struct metrics_list *m_list = (struct metrics_list *)c;
+ json_object *one_metric = json_object_new_object();
+
+ for (i = 0; i < length; i++) {
+ if(!i)
+ m_list->serie.timestamp = json_object_get_int64(valuesJ);
+ else {
+ if(set_value(m_list->serie.serie_columns.tags, valuesJ, i)) {
+ if(set_value(m_list->serie.serie_columns.fields, valuesJ, j)) {
+ AFB_ERROR("No tags nor fields fits.");
+ }
+ j++;
+ }
+ }
+ }
+
+ /* Build a metric object to add in the JSON array */
+ json_object_object_add(one_metric, "name", json_object_new_string(m_list->serie.name));
+ json_object_object_add(one_metric, "timestamp", json_object_new_int64(m_list->serie.timestamp));
+ for(it = m_list->serie.serie_columns.tags; it != NULL; it = it->next)
+ json_object_object_add(one_metric, m_list->serie.serie_columns.tags->key, m_list->serie.serie_columns.tags->value);
+ for(it = m_list->serie.serie_columns.fields; it != NULL; it = it->next)
+ json_object_object_add(one_metric, m_list->serie.serie_columns.fields->key, m_list->serie.serie_columns.fields->value);
+
+ json_object_array_add(m_list->metricsJ, one_metric);
+}
+
+static void fill_key(void *c, json_object *columnJ)
{
int length = json_object_get_string_len(columnJ);
const char *column = json_object_get_string(columnJ);
- struct series_columns_t *s_col = (struct series_columns_t *)c;
+ struct metrics_list *m_list = (struct metrics_list *)c;
- if(strncasecmp(&column[length-1], "_t", 2) == 0) {
- add_key(s_col->tags, column);
+ if(strncasecmp(&column[length-2], "_t", 2) == 0) {
+ add_key(&m_list->serie.serie_columns.tags, column);
}
- else if(strncasecmp(&column[length-1], "_f", 2) == 0) {
- add_key(s_col->fields, column);
+ else if(strncasecmp(&column[length-2], "_f", 2) == 0) {
+ add_key(&m_list->serie.serie_columns.fields, column);
}
- return;
}
-void unpack_metric_from_db(void *series_processed, json_object *metricJ)
+static void unpack_metric_from_db(void *ml, json_object *metricJ)
{
- const char *name;
- int length = 0;
- struct series_columns_t col = {.tags = NULL, .fields = NULL};
+ struct metrics_list *m_list = (struct metrics_list*)ml;
json_object *columnsJ = NULL, *valuesJ = NULL;
if(wrap_json_unpack(metricJ, "{ss, so, so!}",
- "name", &name,
+ "name", &m_list->serie.name,
"columns", &columnsJ,
"values", &valuesJ)) {
AFB_ERROR("Unpacking metric goes wrong");
return;
}
- length = json_object_get_string_len(columnsJ);
- wrap_json_array_for_all(columnsJ, fill_tag_n_fields, &col);
+ wrap_json_array_for_all(columnsJ, fill_key, m_list);
+ wrap_json_array_for_all(valuesJ, fill_n_send_values, m_list);
- /* Increment counter of series well processed */
- ++*((int*)series_processed);
}
-int unpack_series(json_object *seriesJ)
+static json_object *unpack_series(json_object *seriesJ)
{
- size_t series_processed = 0;
- wrap_json_array_for_all(seriesJ, unpack_metric_from_db, (void*)&series_processed);
-
- return 0;
+ struct metrics_list m_list = {
+ .serie = {
+ .name = NULL,
+ .serie_columns = {
+ .tags = NULL,
+ .fields = NULL
+ },
+ .timestamp = 0
+ },
+ .metricsJ = json_object_new_array()
+ };
+
+ wrap_json_array_for_all(seriesJ, unpack_metric_from_db, (void*)&m_list);
+
+ return m_list.metricsJ;
}
-void forward_to_garner(const char *result, size_t size)
+static void forward_to_garner(const char *result, size_t size)
{
int id = 0;
-
json_object *resultsJ = NULL,
*seriesJ = NULL,
+ *metrics2send = NULL,
+ *call_resultJ = NULL,
*db_dumpJ = json_tokener_parse(result);
+
if( wrap_json_unpack(db_dumpJ, "{so!}",
"results", &resultsJ) ||
wrap_json_unpack(resultsJ, "[{si,so!}]",
"statement_id", &id,
"series", &seriesJ)) {
- AFB_ERROR("Unpacking results from influxdb request. Request results was:\n%s", result);
+// AFB_DEBUG("Unpacking results from influxdb request. Request results was:\n%s", result);
return;
}
- if(seriesJ)
- unpack_series(seriesJ);
+ if(seriesJ) {
+ metrics2send = unpack_series(seriesJ);
+ if(json_object_array_length(metrics2send)) {
+ if(afb_service_call_sync("garner", "write", metrics2send, &call_resultJ)) {
+ AFB_ERROR("Metrics were sent but not done, an error happens. Details: %s", json_object_to_json_string(call_resultJ));
+ }
+ }
+ }
+ else {
+ AFB_ERROR("Empty response. Request results was:\n%s", result);
+ }
}
-void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
+static void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
{
long rep_code = curl_wrap_response_code_get(curl);
switch(rep_code) {
@@ -112,7 +164,7 @@ void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *re
}
}
-CURL *make_curl_query_get(const char *url)
+static CURL *make_curl_query_get(const char *url)
{
CURL *curl;
char *args[5];
@@ -154,7 +206,7 @@ CURL *make_curl_query_get(const char *url)
return curl;
}
-int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
+static int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
{
CURL *curl;
struct reader_args *a = NULL;
diff --git a/src/utils/list.c b/src/utils/list.c
index bd7eeac..d47f761 100644
--- a/src/utils/list.c
+++ b/src/utils/list.c
@@ -64,6 +64,20 @@ void add_key(struct list **l, const char *key)
}
}
+int set_value(struct list *l, json_object *val, int index)
+{
+ int i;
+
+ for (i = 0; i < index; i++) {
+ l = l->next;
+ if ( l == NULL )
+ return -1;
+ }
+
+ l->value = val;
+ return 0;
+}
+
struct list *get_elt(struct list *l, int index)
{
int i;
diff --git a/src/utils/list.h b/src/utils/list.h
index 7cacbed..9f93cb6 100644
--- a/src/utils/list.h
+++ b/src/utils/list.h
@@ -29,6 +29,7 @@ struct list {
void destroy_list(struct list *l);
void add_elt(struct list **l, const char *key, json_object *value);
void add_key(struct list **l, const char *key);
+int set_value(struct list *l, json_object *val, int index);
struct list *get_elt(struct list *l, int index);
struct list *find_elt_from_key(struct list *l, const char *key);
json_object *find_key_value(struct list *l, const char *key);