aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/influxdb-reader.c
diff options
context:
space:
mode:
authorRomain Forlot <romain.forlot@iot.bzh>2018-04-14 01:57:17 +0200
committerSebastien Douheret <sebastien.douheret@iot.bzh>2018-07-10 23:41:14 +0200
commit5d50f0426699a06b5720e10f1feaef35c8b59f57 (patch)
tree34641293f0eef907c89ffa1cb6d6b80b863106fc /src/plugins/influxdb-reader.c
parentbe6447ca2038c84d2e946e27b897815e95c48e34 (diff)
Improve writer/reader processing
- Handle indefinite number and kind of tags and fields for a metric - Include only once header files - Cleaning and ordering code Change-Id: I14a4f0e6e1626971bff73ce7d9ac067bda69cfc4 Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
Diffstat (limited to 'src/plugins/influxdb-reader.c')
-rw-r--r--src/plugins/influxdb-reader.c144
1 files changed, 105 insertions, 39 deletions
diff --git a/src/plugins/influxdb-reader.c b/src/plugins/influxdb-reader.c
index bb56c93..3a6a2b1 100644
--- a/src/plugins/influxdb-reader.c
+++ b/src/plugins/influxdb-reader.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,72 +15,88 @@
* limitations under the License.
*/
+
+#include "influxdb.h"
+
+#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
+#include <string.h>
#include <unistd.h>
#include <systemd/sd-event.h>
-#include "influxdb.h"
+#include "../utils/list.h"
-CURL *make_curl_query_get(const char *url)
+void fill_tag_n_fields(void *c, json_object *columnJ)
{
- CURL *curl;
- char *args[5];
- char *last_ts[30];
+ int length = json_object_get_string_len(columnJ);
+ const char *column = json_object_get_string(columnJ);
+ struct series_columns_t *s_col = (struct series_columns_t *)c;
- int fd_last_read = afb_daemon_rootdir_open_locale("last_db_read", O_CREAT, NULL);
- if (fd_last_read < 0)
- return NULL;
-
- read(fd_last_read, last_ts, sizeof(last_ts));
-
- args[0] = "epoch";
- args[1] = "ns";
- args[2] = "q";
- args[3] = "SELECT * FROM /^.*$/";
- args[4] = NULL;
-
- curl = curl_wrap_prepare_get(url, NULL, (const char * const*)args);
-
- return curl;
+ if(strncasecmp(&column[length-1], "_t", 2) == 0) {
+ add_key(s_col->tags, column);
+ }
+ else if(strncasecmp(&column[length-1], "_f", 2) == 0) {
+ add_key(s_col->fields, column);
+ }
+ return;
}
-int unpack_metric_from_db(json_object *metric)
+void unpack_metric_from_db(void *series_processed, json_object *metricJ)
{
const char *name;
- json_object *columns = NULL, *values = NULL;
+ int length = 0;
+ struct series_columns_t col = {.tags = NULL, .fields = NULL};
+ json_object *columnsJ = NULL, *valuesJ = NULL;
- wrap_json_unpack(metric, "{ss, so, so!}",
+ if(wrap_json_unpack(metricJ, "{ss, so, so!}",
"name", &name,
- "columns", &columns,
- "values", &values);
+ "columns", &columnsJ,
+ "values", &valuesJ)) {
+ AFB_ERROR("Unpacking metric goes wrong");
+ return;
+ }
+
+ length = json_object_get_string_len(columnsJ);
+ wrap_json_array_for_all(columnsJ, fill_tag_n_fields, &col);
+
+ /* Increment counter of series well processed */
+ ++*((int*)series_processed);
}
-int unpack_series(json_object *series)
+int unpack_series(json_object *seriesJ)
{
- size_t length_series = json_object_array_length(series);
+ size_t series_processed = 0;
+ wrap_json_array_for_all(seriesJ, unpack_metric_from_db, (void*)&series_processed);
+ return 0;
}
void forward_to_garner(const char *result, size_t size)
{
int id = 0;
- json_object *series = NULL,
- *db_dump = json_tokener_parse(result);
- wrap_json_unpack(db_dump, "{s[{si,so}]}",
- "id", &id,
- "series", &series);
-
- unpack_series(series);
+ json_object *resultsJ = NULL,
+ *seriesJ = NULL,
+ *db_dumpJ = json_tokener_parse(result);
+ if( wrap_json_unpack(db_dumpJ, "{so!}",
+ "results", &resultsJ) ||
+ wrap_json_unpack(resultsJ, "[{si,so!}]",
+ "statement_id", &id,
+ "series", &seriesJ)) {
+ AFB_ERROR("Unpacking results from influxdb request. Request results was:\n%s", result);
+ return;
+ }
+ if(seriesJ)
+ unpack_series(seriesJ);
}
void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
{
long rep_code = curl_wrap_response_code_get(curl);
switch(rep_code) {
- case 204:
+ case 200:
AFB_DEBUG("Read correctly done");
forward_to_garner(result, size);
break;
@@ -96,6 +112,48 @@ void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *re
}
}
+CURL *make_curl_query_get(const char *url)
+{
+ CURL *curl;
+ char *args[5];
+ char query[255] = {};
+ char last_ts[30] = {};
+ char *now;
+ int length_now;
+
+ args[0] = "epoch";
+ args[1] = "ns";
+ args[2] = "q";
+ strncat(query, "SELECT * FROM /^.*$/", strlen("SELECT * FROM /^.*$/"));
+ args[4] = NULL;
+ length_now = asprintf(&now, "%lu", get_ts());
+
+ int rootdir_fd = afb_daemon_rootdir_get_fd();
+ int fd_last_read = openat(rootdir_fd, "last_db_read", O_CREAT | O_RDWR, S_IRWXU);
+ if (fd_last_read < 0)
+ return NULL;
+
+ /* Reading last timestamp recorded and get metric from that point until now
+ else write the last timestamp */
+ if(read(fd_last_read, last_ts, sizeof(last_ts)) == 0) {
+ if(write(fd_last_read, now, length_now) != length_now)
+ AFB_ERROR("Error writing last_db_read file: %s\n", strerror( errno ));
+ }
+ else {
+ strncat(query, " WHERE time >= ", strlen(" WHERE time >= "));
+ strncat(query, last_ts, strlen(last_ts));
+ close(fd_last_read);
+ fd_last_read = openat(rootdir_fd, "last_db_read", O_TRUNC | O_RDWR);
+ if (write(fd_last_read, now, length_now) != length_now)
+ AFB_ERROR("Error writing last_db_read file: %s", strerror( errno ));
+ }
+
+ args[3] = query;
+ curl = curl_wrap_prepare_get(url, NULL, (const char * const*)args);
+
+ return curl;
+}
+
int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
{
CURL *curl;
@@ -103,7 +161,7 @@ int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
if(userdata)
a = (struct reader_args*)userdata;
else
- return -1;
+ return ERROR;
char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
@@ -111,15 +169,23 @@ int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
curl = make_curl_query_get(url);
curl_wrap_do(curl, influxdb_read_curl_cb, NULL);
+ /* Reschedule next run */
+ sd_event_source_set_time(s, usec + a->delay);
+
return 0;
}
int influxdb_reader(void *args)
{
+ int err = 0;
uint64_t usec;
- struct sd_event_source *evtSource;
+ struct sd_event_source *evtSource = NULL;
/* Set a cyclic cb call each 1s to call the read callback */
sd_event_now(afb_daemon_get_event_loop(), CLOCK_MONOTONIC, &usec);
- return sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args);
+ err = sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args);
+ if(!err)
+ err = sd_event_source_set_enabled(evtSource, SD_EVENT_ON);
+
+ return err;
}