diff options
author | Romain Forlot <romain.forlot@iot.bzh> | 2018-04-14 01:57:17 +0200 |
---|---|---|
committer | Sebastien Douheret <sebastien.douheret@iot.bzh> | 2018-07-10 23:41:14 +0200 |
commit | 5d50f0426699a06b5720e10f1feaef35c8b59f57 (patch) | |
tree | 34641293f0eef907c89ffa1cb6d6b80b863106fc /src/plugins/influxdb-reader.c | |
parent | be6447ca2038c84d2e946e27b897815e95c48e34 (diff) |
Improve writer/reader processing
- Handle indefinite number and kind of tags and fields for a metric
- Include only once header files
- Cleaning and ordering code
Change-Id: I14a4f0e6e1626971bff73ce7d9ac067bda69cfc4
Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
Diffstat (limited to 'src/plugins/influxdb-reader.c')
-rw-r--r-- | src/plugins/influxdb-reader.c | 144 |
1 files changed, 105 insertions, 39 deletions
diff --git a/src/plugins/influxdb-reader.c b/src/plugins/influxdb-reader.c index bb56c93..3a6a2b1 100644 --- a/src/plugins/influxdb-reader.c +++ b/src/plugins/influxdb-reader.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh" + * Copyright (C) 2018 "IoT.bzh" * Author "Romain Forlot" <romain.forlot@iot.bzh> * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,72 +15,88 @@ * limitations under the License. */ + +#include "influxdb.h" + +#include <errno.h> #include <fcntl.h> #include <stdio.h> +#include <string.h> #include <unistd.h> #include <systemd/sd-event.h> -#include "influxdb.h" +#include "../utils/list.h" -CURL *make_curl_query_get(const char *url) +void fill_tag_n_fields(void *c, json_object *columnJ) { - CURL *curl; - char *args[5]; - char *last_ts[30]; + int length = json_object_get_string_len(columnJ); + const char *column = json_object_get_string(columnJ); + struct series_columns_t *s_col = (struct series_columns_t *)c; - int fd_last_read = afb_daemon_rootdir_open_locale("last_db_read", O_CREAT, NULL); - if (fd_last_read < 0) - return NULL; - - read(fd_last_read, last_ts, sizeof(last_ts)); - - args[0] = "epoch"; - args[1] = "ns"; - args[2] = "q"; - args[3] = "SELECT * FROM /^.*$/"; - args[4] = NULL; - - curl = curl_wrap_prepare_get(url, NULL, (const char * const*)args); - - return curl; + if(strncasecmp(&column[length-1], "_t", 2) == 0) { + add_key(s_col->tags, column); + } + else if(strncasecmp(&column[length-1], "_f", 2) == 0) { + add_key(s_col->fields, column); + } + return; } -int unpack_metric_from_db(json_object *metric) +void unpack_metric_from_db(void *series_processed, json_object *metricJ) { const char *name; - json_object *columns = NULL, *values = NULL; + int length = 0; + struct series_columns_t col = {.tags = NULL, .fields = NULL}; + json_object *columnsJ = NULL, *valuesJ = NULL; - wrap_json_unpack(metric, "{ss, so, so!}", + if(wrap_json_unpack(metricJ, "{ss, so, so!}", "name", &name, - "columns", &columns, - "values", &values); + "columns", &columnsJ, + "values", &valuesJ)) { + AFB_ERROR("Unpacking metric goes wrong"); + return; + } + + length = json_object_get_string_len(columnsJ); + wrap_json_array_for_all(columnsJ, fill_tag_n_fields, &col); + + /* Increment counter of series well processed */ + ++*((int*)series_processed); } -int unpack_series(json_object *series) +int unpack_series(json_object *seriesJ) { - size_t length_series = json_object_array_length(series); + size_t series_processed = 0; + wrap_json_array_for_all(seriesJ, unpack_metric_from_db, (void*)&series_processed); + return 0; } void forward_to_garner(const char *result, size_t size) { int id = 0; - json_object *series = NULL, - *db_dump = json_tokener_parse(result); - wrap_json_unpack(db_dump, "{s[{si,so}]}", - "id", &id, - "series", &series); - - unpack_series(series); + json_object *resultsJ = NULL, + *seriesJ = NULL, + *db_dumpJ = json_tokener_parse(result); + if( wrap_json_unpack(db_dumpJ, "{so!}", + "results", &resultsJ) || + wrap_json_unpack(resultsJ, "[{si,so!}]", + "statement_id", &id, + "series", &seriesJ)) { + AFB_ERROR("Unpacking results from influxdb request. Request results was:\n%s", result); + return; + } + if(seriesJ) + unpack_series(seriesJ); } void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size) { long rep_code = curl_wrap_response_code_get(curl); switch(rep_code) { - case 204: + case 200: AFB_DEBUG("Read correctly done"); forward_to_garner(result, size); break; @@ -96,6 +112,48 @@ void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *re } } +CURL *make_curl_query_get(const char *url) +{ + CURL *curl; + char *args[5]; + char query[255] = {}; + char last_ts[30] = {}; + char *now; + int length_now; + + args[0] = "epoch"; + args[1] = "ns"; + args[2] = "q"; + strncat(query, "SELECT * FROM /^.*$/", strlen("SELECT * FROM /^.*$/")); + args[4] = NULL; + length_now = asprintf(&now, "%lu", get_ts()); + + int rootdir_fd = afb_daemon_rootdir_get_fd(); + int fd_last_read = openat(rootdir_fd, "last_db_read", O_CREAT | O_RDWR, S_IRWXU); + if (fd_last_read < 0) + return NULL; + + /* Reading last timestamp recorded and get metric from that point until now + else write the last timestamp */ + if(read(fd_last_read, last_ts, sizeof(last_ts)) == 0) { + if(write(fd_last_read, now, length_now) != length_now) + AFB_ERROR("Error writing last_db_read file: %s\n", strerror( errno )); + } + else { + strncat(query, " WHERE time >= ", strlen(" WHERE time >= ")); + strncat(query, last_ts, strlen(last_ts)); + close(fd_last_read); + fd_last_read = openat(rootdir_fd, "last_db_read", O_TRUNC | O_RDWR); + if (write(fd_last_read, now, length_now) != length_now) + AFB_ERROR("Error writing last_db_read file: %s", strerror( errno )); + } + + args[3] = query; + curl = curl_wrap_prepare_get(url, NULL, (const char * const*)args); + + return curl; +} + int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata) { CURL *curl; @@ -103,7 +161,7 @@ int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata) if(userdata) a = (struct reader_args*)userdata; else - return -1; + return ERROR; char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */ @@ -111,15 +169,23 @@ int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata) curl = make_curl_query_get(url); curl_wrap_do(curl, influxdb_read_curl_cb, NULL); + /* Reschedule next run */ + sd_event_source_set_time(s, usec + a->delay); + return 0; } int influxdb_reader(void *args) { + int err = 0; uint64_t usec; - struct sd_event_source *evtSource; + struct sd_event_source *evtSource = NULL; /* Set a cyclic cb call each 1s to call the read callback */ sd_event_now(afb_daemon_get_event_loop(), CLOCK_MONOTONIC, &usec); - return sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args); + err = sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args); + if(!err) + err = sd_event_source_set_enabled(evtSource, SD_EVENT_ON); + + return err; } |