summaryrefslogtreecommitdiffstats
path: root/src/plugins
diff options
context:
space:
mode:
authorRomain Forlot <romain.forlot@iot.bzh>2018-04-14 01:57:17 +0200
committerSebastien Douheret <sebastien.douheret@iot.bzh>2018-07-10 23:41:14 +0200
commit5d50f0426699a06b5720e10f1feaef35c8b59f57 (patch)
tree34641293f0eef907c89ffa1cb6d6b80b863106fc /src/plugins
parentbe6447ca2038c84d2e946e27b897815e95c48e34 (diff)
Improve writer/reader processing
- Handle indefinite number and kind of tags and fields for a metric - Include only once header files - Cleaning and ordering code Change-Id: I14a4f0e6e1626971bff73ce7d9ac067bda69cfc4 Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
Diffstat (limited to 'src/plugins')
-rw-r--r--src/plugins/influxdb-reader.c144
-rw-r--r--src/plugins/influxdb-writer.c71
-rw-r--r--src/plugins/influxdb.c91
-rw-r--r--src/plugins/influxdb.h22
-rw-r--r--src/plugins/tsdb.c8
-rw-r--r--src/plugins/tsdb.h10
6 files changed, 233 insertions, 113 deletions
diff --git a/src/plugins/influxdb-reader.c b/src/plugins/influxdb-reader.c
index bb56c93..3a6a2b1 100644
--- a/src/plugins/influxdb-reader.c
+++ b/src/plugins/influxdb-reader.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,72 +15,88 @@
* limitations under the License.
*/
+
+#include "influxdb.h"
+
+#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
+#include <string.h>
#include <unistd.h>
#include <systemd/sd-event.h>
-#include "influxdb.h"
+#include "../utils/list.h"
-CURL *make_curl_query_get(const char *url)
+void fill_tag_n_fields(void *c, json_object *columnJ)
{
- CURL *curl;
- char *args[5];
- char *last_ts[30];
+ int length = json_object_get_string_len(columnJ);
+ const char *column = json_object_get_string(columnJ);
+ struct series_columns_t *s_col = (struct series_columns_t *)c;
- int fd_last_read = afb_daemon_rootdir_open_locale("last_db_read", O_CREAT, NULL);
- if (fd_last_read < 0)
- return NULL;
-
- read(fd_last_read, last_ts, sizeof(last_ts));
-
- args[0] = "epoch";
- args[1] = "ns";
- args[2] = "q";
- args[3] = "SELECT * FROM /^.*$/";
- args[4] = NULL;
-
- curl = curl_wrap_prepare_get(url, NULL, (const char * const*)args);
-
- return curl;
+ if(strncasecmp(&column[length-1], "_t", 2) == 0) {
+ add_key(s_col->tags, column);
+ }
+ else if(strncasecmp(&column[length-1], "_f", 2) == 0) {
+ add_key(s_col->fields, column);
+ }
+ return;
}
-int unpack_metric_from_db(json_object *metric)
+void unpack_metric_from_db(void *series_processed, json_object *metricJ)
{
const char *name;
- json_object *columns = NULL, *values = NULL;
+ int length = 0;
+ struct series_columns_t col = {.tags = NULL, .fields = NULL};
+ json_object *columnsJ = NULL, *valuesJ = NULL;
- wrap_json_unpack(metric, "{ss, so, so!}",
+ if(wrap_json_unpack(metricJ, "{ss, so, so!}",
"name", &name,
- "columns", &columns,
- "values", &values);
+ "columns", &columnsJ,
+ "values", &valuesJ)) {
+ AFB_ERROR("Unpacking metric goes wrong");
+ return;
+ }
+
+ length = json_object_get_string_len(columnsJ);
+ wrap_json_array_for_all(columnsJ, fill_tag_n_fields, &col);
+
+ /* Increment counter of series well processed */
+ ++*((int*)series_processed);
}
-int unpack_series(json_object *series)
+int unpack_series(json_object *seriesJ)
{
- size_t length_series = json_object_array_length(series);
+ size_t series_processed = 0;
+ wrap_json_array_for_all(seriesJ, unpack_metric_from_db, (void*)&series_processed);
+ return 0;
}
void forward_to_garner(const char *result, size_t size)
{
int id = 0;
- json_object *series = NULL,
- *db_dump = json_tokener_parse(result);
- wrap_json_unpack(db_dump, "{s[{si,so}]}",
- "id", &id,
- "series", &series);
-
- unpack_series(series);
+ json_object *resultsJ = NULL,
+ *seriesJ = NULL,
+ *db_dumpJ = json_tokener_parse(result);
+ if( wrap_json_unpack(db_dumpJ, "{so!}",
+ "results", &resultsJ) ||
+ wrap_json_unpack(resultsJ, "[{si,so!}]",
+ "statement_id", &id,
+ "series", &seriesJ)) {
+ AFB_ERROR("Unpacking results from influxdb request. Request results was:\n%s", result);
+ return;
+ }
+ if(seriesJ)
+ unpack_series(seriesJ);
}
void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
{
long rep_code = curl_wrap_response_code_get(curl);
switch(rep_code) {
- case 204:
+ case 200:
AFB_DEBUG("Read correctly done");
forward_to_garner(result, size);
break;
@@ -96,6 +112,48 @@ void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *re
}
}
+CURL *make_curl_query_get(const char *url)
+{
+ CURL *curl;
+ char *args[5];
+ char query[255] = {};
+ char last_ts[30] = {};
+ char *now;
+ int length_now;
+
+ args[0] = "epoch";
+ args[1] = "ns";
+ args[2] = "q";
+ strncat(query, "SELECT * FROM /^.*$/", strlen("SELECT * FROM /^.*$/"));
+ args[4] = NULL;
+ length_now = asprintf(&now, "%lu", get_ts());
+
+ int rootdir_fd = afb_daemon_rootdir_get_fd();
+ int fd_last_read = openat(rootdir_fd, "last_db_read", O_CREAT | O_RDWR, S_IRWXU);
+ if (fd_last_read < 0)
+ return NULL;
+
+ /* Reading last timestamp recorded and get metric from that point until now
+ else write the last timestamp */
+ if(read(fd_last_read, last_ts, sizeof(last_ts)) == 0) {
+ if(write(fd_last_read, now, length_now) != length_now)
+ AFB_ERROR("Error writing last_db_read file: %s\n", strerror( errno ));
+ }
+ else {
+ strncat(query, " WHERE time >= ", strlen(" WHERE time >= "));
+ strncat(query, last_ts, strlen(last_ts));
+ close(fd_last_read);
+ fd_last_read = openat(rootdir_fd, "last_db_read", O_TRUNC | O_RDWR);
+ if (write(fd_last_read, now, length_now) != length_now)
+ AFB_ERROR("Error writing last_db_read file: %s", strerror( errno ));
+ }
+
+ args[3] = query;
+ curl = curl_wrap_prepare_get(url, NULL, (const char * const*)args);
+
+ return curl;
+}
+
int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
{
CURL *curl;
@@ -103,7 +161,7 @@ int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
if(userdata)
a = (struct reader_args*)userdata;
else
- return -1;
+ return ERROR;
char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
@@ -111,15 +169,23 @@ int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
curl = make_curl_query_get(url);
curl_wrap_do(curl, influxdb_read_curl_cb, NULL);
+ /* Reschedule next run */
+ sd_event_source_set_time(s, usec + a->delay);
+
return 0;
}
int influxdb_reader(void *args)
{
+ int err = 0;
uint64_t usec;
- struct sd_event_source *evtSource;
+ struct sd_event_source *evtSource = NULL;
/* Set a cyclic cb call each 1s to call the read callback */
sd_event_now(afb_daemon_get_event_loop(), CLOCK_MONOTONIC, &usec);
- return sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args);
+ err = sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args);
+ if(!err)
+ err = sd_event_source_set_enabled(evtSource, SD_EVENT_ON);
+
+ return err;
}
diff --git a/src/plugins/influxdb-writer.c b/src/plugins/influxdb-writer.c
index 3c46b57..673fbe5 100644
--- a/src/plugins/influxdb-writer.c
+++ b/src/plugins/influxdb-writer.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,46 +16,57 @@
*/
#define _GNU_SOURCE
-#include <stdio.h>
#include <string.h>
+#include <stdio.h>
+
#include "influxdb.h"
-size_t format_write_args(char *query, const char *name, const char *source, const char *unit, const char *identity, json_object *jv, uint64_t timestamp)
+static size_t format_write_args(char *query, struct series_t *serie)
{
char *ts;
- memset(query, 0, URL_MAXIMUM_LENGTH);
+ struct list *tags = serie->series_columns.tags;
+ struct list *fields = serie->series_columns.fields;
- strncat(query, name, strlen(name));
- if (source) {
- concatenate(query, source, ",");
- }
- if (unit) {
- concatenate(query, unit, ",");
+ strncat(query, serie->name, strlen(serie->name));
+ if(tags) {
+ while(tags != NULL) {
+ concatenate(query, tags->key, ",");
+ if(json_object_is_type(tags->value, json_type_string))
+ concatenate(query, json_object_get_string(tags->value), "=");
+ else
+ concatenate(query, json_object_to_json_string(tags->value), "=");
+ tags = tags->next;
+ }
}
- if (identity) {
- concatenate(query, identity, ",");
+ if(fields) {
+ int i = 0;
+ for(struct list *it = fields; it != NULL; it = it->next) {
+ if(!i)
+ concatenate(query, fields->key, " ");
+ else
+ concatenate(query, fields->key, ",");
+ if(json_object_is_type(fields->value, json_type_string))
+ concatenate(query, json_object_get_string(fields->value), "=");
+ else
+ concatenate(query, json_object_to_json_string(fields->value), "=");
+ i++;
+ }
}
- concatenate(query, "value", " ");
- concatenate(query, json_object_to_json_string(jv), "=");
- asprintf(&ts, "%lu", timestamp);
+ asprintf(&ts, "%lu", serie->timestamp);
concatenate(query, ts, " ");
+ strcat(query, "\n");
return strlen(query);
}
-CURL *make_curl_write_post(const char *url, json_object *metric)
+CURL *make_curl_write_post(const char *url, json_object *metricJ)
{
CURL *curl;
- json_object *jv = NULL;
- uint64_t timestamp = 0;
- const char *name = NULL,
- *source = NULL,
- *unit = NULL,
- *identity = NULL;
+ struct series_t *serie = NULL;
- size_t lpd = json_object_is_type(metric, json_type_array) ?
- json_object_array_length(metric) + 1 : 2;
+ size_t lpd = json_object_is_type(metricJ, json_type_array) ?
+ json_object_array_length(metricJ) + 1 : 2;
char **post_data;
post_data = malloc(lpd);
@@ -63,14 +74,14 @@ CURL *make_curl_write_post(const char *url, json_object *metric)
char write[URL_MAXIMUM_LENGTH];
bzero(write, URL_MAXIMUM_LENGTH);
- if(unpack_metric_from_binding(metric, &name, &source, &unit, &identity, &jv, &timestamp)) {
- AFB_ERROR("ERROR unpacking metric. %s", json_object_to_json_string(metric));
+ if(unpack_metric_from_api(metricJ, &serie)) {
+ AFB_ERROR("ERROR unpacking metric. %s", json_object_to_json_string(metricJ));
curl = NULL;
}
else {
for(long int i = --lpd; i >= 0; i--) {
- format_write_args(write, name, source, unit, identity, jv, timestamp);
- post_data[i] = i == lpd ? NULL : write;
+ format_write_args(write, serie);
+ post_data[i] = i == lpd-1 ? NULL : write;
}
curl = curl_wrap_prepare_post(url, NULL, 1, (const char * const*)post_data);
}
@@ -108,9 +119,9 @@ void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *r
}
}
-CURL *influxdb_write(const char* host, const char *port, json_object *metric)
+CURL *influxdb_write(const char* host, const char *port, json_object *metricJ)
{
char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
make_url(url, sizeof(url), host, port, "write");
- return make_curl_write_post(url, metric);
+ return make_curl_write_post(url, metricJ);
}
diff --git a/src/plugins/influxdb.c b/src/plugins/influxdb.c
index 8a8a112..b8a733b 100644
--- a/src/plugins/influxdb.c
+++ b/src/plugins/influxdb.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,8 +21,35 @@
#include <unistd.h>
#include <string.h>
+#include "influxdb.h"
#include "tsdb.h"
#include "wrap-json.h"
+#include "../utils/list.h"
+
+void concatenate(char* dest, const char* source, const char *sep)
+{
+ strncat(dest, sep, strlen(sep));
+ strncat(dest, source, strlen(source));
+}
+
+size_t make_url(char *url, size_t l_url, const char *host, const char *port, const char *endpoint)
+{
+ bzero(url, l_url);
+
+ /* Handle default host and port */
+ host = host ? host : DEFAULT_DBHOST;
+ port = port ? port : DEFAULT_DBPORT;
+
+ strncat(url, host, strlen(host));
+ strncat(url, ":", 1);
+ strncat(url, port, strlen(port));
+ strncat(url, "/", 1);
+ strncat(url, endpoint, strlen(endpoint));
+ strncat(url, "?db="DEFAULT_DB, strlen("?db="DEFAULT_DB));
+
+ return strlen(url);
+}
+
int create_database()
{
@@ -40,56 +67,46 @@ int create_database()
if(curl_wrap_response_code_get(request) != 200) {
AFB_ERROR("Can't create database.");
- ret = -1;
+ ret = ERROR;
}
curl_easy_cleanup(request);
if(ret == 0)
- AFB_NOTICE("Database 'agl-collector' created");
+ AFB_NOTICE("Database '"DEFAULT_DB"' created");
return ret;
}
-int unpack_metric_from_binding(json_object *m, const char **name, const char **source, const char **unit, const char **identity, json_object **jv, uint64_t *timestamp)
+void unpacking_from_api(void *s, json_object *valueJ, const char *key)
{
- if (wrap_json_unpack(m, "{ss,s?s,s?s,s?s,so,sI!}",
- "name", name,
- "source", source,
- "unit", unit,
- "identity", identity,
- "value", jv,
- "timestamp", timestamp))
- return -1;
- else if (!json_object_is_type(*jv, json_type_boolean) &&
- !json_object_is_type(*jv, json_type_double) &&
- !json_object_is_type(*jv, json_type_int) &&
- !json_object_is_type(*jv, json_type_string))
- return -1;
-
- return 0;
+ size_t key_length = strlen(key);
+ struct series_t *serie = (struct series_t*)s;
+
+ /* Treat the 2 static key that could have been specified */
+ if(strcasecmp("name", key) == 0)
+ serie->name = json_object_get_string(valueJ);
+ else if(strcasecmp("timestamp", key) == 0)
+ serie->timestamp = get_ts();
+ /* Treat all key looking for tag and field object. Those ones could be find
+ with the last 2 character. '_t' for tag and '_f' that are the keys that
+ could be indefinite. Cf influxdb documentation:
+ https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/ */
+ else if(strncasecmp(&key[key_length-2], "_t", 2) == 0)
+ add_elt(&serie->series_columns.tags, key, valueJ);
+ else if(strncasecmp(&key[key_length-2], "_f", 2) == 0)
+ add_elt(&serie->series_columns.fields, key, valueJ);
}
-void concatenate(char* dest, const char* source, const char *sep)
+int unpack_metric_from_api(json_object *m, struct series_t **serie)
{
- strncat(dest, sep, strlen(sep));
- strncat(dest, source, strlen(source));
-}
+ *serie = malloc(sizeof(struct series_t));
+ bzero(*serie, sizeof(struct series_t));
-size_t make_url(char *url, size_t l_url, const char *host, const char *port, const char *endpoint)
-{
- bzero(url, l_url);
+ wrap_json_object_for_all(m, unpacking_from_api, *serie);
- /* Handle default host and port */
- host = host ? host : DEFAULT_DBHOST;
- port = port ? port : DEFAULT_DBPORT;
+ if(!(*serie)->timestamp)
+ (*serie)->timestamp = get_ts();
- strncat(url, host, strlen(host));
- strncat(url, ":", 1);
- strncat(url, port, strlen(port));
- strncat(url, "/", 1);
- strncat(url, endpoint, strlen(endpoint));
- strncat(url, "?db="DEFAULT_DB, strlen("?db="DEFAULT_DB));
-
- return strlen(url);
+ return 0;
}
diff --git a/src/plugins/influxdb.h b/src/plugins/influxdb.h
index 6c775b7..859e101 100644
--- a/src/plugins/influxdb.h
+++ b/src/plugins/influxdb.h
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,13 +15,31 @@
* limitations under the License.
*/
+#ifndef _INFLUXDB_H_
+#define _INFLUXDB_H_
+
+#define _GNU_SOURCE
#include "wrap-json.h"
#include "tsdb.h"
+#include "../utils/list.h"
+
+struct series_columns_t {
+ struct list *tags;
+ struct list *fields;
+};
+
+struct series_t {
+ const char *name;
+ struct series_columns_t series_columns;
+ uint64_t timestamp;
+};
int create_database();
-int unpack_metric_from_binding(json_object *m, const char **name, const char **source, const char **unit, const char **identity, json_object **jv, uint64_t *timestamp);
+int unpack_metric_from_api(json_object *m, struct series_t **serie);
void concatenate(char* dest, const char* source, const char *sep);
size_t make_url(char *url, size_t l_url, const char *host, const char *port, const char *endpoint);
+
+#endif
diff --git a/src/plugins/tsdb.c b/src/plugins/tsdb.c
index e393109..7e10a74 100644
--- a/src/plugins/tsdb.c
+++ b/src/plugins/tsdb.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -28,7 +28,7 @@ int influxdb_ping()
if(curl_wrap_response_code_get(request) != 204) {
AFB_ERROR("TimeSeries DB not reachable");
- ret = -1;
+ ret = ERROR;
}
curl_easy_cleanup(request);
@@ -43,10 +43,10 @@ int db_ping()
return ret;
}
-u_int64_t get_ts()
+uint64_t get_ts()
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
- return (int64_t)(ts.tv_sec) * (int64_t)1000000000 + (int64_t)(ts.tv_nsec);
+ return (uint64_t)(ts.tv_sec) * (uint64_t)1000000000 + (uint64_t)(ts.tv_nsec);
}
diff --git a/src/plugins/tsdb.h b/src/plugins/tsdb.h
index 5c0245c..437c5dc 100644
--- a/src/plugins/tsdb.h
+++ b/src/plugins/tsdb.h
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,12 +15,17 @@
* limitations under the License.
*/
+#ifndef _TSDB_H
+#define _TSDB_H
+
#define AFB_BINDING_VERSION 2
#include <afb/afb-binding.h>
#include <json-c/json.h>
#include "curl-wrap.h"
+#define ERROR -1
+
#define DEFAULT_DB "agl-garner"
#define DEFAULT_DBHOST "localhost"
#define DEFAULT_DBPORT "8086"
@@ -35,6 +40,7 @@ enum db_available {
struct reader_args {
const char *host;
const char *port;
+ u_int32_t delay;
};
CURL *influxdb_write(const char* host, const char *port, json_object *metric);
@@ -45,3 +51,5 @@ int influxdb_reader(void *args);
int db_ping();
u_int64_t get_ts();
+
+#endif