summaryrefslogtreecommitdiffstats
path: root/src/plugins/influxdb-reader.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/influxdb-reader.c')
-rw-r--r--src/plugins/influxdb-reader.c67
1 files changed, 38 insertions, 29 deletions
diff --git a/src/plugins/influxdb-reader.c b/src/plugins/influxdb-reader.c
index a1fa3a0..052a3da 100644
--- a/src/plugins/influxdb-reader.c
+++ b/src/plugins/influxdb-reader.c
@@ -29,6 +29,7 @@
struct metrics_list {
struct series_t serie;
json_object *metricsJ;
+ AFB_ApiT api;
};
static void fill_n_send_values(void *c, json_object *valuesJ)
@@ -44,7 +45,7 @@ static void fill_n_send_values(void *c, json_object *valuesJ)
else {
if(set_value(m_list->serie.serie_columns.tags, valuesJ, i)) {
if(set_value(m_list->serie.serie_columns.fields, valuesJ, j)) {
- AFB_ERROR("No tags nor fields fits.");
+ AFB_ApiError(m_list->api, "No tags nor fields fits.");
}
j++;
}
@@ -85,7 +86,7 @@ static void unpack_metric_from_db(void *ml, json_object *metricJ)
"name", &m_list->serie.name,
"columns", &columnsJ,
"values", &valuesJ)) {
- AFB_ERROR("Unpacking metric goes wrong");
+ AFB_ApiError(m_list->api, "Unpacking metric goes wrong");
return;
}
@@ -93,7 +94,7 @@ static void unpack_metric_from_db(void *ml, json_object *metricJ)
wrap_json_array_for_all(valuesJ, fill_n_send_values, m_list);
}
-static json_object *unpack_series(json_object *seriesJ)
+static json_object *unpack_series(AFB_ApiT apiHandle, json_object *seriesJ)
{
struct metrics_list m_list = {
.serie = {
@@ -104,7 +105,8 @@ static json_object *unpack_series(json_object *seriesJ)
},
.timestamp = 0
},
- .metricsJ = json_object_new_array()
+ .metricsJ = json_object_new_array(),
+ .api = apiHandle
};
wrap_json_array_for_all(seriesJ, unpack_metric_from_db, (void*)&m_list);
@@ -112,7 +114,7 @@ static json_object *unpack_series(json_object *seriesJ)
return m_list.metricsJ;
}
-static void forward_to_garner(const char *result, size_t size)
+static void forward_to_garner(AFB_ApiT apiHandle, const char *result, size_t size)
{
int id = 0;
json_object *resultsJ = NULL,
@@ -131,39 +133,42 @@ static void forward_to_garner(const char *result, size_t size)
}
if(seriesJ) {
- metrics2send = unpack_series(seriesJ);
+ metrics2send = unpack_series(apiHandle, seriesJ);
if(json_object_array_length(metrics2send)) {
- if(afb_service_call_sync("garner", "write", metrics2send, &call_resultJ)) {
- AFB_ERROR("Metrics were sent but not done, an error happens. Details: %s", json_object_to_json_string(call_resultJ));
+ if(AFB_ServiceSync(apiHandle, "garner", "write", metrics2send, &call_resultJ)) {
+ AFB_ApiError(apiHandle, "Metrics were sent but not done, an error happens. Details: %s", json_object_to_json_string(call_resultJ));
}
}
}
else {
- AFB_ERROR("Empty response. Request results was:\n%s", result);
+ AFB_ApiError(apiHandle, "Empty response. Request results was:\n%s", result);
}
}
static void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
{
+ if(!closure)
+ return;
+ AFB_ApiT apiHandle = (AFB_ApiT)closure;
long rep_code = curl_wrap_response_code_get(curl);
switch(rep_code) {
case 200:
- AFB_DEBUG("Read correctly done");
- forward_to_garner(result, size);
+ AFB_ApiDebug(apiHandle, "Read correctly done");
+ forward_to_garner(apiHandle, result, size);
break;
case 400:
- AFB_ERROR("Unacceptable request. %s", result);
+ AFB_ApiError(apiHandle, "Unacceptable request. %s", result);
break;
case 401:
- AFB_ERROR("Invalid authentication. %s", result);
+ AFB_ApiError(apiHandle, "Invalid authentication. %s", result);
break;
default:
- AFB_ERROR("Unexptected behavior. %s", result);
+ AFB_ApiError(apiHandle, "Unexptected behavior. %s", result);
break;
}
}
-static CURL *make_curl_query_get(const char *url)
+static CURL *make_curl_query_get(AFB_ApiT apiHandle, const char *url)
{
CURL *curl;
char *args[5];
@@ -179,7 +184,7 @@ static CURL *make_curl_query_get(const char *url)
args[4] = NULL;
length_now = asprintf(&now, "%lu", get_ts());
- int rootdir_fd = afb_daemon_rootdir_get_fd();
+ int rootdir_fd = AFB_RootDirGetFD(apiHandle);
int fd_last_read = openat(rootdir_fd, "last_db_read", O_CREAT | O_RDWR, S_IRWXU);
if (fd_last_read < 0)
return NULL;
@@ -188,7 +193,7 @@ static CURL *make_curl_query_get(const char *url)
else write the last timestamp */
if(read(fd_last_read, last_ts, sizeof(last_ts)) == 0) {
if(write(fd_last_read, now, length_now) != length_now)
- AFB_ERROR("Error writing last_db_read file: %s\n", strerror( errno ));
+ AFB_ApiError(apiHandle, "Error writing last_db_read file: %s\n", strerror( errno ));
}
else {
strcat(query, " WHERE time >= ");
@@ -196,7 +201,7 @@ static CURL *make_curl_query_get(const char *url)
close(fd_last_read);
fd_last_read = openat(rootdir_fd, "last_db_read", O_TRUNC | O_RDWR);
if (write(fd_last_read, now, length_now) != length_now)
- AFB_ERROR("Error writing last_db_read file: %s", strerror( errno ));
+ AFB_ApiError(apiHandle, "Error writing last_db_read file: %s", strerror( errno ));
}
args[3] = query;
@@ -208,33 +213,37 @@ static CURL *make_curl_query_get(const char *url)
static int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
{
CURL *curl;
- struct reader_args *a = NULL;
- if(userdata)
- a = (struct reader_args*)userdata;
- else
+ struct reader_args *r_args = NULL;
+ CtlSourceT *source = NULL;
+ if(userdata) {
+ source = (CtlSourceT*)userdata;
+ r_args = (struct reader_args*)source->context;
+ }
+ else {
return ERROR;
+ }
char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
- make_url(url, sizeof(url), a->host, a->port, "query");
- curl = make_curl_query_get(url);
- curl_wrap_do(curl, influxdb_read_curl_cb, NULL);
+ make_url(url, sizeof(url), r_args->host, r_args->port, "query");
+ curl = make_curl_query_get(source->api, url);
+ curl_wrap_do(curl, influxdb_read_curl_cb, (void*)source->api);
/* Reschedule next run */
- sd_event_source_set_time(s, usec + a->delay);
+ sd_event_source_set_time(s, usec + r_args->delay);
return 0;
}
-int influxdb_reader(void *args)
+CTLP_CAPI(read_from_influxdb, source, argsJ, eventJ)
{
int err = 0;
uint64_t usec;
struct sd_event_source *evtSource = NULL;
/* Set a cyclic cb call each 1s to call the read callback */
- sd_event_now(afb_daemon_get_event_loop(), CLOCK_MONOTONIC, &usec);
- err = sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args);
+ sd_event_now(AFB_GetEventLoop(source->api), CLOCK_MONOTONIC, &usec);
+ err = sd_event_add_time(AFB_GetEventLoop(source->api), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, (void*)source);
if(!err)
err = sd_event_source_set_enabled(evtSource, SD_EVENT_ON);