diff options
-rw-r--r-- | .vscode/settings.json | 3 | ||||
-rw-r--r-- | src/plugins/influxdb-writer.c | 274 | ||||
-rw-r--r-- | src/plugins/influxdb.c | 191 |
3 files changed, 232 insertions, 236 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json index 8337700..9dedb11 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -22,7 +22,8 @@ "influxdb.h": "c", "tsdb.h": "c", "ctl-plugin.h": "c", - "escape.h": "c" + "escape.h": "c", + "ostream": "c" }, "C_Cpp.intelliSenseEngineFallback": "Disabled", "C_Cpp.errorSquiggles": "Disabled", diff --git a/src/plugins/influxdb-writer.c b/src/plugins/influxdb-writer.c index be02e34..dca5dab 100644 --- a/src/plugins/influxdb-writer.c +++ b/src/plugins/influxdb-writer.c @@ -16,166 +16,164 @@ */ #define _GNU_SOURCE -#include <string.h> #include <stdio.h> +#include <string.h> #include "influxdb.h" -void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size) +void influxdb_write_curl_cb(void* closure, int status, CURL* curl, const char* result, size_t size) { - AFB_ReqT request = (AFB_ReqT)closure; - long rep_code = curl_wrap_response_code_get(curl); - switch(rep_code) { - case 204: - AFB_ReqDebug(request, "Request correctly written"); - AFB_ReqSucess(request, NULL, "Request has been successfully written"); - break; - case 400: - AFB_ReqFail(request, "Bad request", result); - break; - case 401: - AFB_ReqFail(request, "Unauthorized access", result); - break; - case 404: - AFB_ReqFail(request, "Not found", result); - AFB_ReqNotice(request, "Attempt to create the DB '"DEFAULT_DB"'"); - create_database(request); - break; - case 500: - AFB_ReqFailF(request, "Timeout", "Overloaded server: %s", result); - break; - default: - AFB_ReqFail(request, "Failure", "Unexpected behavior."); - break; - } + AFB_ReqT request = (AFB_ReqT)closure; + long rep_code = curl_wrap_response_code_get(curl); + switch (rep_code) { + case 204: + AFB_ReqDebug(request, "Request correctly written"); + AFB_ReqSucess(request, NULL, "Request has been successfully written"); + break; + case 400: + AFB_ReqFail(request, "Bad request", result); + break; + case 401: + AFB_ReqFail(request, "Unauthorized access", result); + break; + case 404: + AFB_ReqFail(request, "Not found", result); + AFB_ReqNotice(request, "Attempt to create the DB '" DEFAULT_DB "'"); + create_database(request); + break; + case 500: + AFB_ReqFailF(request, "Timeout", "Overloaded server: %s", result); + break; + default: + AFB_ReqFail(request, "Failure", "Unexpected behavior."); + break; + } } -static size_t format_write_args(char *query, struct series_t *serie) +static size_t format_write_args(char* query, struct series_t* serie) { - char *ts; - struct list *tags = serie->serie_columns.tags; - struct list *fields = serie->serie_columns.fields; + char* ts; + struct list* tags = serie->serie_columns.tags; + struct list* fields = serie->serie_columns.fields; concatenate(query, serie->name, NULL); - if(tags) { - while(tags != NULL) { - concatenate(query, tags->key, ","); - if(json_object_is_type(tags->value, json_type_string)) - concatenate(query, json_object_get_string(tags->value), "="); - else - concatenate(query, json_object_to_json_string(tags->value), "="); - tags = tags->next; - } - } - if(fields) { - int i = 0; - for(struct list *it = fields; it != NULL; it = it->next) { - if(!i) - concatenate(query, fields->key, " "); - else - concatenate(query, fields->key, ","); - if(json_object_is_type(fields->value, json_type_string)) - concatenate(query, json_object_get_string(fields->value), "="); - else - concatenate(query, json_object_to_json_string(fields->value), "="); - i++; - } - } - - asprintf(&ts, "%lu", serie->timestamp); - concatenate(query, ts, " "); - - return strlen(query); + if (tags) { + while (tags != NULL) { + concatenate(query, tags->key, ","); + if (json_object_is_type(tags->value, json_type_string)) + concatenate(query, json_object_get_string(tags->value), "="); + else + concatenate(query, json_object_to_json_string(tags->value), "="); + tags = tags->next; + } + } + + if (fields) { + int i = 0; + for (struct list* it = fields; it != NULL; it = it->next) { + if (!i) + concatenate(query, it->key, " "); + else + concatenate(query, it->key, ","); + if (json_object_is_type(it->value, json_type_string)) + concatenate(query, json_object_get_string(it->value), "="); + else + concatenate(query, json_object_to_json_string(it->value), "="); + i++; + } + } + + asprintf(&ts, "%lu", serie->timestamp); + concatenate(query, ts, " "); + + return strlen(query); } -CURL *make_curl_write_post(AFB_ApiT apiHandle, const char *url, json_object *metricsJ) +CURL* make_curl_write_post(AFB_ApiT apiHandle, const char* url, json_object* metricsJ) { - CURL *curl = NULL; - size_t lpd = 0, len_write = 0, i = 0; - char **post_data; - char write[URL_MAXIMUM_LENGTH] = ""; - struct series_t *serie = NULL; - json_object *metricsArrayJ = NULL; - - if(json_object_is_type(metricsJ, json_type_array)) { - lpd = json_object_array_length(metricsJ); - metricsArrayJ = metricsJ; - } - else { - metricsArrayJ = json_object_new_array(); - json_object_array_add(metricsArrayJ, metricsJ); - lpd = 1; - } - - serie = malloc(sizeof(struct series_t)); - post_data = calloc(lpd + 1, sizeof(void*)); - - for(i = 0; i < lpd; i++) { - memset(serie, 0, sizeof(struct series_t)); - - if(unpack_metric_from_api(json_object_array_get_idx(metricsArrayJ, i), serie)) { - AFB_ApiError(apiHandle, "ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ)); - break; - } - else { - if(! serie->name) { - post_data[i] = NULL; - } - else { - len_write = format_write_args(write, serie); - if(len_write) { - post_data[i] = malloc(len_write + 1); - strcpy(post_data[i], write); - memset(write, 0, len_write); - } - } - } - } - - AFB_ApiDebug(apiHandle, "influx curl: url=%s data=%s", url, (const char*) *post_data); - - /* Check that we just do not broke the for loop before trying preparing CURL + CURL* curl = NULL; + size_t lpd = 0, len_write = 0, i = 0; + char** post_data; + char write[URL_MAXIMUM_LENGTH] = ""; + struct series_t* serie = NULL; + json_object* metricsArrayJ = NULL; + + if (json_object_is_type(metricsJ, json_type_array)) { + lpd = json_object_array_length(metricsJ); + metricsArrayJ = metricsJ; + } else { + metricsArrayJ = json_object_new_array(); + json_object_array_add(metricsArrayJ, metricsJ); + lpd = 1; + } + + serie = malloc(sizeof(struct series_t)); + post_data = calloc(lpd + 1, sizeof(void*)); + + for (i = 0; i < lpd; i++) { + memset(serie, 0, sizeof(struct series_t)); + + if (unpack_metric_from_api(json_object_array_get_idx(metricsArrayJ, i), serie)) { + AFB_ApiError(apiHandle, "ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ)); + break; + } else { + if (!serie->name) { + post_data[i] = NULL; + } else { + // TODO: pass strlen(write) == URL_MAXIMUM_LENGTH to format_write_args and add write size overflow + len_write = format_write_args(write, serie); + if (len_write) { + post_data[i] = malloc(len_write + 1); + strcpy(post_data[i], write); + memset(write, 0, len_write); + } + } + } + } + + AFB_ApiDebug(apiHandle, "influx curl: url=%s data=%s", url, (const char*)*post_data); + + /* Check that we just do not broke the for loop before trying preparing CURL request object */ - curl = i == lpd ? - curl_wrap_prepare_post_unescaped(url, NULL, "\n", (const char * const*)post_data) : NULL; - free(serie); - for(i = 0; i < lpd; i++) - free(post_data[i]); - free(post_data); - - return curl; + curl = i == lpd ? curl_wrap_prepare_post_unescaped(url, NULL, "\n", (const char* const*)post_data) : NULL; + free(serie); + for (i = 0; i < lpd; i++) + free(post_data[i]); + free(post_data); + + return curl; } -CURL *influxdb_write(AFB_ApiT apiHandle, const char* host, const char *port, json_object *metricJ) +CURL* influxdb_write(AFB_ApiT apiHandle, const char* host, const char* port, json_object* metricJ) { - char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */ - make_url(url, sizeof(url), host, port, "write"); - return make_curl_write_post(apiHandle, url, metricJ); + char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */ + make_url(url, sizeof(url), host, port, "write"); + return make_curl_write_post(apiHandle, url, metricJ); } CTLP_CAPI(write_to_influxdb, source, argsJ, eventJ) { - AFB_ReqT request = source->request; - const char *port = NULL; - const char *host = NULL; - CURL *curl_request; - - json_object *req_args = AFB_ReqJson(request), - *portJ = NULL, - *metric = NULL; - - if(wrap_json_unpack(req_args, "{s?s,s?o,so!}", - "host", &host, - "port", &portJ, - "metric", &metric) || ! metric) - AFB_ReqFail(request, "Failed", "Error processing arguments. Miss metric\ + AFB_ReqT request = source->request; + const char* port = NULL; + const char* host = NULL; + CURL* curl_request; + + json_object *req_args = AFB_ReqJson(request), + *portJ = NULL, + *metric = NULL; + + if (wrap_json_unpack(req_args, "{s?s,s?o,so!}", + "host", &host, + "port", &portJ, + "metric", &metric) + || !metric) + AFB_ReqFail(request, "Failed", "Error processing arguments. Miss metric\ JSON object or malformed"); - else - port = json_object_is_type(portJ, json_type_null) ? - NULL : json_object_to_json_string(portJ); + else + port = json_object_is_type(portJ, json_type_null) ? NULL : json_object_to_json_string(portJ); - curl_request = influxdb_write(source->api, host, port, metric); - curl_wrap_do(curl_request, influxdb_write_curl_cb, request); + curl_request = influxdb_write(source->api, host, port, metric); + curl_wrap_do(curl_request, influxdb_write_curl_cb, request); - return 0; + return 0; } diff --git a/src/plugins/influxdb.c b/src/plugins/influxdb.c index ae39a2a..d3da39a 100644 --- a/src/plugins/influxdb.c +++ b/src/plugins/influxdb.c @@ -20,161 +20,158 @@ #include <stdio.h> #include <unistd.h> +#include "../utils/list.h" #include "influxdb.h" #include "tsdb.h" #include "wrap-json.h" -#include "../utils/list.h" CTLP_CAPI_REGISTER("influxdb"); CTLP_ONLOAD(plugin, ret) { - int err = 0; - char *result; - size_t result_size; - CURL *request = curl_wrap_prepare_get("localhost:"DEFAULT_DBPORT"/ping",NULL, NULL); + int err = 0; + char* result; + size_t result_size; + CURL* request = curl_wrap_prepare_get("localhost:" DEFAULT_DBPORT "/ping", NULL, NULL); - struct reader_args r_args = {NULL, NULL, 1000000}; - plugin->context = (void*)&r_args; + struct reader_args r_args = { NULL, NULL, 1000000 }; + plugin->context = (void*)&r_args; - curl_wrap_perform(request, &result, &result_size); + curl_wrap_perform(request, &result, &result_size); - if(curl_wrap_response_code_get(request) != 204) { - AFB_ApiError(plugin->api, "InfluxDB not reachable, please start it"); - err = ERROR; - } + if (curl_wrap_response_code_get(request) != 204) { + AFB_ApiError(plugin->api, "InfluxDB not reachable, please start it"); + err = ERROR; + } - curl_easy_cleanup(request); - return err; + curl_easy_cleanup(request); + return err; } CTLP_CAPI(influxdb_ping, source, argsJ, eventJ) { - int ret = 0; - char *result; - size_t result_size; + int ret = 0; + char* result; + size_t result_size; - CURL *curl_req = curl_wrap_prepare_get("localhost:"DEFAULT_DBPORT"/ping",NULL, NULL); + CURL* curl_req = curl_wrap_prepare_get("localhost:" DEFAULT_DBPORT "/ping", NULL, NULL); - curl_wrap_perform(curl_req, &result, &result_size); + curl_wrap_perform(curl_req, &result, &result_size); - if(curl_wrap_response_code_get(curl_req) != 204) { - AFB_ApiError(source->api, "InfluxDB is offline."); - ret = ERROR; - } - else { - AFB_ApiNotice(source->api, "InfluxDB is up and running."); - } + if (curl_wrap_response_code_get(curl_req) != 204) { + AFB_ApiError(source->api, "InfluxDB is offline."); + ret = ERROR; + } else { + AFB_ApiNotice(source->api, "InfluxDB is up and running."); + } - curl_easy_cleanup(curl_req); + curl_easy_cleanup(curl_req); - return ret; + return ret; } -size_t make_url(char *url, size_t l_url, const char *host, const char *port, const char *endpoint) +size_t make_url(char* url, size_t l_url, const char* host, const char* port, const char* endpoint) { - bzero(url, l_url); + bzero(url, l_url); - /* Handle default host and port */ - host = host ? host : DEFAULT_DBHOST; - port = port ? port : DEFAULT_DBPORT; + /* Handle default host and port */ + host = host ? host : DEFAULT_DBHOST; + port = port ? port : DEFAULT_DBPORT; - strncat(url, host, strlen(host)); - strcat(url, ":"); - strncat(url, port, strlen(port)); - strcat(url, "/"); - strncat(url, endpoint, strlen(endpoint)); - strcat(url, "?db="DEFAULT_DB); + strncat(url, host, strlen(host)); + strcat(url, ":"); + strncat(url, port, strlen(port)); + strcat(url, "/"); + strncat(url, endpoint, strlen(endpoint)); + strcat(url, "?db=" DEFAULT_DB); - return strlen(url); + return strlen(url); } - int create_database(AFB_ReqT request) { - int ret = 0; - char *result; - size_t result_size; + int ret = 0; + char* result; + size_t result_size; - // Declare query to be posted - const char *post_data[2]; - post_data[0] = "q=CREATE DATABASE \""DEFAULT_DB"\""; - post_data[1] = NULL; + // Declare query to be posted + const char* post_data[2]; + post_data[0] = "q=CREATE DATABASE \"" DEFAULT_DB "\""; + post_data[1] = NULL; - CURL *curl_req = curl_wrap_prepare_post_unescaped("localhost:"DEFAULT_DBPORT"/query",NULL, " ", post_data); - curl_wrap_perform(curl_req, &result, &result_size); + CURL* curl_req = curl_wrap_prepare_post_unescaped("localhost:" DEFAULT_DBPORT "/query", NULL, " ", post_data); + curl_wrap_perform(curl_req, &result, &result_size); - if(curl_wrap_response_code_get(request) != 200) { - AFB_ReqError(request, "Can't create database."); - ret = ERROR; - } + if (curl_wrap_response_code_get(request) != 200) { + AFB_ReqError(request, "Can't create database."); + ret = ERROR; + } - curl_easy_cleanup(curl_req); + curl_easy_cleanup(curl_req); - if(ret == 0) - AFB_ReqNotice(request, "Database '"DEFAULT_DB"' created"); + if (ret == 0) + AFB_ReqNotice(request, "Database '" DEFAULT_DB "' created"); - return ret; + return ret; } -void unpack_values(void *l, json_object *valuesJ, const char *key) +void unpack_values(void* l, json_object* valuesJ, const char* key) { - struct list **oneList = (struct list **)l; + struct list** oneList = (struct list**)l; - /* Append a suffix to be able to differentiate tags and fields at reading + /* Append a suffix to be able to differentiate tags and fields at reading time */ - char *suffixed_key = calloc(1, strlen(key) + 3); - strcpy(suffixed_key, key); - strcat(suffixed_key, "_f"); + char* suffixed_key = calloc(1, strlen(key) + 3); + strcpy(suffixed_key, key); + strcat(suffixed_key, "_f"); - add_elt(oneList, suffixed_key, valuesJ); + add_elt(oneList, suffixed_key, valuesJ); } -void unpack_metadata(void *l, json_object *valuesJ, const char *key) +void unpack_metadata(void* l, json_object* valuesJ, const char* key) { - struct list **oneList = (struct list **)l; + struct list** oneList = (struct list**)l; - /* Append a suffix to be able to differentiate tags and fields at reading + /* Append a suffix to be able to differentiate tags and fields at reading time */ - char *suffixed_key = calloc(1, strlen(key) +3); - strcat(suffixed_key, key); - strcat(suffixed_key, "_t"); + char* suffixed_key = calloc(1, strlen(key) + 3); + strcpy(suffixed_key, key); + strcat(suffixed_key, "_t"); - add_elt(oneList, suffixed_key, valuesJ); + add_elt(oneList, suffixed_key, valuesJ); } -void unpacking_from_api(void *s, json_object *valueJ, const char *key) +void unpacking_from_api(void* s, json_object* valueJ, const char* key) { - size_t key_length = strlen(key); - struct series_t *serie = (struct series_t*)s; - - /* Treat the 2 static key that could have been specified */ - if(strcasecmp("name", key) == 0) - serie->name = json_object_get_string(valueJ); - else if(strcasecmp("timestamp", key) == 0) - serie->timestamp = (json_object_is_type(valueJ, json_type_int)) ? - json_object_get_int64(valueJ) : 0; - else if(strcasecmp("metadata", key) == 0) - wrap_json_object_for_all(valueJ, unpack_metadata, (void*)&serie->serie_columns.tags); - else if(strcasecmp("value", key) == 0 || strcasecmp("values", key) == 0) - wrap_json_object_for_all(valueJ, unpack_values, (void*)&serie->serie_columns.fields); - /* Treat all key looking for tag and field object. Those ones could be find + size_t key_length = strlen(key); + struct series_t* serie = (struct series_t*)s; + + /* Treat the 2 static key that could have been specified */ + if (strcasecmp("name", key) == 0) + serie->name = json_object_get_string(valueJ); + else if (strcasecmp("timestamp", key) == 0) + serie->timestamp = (json_object_is_type(valueJ, json_type_int)) ? json_object_get_int64(valueJ) : 0; + else if (strcasecmp("metadata", key) == 0) + wrap_json_object_for_all(valueJ, unpack_metadata, (void*)&serie->serie_columns.tags); + else if (strcasecmp("value", key) == 0 || strcasecmp("values", key) == 0) + wrap_json_object_for_all(valueJ, unpack_values, (void*)&serie->serie_columns.fields); + /* Treat all key looking for tag and field object. Those ones could be find with the last 2 character. '_t' for tag and '_f' that are the keys that could be indefinite. Cf influxdb documentation: https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/ */ - else if(strncasecmp(&key[key_length-2], "_t", 2) == 0) - add_elt(&serie->serie_columns.tags, key, valueJ); - else if(strncasecmp(&key[key_length-2], "_f", 2) == 0) - add_elt(&serie->serie_columns.fields, key, valueJ); + else if (strncasecmp(&key[key_length - 2], "_t", 2) == 0) + add_elt(&serie->serie_columns.tags, key, valueJ); + else if (strncasecmp(&key[key_length - 2], "_f", 2) == 0) + add_elt(&serie->serie_columns.fields, key, valueJ); } -int unpack_metric_from_api(json_object *m, struct series_t *serie) +int unpack_metric_from_api(json_object* m, struct series_t* serie) { - wrap_json_object_for_all(m, unpacking_from_api, serie); + wrap_json_object_for_all(m, unpacking_from_api, serie); - if(! serie->timestamp) { - serie->timestamp = get_ts(); - } + if (!serie->timestamp) { + serie->timestamp = get_ts(); + } - return 0; + return 0; } |