summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRomain Forlot <romain.forlot@iot.bzh>2018-04-14 01:57:17 +0200
committerSebastien Douheret <sebastien.douheret@iot.bzh>2018-07-10 23:41:14 +0200
commit5d50f0426699a06b5720e10f1feaef35c8b59f57 (patch)
tree34641293f0eef907c89ffa1cb6d6b80b863106fc
parentbe6447ca2038c84d2e946e27b897815e95c48e34 (diff)
Improve writer/reader processing
- Handle indefinite number and kind of tags and fields for a metric - Include only once header files - Cleaning and ordering code Change-Id: I14a4f0e6e1626971bff73ce7d9ac067bda69cfc4 Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
-rw-r--r--.vscode/settings.json5
-rw-r--r--src/harvester-apidef.json6
-rw-r--r--src/harvester.c12
-rw-r--r--src/harvester.h13
-rw-r--r--src/plugins/influxdb-reader.c144
-rw-r--r--src/plugins/influxdb-writer.c71
-rw-r--r--src/plugins/influxdb.c91
-rw-r--r--src/plugins/influxdb.h22
-rw-r--r--src/plugins/tsdb.c8
-rw-r--r--src/plugins/tsdb.h10
-rw-r--r--src/utils/list.c24
-rw-r--r--src/utils/list.h4
12 files changed, 265 insertions, 145 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 844d362..389c8a8 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -13,7 +13,10 @@
"stdio.h": "c",
"ratio": "c",
"system_error": "c",
- "type_traits": "c"
+ "type_traits": "c",
+ "istream": "c",
+ "optional": "c",
+ "ostream": "c"
},
"C_Cpp.intelliSenseEngineFallback": "Disabled",
"C_Cpp.errorSquiggles": "Disabled"
diff --git a/src/harvester-apidef.json b/src/harvester-apidef.json
index c8612d3..1601f35 100644
--- a/src/harvester-apidef.json
+++ b/src/harvester-apidef.json
@@ -8,7 +8,7 @@
"x-binding-c-generator": {
"api": "harvester",
"version": 2,
- "prefix": "",
+ "prefix": "afv_",
"postfix": "",
"start": null ,
"onevent": null,
@@ -82,7 +82,7 @@
},
"x-permissions": {
"monitor": {
- "permission": "urn:AGL:permission:low-can:public:monitor"
+ "permission": "urn:AGL:permission:harvester:public:monitor"
},
"write": {
"permission": "urn:AGL:permission::platform:can:write "
@@ -113,7 +113,7 @@
}
}
},
- "/record": {
+ "/write": {
"description": "Record a metric to local TSDB and Cloud service if available",
"get": {
"x-permissions": {
diff --git a/src/harvester.c b/src/harvester.c
index b137e9c..30ba486 100644
--- a/src/harvester.c
+++ b/src/harvester.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -23,12 +23,10 @@
#include "curl-wrap.h"
#include "wrap-json.h"
-#define ERROR -1
-
CURL* (*tsdb_write)(const char* host, const char *port, json_object *metric);
void (*write_curl_cb)(void *closure, int status, CURL *curl, const char *result, size_t size);
-struct reader_args r_args = {NULL, NULL};
+struct reader_args r_args = {NULL, NULL, 1000000};
int do_write(struct afb_req req, const char* host, const char *port, json_object *metric)
{
@@ -40,7 +38,7 @@ int do_write(struct afb_req req, const char* host, const char *port, json_object
return 0;
}
-void record(struct afb_req req)
+void afv_write(struct afb_req req)
{
const char *port = NULL;
const char *host = NULL;
@@ -64,7 +62,7 @@ Malformed !");
}
}
-void auth(struct afb_req request)
+void afv_auth(struct afb_req request)
{
afb_req_session_set_LOA(request, 1);
afb_req_success(request, NULL, NULL);
@@ -94,6 +92,6 @@ int init()
return ERROR;
break;
}
-
+ /* TODO:*/
return 0;
}
diff --git a/src/harvester.h b/src/harvester.h
index 224ec71..b11fffe 100644
--- a/src/harvester.h
+++ b/src/harvester.h
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,17 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#ifndef _HARVESTER_H_
+#define _HARVESTER_H_
#define AFB_BINDING_VERSION 2
#include <afb/afb-binding.h>
-union port {
- int i_port;
- char c_port[5]; // Available ports 1-65535
-};
-
+enum metric_type {b = 0, i, d, str} type;
union metric_value {
- enum metric_type {b = 0, i, d, str} type;
int b_value;
int i_value;
double d_value;
@@ -32,3 +29,5 @@ union metric_value {
};
int init();
+
+#endif
diff --git a/src/plugins/influxdb-reader.c b/src/plugins/influxdb-reader.c
index bb56c93..3a6a2b1 100644
--- a/src/plugins/influxdb-reader.c
+++ b/src/plugins/influxdb-reader.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,72 +15,88 @@
* limitations under the License.
*/
+
+#include "influxdb.h"
+
+#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
+#include <string.h>
#include <unistd.h>
#include <systemd/sd-event.h>
-#include "influxdb.h"
+#include "../utils/list.h"
-CURL *make_curl_query_get(const char *url)
+void fill_tag_n_fields(void *c, json_object *columnJ)
{
- CURL *curl;
- char *args[5];
- char *last_ts[30];
+ int length = json_object_get_string_len(columnJ);
+ const char *column = json_object_get_string(columnJ);
+ struct series_columns_t *s_col = (struct series_columns_t *)c;
- int fd_last_read = afb_daemon_rootdir_open_locale("last_db_read", O_CREAT, NULL);
- if (fd_last_read < 0)
- return NULL;
-
- read(fd_last_read, last_ts, sizeof(last_ts));
-
- args[0] = "epoch";
- args[1] = "ns";
- args[2] = "q";
- args[3] = "SELECT * FROM /^.*$/";
- args[4] = NULL;
-
- curl = curl_wrap_prepare_get(url, NULL, (const char * const*)args);
-
- return curl;
+ if(strncasecmp(&column[length-1], "_t", 2) == 0) {
+ add_key(s_col->tags, column);
+ }
+ else if(strncasecmp(&column[length-1], "_f", 2) == 0) {
+ add_key(s_col->fields, column);
+ }
+ return;
}
-int unpack_metric_from_db(json_object *metric)
+void unpack_metric_from_db(void *series_processed, json_object *metricJ)
{
const char *name;
- json_object *columns = NULL, *values = NULL;
+ int length = 0;
+ struct series_columns_t col = {.tags = NULL, .fields = NULL};
+ json_object *columnsJ = NULL, *valuesJ = NULL;
- wrap_json_unpack(metric, "{ss, so, so!}",
+ if(wrap_json_unpack(metricJ, "{ss, so, so!}",
"name", &name,
- "columns", &columns,
- "values", &values);
+ "columns", &columnsJ,
+ "values", &valuesJ)) {
+ AFB_ERROR("Unpacking metric goes wrong");
+ return;
+ }
+
+ length = json_object_get_string_len(columnsJ);
+ wrap_json_array_for_all(columnsJ, fill_tag_n_fields, &col);
+
+ /* Increment counter of series well processed */
+ ++*((int*)series_processed);
}
-int unpack_series(json_object *series)
+int unpack_series(json_object *seriesJ)
{
- size_t length_series = json_object_array_length(series);
+ size_t series_processed = 0;
+ wrap_json_array_for_all(seriesJ, unpack_metric_from_db, (void*)&series_processed);
+ return 0;
}
void forward_to_garner(const char *result, size_t size)
{
int id = 0;
- json_object *series = NULL,
- *db_dump = json_tokener_parse(result);
- wrap_json_unpack(db_dump, "{s[{si,so}]}",
- "id", &id,
- "series", &series);
-
- unpack_series(series);
+ json_object *resultsJ = NULL,
+ *seriesJ = NULL,
+ *db_dumpJ = json_tokener_parse(result);
+ if( wrap_json_unpack(db_dumpJ, "{so!}",
+ "results", &resultsJ) ||
+ wrap_json_unpack(resultsJ, "[{si,so!}]",
+ "statement_id", &id,
+ "series", &seriesJ)) {
+ AFB_ERROR("Unpacking results from influxdb request. Request results was:\n%s", result);
+ return;
+ }
+ if(seriesJ)
+ unpack_series(seriesJ);
}
void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
{
long rep_code = curl_wrap_response_code_get(curl);
switch(rep_code) {
- case 204:
+ case 200:
AFB_DEBUG("Read correctly done");
forward_to_garner(result, size);
break;
@@ -96,6 +112,48 @@ void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *re
}
}
+CURL *make_curl_query_get(const char *url)
+{
+ CURL *curl;
+ char *args[5];
+ char query[255] = {};
+ char last_ts[30] = {};
+ char *now;
+ int length_now;
+
+ args[0] = "epoch";
+ args[1] = "ns";
+ args[2] = "q";
+ strncat(query, "SELECT * FROM /^.*$/", strlen("SELECT * FROM /^.*$/"));
+ args[4] = NULL;
+ length_now = asprintf(&now, "%lu", get_ts());
+
+ int rootdir_fd = afb_daemon_rootdir_get_fd();
+ int fd_last_read = openat(rootdir_fd, "last_db_read", O_CREAT | O_RDWR, S_IRWXU);
+ if (fd_last_read < 0)
+ return NULL;
+
+ /* Reading last timestamp recorded and get metric from that point until now
+ else write the last timestamp */
+ if(read(fd_last_read, last_ts, sizeof(last_ts)) == 0) {
+ if(write(fd_last_read, now, length_now) != length_now)
+ AFB_ERROR("Error writing last_db_read file: %s\n", strerror( errno ));
+ }
+ else {
+ strncat(query, " WHERE time >= ", strlen(" WHERE time >= "));
+ strncat(query, last_ts, strlen(last_ts));
+ close(fd_last_read);
+ fd_last_read = openat(rootdir_fd, "last_db_read", O_TRUNC | O_RDWR);
+ if (write(fd_last_read, now, length_now) != length_now)
+ AFB_ERROR("Error writing last_db_read file: %s", strerror( errno ));
+ }
+
+ args[3] = query;
+ curl = curl_wrap_prepare_get(url, NULL, (const char * const*)args);
+
+ return curl;
+}
+
int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
{
CURL *curl;
@@ -103,7 +161,7 @@ int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
if(userdata)
a = (struct reader_args*)userdata;
else
- return -1;
+ return ERROR;
char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
@@ -111,15 +169,23 @@ int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
curl = make_curl_query_get(url);
curl_wrap_do(curl, influxdb_read_curl_cb, NULL);
+ /* Reschedule next run */
+ sd_event_source_set_time(s, usec + a->delay);
+
return 0;
}
int influxdb_reader(void *args)
{
+ int err = 0;
uint64_t usec;
- struct sd_event_source *evtSource;
+ struct sd_event_source *evtSource = NULL;
/* Set a cyclic cb call each 1s to call the read callback */
sd_event_now(afb_daemon_get_event_loop(), CLOCK_MONOTONIC, &usec);
- return sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args);
+ err = sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args);
+ if(!err)
+ err = sd_event_source_set_enabled(evtSource, SD_EVENT_ON);
+
+ return err;
}
diff --git a/src/plugins/influxdb-writer.c b/src/plugins/influxdb-writer.c
index 3c46b57..673fbe5 100644
--- a/src/plugins/influxdb-writer.c
+++ b/src/plugins/influxdb-writer.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,46 +16,57 @@
*/
#define _GNU_SOURCE
-#include <stdio.h>
#include <string.h>
+#include <stdio.h>
+
#include "influxdb.h"
-size_t format_write_args(char *query, const char *name, const char *source, const char *unit, const char *identity, json_object *jv, uint64_t timestamp)
+static size_t format_write_args(char *query, struct series_t *serie)
{
char *ts;
- memset(query, 0, URL_MAXIMUM_LENGTH);
+ struct list *tags = serie->series_columns.tags;
+ struct list *fields = serie->series_columns.fields;
- strncat(query, name, strlen(name));
- if (source) {
- concatenate(query, source, ",");
- }
- if (unit) {
- concatenate(query, unit, ",");
+ strncat(query, serie->name, strlen(serie->name));
+ if(tags) {
+ while(tags != NULL) {
+ concatenate(query, tags->key, ",");
+ if(json_object_is_type(tags->value, json_type_string))
+ concatenate(query, json_object_get_string(tags->value), "=");
+ else
+ concatenate(query, json_object_to_json_string(tags->value), "=");
+ tags = tags->next;
+ }
}
- if (identity) {
- concatenate(query, identity, ",");
+ if(fields) {
+ int i = 0;
+ for(struct list *it = fields; it != NULL; it = it->next) {
+ if(!i)
+ concatenate(query, fields->key, " ");
+ else
+ concatenate(query, fields->key, ",");
+ if(json_object_is_type(fields->value, json_type_string))
+ concatenate(query, json_object_get_string(fields->value), "=");
+ else
+ concatenate(query, json_object_to_json_string(fields->value), "=");
+ i++;
+ }
}
- concatenate(query, "value", " ");
- concatenate(query, json_object_to_json_string(jv), "=");
- asprintf(&ts, "%lu", timestamp);
+ asprintf(&ts, "%lu", serie->timestamp);
concatenate(query, ts, " ");
+ strcat(query, "\n");
return strlen(query);
}
-CURL *make_curl_write_post(const char *url, json_object *metric)
+CURL *make_curl_write_post(const char *url, json_object *metricJ)
{
CURL *curl;
- json_object *jv = NULL;
- uint64_t timestamp = 0;
- const char *name = NULL,
- *source = NULL,
- *unit = NULL,
- *identity = NULL;
+ struct series_t *serie = NULL;
- size_t lpd = json_object_is_type(metric, json_type_array) ?
- json_object_array_length(metric) + 1 : 2;
+ size_t lpd = json_object_is_type(metricJ, json_type_array) ?
+ json_object_array_length(metricJ) + 1 : 2;
char **post_data;
post_data = malloc(lpd);
@@ -63,14 +74,14 @@ CURL *make_curl_write_post(const char *url, json_object *metric)
char write[URL_MAXIMUM_LENGTH];
bzero(write, URL_MAXIMUM_LENGTH);
- if(unpack_metric_from_binding(metric, &name, &source, &unit, &identity, &jv, &timestamp)) {
- AFB_ERROR("ERROR unpacking metric. %s", json_object_to_json_string(metric));
+ if(unpack_metric_from_api(metricJ, &serie)) {
+ AFB_ERROR("ERROR unpacking metric. %s", json_object_to_json_string(metricJ));
curl = NULL;
}
else {
for(long int i = --lpd; i >= 0; i--) {
- format_write_args(write, name, source, unit, identity, jv, timestamp);
- post_data[i] = i == lpd ? NULL : write;
+ format_write_args(write, serie);
+ post_data[i] = i == lpd-1 ? NULL : write;
}
curl = curl_wrap_prepare_post(url, NULL, 1, (const char * const*)post_data);
}
@@ -108,9 +119,9 @@ void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *r
}
}
-CURL *influxdb_write(const char* host, const char *port, json_object *metric)
+CURL *influxdb_write(const char* host, const char *port, json_object *metricJ)
{
char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
make_url(url, sizeof(url), host, port, "write");
- return make_curl_write_post(url, metric);
+ return make_curl_write_post(url, metricJ);
}
diff --git a/src/plugins/influxdb.c b/src/plugins/influxdb.c
index 8a8a112..b8a733b 100644
--- a/src/plugins/influxdb.c
+++ b/src/plugins/influxdb.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,8 +21,35 @@
#include <unistd.h>
#include <string.h>
+#include "influxdb.h"
#include "tsdb.h"
#include "wrap-json.h"
+#include "../utils/list.h"
+
+void concatenate(char* dest, const char* source, const char *sep)
+{
+ strncat(dest, sep, strlen(sep));
+ strncat(dest, source, strlen(source));
+}
+
+size_t make_url(char *url, size_t l_url, const char *host, const char *port, const char *endpoint)
+{
+ bzero(url, l_url);
+
+ /* Handle default host and port */
+ host = host ? host : DEFAULT_DBHOST;
+ port = port ? port : DEFAULT_DBPORT;
+
+ strncat(url, host, strlen(host));
+ strncat(url, ":", 1);
+ strncat(url, port, strlen(port));
+ strncat(url, "/", 1);
+ strncat(url, endpoint, strlen(endpoint));
+ strncat(url, "?db="DEFAULT_DB, strlen("?db="DEFAULT_DB));
+
+ return strlen(url);
+}
+
int create_database()
{
@@ -40,56 +67,46 @@ int create_database()
if(curl_wrap_response_code_get(request) != 200) {
AFB_ERROR("Can't create database.");
- ret = -1;
+ ret = ERROR;
}
curl_easy_cleanup(request);
if(ret == 0)
- AFB_NOTICE("Database 'agl-collector' created");
+ AFB_NOTICE("Database '"DEFAULT_DB"' created");
return ret;
}
-int unpack_metric_from_binding(json_object *m, const char **name, const char **source, const char **unit, const char **identity, json_object **jv, uint64_t *timestamp)
+void unpacking_from_api(void *s, json_object *valueJ, const char *key)
{
- if (wrap_json_unpack(m, "{ss,s?s,s?s,s?s,so,sI!}",
- "name", name,
- "source", source,
- "unit", unit,
- "identity", identity,
- "value", jv,
- "timestamp", timestamp))
- return -1;
- else if (!json_object_is_type(*jv, json_type_boolean) &&
- !json_object_is_type(*jv, json_type_double) &&
- !json_object_is_type(*jv, json_type_int) &&
- !json_object_is_type(*jv, json_type_string))
- return -1;
-
- return 0;
+ size_t key_length = strlen(key);
+ struct series_t *serie = (struct series_t*)s;
+
+ /* Treat the 2 static key that could have been specified */
+ if(strcasecmp("name", key) == 0)
+ serie->name = json_object_get_string(valueJ);
+ else if(strcasecmp("timestamp", key) == 0)
+ serie->timestamp = get_ts();
+ /* Treat all key looking for tag and field object. Those ones could be find
+ with the last 2 character. '_t' for tag and '_f' that are the keys that
+ could be indefinite. Cf influxdb documentation:
+ https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/ */
+ else if(strncasecmp(&key[key_length-2], "_t", 2) == 0)
+ add_elt(&serie->series_columns.tags, key, valueJ);
+ else if(strncasecmp(&key[key_length-2], "_f", 2) == 0)
+ add_elt(&serie->series_columns.fields, key, valueJ);
}
-void concatenate(char* dest, const char* source, const char *sep)
+int unpack_metric_from_api(json_object *m, struct series_t **serie)
{
- strncat(dest, sep, strlen(sep));
- strncat(dest, source, strlen(source));
-}
+ *serie = malloc(sizeof(struct series_t));
+ bzero(*serie, sizeof(struct series_t));
-size_t make_url(char *url, size_t l_url, const char *host, const char *port, const char *endpoint)
-{
- bzero(url, l_url);
+ wrap_json_object_for_all(m, unpacking_from_api, *serie);
- /* Handle default host and port */
- host = host ? host : DEFAULT_DBHOST;
- port = port ? port : DEFAULT_DBPORT;
+ if(!(*serie)->timestamp)
+ (*serie)->timestamp = get_ts();
- strncat(url, host, strlen(host));
- strncat(url, ":", 1);
- strncat(url, port, strlen(port));
- strncat(url, "/", 1);
- strncat(url, endpoint, strlen(endpoint));
- strncat(url, "?db="DEFAULT_DB, strlen("?db="DEFAULT_DB));
-
- return strlen(url);
+ return 0;
}
diff --git a/src/plugins/influxdb.h b/src/plugins/influxdb.h
index 6c775b7..859e101 100644
--- a/src/plugins/influxdb.h
+++ b/src/plugins/influxdb.h
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,13 +15,31 @@
* limitations under the License.
*/
+#ifndef _INFLUXDB_H_
+#define _INFLUXDB_H_
+
+#define _GNU_SOURCE
#include "wrap-json.h"
#include "tsdb.h"
+#include "../utils/list.h"
+
+struct series_columns_t {
+ struct list *tags;
+ struct list *fields;
+};
+
+struct series_t {
+ const char *name;
+ struct series_columns_t series_columns;
+ uint64_t timestamp;
+};
int create_database();
-int unpack_metric_from_binding(json_object *m, const char **name, const char **source, const char **unit, const char **identity, json_object **jv, uint64_t *timestamp);
+int unpack_metric_from_api(json_object *m, struct series_t **serie);
void concatenate(char* dest, const char* source, const char *sep);
size_t make_url(char *url, size_t l_url, const char *host, const char *port, const char *endpoint);
+
+#endif
diff --git a/src/plugins/tsdb.c b/src/plugins/tsdb.c
index e393109..7e10a74 100644
--- a/src/plugins/tsdb.c
+++ b/src/plugins/tsdb.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -28,7 +28,7 @@ int influxdb_ping()
if(curl_wrap_response_code_get(request) != 204) {
AFB_ERROR("TimeSeries DB not reachable");
- ret = -1;
+ ret = ERROR;
}
curl_easy_cleanup(request);
@@ -43,10 +43,10 @@ int db_ping()
return ret;
}
-u_int64_t get_ts()
+uint64_t get_ts()
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
- return (int64_t)(ts.tv_sec) * (int64_t)1000000000 + (int64_t)(ts.tv_nsec);
+ return (uint64_t)(ts.tv_sec) * (uint64_t)1000000000 + (uint64_t)(ts.tv_nsec);
}
diff --git a/src/plugins/tsdb.h b/src/plugins/tsdb.h
index 5c0245c..437c5dc 100644
--- a/src/plugins/tsdb.h
+++ b/src/plugins/tsdb.h
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015, 2016, 2017, 2018 "IoT.bzh"
+ * Copyright (C) 2018 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,12 +15,17 @@
* limitations under the License.
*/
+#ifndef _TSDB_H
+#define _TSDB_H
+
#define AFB_BINDING_VERSION 2
#include <afb/afb-binding.h>
#include <json-c/json.h>
#include "curl-wrap.h"
+#define ERROR -1
+
#define DEFAULT_DB "agl-garner"
#define DEFAULT_DBHOST "localhost"
#define DEFAULT_DBPORT "8086"
@@ -35,6 +40,7 @@ enum db_available {
struct reader_args {
const char *host;
const char *port;
+ u_int32_t delay;
};
CURL *influxdb_write(const char* host, const char *port, json_object *metric);
@@ -45,3 +51,5 @@ int influxdb_reader(void *args);
int db_ping();
u_int64_t get_ts();
+
+#endif
diff --git a/src/utils/list.c b/src/utils/list.c
index da97e42..bd7eeac 100644
--- a/src/utils/list.c
+++ b/src/utils/list.c
@@ -28,39 +28,39 @@ void destroy_list(struct list *l)
}
}
-void add_elt(struct list *l, const char *key, json_object *value)
+void add_elt(struct list **l, const char *key, json_object *value)
{
struct list *new_elt = malloc(sizeof(struct list));
new_elt->key = key;
new_elt->value = value;
new_elt->next = NULL;
- if(l) {
- while(l->next != NULL) {
- l = l->next;
+ if(*l) {
+ while((*l)->next != NULL) {
+ *l = (*l)->next;
}
- l->next = new_elt;
+ (*l)->next = new_elt;
}
else {
- l = new_elt;
+ *l = new_elt;
}
}
-void add_key(struct list *l, const char *key)
+void add_key(struct list **l, const char *key)
{
struct list *new_elt = malloc(sizeof(struct list));
new_elt->key = key;
new_elt->value = NULL;
new_elt->next = NULL;
- if(l) {
- while(l->next != NULL) {
- l = l->next;
+ if(*l) {
+ while((*l)->next != NULL) {
+ *l = (*l)->next;
}
- l->next = new_elt;
+ (*l)->next = new_elt;
}
else {
- l = new_elt;
+ *l = new_elt;
}
}
diff --git a/src/utils/list.h b/src/utils/list.h
index dcb01bd..7cacbed 100644
--- a/src/utils/list.h
+++ b/src/utils/list.h
@@ -27,8 +27,8 @@ struct list {
};
void destroy_list(struct list *l);
-void add_elt(struct list *l, const char *key, json_object *value);
-void add_key(struct list *l, const char *key);
+void add_elt(struct list **l, const char *key, json_object *value);
+void add_key(struct list **l, const char *key);
struct list *get_elt(struct list *l, int index);
struct list *find_elt_from_key(struct list *l, const char *key);
json_object *find_key_value(struct list *l, const char *key);