diff options
Diffstat (limited to 'src/plugins/influxdb-writer.c')
-rw-r--r-- | src/plugins/influxdb-writer.c | 172 |
1 files changed, 88 insertions, 84 deletions
diff --git a/src/plugins/influxdb-writer.c b/src/plugins/influxdb-writer.c index c114ab5..45f5fb2 100644 --- a/src/plugins/influxdb-writer.c +++ b/src/plugins/influxdb-writer.c @@ -23,77 +23,78 @@ void influxdb_write_curl_cb(void* closure, int status, CURL* curl, const char* result, size_t size) { - afb_req_t request = (afb_req_t)closure; - long rep_code = curl_wrap_response_code_get(curl); + afb_req_t request = (afb_req_t)closure; + long rep_code = curl_wrap_response_code_get(curl); switch (rep_code) { - case 204: - AFB_REQ_DEBUG(request, "Request correctly written"); - afb_req_success(request, NULL, "Request has been successfully written"); - break; - case 400: - afb_req_fail(request, "Bad request", result); - break; - case 401: - afb_req_fail(request, "Unauthorized access", result); - break; - case 404: - afb_req_fail(request, "Not found", result); + case 204: + AFB_REQ_DEBUG(request, "Request correctly written"); + afb_req_success(request, NULL, "Request has been successfully written"); + break; + case 400: + afb_req_fail(request, "Bad request", result); + break; + case 401: + afb_req_fail(request, "Unauthorized access", result); + break; + case 404: + afb_req_fail(request, "Not found", result); AFB_REQ_NOTICE(request, "Attempt to create the DB '" DEFAULT_DB "'"); - create_database(request); - break; - case 500: - afb_req_fail_f(request, "Timeout", "Overloaded server: %s", result); - break; - default: + create_database(request); + break; + case 500: + afb_req_fail_f(request, "Timeout", "Overloaded server: %s", result); + break; + default: afb_req_fail_f(request, "Failure", "Unexpected behavior (code %ld).", rep_code); - break; - } + break; + } } // query: destination buffer // node: head node of the list // list_sep: separator to append in front of the list // item_sep: separator to append between items of the list -static void serialize_list_to_query(char *query, struct list *node, const char *list_sep, const char *item_sep) { - bool first = true; +static void serialize_list_to_query(char* query, struct list* node, bool quoteString, const char* list_sep, const char* item_sep) +{ + bool first = true; while (node != NULL) { if (first) { - concatenate(query, node->key, list_sep); - first = false; - } else { - concatenate(query, node->key, item_sep); - } - - if(json_object_is_type(node->value, json_type_string)) - concatenate_str(query, json_object_get_string(node->value), "="); - else - concatenate(query, json_object_to_json_string(node->value), "="); - node = node->next; - } + concatenate(query, node->key, list_sep); + first = false; + } else { + concatenate(query, node->key, item_sep); + } + + if (json_object_is_type(node->value, json_type_string)) + concatenate_str(query, json_object_get_string(node->value), "=", quoteString); + else + concatenate(query, json_object_to_json_string(node->value), "="); + node = node->next; + } } static size_t format_write_args(char* query, struct series_t* serie) { char* ts; - strncat(query, serie->name, strlen(serie->name)); + strncat(query, serie->name, strlen(serie->name)); - serialize_list_to_query(query, serie->serie_columns.tags, ",", ","); - serialize_list_to_query(query, serie->serie_columns.fields, " ", ","); + serialize_list_to_query(query, serie->serie_columns.tags, false, ",", ","); + serialize_list_to_query(query, serie->serie_columns.fields, true, " ", ","); - if (asprintf(&ts, "%lu", serie->timestamp) > 0) { - concatenate(query, ts, " "); - free(ts); - return strlen(query); - } - return -1; + if (asprintf(&ts, "%lu", serie->timestamp) > 0) { + concatenate(query, ts, " "); + free(ts); + return strlen(query); + } + return -1; } CURL* make_curl_write_post(afb_api_t apiHandle, const char* url, json_object* metricsJ) { CURL* curl = NULL; - size_t lpd = 0, len_write = 0, i = 0; + size_t lpd = 0, len_write = 0, i = 0; char** post_data; char* write = alloca(URL_MAXIMUM_LENGTH); // FIXME: better to use malloc and relloc bigger when needed struct series_t* serie = NULL; @@ -102,40 +103,43 @@ CURL* make_curl_write_post(afb_api_t apiHandle, const char* url, json_object* me write[0] = '\0'; if (json_object_is_type(metricsJ, json_type_array)) { - lpd = json_object_array_length(metricsJ); - metricsArrayJ = metricsJ; + lpd = json_object_array_length(metricsJ); + metricsArrayJ = metricsJ; } else { - metricsArrayJ = json_object_new_array(); - json_object_array_add(metricsArrayJ, metricsJ); - lpd = 1; - } + metricsArrayJ = json_object_new_array(); + json_object_array_add(metricsArrayJ, metricsJ); + lpd = 1; + } - serie = malloc(sizeof(struct series_t)); - post_data = calloc(lpd + 1, sizeof(void*)); + serie = malloc(sizeof(struct series_t)); + post_data = calloc(lpd + 1, sizeof(void*)); for (i = 0; i < lpd; i++) { - memset(serie, 0, sizeof(struct series_t)); + memset(serie, 0, sizeof(struct series_t)); if (unpack_metric_from_api(json_object_array_get_idx(metricsArrayJ, i), serie)) { - AFB_API_ERROR(apiHandle, "ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ)); - break; + AFB_API_ERROR(apiHandle, "ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ)); + break; } else { if (!serie->name) { - post_data[i] = NULL; + post_data[i] = NULL; } else { - len_write = format_write_args(write, serie); + len_write = format_write_args(write, serie); if (len_write > 0) { - post_data[i] = malloc(len_write + 1); + post_data[i] = malloc(len_write + 1); strncpy(post_data[i], write, len_write+1); write[0] = '\0'; - } - } - } - } + } + } + } + } - AFB_API_DEBUG(apiHandle, "curl POST '%s' '%s' ", url, *post_data); + // Debugging purpose + // for (i = 0; i < lpd; i++) { + // AFB_API_DEBUG(apiHandle, "curl POST '%s' '%s' ", url, post_data[i]); + // } - /* Check that we just do not broke the for loop before trying preparing CURL + /* Check that we just do not broke the for loop before trying preparing CURL request object */ curl = i == lpd ? curl_wrap_prepare_post_unescaped(url, NULL, "\n", (const char* const*)post_data) : NULL; free(serie); @@ -146,42 +150,42 @@ CURL* make_curl_write_post(afb_api_t apiHandle, const char* url, json_object* me } free(post_data); - return curl; + return curl; } CURL* influxdb_write(afb_api_t apiHandle, const char* host, const char* port, json_object* metricJ) { - char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */ + char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */ make_url_db(url, sizeof(url), host, port, "write", DEFAULT_DB); - return make_curl_write_post(apiHandle, url, metricJ); + return make_curl_write_post(apiHandle, url, metricJ); } CTLP_CAPI(write_to_influxdb, source, argsJ, eventJ) { - afb_req_t request = source->request; + afb_req_t request = source->request; const char* port = NULL; const char* host = NULL; CURL* curl_request; - int rc = -1; + int rc = -1; - json_object *req_args = afb_req_json(request), - *portJ = NULL, - *metric = NULL; + json_object *req_args = afb_req_json(request), + *portJ = NULL, + *metric = NULL; if (wrap_json_unpack(req_args, "{s?s,s?o,so!}", - "host", &host, - "port", &portJ, + "host", &host, + "port", &portJ, "metric", &metric) || !metric) { - afb_req_fail(request, "Failed", "Error processing arguments. Miss metric\ + afb_req_fail(request, "Failed", "Error processing arguments. Miss metric\ JSON object or malformed"); - rc = -1; - } else { + rc = -1; + } else { port = json_object_is_type(portJ, json_type_null) ? NULL : json_object_to_json_string(portJ); - curl_request = influxdb_write(source->api, host, port, metric); - curl_wrap_do(curl_request, influxdb_write_curl_cb, request); - rc = 0; - } + curl_request = influxdb_write(source->api, host, port, metric); + curl_wrap_do(curl_request, influxdb_write_curl_cb, request); + rc = 0; + } - return rc; + return rc; } |