aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRomain Forlot <romain.forlot@iot.bzh>2018-04-17 10:23:07 +0200
committerSebastien Douheret <sebastien.douheret@iot.bzh>2018-07-10 23:41:14 +0200
commit4b6d97da2d4bc1a9e328cf861e75f5ee1bb968e3 (patch)
tree5d05a7fef6ddb17ec938dd9959ed33b856cfb607
parent5d50f0426699a06b5720e10f1feaef35c8b59f57 (diff)
Fully handle array of metrics to be writen in DB
Move a function code. Change-Id: I35a90e08e7019e634676915858aa0a666d46f711 Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
-rw-r--r--src/plugins/influxdb-writer.c72
-rw-r--r--src/plugins/influxdb.c15
-rw-r--r--src/plugins/influxdb.h6
3 files changed, 49 insertions, 44 deletions
diff --git a/src/plugins/influxdb-writer.c b/src/plugins/influxdb-writer.c
index 673fbe5..a3763d5 100644
--- a/src/plugins/influxdb-writer.c
+++ b/src/plugins/influxdb-writer.c
@@ -24,8 +24,8 @@
static size_t format_write_args(char *query, struct series_t *serie)
{
char *ts;
- struct list *tags = serie->series_columns.tags;
- struct list *fields = serie->series_columns.fields;
+ struct list *tags = serie->serie_columns.tags;
+ struct list *fields = serie->serie_columns.fields;
strncat(query, serie->name, strlen(serie->name));
if(tags) {
@@ -60,36 +60,6 @@ static size_t format_write_args(char *query, struct series_t *serie)
return strlen(query);
}
-CURL *make_curl_write_post(const char *url, json_object *metricJ)
-{
- CURL *curl;
- struct series_t *serie = NULL;
-
- size_t lpd = json_object_is_type(metricJ, json_type_array) ?
- json_object_array_length(metricJ) + 1 : 2;
-
- char **post_data;
- post_data = malloc(lpd);
-
- char write[URL_MAXIMUM_LENGTH];
- bzero(write, URL_MAXIMUM_LENGTH);
-
- if(unpack_metric_from_api(metricJ, &serie)) {
- AFB_ERROR("ERROR unpacking metric. %s", json_object_to_json_string(metricJ));
- curl = NULL;
- }
- else {
- for(long int i = --lpd; i >= 0; i--) {
- format_write_args(write, serie);
- post_data[i] = i == lpd-1 ? NULL : write;
- }
- curl = curl_wrap_prepare_post(url, NULL, 1, (const char * const*)post_data);
- }
- free(post_data);
-
- return curl;
-}
-
void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
{
struct afb_req *req = (struct afb_req*)closure;
@@ -119,6 +89,44 @@ void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *r
}
}
+CURL *make_curl_write_post(const char *url, json_object *metricsJ)
+{
+ CURL *curl = NULL;
+ size_t lpd = 0, i = 0;
+ char **post_data;
+ char write[URL_MAXIMUM_LENGTH];
+ struct series_t *serie = NULL;
+
+ lpd = json_object_is_type(metricsJ, json_type_array) ?
+ json_object_array_length(metricsJ) + 1 : 2;
+
+ serie = malloc(sizeof(struct series_t));
+ post_data = malloc(lpd);
+
+ for(i = 0; i < lpd; i++) {
+ bzero(serie, sizeof(struct series_t));
+
+ if(unpack_metric_from_api(json_object_array_get_idx(metricsJ, i), serie)) {
+ AFB_ERROR("ERROR unpacking metric. %s", json_object_to_json_string(metricsJ));
+ break;
+ }
+ else {
+ bzero(write, URL_MAXIMUM_LENGTH);
+ format_write_args(write, serie);
+ post_data[i] = i == lpd - 1 ? NULL : write;
+ }
+ }
+
+ /* Check that we just do not broke the for loop before trying preparing CURL
+ request object */
+ curl = i == lpd ?
+ curl_wrap_prepare_post(url, NULL, 1, (const char * const*)post_data):NULL;
+ free(serie);
+ free(post_data);
+
+ return curl;
+}
+
CURL *influxdb_write(const char* host, const char *port, json_object *metricJ)
{
char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
diff --git a/src/plugins/influxdb.c b/src/plugins/influxdb.c
index b8a733b..c33d167 100644
--- a/src/plugins/influxdb.c
+++ b/src/plugins/influxdb.c
@@ -93,20 +93,17 @@ void unpacking_from_api(void *s, json_object *valueJ, const char *key)
could be indefinite. Cf influxdb documentation:
https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/ */
else if(strncasecmp(&key[key_length-2], "_t", 2) == 0)
- add_elt(&serie->series_columns.tags, key, valueJ);
+ add_elt(&serie->serie_columns.tags, key, valueJ);
else if(strncasecmp(&key[key_length-2], "_f", 2) == 0)
- add_elt(&serie->series_columns.fields, key, valueJ);
+ add_elt(&serie->serie_columns.fields, key, valueJ);
}
-int unpack_metric_from_api(json_object *m, struct series_t **serie)
+int unpack_metric_from_api(json_object *m, struct series_t *serie)
{
- *serie = malloc(sizeof(struct series_t));
- bzero(*serie, sizeof(struct series_t));
+ wrap_json_object_for_all(m, unpacking_from_api, serie);
- wrap_json_object_for_all(m, unpacking_from_api, *serie);
-
- if(!(*serie)->timestamp)
- (*serie)->timestamp = get_ts();
+ if(!serie->timestamp)
+ serie->timestamp = get_ts();
return 0;
}
diff --git a/src/plugins/influxdb.h b/src/plugins/influxdb.h
index 859e101..ae40e77 100644
--- a/src/plugins/influxdb.h
+++ b/src/plugins/influxdb.h
@@ -23,20 +23,20 @@
#include "tsdb.h"
#include "../utils/list.h"
-struct series_columns_t {
+struct serie_columns_t {
struct list *tags;
struct list *fields;
};
struct series_t {
const char *name;
- struct series_columns_t series_columns;
+ struct serie_columns_t serie_columns;
uint64_t timestamp;
};
int create_database();
-int unpack_metric_from_api(json_object *m, struct series_t **serie);
+int unpack_metric_from_api(json_object *m, struct series_t *serie);
void concatenate(char* dest, const char* source, const char *sep);