summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/plugins/influxdb-writer.c79
-rw-r--r--src/plugins/influxdb.c9
-rw-r--r--src/plugins/influxdb.h6
-rw-r--r--src/plugins/tsdb.h20
4 files changed, 61 insertions, 53 deletions
diff --git a/src/plugins/influxdb-writer.c b/src/plugins/influxdb-writer.c
index c4929f8..a560cb7 100644
--- a/src/plugins/influxdb-writer.c
+++ b/src/plugins/influxdb-writer.c
@@ -16,16 +16,16 @@
*/
#define _GNU_SOURCE
-#include <string.h>
#include <stdio.h>
+#include <string.h>
#include "influxdb.h"
-void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
+void influxdb_write_curl_cb(void* closure, int status, CURL* curl, const char* result, size_t size)
{
afb_req_t request = (afb_req_t)closure;
long rep_code = curl_wrap_response_code_get(curl);
- switch(rep_code) {
+ switch (rep_code) {
case 204:
AFB_REQ_DEBUG(request, "Request correctly written");
afb_req_success(request, NULL, "Request has been successfully written");
@@ -38,14 +38,14 @@ void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *r
break;
case 404:
afb_req_fail(request, "Not found", result);
- AFB_REQ_NOTICE(request, "Attempt to create the DB '"DEFAULT_DB"'");
+ AFB_REQ_NOTICE(request, "Attempt to create the DB '" DEFAULT_DB "'");
create_database(request);
break;
case 500:
afb_req_fail_f(request, "Timeout", "Overloaded server: %s", result);
break;
default:
- afb_req_fail(request, "Failure", "Unexpected behavior.");
+ afb_req_fail_f(request, "Failure", "Unexpected behavior (code %ld).", rep_code);
break;
}
}
@@ -57,8 +57,8 @@ void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *r
static void serialize_list_to_query(char *query, struct list *node, const char *list_sep, const char *item_sep) {
bool first = true;
- while(node != NULL) {
- if(first) {
+ while (node != NULL) {
+ if (first) {
concatenate(query, node->key, list_sep);
first = false;
} else {
@@ -73,10 +73,9 @@ static void serialize_list_to_query(char *query, struct list *node, const char *
}
}
-
-static size_t format_write_args(char *query, struct series_t *serie)
+static size_t format_write_args(char* query, struct series_t* serie)
{
- char *ts;
+ char* ts;
strncat(query, serie->name, strlen(serie->name));
@@ -91,20 +90,20 @@ static size_t format_write_args(char *query, struct series_t *serie)
return -1;
}
-CURL *make_curl_write_post(afb_api_t apiHandle, const char *url, json_object *metricsJ)
+CURL* make_curl_write_post(afb_api_t apiHandle, const char* url, json_object* metricsJ)
{
- CURL *curl = NULL;
+ CURL* curl = NULL;
size_t lpd = 0, len_write = 0, i = 0;
- char **post_data;
- char write[URL_MAXIMUM_LENGTH] = "";
- struct series_t *serie = NULL;
- json_object *metricsArrayJ = NULL;
+ char** post_data;
+ char write[URL_MAXIMUM_LENGTH] = "";
+ struct series_t* serie = NULL;
+ json_object* metricsArrayJ = NULL;
+
- if(json_object_is_type(metricsJ, json_type_array)) {
+ if (json_object_is_type(metricsJ, json_type_array)) {
lpd = json_object_array_length(metricsJ);
metricsArrayJ = metricsJ;
- }
- else {
+ } else {
metricsArrayJ = json_object_new_array();
json_object_array_add(metricsArrayJ, metricsJ);
lpd = 1;
@@ -113,20 +112,18 @@ CURL *make_curl_write_post(afb_api_t apiHandle, const char *url, json_object *me
serie = malloc(sizeof(struct series_t));
post_data = calloc(lpd + 1, sizeof(void*));
- for(i = 0; i < lpd; i++) {
+ for (i = 0; i < lpd; i++) {
memset(serie, 0, sizeof(struct series_t));
- if(unpack_metric_from_api(json_object_array_get_idx(metricsArrayJ, i), serie)) {
+ if (unpack_metric_from_api(json_object_array_get_idx(metricsArrayJ, i), serie)) {
AFB_API_ERROR(apiHandle, "ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ));
break;
- }
- else {
- if(! serie->name) {
+ } else {
+ if (!serie->name) {
post_data[i] = NULL;
- }
- else {
+ } else {
len_write = format_write_args(write, serie);
- if(len_write > 0) {
+ if (len_write > 0) {
post_data[i] = malloc(len_write + 1);
strcpy(post_data[i], write);
memset(write, 0, len_write);
@@ -135,19 +132,23 @@ CURL *make_curl_write_post(afb_api_t apiHandle, const char *url, json_object *me
}
}
+ AFB_API_DEBUG(apiHandle, "curl POST '%s' '%s' ", url, *post_data);
+
/* Check that we just do not broke the for loop before trying preparing CURL
request object */
- curl = i == lpd ?
- curl_wrap_prepare_post_unescaped(url, NULL, "\n", (const char * const*)post_data) : NULL;
+ curl = i == lpd ? curl_wrap_prepare_post_unescaped(url, NULL, "\n", (const char* const*)post_data) : NULL;
free(serie);
- for(i = 0; i < lpd; i++)
- if (post_data[i]) free(post_data[i]);
+ for (i = 0; i < lpd; i++) {
+ if (post_data[i]) {
+ free(post_data[i]);
+ }
+ }
free(post_data);
return curl;
}
-CURL *influxdb_write(afb_api_t apiHandle, const char* host, const char *port, json_object *metricJ)
+CURL* influxdb_write(afb_api_t apiHandle, const char* host, const char* port, json_object* metricJ)
{
char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
make_url(url, sizeof(url), host, port, "write");
@@ -157,25 +158,25 @@ CURL *influxdb_write(afb_api_t apiHandle, const char* host, const char *port, js
CTLP_CAPI(write_to_influxdb, source, argsJ, eventJ)
{
afb_req_t request = source->request;
- const char *port = NULL;
- const char *host = NULL;
- CURL *curl_request;
+ const char* port = NULL;
+ const char* host = NULL;
+ CURL* curl_request;
int rc = -1;
json_object *req_args = afb_req_json(request),
*portJ = NULL,
*metric = NULL;
- if(wrap_json_unpack(req_args, "{s?s,s?o,so!}",
+ if (wrap_json_unpack(req_args, "{s?s,s?o,so!}",
"host", &host,
"port", &portJ,
- "metric", &metric) || ! metric) {
+ "metric", &metric)
+ || !metric) {
afb_req_fail(request, "Failed", "Error processing arguments. Miss metric\
JSON object or malformed");
rc = -1;
} else {
- port = json_object_is_type(portJ, json_type_null) ?
- NULL : json_object_to_json_string(portJ);
+ port = json_object_is_type(portJ, json_type_null) ? NULL : json_object_to_json_string(portJ);
curl_request = influxdb_write(source->api, host, port, metric);
curl_wrap_do(curl_request, influxdb_write_curl_cb, request);
rc = 0;
diff --git a/src/plugins/influxdb.c b/src/plugins/influxdb.c
index b8f85d2..f2f8fe1 100644
--- a/src/plugins/influxdb.c
+++ b/src/plugins/influxdb.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2018 "IoT.bzh"
+ * Copyright (C) 2018-2019 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -133,18 +133,25 @@ void unpacking_from_api(void* s, json_object* valueJ, const char* key)
/* Treat the 2 static key that could have been specified */
if (strcasecmp("name", key) == 0)
serie->name = json_object_get_string(valueJ);
+
else if (strcasecmp("timestamp", key) == 0)
serie->timestamp = (json_object_is_type(valueJ, json_type_int)) ? json_object_get_int64(valueJ) : 0;
+
+ // metadata (AKA tags) are indexed into influxdb
else if (strcasecmp("metadata", key) == 0)
wrap_json_object_for_all(valueJ, unpack_metadata, (void*)&serie->serie_columns.tags);
+
+ // value (AKA fields) are NOT indexed into influxdb
else if (strcasecmp("value", key) == 0 || strcasecmp("values", key) == 0)
wrap_json_object_for_all(valueJ, unpack_values, (void*)&serie->serie_columns.fields);
+
/* Treat all key looking for tag and field object. Those ones could be find
with the last 2 character. '_t' for tag and '_f' that are the keys that
could be indefinite. Cf influxdb documentation:
https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/ */
else if (strncasecmp(&key[key_length - 2], "_t", 2) == 0)
add_elt(&serie->serie_columns.tags, key, "", valueJ);
+
else if (strncasecmp(&key[key_length - 2], "_f", 2) == 0)
add_elt(&serie->serie_columns.fields, key, "", valueJ);
}
diff --git a/src/plugins/influxdb.h b/src/plugins/influxdb.h
index 3c60e27..781ab29 100644
--- a/src/plugins/influxdb.h
+++ b/src/plugins/influxdb.h
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2018 "IoT.bzh"
+ * Copyright (C) 2018-2019 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -19,8 +19,8 @@
#define _INFLUXDB_H_
#define _GNU_SOURCE
-#include <string.h>
#include <stdbool.h>
+#include <string.h>
#include "../utils/list.h"
#include "ctl-plugin.h"
@@ -113,7 +113,7 @@ static inline void concatenate(char* dest, const char* source, const char* sep)
static inline void concatenate_str(char* dest, const char* source, const char* sep)
{
- char* esc_source;
+ char* esc_source;
if (sep)
strncat(dest, sep, strlen(sep));
diff --git a/src/plugins/tsdb.h b/src/plugins/tsdb.h
index 8469ac0..ac5a103 100644
--- a/src/plugins/tsdb.h
+++ b/src/plugins/tsdb.h
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2018 "IoT.bzh"
+ * Copyright (C) 2018-2019 "IoT.bzh"
* Author "Romain Forlot" <romain.forlot@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,27 +18,27 @@
#ifndef _TSDB_H
#define _TSDB_H
-#include <json-c/json.h>
#include "curl-wrap.h"
+#include <json-c/json.h>
#define ERROR -1
#define DEFAULT_DB "agl-garner"
#define DEFAULT_DBHOST "localhost"
#define DEFAULT_DBPORT "8086"
-#define URL_MAXIMUM_LENGTH 2047
+#define URL_MAXIMUM_LENGTH 4096
enum db_available {
- NODB = 0,
- INFLUX = 1,
- GRAPHITE = 2,
- OPENTSDB = 4
+ NODB = 0,
+ INFLUX = 1,
+ GRAPHITE = 2,
+ OPENTSDB = 4
};
struct reader_args {
- const char *host;
- const char *port;
- u_int32_t delay;
+ const char* host;
+ const char* port;
+ u_int32_t delay;
};
u_int64_t get_ts();