From fef008b549c9c6d6ee1e564312787c3d3f14f0c0 Mon Sep 17 00:00:00 2001 From: Romain Forlot Date: Wed, 16 May 2018 00:58:28 +0200 Subject: Convert the binding to use the controller Ease Time series DB abstraction layer by using Dyn API that implemente the API defined by the JSON schema. Change-Id: I67de4fbca10048201fdd2da683732a5f4f5b5368 Signed-off-by: Romain Forlot --- src/plugins/influxdb-reader.c | 67 ++++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 29 deletions(-) (limited to 'src/plugins/influxdb-reader.c') diff --git a/src/plugins/influxdb-reader.c b/src/plugins/influxdb-reader.c index a1fa3a0..052a3da 100644 --- a/src/plugins/influxdb-reader.c +++ b/src/plugins/influxdb-reader.c @@ -29,6 +29,7 @@ struct metrics_list { struct series_t serie; json_object *metricsJ; + AFB_ApiT api; }; static void fill_n_send_values(void *c, json_object *valuesJ) @@ -44,7 +45,7 @@ static void fill_n_send_values(void *c, json_object *valuesJ) else { if(set_value(m_list->serie.serie_columns.tags, valuesJ, i)) { if(set_value(m_list->serie.serie_columns.fields, valuesJ, j)) { - AFB_ERROR("No tags nor fields fits."); + AFB_ApiError(m_list->api, "No tags nor fields fits."); } j++; } @@ -85,7 +86,7 @@ static void unpack_metric_from_db(void *ml, json_object *metricJ) "name", &m_list->serie.name, "columns", &columnsJ, "values", &valuesJ)) { - AFB_ERROR("Unpacking metric goes wrong"); + AFB_ApiError(m_list->api, "Unpacking metric goes wrong"); return; } @@ -93,7 +94,7 @@ static void unpack_metric_from_db(void *ml, json_object *metricJ) wrap_json_array_for_all(valuesJ, fill_n_send_values, m_list); } -static json_object *unpack_series(json_object *seriesJ) +static json_object *unpack_series(AFB_ApiT apiHandle, json_object *seriesJ) { struct metrics_list m_list = { .serie = { @@ -104,7 +105,8 @@ static json_object *unpack_series(json_object *seriesJ) }, .timestamp = 0 }, - .metricsJ = json_object_new_array() + .metricsJ = json_object_new_array(), + .api = apiHandle }; wrap_json_array_for_all(seriesJ, unpack_metric_from_db, (void*)&m_list); @@ -112,7 +114,7 @@ static json_object *unpack_series(json_object *seriesJ) return m_list.metricsJ; } -static void forward_to_garner(const char *result, size_t size) +static void forward_to_garner(AFB_ApiT apiHandle, const char *result, size_t size) { int id = 0; json_object *resultsJ = NULL, @@ -131,39 +133,42 @@ static void forward_to_garner(const char *result, size_t size) } if(seriesJ) { - metrics2send = unpack_series(seriesJ); + metrics2send = unpack_series(apiHandle, seriesJ); if(json_object_array_length(metrics2send)) { - if(afb_service_call_sync("garner", "write", metrics2send, &call_resultJ)) { - AFB_ERROR("Metrics were sent but not done, an error happens. Details: %s", json_object_to_json_string(call_resultJ)); + if(AFB_ServiceSync(apiHandle, "garner", "write", metrics2send, &call_resultJ)) { + AFB_ApiError(apiHandle, "Metrics were sent but not done, an error happens. Details: %s", json_object_to_json_string(call_resultJ)); } } } else { - AFB_ERROR("Empty response. Request results was:\n%s", result); + AFB_ApiError(apiHandle, "Empty response. Request results was:\n%s", result); } } static void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size) { + if(!closure) + return; + AFB_ApiT apiHandle = (AFB_ApiT)closure; long rep_code = curl_wrap_response_code_get(curl); switch(rep_code) { case 200: - AFB_DEBUG("Read correctly done"); - forward_to_garner(result, size); + AFB_ApiDebug(apiHandle, "Read correctly done"); + forward_to_garner(apiHandle, result, size); break; case 400: - AFB_ERROR("Unacceptable request. %s", result); + AFB_ApiError(apiHandle, "Unacceptable request. %s", result); break; case 401: - AFB_ERROR("Invalid authentication. %s", result); + AFB_ApiError(apiHandle, "Invalid authentication. %s", result); break; default: - AFB_ERROR("Unexptected behavior. %s", result); + AFB_ApiError(apiHandle, "Unexptected behavior. %s", result); break; } } -static CURL *make_curl_query_get(const char *url) +static CURL *make_curl_query_get(AFB_ApiT apiHandle, const char *url) { CURL *curl; char *args[5]; @@ -179,7 +184,7 @@ static CURL *make_curl_query_get(const char *url) args[4] = NULL; length_now = asprintf(&now, "%lu", get_ts()); - int rootdir_fd = afb_daemon_rootdir_get_fd(); + int rootdir_fd = AFB_RootDirGetFD(apiHandle); int fd_last_read = openat(rootdir_fd, "last_db_read", O_CREAT | O_RDWR, S_IRWXU); if (fd_last_read < 0) return NULL; @@ -188,7 +193,7 @@ static CURL *make_curl_query_get(const char *url) else write the last timestamp */ if(read(fd_last_read, last_ts, sizeof(last_ts)) == 0) { if(write(fd_last_read, now, length_now) != length_now) - AFB_ERROR("Error writing last_db_read file: %s\n", strerror( errno )); + AFB_ApiError(apiHandle, "Error writing last_db_read file: %s\n", strerror( errno )); } else { strcat(query, " WHERE time >= "); @@ -196,7 +201,7 @@ static CURL *make_curl_query_get(const char *url) close(fd_last_read); fd_last_read = openat(rootdir_fd, "last_db_read", O_TRUNC | O_RDWR); if (write(fd_last_read, now, length_now) != length_now) - AFB_ERROR("Error writing last_db_read file: %s", strerror( errno )); + AFB_ApiError(apiHandle, "Error writing last_db_read file: %s", strerror( errno )); } args[3] = query; @@ -208,33 +213,37 @@ static CURL *make_curl_query_get(const char *url) static int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata) { CURL *curl; - struct reader_args *a = NULL; - if(userdata) - a = (struct reader_args*)userdata; - else + struct reader_args *r_args = NULL; + CtlSourceT *source = NULL; + if(userdata) { + source = (CtlSourceT*)userdata; + r_args = (struct reader_args*)source->context; + } + else { return ERROR; + } char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */ - make_url(url, sizeof(url), a->host, a->port, "query"); - curl = make_curl_query_get(url); - curl_wrap_do(curl, influxdb_read_curl_cb, NULL); + make_url(url, sizeof(url), r_args->host, r_args->port, "query"); + curl = make_curl_query_get(source->api, url); + curl_wrap_do(curl, influxdb_read_curl_cb, (void*)source->api); /* Reschedule next run */ - sd_event_source_set_time(s, usec + a->delay); + sd_event_source_set_time(s, usec + r_args->delay); return 0; } -int influxdb_reader(void *args) +CTLP_CAPI(read_from_influxdb, source, argsJ, eventJ) { int err = 0; uint64_t usec; struct sd_event_source *evtSource = NULL; /* Set a cyclic cb call each 1s to call the read callback */ - sd_event_now(afb_daemon_get_event_loop(), CLOCK_MONOTONIC, &usec); - err = sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args); + sd_event_now(AFB_GetEventLoop(source->api), CLOCK_MONOTONIC, &usec); + err = sd_event_add_time(AFB_GetEventLoop(source->api), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, (void*)source); if(!err) err = sd_event_source_set_enabled(evtSource, SD_EVENT_ON); -- cgit 1.2.3-korg