diff options
-rw-r--r-- | src/plugins/influxdb-writer.c | 79 | ||||
-rw-r--r-- | src/plugins/influxdb.c | 9 | ||||
-rw-r--r-- | src/plugins/influxdb.h | 6 | ||||
-rw-r--r-- | src/plugins/tsdb.h | 20 |
4 files changed, 61 insertions, 53 deletions
diff --git a/src/plugins/influxdb-writer.c b/src/plugins/influxdb-writer.c index c4929f8..a560cb7 100644 --- a/src/plugins/influxdb-writer.c +++ b/src/plugins/influxdb-writer.c @@ -16,16 +16,16 @@ */ #define _GNU_SOURCE -#include <string.h> #include <stdio.h> +#include <string.h> #include "influxdb.h" -void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size) +void influxdb_write_curl_cb(void* closure, int status, CURL* curl, const char* result, size_t size) { afb_req_t request = (afb_req_t)closure; long rep_code = curl_wrap_response_code_get(curl); - switch(rep_code) { + switch (rep_code) { case 204: AFB_REQ_DEBUG(request, "Request correctly written"); afb_req_success(request, NULL, "Request has been successfully written"); @@ -38,14 +38,14 @@ void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *r break; case 404: afb_req_fail(request, "Not found", result); - AFB_REQ_NOTICE(request, "Attempt to create the DB '"DEFAULT_DB"'"); + AFB_REQ_NOTICE(request, "Attempt to create the DB '" DEFAULT_DB "'"); create_database(request); break; case 500: afb_req_fail_f(request, "Timeout", "Overloaded server: %s", result); break; default: - afb_req_fail(request, "Failure", "Unexpected behavior."); + afb_req_fail_f(request, "Failure", "Unexpected behavior (code %ld).", rep_code); break; } } @@ -57,8 +57,8 @@ void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *r static void serialize_list_to_query(char *query, struct list *node, const char *list_sep, const char *item_sep) { bool first = true; - while(node != NULL) { - if(first) { + while (node != NULL) { + if (first) { concatenate(query, node->key, list_sep); first = false; } else { @@ -73,10 +73,9 @@ static void serialize_list_to_query(char *query, struct list *node, const char * } } - -static size_t format_write_args(char *query, struct series_t *serie) +static size_t format_write_args(char* query, struct series_t* serie) { - char *ts; + char* ts; strncat(query, serie->name, strlen(serie->name)); @@ -91,20 +90,20 @@ static size_t format_write_args(char *query, struct series_t *serie) return -1; } -CURL *make_curl_write_post(afb_api_t apiHandle, const char *url, json_object *metricsJ) +CURL* make_curl_write_post(afb_api_t apiHandle, const char* url, json_object* metricsJ) { - CURL *curl = NULL; + CURL* curl = NULL; size_t lpd = 0, len_write = 0, i = 0; - char **post_data; - char write[URL_MAXIMUM_LENGTH] = ""; - struct series_t *serie = NULL; - json_object *metricsArrayJ = NULL; + char** post_data; + char write[URL_MAXIMUM_LENGTH] = ""; + struct series_t* serie = NULL; + json_object* metricsArrayJ = NULL; + - if(json_object_is_type(metricsJ, json_type_array)) { + if (json_object_is_type(metricsJ, json_type_array)) { lpd = json_object_array_length(metricsJ); metricsArrayJ = metricsJ; - } - else { + } else { metricsArrayJ = json_object_new_array(); json_object_array_add(metricsArrayJ, metricsJ); lpd = 1; @@ -113,20 +112,18 @@ CURL *make_curl_write_post(afb_api_t apiHandle, const char *url, json_object *me serie = malloc(sizeof(struct series_t)); post_data = calloc(lpd + 1, sizeof(void*)); - for(i = 0; i < lpd; i++) { + for (i = 0; i < lpd; i++) { memset(serie, 0, sizeof(struct series_t)); - if(unpack_metric_from_api(json_object_array_get_idx(metricsArrayJ, i), serie)) { + if (unpack_metric_from_api(json_object_array_get_idx(metricsArrayJ, i), serie)) { AFB_API_ERROR(apiHandle, "ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ)); break; - } - else { - if(! serie->name) { + } else { + if (!serie->name) { post_data[i] = NULL; - } - else { + } else { len_write = format_write_args(write, serie); - if(len_write > 0) { + if (len_write > 0) { post_data[i] = malloc(len_write + 1); strcpy(post_data[i], write); memset(write, 0, len_write); @@ -135,19 +132,23 @@ CURL *make_curl_write_post(afb_api_t apiHandle, const char *url, json_object *me } } + AFB_API_DEBUG(apiHandle, "curl POST '%s' '%s' ", url, *post_data); + /* Check that we just do not broke the for loop before trying preparing CURL request object */ - curl = i == lpd ? - curl_wrap_prepare_post_unescaped(url, NULL, "\n", (const char * const*)post_data) : NULL; + curl = i == lpd ? curl_wrap_prepare_post_unescaped(url, NULL, "\n", (const char* const*)post_data) : NULL; free(serie); - for(i = 0; i < lpd; i++) - if (post_data[i]) free(post_data[i]); + for (i = 0; i < lpd; i++) { + if (post_data[i]) { + free(post_data[i]); + } + } free(post_data); return curl; } -CURL *influxdb_write(afb_api_t apiHandle, const char* host, const char *port, json_object *metricJ) +CURL* influxdb_write(afb_api_t apiHandle, const char* host, const char* port, json_object* metricJ) { char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */ make_url(url, sizeof(url), host, port, "write"); @@ -157,25 +158,25 @@ CURL *influxdb_write(afb_api_t apiHandle, const char* host, const char *port, js CTLP_CAPI(write_to_influxdb, source, argsJ, eventJ) { afb_req_t request = source->request; - const char *port = NULL; - const char *host = NULL; - CURL *curl_request; + const char* port = NULL; + const char* host = NULL; + CURL* curl_request; int rc = -1; json_object *req_args = afb_req_json(request), *portJ = NULL, *metric = NULL; - if(wrap_json_unpack(req_args, "{s?s,s?o,so!}", + if (wrap_json_unpack(req_args, "{s?s,s?o,so!}", "host", &host, "port", &portJ, - "metric", &metric) || ! metric) { + "metric", &metric) + || !metric) { afb_req_fail(request, "Failed", "Error processing arguments. Miss metric\ JSON object or malformed"); rc = -1; } else { - port = json_object_is_type(portJ, json_type_null) ? - NULL : json_object_to_json_string(portJ); + port = json_object_is_type(portJ, json_type_null) ? NULL : json_object_to_json_string(portJ); curl_request = influxdb_write(source->api, host, port, metric); curl_wrap_do(curl_request, influxdb_write_curl_cb, request); rc = 0; diff --git a/src/plugins/influxdb.c b/src/plugins/influxdb.c index b8f85d2..f2f8fe1 100644 --- a/src/plugins/influxdb.c +++ b/src/plugins/influxdb.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 "IoT.bzh" + * Copyright (C) 2018-2019 "IoT.bzh" * Author "Romain Forlot" <romain.forlot@iot.bzh> * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -133,18 +133,25 @@ void unpacking_from_api(void* s, json_object* valueJ, const char* key) /* Treat the 2 static key that could have been specified */ if (strcasecmp("name", key) == 0) serie->name = json_object_get_string(valueJ); + else if (strcasecmp("timestamp", key) == 0) serie->timestamp = (json_object_is_type(valueJ, json_type_int)) ? json_object_get_int64(valueJ) : 0; + + // metadata (AKA tags) are indexed into influxdb else if (strcasecmp("metadata", key) == 0) wrap_json_object_for_all(valueJ, unpack_metadata, (void*)&serie->serie_columns.tags); + + // value (AKA fields) are NOT indexed into influxdb else if (strcasecmp("value", key) == 0 || strcasecmp("values", key) == 0) wrap_json_object_for_all(valueJ, unpack_values, (void*)&serie->serie_columns.fields); + /* Treat all key looking for tag and field object. Those ones could be find with the last 2 character. '_t' for tag and '_f' that are the keys that could be indefinite. Cf influxdb documentation: https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/ */ else if (strncasecmp(&key[key_length - 2], "_t", 2) == 0) add_elt(&serie->serie_columns.tags, key, "", valueJ); + else if (strncasecmp(&key[key_length - 2], "_f", 2) == 0) add_elt(&serie->serie_columns.fields, key, "", valueJ); } diff --git a/src/plugins/influxdb.h b/src/plugins/influxdb.h index 3c60e27..781ab29 100644 --- a/src/plugins/influxdb.h +++ b/src/plugins/influxdb.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 "IoT.bzh" + * Copyright (C) 2018-2019 "IoT.bzh" * Author "Romain Forlot" <romain.forlot@iot.bzh> * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -19,8 +19,8 @@ #define _INFLUXDB_H_ #define _GNU_SOURCE -#include <string.h> #include <stdbool.h> +#include <string.h> #include "../utils/list.h" #include "ctl-plugin.h" @@ -113,7 +113,7 @@ static inline void concatenate(char* dest, const char* source, const char* sep) static inline void concatenate_str(char* dest, const char* source, const char* sep) { - char* esc_source; + char* esc_source; if (sep) strncat(dest, sep, strlen(sep)); diff --git a/src/plugins/tsdb.h b/src/plugins/tsdb.h index 8469ac0..ac5a103 100644 --- a/src/plugins/tsdb.h +++ b/src/plugins/tsdb.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 "IoT.bzh" + * Copyright (C) 2018-2019 "IoT.bzh" * Author "Romain Forlot" <romain.forlot@iot.bzh> * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,27 +18,27 @@ #ifndef _TSDB_H #define _TSDB_H -#include <json-c/json.h> #include "curl-wrap.h" +#include <json-c/json.h> #define ERROR -1 #define DEFAULT_DB "agl-garner" #define DEFAULT_DBHOST "localhost" #define DEFAULT_DBPORT "8086" -#define URL_MAXIMUM_LENGTH 2047 +#define URL_MAXIMUM_LENGTH 4096 enum db_available { - NODB = 0, - INFLUX = 1, - GRAPHITE = 2, - OPENTSDB = 4 + NODB = 0, + INFLUX = 1, + GRAPHITE = 2, + OPENTSDB = 4 }; struct reader_args { - const char *host; - const char *port; - u_int32_t delay; + const char* host; + const char* port; + u_int32_t delay; }; u_int64_t get_ts(); |