summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/plugins/influxdb-writer.c274
-rw-r--r--src/plugins/influxdb.c191
2 files changed, 230 insertions, 235 deletions
diff --git a/src/plugins/influxdb-writer.c b/src/plugins/influxdb-writer.c
index be02e34..dca5dab 100644
--- a/src/plugins/influxdb-writer.c
+++ b/src/plugins/influxdb-writer.c
@@ -16,166 +16,164 @@
*/
#define _GNU_SOURCE
-#include <string.h>
#include <stdio.h>
+#include <string.h>
#include "influxdb.h"
-void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
+void influxdb_write_curl_cb(void* closure, int status, CURL* curl, const char* result, size_t size)
{
- AFB_ReqT request = (AFB_ReqT)closure;
- long rep_code = curl_wrap_response_code_get(curl);
- switch(rep_code) {
- case 204:
- AFB_ReqDebug(request, "Request correctly written");
- AFB_ReqSucess(request, NULL, "Request has been successfully written");
- break;
- case 400:
- AFB_ReqFail(request, "Bad request", result);
- break;
- case 401:
- AFB_ReqFail(request, "Unauthorized access", result);
- break;
- case 404:
- AFB_ReqFail(request, "Not found", result);
- AFB_ReqNotice(request, "Attempt to create the DB '"DEFAULT_DB"'");
- create_database(request);
- break;
- case 500:
- AFB_ReqFailF(request, "Timeout", "Overloaded server: %s", result);
- break;
- default:
- AFB_ReqFail(request, "Failure", "Unexpected behavior.");
- break;
- }
+ AFB_ReqT request = (AFB_ReqT)closure;
+ long rep_code = curl_wrap_response_code_get(curl);
+ switch (rep_code) {
+ case 204:
+ AFB_ReqDebug(request, "Request correctly written");
+ AFB_ReqSucess(request, NULL, "Request has been successfully written");
+ break;
+ case 400:
+ AFB_ReqFail(request, "Bad request", result);
+ break;
+ case 401:
+ AFB_ReqFail(request, "Unauthorized access", result);
+ break;
+ case 404:
+ AFB_ReqFail(request, "Not found", result);
+ AFB_ReqNotice(request, "Attempt to create the DB '" DEFAULT_DB "'");
+ create_database(request);
+ break;
+ case 500:
+ AFB_ReqFailF(request, "Timeout", "Overloaded server: %s", result);
+ break;
+ default:
+ AFB_ReqFail(request, "Failure", "Unexpected behavior.");
+ break;
+ }
}
-static size_t format_write_args(char *query, struct series_t *serie)
+static size_t format_write_args(char* query, struct series_t* serie)
{
- char *ts;
- struct list *tags = serie->serie_columns.tags;
- struct list *fields = serie->serie_columns.fields;
+ char* ts;
+ struct list* tags = serie->serie_columns.tags;
+ struct list* fields = serie->serie_columns.fields;
concatenate(query, serie->name, NULL);
- if(tags) {
- while(tags != NULL) {
- concatenate(query, tags->key, ",");
- if(json_object_is_type(tags->value, json_type_string))
- concatenate(query, json_object_get_string(tags->value), "=");
- else
- concatenate(query, json_object_to_json_string(tags->value), "=");
- tags = tags->next;
- }
- }
- if(fields) {
- int i = 0;
- for(struct list *it = fields; it != NULL; it = it->next) {
- if(!i)
- concatenate(query, fields->key, " ");
- else
- concatenate(query, fields->key, ",");
- if(json_object_is_type(fields->value, json_type_string))
- concatenate(query, json_object_get_string(fields->value), "=");
- else
- concatenate(query, json_object_to_json_string(fields->value), "=");
- i++;
- }
- }
-
- asprintf(&ts, "%lu", serie->timestamp);
- concatenate(query, ts, " ");
-
- return strlen(query);
+ if (tags) {
+ while (tags != NULL) {
+ concatenate(query, tags->key, ",");
+ if (json_object_is_type(tags->value, json_type_string))
+ concatenate(query, json_object_get_string(tags->value), "=");
+ else
+ concatenate(query, json_object_to_json_string(tags->value), "=");
+ tags = tags->next;
+ }
+ }
+
+ if (fields) {
+ int i = 0;
+ for (struct list* it = fields; it != NULL; it = it->next) {
+ if (!i)
+ concatenate(query, it->key, " ");
+ else
+ concatenate(query, it->key, ",");
+ if (json_object_is_type(it->value, json_type_string))
+ concatenate(query, json_object_get_string(it->value), "=");
+ else
+ concatenate(query, json_object_to_json_string(it->value), "=");
+ i++;
+ }
+ }
+
+ asprintf(&ts, "%lu", serie->timestamp);
+ concatenate(query, ts, " ");
+
+ return strlen(query);
}
-CURL *make_curl_write_post(AFB_ApiT apiHandle, const char *url, json_object *metricsJ)
+CURL* make_curl_write_post(AFB_ApiT apiHandle, const char* url, json_object* metricsJ)
{
- CURL *curl = NULL;
- size_t lpd = 0, len_write = 0, i = 0;
- char **post_data;
- char write[URL_MAXIMUM_LENGTH] = "";
- struct series_t *serie = NULL;
- json_object *metricsArrayJ = NULL;
-
- if(json_object_is_type(metricsJ, json_type_array)) {
- lpd = json_object_array_length(metricsJ);
- metricsArrayJ = metricsJ;
- }
- else {
- metricsArrayJ = json_object_new_array();
- json_object_array_add(metricsArrayJ, metricsJ);
- lpd = 1;
- }
-
- serie = malloc(sizeof(struct series_t));
- post_data = calloc(lpd + 1, sizeof(void*));
-
- for(i = 0; i < lpd; i++) {
- memset(serie, 0, sizeof(struct series_t));
-
- if(unpack_metric_from_api(json_object_array_get_idx(metricsArrayJ, i), serie)) {
- AFB_ApiError(apiHandle, "ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ));
- break;
- }
- else {
- if(! serie->name) {
- post_data[i] = NULL;
- }
- else {
- len_write = format_write_args(write, serie);
- if(len_write) {
- post_data[i] = malloc(len_write + 1);
- strcpy(post_data[i], write);
- memset(write, 0, len_write);
- }
- }
- }
- }
-
- AFB_ApiDebug(apiHandle, "influx curl: url=%s data=%s", url, (const char*) *post_data);
-
- /* Check that we just do not broke the for loop before trying preparing CURL
+ CURL* curl = NULL;
+ size_t lpd = 0, len_write = 0, i = 0;
+ char** post_data;
+ char write[URL_MAXIMUM_LENGTH] = "";
+ struct series_t* serie = NULL;
+ json_object* metricsArrayJ = NULL;
+
+ if (json_object_is_type(metricsJ, json_type_array)) {
+ lpd = json_object_array_length(metricsJ);
+ metricsArrayJ = metricsJ;
+ } else {
+ metricsArrayJ = json_object_new_array();
+ json_object_array_add(metricsArrayJ, metricsJ);
+ lpd = 1;
+ }
+
+ serie = malloc(sizeof(struct series_t));
+ post_data = calloc(lpd + 1, sizeof(void*));
+
+ for (i = 0; i < lpd; i++) {
+ memset(serie, 0, sizeof(struct series_t));
+
+ if (unpack_metric_from_api(json_object_array_get_idx(metricsArrayJ, i), serie)) {
+ AFB_ApiError(apiHandle, "ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ));
+ break;
+ } else {
+ if (!serie->name) {
+ post_data[i] = NULL;
+ } else {
+ // TODO: pass strlen(write) == URL_MAXIMUM_LENGTH to format_write_args and add write size overflow
+ len_write = format_write_args(write, serie);
+ if (len_write) {
+ post_data[i] = malloc(len_write + 1);
+ strcpy(post_data[i], write);
+ memset(write, 0, len_write);
+ }
+ }
+ }
+ }
+
+ AFB_ApiDebug(apiHandle, "influx curl: url=%s data=%s", url, (const char*)*post_data);
+
+ /* Check that we just do not broke the for loop before trying preparing CURL
request object */
- curl = i == lpd ?
- curl_wrap_prepare_post_unescaped(url, NULL, "\n", (const char * const*)post_data) : NULL;
- free(serie);
- for(i = 0; i < lpd; i++)
- free(post_data[i]);
- free(post_data);
-
- return curl;
+ curl = i == lpd ? curl_wrap_prepare_post_unescaped(url, NULL, "\n", (const char* const*)post_data) : NULL;
+ free(serie);
+ for (i = 0; i < lpd; i++)
+ free(post_data[i]);
+ free(post_data);
+
+ return curl;
}
-CURL *influxdb_write(AFB_ApiT apiHandle, const char* host, const char *port, json_object *metricJ)
+CURL* influxdb_write(AFB_ApiT apiHandle, const char* host, const char* port, json_object* metricJ)
{
- char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
- make_url(url, sizeof(url), host, port, "write");
- return make_curl_write_post(apiHandle, url, metricJ);
+ char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
+ make_url(url, sizeof(url), host, port, "write");
+ return make_curl_write_post(apiHandle, url, metricJ);
}
CTLP_CAPI(write_to_influxdb, source, argsJ, eventJ)
{
- AFB_ReqT request = source->request;
- const char *port = NULL;
- const char *host = NULL;
- CURL *curl_request;
-
- json_object *req_args = AFB_ReqJson(request),
- *portJ = NULL,
- *metric = NULL;
-
- if(wrap_json_unpack(req_args, "{s?s,s?o,so!}",
- "host", &host,
- "port", &portJ,
- "metric", &metric) || ! metric)
- AFB_ReqFail(request, "Failed", "Error processing arguments. Miss metric\
+ AFB_ReqT request = source->request;
+ const char* port = NULL;
+ const char* host = NULL;
+ CURL* curl_request;
+
+ json_object *req_args = AFB_ReqJson(request),
+ *portJ = NULL,
+ *metric = NULL;
+
+ if (wrap_json_unpack(req_args, "{s?s,s?o,so!}",
+ "host", &host,
+ "port", &portJ,
+ "metric", &metric)
+ || !metric)
+ AFB_ReqFail(request, "Failed", "Error processing arguments. Miss metric\
JSON object or malformed");
- else
- port = json_object_is_type(portJ, json_type_null) ?
- NULL : json_object_to_json_string(portJ);
+ else
+ port = json_object_is_type(portJ, json_type_null) ? NULL : json_object_to_json_string(portJ);
- curl_request = influxdb_write(source->api, host, port, metric);
- curl_wrap_do(curl_request, influxdb_write_curl_cb, request);
+ curl_request = influxdb_write(source->api, host, port, metric);
+ curl_wrap_do(curl_request, influxdb_write_curl_cb, request);
- return 0;
+ return 0;
}
diff --git a/src/plugins/influxdb.c b/src/plugins/influxdb.c
index ae39a2a..d3da39a 100644
--- a/src/plugins/influxdb.c
+++ b/src/plugins/influxdb.c
@@ -20,161 +20,158 @@
#include <stdio.h>
#include <unistd.h>
+#include "../utils/list.h"
#include "influxdb.h"
#include "tsdb.h"
#include "wrap-json.h"
-#include "../utils/list.h"
CTLP_CAPI_REGISTER("influxdb");
CTLP_ONLOAD(plugin, ret)
{
- int err = 0;
- char *result;
- size_t result_size;
- CURL *request = curl_wrap_prepare_get("localhost:"DEFAULT_DBPORT"/ping",NULL, NULL);
+ int err = 0;
+ char* result;
+ size_t result_size;
+ CURL* request = curl_wrap_prepare_get("localhost:" DEFAULT_DBPORT "/ping", NULL, NULL);
- struct reader_args r_args = {NULL, NULL, 1000000};
- plugin->context = (void*)&r_args;
+ struct reader_args r_args = { NULL, NULL, 1000000 };
+ plugin->context = (void*)&r_args;
- curl_wrap_perform(request, &result, &result_size);
+ curl_wrap_perform(request, &result, &result_size);
- if(curl_wrap_response_code_get(request) != 204) {
- AFB_ApiError(plugin->api, "InfluxDB not reachable, please start it");
- err = ERROR;
- }
+ if (curl_wrap_response_code_get(request) != 204) {
+ AFB_ApiError(plugin->api, "InfluxDB not reachable, please start it");
+ err = ERROR;
+ }
- curl_easy_cleanup(request);
- return err;
+ curl_easy_cleanup(request);
+ return err;
}
CTLP_CAPI(influxdb_ping, source, argsJ, eventJ)
{
- int ret = 0;
- char *result;
- size_t result_size;
+ int ret = 0;
+ char* result;
+ size_t result_size;
- CURL *curl_req = curl_wrap_prepare_get("localhost:"DEFAULT_DBPORT"/ping",NULL, NULL);
+ CURL* curl_req = curl_wrap_prepare_get("localhost:" DEFAULT_DBPORT "/ping", NULL, NULL);
- curl_wrap_perform(curl_req, &result, &result_size);
+ curl_wrap_perform(curl_req, &result, &result_size);
- if(curl_wrap_response_code_get(curl_req) != 204) {
- AFB_ApiError(source->api, "InfluxDB is offline.");
- ret = ERROR;
- }
- else {
- AFB_ApiNotice(source->api, "InfluxDB is up and running.");
- }
+ if (curl_wrap_response_code_get(curl_req) != 204) {
+ AFB_ApiError(source->api, "InfluxDB is offline.");
+ ret = ERROR;
+ } else {
+ AFB_ApiNotice(source->api, "InfluxDB is up and running.");
+ }
- curl_easy_cleanup(curl_req);
+ curl_easy_cleanup(curl_req);
- return ret;
+ return ret;
}
-size_t make_url(char *url, size_t l_url, const char *host, const char *port, const char *endpoint)
+size_t make_url(char* url, size_t l_url, const char* host, const char* port, const char* endpoint)
{
- bzero(url, l_url);
+ bzero(url, l_url);
- /* Handle default host and port */
- host = host ? host : DEFAULT_DBHOST;
- port = port ? port : DEFAULT_DBPORT;
+ /* Handle default host and port */
+ host = host ? host : DEFAULT_DBHOST;
+ port = port ? port : DEFAULT_DBPORT;
- strncat(url, host, strlen(host));
- strcat(url, ":");
- strncat(url, port, strlen(port));
- strcat(url, "/");
- strncat(url, endpoint, strlen(endpoint));
- strcat(url, "?db="DEFAULT_DB);
+ strncat(url, host, strlen(host));
+ strcat(url, ":");
+ strncat(url, port, strlen(port));
+ strcat(url, "/");
+ strncat(url, endpoint, strlen(endpoint));
+ strcat(url, "?db=" DEFAULT_DB);
- return strlen(url);
+ return strlen(url);
}
-
int create_database(AFB_ReqT request)
{
- int ret = 0;
- char *result;
- size_t result_size;
+ int ret = 0;
+ char* result;
+ size_t result_size;
- // Declare query to be posted
- const char *post_data[2];
- post_data[0] = "q=CREATE DATABASE \""DEFAULT_DB"\"";
- post_data[1] = NULL;
+ // Declare query to be posted
+ const char* post_data[2];
+ post_data[0] = "q=CREATE DATABASE \"" DEFAULT_DB "\"";
+ post_data[1] = NULL;
- CURL *curl_req = curl_wrap_prepare_post_unescaped("localhost:"DEFAULT_DBPORT"/query",NULL, " ", post_data);
- curl_wrap_perform(curl_req, &result, &result_size);
+ CURL* curl_req = curl_wrap_prepare_post_unescaped("localhost:" DEFAULT_DBPORT "/query", NULL, " ", post_data);
+ curl_wrap_perform(curl_req, &result, &result_size);
- if(curl_wrap_response_code_get(request) != 200) {
- AFB_ReqError(request, "Can't create database.");
- ret = ERROR;
- }
+ if (curl_wrap_response_code_get(request) != 200) {
+ AFB_ReqError(request, "Can't create database.");
+ ret = ERROR;
+ }
- curl_easy_cleanup(curl_req);
+ curl_easy_cleanup(curl_req);
- if(ret == 0)
- AFB_ReqNotice(request, "Database '"DEFAULT_DB"' created");
+ if (ret == 0)
+ AFB_ReqNotice(request, "Database '" DEFAULT_DB "' created");
- return ret;
+ return ret;
}
-void unpack_values(void *l, json_object *valuesJ, const char *key)
+void unpack_values(void* l, json_object* valuesJ, const char* key)
{
- struct list **oneList = (struct list **)l;
+ struct list** oneList = (struct list**)l;
- /* Append a suffix to be able to differentiate tags and fields at reading
+ /* Append a suffix to be able to differentiate tags and fields at reading
time */
- char *suffixed_key = calloc(1, strlen(key) + 3);
- strcpy(suffixed_key, key);
- strcat(suffixed_key, "_f");
+ char* suffixed_key = calloc(1, strlen(key) + 3);
+ strcpy(suffixed_key, key);
+ strcat(suffixed_key, "_f");
- add_elt(oneList, suffixed_key, valuesJ);
+ add_elt(oneList, suffixed_key, valuesJ);
}
-void unpack_metadata(void *l, json_object *valuesJ, const char *key)
+void unpack_metadata(void* l, json_object* valuesJ, const char* key)
{
- struct list **oneList = (struct list **)l;
+ struct list** oneList = (struct list**)l;
- /* Append a suffix to be able to differentiate tags and fields at reading
+ /* Append a suffix to be able to differentiate tags and fields at reading
time */
- char *suffixed_key = calloc(1, strlen(key) +3);
- strcat(suffixed_key, key);
- strcat(suffixed_key, "_t");
+ char* suffixed_key = calloc(1, strlen(key) + 3);
+ strcpy(suffixed_key, key);
+ strcat(suffixed_key, "_t");
- add_elt(oneList, suffixed_key, valuesJ);
+ add_elt(oneList, suffixed_key, valuesJ);
}
-void unpacking_from_api(void *s, json_object *valueJ, const char *key)
+void unpacking_from_api(void* s, json_object* valueJ, const char* key)
{
- size_t key_length = strlen(key);
- struct series_t *serie = (struct series_t*)s;
-
- /* Treat the 2 static key that could have been specified */
- if(strcasecmp("name", key) == 0)
- serie->name = json_object_get_string(valueJ);
- else if(strcasecmp("timestamp", key) == 0)
- serie->timestamp = (json_object_is_type(valueJ, json_type_int)) ?
- json_object_get_int64(valueJ) : 0;
- else if(strcasecmp("metadata", key) == 0)
- wrap_json_object_for_all(valueJ, unpack_metadata, (void*)&serie->serie_columns.tags);
- else if(strcasecmp("value", key) == 0 || strcasecmp("values", key) == 0)
- wrap_json_object_for_all(valueJ, unpack_values, (void*)&serie->serie_columns.fields);
- /* Treat all key looking for tag and field object. Those ones could be find
+ size_t key_length = strlen(key);
+ struct series_t* serie = (struct series_t*)s;
+
+ /* Treat the 2 static key that could have been specified */
+ if (strcasecmp("name", key) == 0)
+ serie->name = json_object_get_string(valueJ);
+ else if (strcasecmp("timestamp", key) == 0)
+ serie->timestamp = (json_object_is_type(valueJ, json_type_int)) ? json_object_get_int64(valueJ) : 0;
+ else if (strcasecmp("metadata", key) == 0)
+ wrap_json_object_for_all(valueJ, unpack_metadata, (void*)&serie->serie_columns.tags);
+ else if (strcasecmp("value", key) == 0 || strcasecmp("values", key) == 0)
+ wrap_json_object_for_all(valueJ, unpack_values, (void*)&serie->serie_columns.fields);
+ /* Treat all key looking for tag and field object. Those ones could be find
with the last 2 character. '_t' for tag and '_f' that are the keys that
could be indefinite. Cf influxdb documentation:
https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/ */
- else if(strncasecmp(&key[key_length-2], "_t", 2) == 0)
- add_elt(&serie->serie_columns.tags, key, valueJ);
- else if(strncasecmp(&key[key_length-2], "_f", 2) == 0)
- add_elt(&serie->serie_columns.fields, key, valueJ);
+ else if (strncasecmp(&key[key_length - 2], "_t", 2) == 0)
+ add_elt(&serie->serie_columns.tags, key, valueJ);
+ else if (strncasecmp(&key[key_length - 2], "_f", 2) == 0)
+ add_elt(&serie->serie_columns.fields, key, valueJ);
}
-int unpack_metric_from_api(json_object *m, struct series_t *serie)
+int unpack_metric_from_api(json_object* m, struct series_t* serie)
{
- wrap_json_object_for_all(m, unpacking_from_api, serie);
+ wrap_json_object_for_all(m, unpacking_from_api, serie);
- if(! serie->timestamp) {
- serie->timestamp = get_ts();
- }
+ if (!serie->timestamp) {
+ serie->timestamp = get_ts();
+ }
- return 0;
+ return 0;
}