summaryrefslogtreecommitdiffstats
path: root/src/plugins/influxdb-writer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/influxdb-writer.c')
-rw-r--r--src/plugins/influxdb-writer.c172
1 files changed, 88 insertions, 84 deletions
diff --git a/src/plugins/influxdb-writer.c b/src/plugins/influxdb-writer.c
index c114ab5..45f5fb2 100644
--- a/src/plugins/influxdb-writer.c
+++ b/src/plugins/influxdb-writer.c
@@ -23,77 +23,78 @@
void influxdb_write_curl_cb(void* closure, int status, CURL* curl, const char* result, size_t size)
{
- afb_req_t request = (afb_req_t)closure;
- long rep_code = curl_wrap_response_code_get(curl);
+ afb_req_t request = (afb_req_t)closure;
+ long rep_code = curl_wrap_response_code_get(curl);
switch (rep_code) {
- case 204:
- AFB_REQ_DEBUG(request, "Request correctly written");
- afb_req_success(request, NULL, "Request has been successfully written");
- break;
- case 400:
- afb_req_fail(request, "Bad request", result);
- break;
- case 401:
- afb_req_fail(request, "Unauthorized access", result);
- break;
- case 404:
- afb_req_fail(request, "Not found", result);
+ case 204:
+ AFB_REQ_DEBUG(request, "Request correctly written");
+ afb_req_success(request, NULL, "Request has been successfully written");
+ break;
+ case 400:
+ afb_req_fail(request, "Bad request", result);
+ break;
+ case 401:
+ afb_req_fail(request, "Unauthorized access", result);
+ break;
+ case 404:
+ afb_req_fail(request, "Not found", result);
AFB_REQ_NOTICE(request, "Attempt to create the DB '" DEFAULT_DB "'");
- create_database(request);
- break;
- case 500:
- afb_req_fail_f(request, "Timeout", "Overloaded server: %s", result);
- break;
- default:
+ create_database(request);
+ break;
+ case 500:
+ afb_req_fail_f(request, "Timeout", "Overloaded server: %s", result);
+ break;
+ default:
afb_req_fail_f(request, "Failure", "Unexpected behavior (code %ld).", rep_code);
- break;
- }
+ break;
+ }
}
// query: destination buffer
// node: head node of the list
// list_sep: separator to append in front of the list
// item_sep: separator to append between items of the list
-static void serialize_list_to_query(char *query, struct list *node, const char *list_sep, const char *item_sep) {
- bool first = true;
+static void serialize_list_to_query(char* query, struct list* node, bool quoteString, const char* list_sep, const char* item_sep)
+{
+ bool first = true;
while (node != NULL) {
if (first) {
- concatenate(query, node->key, list_sep);
- first = false;
- } else {
- concatenate(query, node->key, item_sep);
- }
-
- if(json_object_is_type(node->value, json_type_string))
- concatenate_str(query, json_object_get_string(node->value), "=");
- else
- concatenate(query, json_object_to_json_string(node->value), "=");
- node = node->next;
- }
+ concatenate(query, node->key, list_sep);
+ first = false;
+ } else {
+ concatenate(query, node->key, item_sep);
+ }
+
+ if (json_object_is_type(node->value, json_type_string))
+ concatenate_str(query, json_object_get_string(node->value), "=", quoteString);
+ else
+ concatenate(query, json_object_to_json_string(node->value), "=");
+ node = node->next;
+ }
}
static size_t format_write_args(char* query, struct series_t* serie)
{
char* ts;
- strncat(query, serie->name, strlen(serie->name));
+ strncat(query, serie->name, strlen(serie->name));
- serialize_list_to_query(query, serie->serie_columns.tags, ",", ",");
- serialize_list_to_query(query, serie->serie_columns.fields, " ", ",");
+ serialize_list_to_query(query, serie->serie_columns.tags, false, ",", ",");
+ serialize_list_to_query(query, serie->serie_columns.fields, true, " ", ",");
- if (asprintf(&ts, "%lu", serie->timestamp) > 0) {
- concatenate(query, ts, " ");
- free(ts);
- return strlen(query);
- }
- return -1;
+ if (asprintf(&ts, "%lu", serie->timestamp) > 0) {
+ concatenate(query, ts, " ");
+ free(ts);
+ return strlen(query);
+ }
+ return -1;
}
CURL* make_curl_write_post(afb_api_t apiHandle, const char* url, json_object* metricsJ)
{
CURL* curl = NULL;
- size_t lpd = 0, len_write = 0, i = 0;
+ size_t lpd = 0, len_write = 0, i = 0;
char** post_data;
char* write = alloca(URL_MAXIMUM_LENGTH); // FIXME: better to use malloc and relloc bigger when needed
struct series_t* serie = NULL;
@@ -102,40 +103,43 @@ CURL* make_curl_write_post(afb_api_t apiHandle, const char* url, json_object* me
write[0] = '\0';
if (json_object_is_type(metricsJ, json_type_array)) {
- lpd = json_object_array_length(metricsJ);
- metricsArrayJ = metricsJ;
+ lpd = json_object_array_length(metricsJ);
+ metricsArrayJ = metricsJ;
} else {
- metricsArrayJ = json_object_new_array();
- json_object_array_add(metricsArrayJ, metricsJ);
- lpd = 1;
- }
+ metricsArrayJ = json_object_new_array();
+ json_object_array_add(metricsArrayJ, metricsJ);
+ lpd = 1;
+ }
- serie = malloc(sizeof(struct series_t));
- post_data = calloc(lpd + 1, sizeof(void*));
+ serie = malloc(sizeof(struct series_t));
+ post_data = calloc(lpd + 1, sizeof(void*));
for (i = 0; i < lpd; i++) {
- memset(serie, 0, sizeof(struct series_t));
+ memset(serie, 0, sizeof(struct series_t));
if (unpack_metric_from_api(json_object_array_get_idx(metricsArrayJ, i), serie)) {
- AFB_API_ERROR(apiHandle, "ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ));
- break;
+ AFB_API_ERROR(apiHandle, "ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ));
+ break;
} else {
if (!serie->name) {
- post_data[i] = NULL;
+ post_data[i] = NULL;
} else {
- len_write = format_write_args(write, serie);
+ len_write = format_write_args(write, serie);
if (len_write > 0) {
- post_data[i] = malloc(len_write + 1);
+ post_data[i] = malloc(len_write + 1);
strncpy(post_data[i], write, len_write+1);
write[0] = '\0';
- }
- }
- }
- }
+ }
+ }
+ }
+ }
- AFB_API_DEBUG(apiHandle, "curl POST '%s' '%s' ", url, *post_data);
+ // Debugging purpose
+ // for (i = 0; i < lpd; i++) {
+ // AFB_API_DEBUG(apiHandle, "curl POST '%s' '%s' ", url, post_data[i]);
+ // }
- /* Check that we just do not broke the for loop before trying preparing CURL
+ /* Check that we just do not broke the for loop before trying preparing CURL
request object */
curl = i == lpd ? curl_wrap_prepare_post_unescaped(url, NULL, "\n", (const char* const*)post_data) : NULL;
free(serie);
@@ -146,42 +150,42 @@ CURL* make_curl_write_post(afb_api_t apiHandle, const char* url, json_object* me
}
free(post_data);
- return curl;
+ return curl;
}
CURL* influxdb_write(afb_api_t apiHandle, const char* host, const char* port, json_object* metricJ)
{
- char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
+ char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
make_url_db(url, sizeof(url), host, port, "write", DEFAULT_DB);
- return make_curl_write_post(apiHandle, url, metricJ);
+ return make_curl_write_post(apiHandle, url, metricJ);
}
CTLP_CAPI(write_to_influxdb, source, argsJ, eventJ)
{
- afb_req_t request = source->request;
+ afb_req_t request = source->request;
const char* port = NULL;
const char* host = NULL;
CURL* curl_request;
- int rc = -1;
+ int rc = -1;
- json_object *req_args = afb_req_json(request),
- *portJ = NULL,
- *metric = NULL;
+ json_object *req_args = afb_req_json(request),
+ *portJ = NULL,
+ *metric = NULL;
if (wrap_json_unpack(req_args, "{s?s,s?o,so!}",
- "host", &host,
- "port", &portJ,
+ "host", &host,
+ "port", &portJ,
"metric", &metric)
|| !metric) {
- afb_req_fail(request, "Failed", "Error processing arguments. Miss metric\
+ afb_req_fail(request, "Failed", "Error processing arguments. Miss metric\
JSON object or malformed");
- rc = -1;
- } else {
+ rc = -1;
+ } else {
port = json_object_is_type(portJ, json_type_null) ? NULL : json_object_to_json_string(portJ);
- curl_request = influxdb_write(source->api, host, port, metric);
- curl_wrap_do(curl_request, influxdb_write_curl_cb, request);
- rc = 0;
- }
+ curl_request = influxdb_write(source->api, host, port, metric);
+ curl_wrap_do(curl_request, influxdb_write_curl_cb, request);
+ rc = 0;
+ }
- return rc;
+ return rc;
}