summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorRomain Forlot <romain.forlot@iot.bzh>2018-05-16 00:58:28 +0200
committerSebastien Douheret <sebastien.douheret@iot.bzh>2018-07-10 23:41:14 +0200
commitfef008b549c9c6d6ee1e564312787c3d3f14f0c0 (patch)
tree43ab4879055ef9e4198705be7cca6fe30177593f /src
parentdaf148db51e3abe2d7cfbeb7224124f64b8fc4e3 (diff)
Convert the binding to use the controller
Ease Time series DB abstraction layer by using Dyn API that implemente the API defined by the JSON schema. Change-Id: I67de4fbca10048201fdd2da683732a5f4f5b5368 Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt9
-rw-r--r--src/harvester-binding.c156
-rw-r--r--src/harvester-binding.h (renamed from src/harvester.h)29
-rw-r--r--src/harvester.c97
-rw-r--r--src/plugins/CMakeLists.txt44
-rw-r--r--src/plugins/influxdb-reader.c67
-rw-r--r--src/plugins/influxdb-writer.c93
-rw-r--r--src/plugins/influxdb.c56
-rw-r--r--src/plugins/influxdb.h3
-rw-r--r--src/plugins/tsdb.c25
-rw-r--r--src/plugins/tsdb.h11
11 files changed, 368 insertions, 222 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 7c8c2b4..a016c14 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -22,12 +22,7 @@ PROJECT_TARGET_ADD(harvester)
# Define project Targets
add_library(${TARGET_NAME} MODULE
- ${TARGET_NAME}.c
- plugins/tsdb.c
- plugins/influxdb.c
- plugins/influxdb-writer.c
- plugins/influxdb-reader.c
- utils/list.c
+ ${TARGET_NAME}-binding.c
)
set(OPENAPI_DEF "harvester-apidef" CACHE STRING "name and path to the JSON API definition without extension")
@@ -43,5 +38,7 @@ PROJECT_TARGET_ADD(harvester)
# Library dependencies (include updates automatically)
TARGET_LINK_LIBRARIES(${TARGET_NAME}
afb-helpers
+ ctl-utilities
${link_libraries})
+add_subdirectory("plugins")
diff --git a/src/harvester-binding.c b/src/harvester-binding.c
new file mode 100644
index 0000000..c5d0602
--- /dev/null
+++ b/src/harvester-binding.c
@@ -0,0 +1,156 @@
+/*
+* Copyright (C) 2016 "IoT.bzh"
+* Author Fulup Ar Foll <fulup@iot.bzh>
+* Author Romain Forlot <romain@iot.bzh>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+
+#include "harvester-binding.h"
+
+// default api to print log when apihandle not avaliable
+afb_dynapi *AFB_default;
+
+// Config Section definition (note: controls section index should match handle
+// retrieval in HalConfigExec)
+static CtlSectionT ctrlSections[] = {
+ {.key = "plugins" , .loadCB = PluginConfig},
+ {.key = "onload" , .loadCB = OnloadConfig},
+ {.key = "controls", .loadCB = ControlConfig},
+ {.key = "events" , .loadCB = EventConfig},
+ {.key = NULL}
+};
+
+static void ctrlapi_ping(AFB_ReqT request) {
+ static int count = 0;
+
+ count++;
+ AFB_ReqNotice(request, "Controller:ping count=%d", count);
+ AFB_ReqSucess(request, json_object_new_int(count), NULL);
+
+ return;
+}
+
+void ctrlapi_auth(AFB_ReqT request)
+{
+ AFB_ReqSetLOA(request, 1);
+ AFB_ReqSucess(request, NULL, NULL);
+}
+
+static AFB_ApiVerbs CtrlApiVerbs[] = {
+ /* VERB'S NAME FUNCTION TO CALL SHORT DESCRIPTION */
+ {.verb = "ping", .callback = ctrlapi_ping, .info = "ping test for API"},
+ {.verb = "auth", .callback = ctrlapi_auth, .info = "Authenticate session to raise Level Of Assurance of the session"},
+ {.verb = NULL} /* marker for end of the array */
+};
+
+static int CtrlLoadStaticVerbs(afb_dynapi *apiHandle, AFB_ApiVerbs *verbs) {
+ int errcount = 0;
+
+ for (int idx = 0; verbs[idx].verb; idx++) {
+ errcount += afb_dynapi_add_verb(
+ apiHandle, CtrlApiVerbs[idx].verb, NULL, CtrlApiVerbs[idx].callback,
+ (void *)&CtrlApiVerbs[idx], CtrlApiVerbs[idx].auth, 0);
+ }
+
+ return errcount;
+};
+
+static int CtrlInitOneApi(AFB_ApiT apiHandle) {
+ int err = 0;
+ AFB_default = apiHandle; // hugely hack to make all V2 AFB_DEBUG to work in fileutils
+
+ // retrieve section config from api handle
+ CtlConfigT *ctrlConfig = (CtlConfigT *)afb_dynapi_get_userdata(apiHandle);
+ err = CtlConfigExec(apiHandle, ctrlConfig);
+ if(err) {
+ AFB_ApiError(apiHandle, "Error at CtlConfigExec step");
+ return err;
+ }
+
+ return err;
+}
+
+// next generation dynamic API-V3 mode
+#include <signal.h>
+
+static int CtrlLoadOneApi(void *cbdata, AFB_ApiT apiHandle) {
+ CtlConfigT *ctrlConfig = (CtlConfigT *)cbdata;
+
+ // save closure as api's data context
+ afb_dynapi_set_userdata(apiHandle, ctrlConfig);
+
+ // add static controls verbs
+ int err = CtrlLoadStaticVerbs(apiHandle, CtrlApiVerbs);
+ if (err) {
+ AFB_ApiError(apiHandle, "CtrlLoadSection fail to register static V2 verbs");
+ return ERROR;
+ }
+
+ // load section for corresponding API
+ err = CtlLoadSections(apiHandle, ctrlConfig, ctrlSections);
+
+ // declare an event event manager for this API;
+ afb_dynapi_on_event(apiHandle, CtrlDispatchApiEvent);
+
+ // init API function (does not receive user closure ???
+ afb_dynapi_on_init(apiHandle, CtrlInitOneApi);
+
+ afb_dynapi_seal(apiHandle);
+ return err;
+}
+
+int afbBindingVdyn(afb_dynapi *apiHandle) {
+
+ AFB_default = apiHandle;
+ AFB_ApiNotice(apiHandle, "Controller in afbBindingVdyn");
+
+ const char *dirList = getenv("CONTROL_CONFIG_PATH");
+ if (!dirList)
+ dirList = CONTROL_CONFIG_PATH;
+
+ const char *configPath = CtlConfigSearch(apiHandle, dirList, "");
+ if (!configPath) {
+ AFB_ApiError(apiHandle, "CtlPreInit: No %s* config found in %s ", GetBinderName(), dirList);
+ return ERROR;
+ }
+
+ // load config file and create API
+ CtlConfigT *ctrlConfig = CtlLoadMetaData(apiHandle, configPath);
+ if (!ctrlConfig) {
+ AFB_ApiError(apiHandle,
+ "CtrlBindingDyn No valid control config file in:\n-- %s",
+ configPath);
+ return ERROR;
+ }
+
+ if (!ctrlConfig->api) {
+ AFB_ApiError(apiHandle,
+ "CtrlBindingDyn API Missing from metadata in:\n-- %s",
+ configPath);
+ return ERROR;
+ }
+
+ AFB_ApiNotice(apiHandle, "Controller API='%s' info='%s'", ctrlConfig->api,
+ ctrlConfig->info);
+
+ // create one API per config file (Pre-V3 return code ToBeChanged)
+ int status = afb_dynapi_new_api(apiHandle, ctrlConfig->api, ctrlConfig->info, 1, CtrlLoadOneApi, ctrlConfig);
+
+ return status;
+}
diff --git a/src/harvester.h b/src/harvester-binding.h
index b11fffe..7ac00a0 100644
--- a/src/harvester.h
+++ b/src/harvester-binding.h
@@ -1,12 +1,13 @@
/*
- * Copyright (C) 2018 "IoT.bzh"
- * Author "Romain Forlot" <romain.forlot@iot.bzh>
+ * Copyright (C) 2016 "IoT.bzh"
+ * Author Fulup Ar Foll <fulup@iot.bzh>
+ * Author Romain Forlot <romain@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,20 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef _HARVESTER_H_
-#define _HARVESTER_H_
-#define AFB_BINDING_VERSION 2
-#include <afb/afb-binding.h>
-enum metric_type {b = 0, i, d, str} type;
-union metric_value {
- int b_value;
- int i_value;
- double d_value;
- char *str_value;
-};
+#ifndef _CTL_BINDING_INCLUDE_
+#define _CTL_BINDING_INCLUDE_
-int init();
+#include <stdio.h>
+#include <ctl-config.h>
+#include <filescan-utils.h>
+#include <wrap-json.h>
+#ifndef ERROR
+ #define ERROR -1
#endif
+
+#endif /* _CTL_BINDING_INCLUDE_ */
diff --git a/src/harvester.c b/src/harvester.c
deleted file mode 100644
index 1d4d9a6..0000000
--- a/src/harvester.c
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (C) 2018 "IoT.bzh"
- * Author "Romain Forlot" <romain.forlot@iot.bzh>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define _GNU_SOURCE
-#include "harvester.h"
-#include "harvester-apidef.h"
-
-#include "plugins/tsdb.h"
-#include "curl-wrap.h"
-#include "wrap-json.h"
-
-CURL* (*tsdb_write)(const char* host, const char *port, json_object *metric);
-void (*write_curl_cb)(void *closure, int status, CURL *curl, const char *result, size_t size);
-
-struct reader_args r_args = {NULL, NULL, 1000000};
-
-int do_write(struct afb_req req, const char* host, const char *port, json_object *metric)
-{
- CURL *curl_request;
-
- curl_request = tsdb_write(host, port, metric);
- curl_wrap_do(curl_request, write_curl_cb, &req);
-
- return 0;
-}
-
-void afv_write(struct afb_req req)
-{
- const char *port = NULL;
- const char *host = NULL;
-
- json_object *req_args = afb_req_json(req),
- *portJ = NULL,
- *metric = NULL;
-
- if(wrap_json_unpack(req_args, "{s?s,s?o,so!}",
- "host", &host,
- "port", &portJ,
- "metric", &metric) || ! metric)
- afb_req_fail(req, "Failed", "Error processing arguments. Miss metric\
-JSON object or malformed");
- else {
- port = json_object_is_type(portJ, json_type_null) ?
- NULL : json_object_to_json_string(portJ);
- if(do_write(req, host, port, metric))
- afb_req_fail(req, "Failed", "Error processing metric JSON object.\
-Malformed !");
- }
-}
-
-void afv_auth(struct afb_req request)
-{
- afb_req_session_set_LOA(request, 1);
- afb_req_success(request, NULL, NULL);
-}
-
-int init()
-{
- int tsdb_available = 0;
-
- if(curl_global_init(CURL_GLOBAL_DEFAULT) != 0) {
- AFB_ERROR("Something went wrong initiliazing libcurl. Abort");
- return ERROR;
- }
-
- tsdb_available = db_ping();
- switch (tsdb_available) {
- case INFLUX:
- tsdb_write = influxdb_write;
- write_curl_cb = influxdb_write_curl_cb;
- /*if(influxdb_reader(&r_args) != 0) {
- AFB_ERROR("Problem initiating reader timer. Abort");
- return ERROR;
- }*/
- break;
- default:
- AFB_ERROR("No Time Series Database found. Abort");
- return ERROR;
- break;
- }
- /* TODO:*/
- return 0;
-}
diff --git a/src/plugins/CMakeLists.txt b/src/plugins/CMakeLists.txt
new file mode 100644
index 0000000..e366685
--- /dev/null
+++ b/src/plugins/CMakeLists.txt
@@ -0,0 +1,44 @@
+###########################################################################
+# Copyright 2015, 2016, 2017 IoT.bzh
+#
+# author: Romain Forlot <romain.forlot@iot.bzh>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###########################################################################
+
+PROJECT_TARGET_ADD(influxdb)
+
+ # Define targets
+ ADD_LIBRARY(${TARGET_NAME} MODULE
+ ${TARGET_NAME}.c
+ ${TARGET_NAME}-writer.c
+ ${TARGET_NAME}-reader.c
+ tsdb.c
+ ../utils/list.c)
+
+ # Alsa Plugin properties
+ SET_TARGET_PROPERTIES(${TARGET_NAME} PROPERTIES
+ LABELS "PLUGIN"
+ PREFIX ""
+ SUFFIX ".ctlso"
+ OUTPUT_NAME ${TARGET_NAME}
+ )
+
+ # Library dependencies (include updates automatically)
+ TARGET_LINK_LIBRARIES(${TARGET_NAME}
+ afb-helpers
+ ${link_libraries}
+ )
+
+ target_include_directories(${TARGET_NAME}
+ PRIVATE "${CMAKE_SOURCE_DIR}/app-controller-submodule/ctl-lib")
diff --git a/src/plugins/influxdb-reader.c b/src/plugins/influxdb-reader.c
index a1fa3a0..052a3da 100644
--- a/src/plugins/influxdb-reader.c
+++ b/src/plugins/influxdb-reader.c
@@ -29,6 +29,7 @@
struct metrics_list {
struct series_t serie;
json_object *metricsJ;
+ AFB_ApiT api;
};
static void fill_n_send_values(void *c, json_object *valuesJ)
@@ -44,7 +45,7 @@ static void fill_n_send_values(void *c, json_object *valuesJ)
else {
if(set_value(m_list->serie.serie_columns.tags, valuesJ, i)) {
if(set_value(m_list->serie.serie_columns.fields, valuesJ, j)) {
- AFB_ERROR("No tags nor fields fits.");
+ AFB_ApiError(m_list->api, "No tags nor fields fits.");
}
j++;
}
@@ -85,7 +86,7 @@ static void unpack_metric_from_db(void *ml, json_object *metricJ)
"name", &m_list->serie.name,
"columns", &columnsJ,
"values", &valuesJ)) {
- AFB_ERROR("Unpacking metric goes wrong");
+ AFB_ApiError(m_list->api, "Unpacking metric goes wrong");
return;
}
@@ -93,7 +94,7 @@ static void unpack_metric_from_db(void *ml, json_object *metricJ)
wrap_json_array_for_all(valuesJ, fill_n_send_values, m_list);
}
-static json_object *unpack_series(json_object *seriesJ)
+static json_object *unpack_series(AFB_ApiT apiHandle, json_object *seriesJ)
{
struct metrics_list m_list = {
.serie = {
@@ -104,7 +105,8 @@ static json_object *unpack_series(json_object *seriesJ)
},
.timestamp = 0
},
- .metricsJ = json_object_new_array()
+ .metricsJ = json_object_new_array(),
+ .api = apiHandle
};
wrap_json_array_for_all(seriesJ, unpack_metric_from_db, (void*)&m_list);
@@ -112,7 +114,7 @@ static json_object *unpack_series(json_object *seriesJ)
return m_list.metricsJ;
}
-static void forward_to_garner(const char *result, size_t size)
+static void forward_to_garner(AFB_ApiT apiHandle, const char *result, size_t size)
{
int id = 0;
json_object *resultsJ = NULL,
@@ -131,39 +133,42 @@ static void forward_to_garner(const char *result, size_t size)
}
if(seriesJ) {
- metrics2send = unpack_series(seriesJ);
+ metrics2send = unpack_series(apiHandle, seriesJ);
if(json_object_array_length(metrics2send)) {
- if(afb_service_call_sync("garner", "write", metrics2send, &call_resultJ)) {
- AFB_ERROR("Metrics were sent but not done, an error happens. Details: %s", json_object_to_json_string(call_resultJ));
+ if(AFB_ServiceSync(apiHandle, "garner", "write", metrics2send, &call_resultJ)) {
+ AFB_ApiError(apiHandle, "Metrics were sent but not done, an error happens. Details: %s", json_object_to_json_string(call_resultJ));
}
}
}
else {
- AFB_ERROR("Empty response. Request results was:\n%s", result);
+ AFB_ApiError(apiHandle, "Empty response. Request results was:\n%s", result);
}
}
static void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
{
+ if(!closure)
+ return;
+ AFB_ApiT apiHandle = (AFB_ApiT)closure;
long rep_code = curl_wrap_response_code_get(curl);
switch(rep_code) {
case 200:
- AFB_DEBUG("Read correctly done");
- forward_to_garner(result, size);
+ AFB_ApiDebug(apiHandle, "Read correctly done");
+ forward_to_garner(apiHandle, result, size);
break;
case 400:
- AFB_ERROR("Unacceptable request. %s", result);
+ AFB_ApiError(apiHandle, "Unacceptable request. %s", result);
break;
case 401:
- AFB_ERROR("Invalid authentication. %s", result);
+ AFB_ApiError(apiHandle, "Invalid authentication. %s", result);
break;
default:
- AFB_ERROR("Unexptected behavior. %s", result);
+ AFB_ApiError(apiHandle, "Unexptected behavior. %s", result);
break;
}
}
-static CURL *make_curl_query_get(const char *url)
+static CURL *make_curl_query_get(AFB_ApiT apiHandle, const char *url)
{
CURL *curl;
char *args[5];
@@ -179,7 +184,7 @@ static CURL *make_curl_query_get(const char *url)
args[4] = NULL;
length_now = asprintf(&now, "%lu", get_ts());
- int rootdir_fd = afb_daemon_rootdir_get_fd();
+ int rootdir_fd = AFB_RootDirGetFD(apiHandle);
int fd_last_read = openat(rootdir_fd, "last_db_read", O_CREAT | O_RDWR, S_IRWXU);
if (fd_last_read < 0)
return NULL;
@@ -188,7 +193,7 @@ static CURL *make_curl_query_get(const char *url)
else write the last timestamp */
if(read(fd_last_read, last_ts, sizeof(last_ts)) == 0) {
if(write(fd_last_read, now, length_now) != length_now)
- AFB_ERROR("Error writing last_db_read file: %s\n", strerror( errno ));
+ AFB_ApiError(apiHandle, "Error writing last_db_read file: %s\n", strerror( errno ));
}
else {
strcat(query, " WHERE time >= ");
@@ -196,7 +201,7 @@ static CURL *make_curl_query_get(const char *url)
close(fd_last_read);
fd_last_read = openat(rootdir_fd, "last_db_read", O_TRUNC | O_RDWR);
if (write(fd_last_read, now, length_now) != length_now)
- AFB_ERROR("Error writing last_db_read file: %s", strerror( errno ));
+ AFB_ApiError(apiHandle, "Error writing last_db_read file: %s", strerror( errno ));
}
args[3] = query;
@@ -208,33 +213,37 @@ static CURL *make_curl_query_get(const char *url)
static int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
{
CURL *curl;
- struct reader_args *a = NULL;
- if(userdata)
- a = (struct reader_args*)userdata;
- else
+ struct reader_args *r_args = NULL;
+ CtlSourceT *source = NULL;
+ if(userdata) {
+ source = (CtlSourceT*)userdata;
+ r_args = (struct reader_args*)source->context;
+ }
+ else {
return ERROR;
+ }
char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
- make_url(url, sizeof(url), a->host, a->port, "query");
- curl = make_curl_query_get(url);
- curl_wrap_do(curl, influxdb_read_curl_cb, NULL);
+ make_url(url, sizeof(url), r_args->host, r_args->port, "query");
+ curl = make_curl_query_get(source->api, url);
+ curl_wrap_do(curl, influxdb_read_curl_cb, (void*)source->api);
/* Reschedule next run */
- sd_event_source_set_time(s, usec + a->delay);
+ sd_event_source_set_time(s, usec + r_args->delay);
return 0;
}
-int influxdb_reader(void *args)
+CTLP_CAPI(read_from_influxdb, source, argsJ, eventJ)
{
int err = 0;
uint64_t usec;
struct sd_event_source *evtSource = NULL;
/* Set a cyclic cb call each 1s to call the read callback */
- sd_event_now(afb_daemon_get_event_loop(), CLOCK_MONOTONIC, &usec);
- err = sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args);
+ sd_event_now(AFB_GetEventLoop(source->api), CLOCK_MONOTONIC, &usec);
+ err = sd_event_add_time(AFB_GetEventLoop(source->api), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, (void*)source);
if(!err)
err = sd_event_source_set_enabled(evtSource, SD_EVENT_ON);
diff --git a/src/plugins/influxdb-writer.c b/src/plugins/influxdb-writer.c
index c21f4fd..677bfd0 100644
--- a/src/plugins/influxdb-writer.c
+++ b/src/plugins/influxdb-writer.c
@@ -21,6 +21,35 @@
#include "influxdb.h"
+void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
+{
+ AFB_ReqT request = (AFB_ReqT)closure;
+ long rep_code = curl_wrap_response_code_get(curl);
+ switch(rep_code) {
+ case 204:
+ AFB_ReqDebug(request, "Request correctly written");
+ AFB_ReqSucess(request, NULL, "Request has been successfully writen");
+ break;
+ case 400:
+ AFB_ReqFail(request, "Bad request", result);
+ break;
+ case 401:
+ AFB_ReqFail(request, "Unauthorized access", result);
+ break;
+ case 404:
+ AFB_ReqFail(request, "Not found", result);
+ AFB_ReqNotice(request, "Attempt to create the DB '"DEFAULT_DB"'");
+ create_database(request);
+ break;
+ case 500:
+ AFB_ReqFailF(request, "Timeout", "Overloaded server: %s", result);
+ break;
+ default:
+ AFB_ReqFail(request, "Failure", "Unexpected behavior.");
+ break;
+ }
+}
+
static size_t format_write_args(char *query, struct series_t *serie)
{
char *ts;
@@ -59,36 +88,7 @@ static size_t format_write_args(char *query, struct series_t *serie)
return strlen(query);
}
-void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
-{
- struct afb_req *req = (struct afb_req*)closure;
- long rep_code = curl_wrap_response_code_get(curl);
- switch(rep_code) {
- case 204:
- AFB_DEBUG("Request correctly written");
- afb_req_success(*req, NULL, "Request has been successfully writen");
- break;
- case 400:
- afb_req_fail(*req, "Bad request", result);
- break;
- case 401:
- afb_req_fail(*req, "Unauthorized access", result);
- break;
- case 404:
- afb_req_fail(*req, "Not found", result);
- AFB_NOTICE("Attempt to create the DB '"DEFAULT_DB"'");
- create_database();
- break;
- case 500:
- afb_req_fail_f(*req, "Timeout", "Overloaded server: %s", result);
- break;
- default:
- afb_req_fail(*req, "Failure", "Unexpected behavior.");
- break;
- }
-}
-
-CURL *make_curl_write_post(const char *url, json_object *metricsJ)
+CURL *make_curl_write_post(AFB_ApiT apiHandle, const char *url, json_object *metricsJ)
{
CURL *curl = NULL;
size_t lpd = 0, len_write = 0, i = 0;
@@ -114,7 +114,7 @@ CURL *make_curl_write_post(const char *url, json_object *metricsJ)
memset(serie, 0, sizeof(struct series_t));
if(unpack_metric_from_api(json_object_array_get_idx(metricsArrayJ, i), serie)) {
- AFB_ERROR("ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ));
+ AFB_ApiError(apiHandle, "ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ));
break;
}
else {
@@ -144,9 +144,36 @@ CURL *make_curl_write_post(const char *url, json_object *metricsJ)
return curl;
}
-CURL *influxdb_write(const char* host, const char *port, json_object *metricJ)
+CURL *influxdb_write(AFB_ApiT apiHandle, const char* host, const char *port, json_object *metricJ)
{
char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
make_url(url, sizeof(url), host, port, "write");
- return make_curl_write_post(url, metricJ);
+ return make_curl_write_post(apiHandle, url, metricJ);
+}
+
+CTLP_CAPI(write_to_influxdb, source, argsJ, eventJ)
+{
+ AFB_ReqT request = source->request;
+ const char *port = NULL;
+ const char *host = NULL;
+ CURL *curl_request;
+
+ json_object *req_args = AFB_ReqJson(request),
+ *portJ = NULL,
+ *metric = NULL;
+
+ if(wrap_json_unpack(req_args, "{s?s,s?o,so!}",
+ "host", &host,
+ "port", &portJ,
+ "metric", &metric) || ! metric)
+ AFB_ReqFail(request, "Failed", "Error processing arguments. Miss metric\
+JSON object or malformed");
+ else
+ port = json_object_is_type(portJ, json_type_null) ?
+ NULL : json_object_to_json_string(portJ);
+
+ curl_request = influxdb_write(source->api, host, port, metric);
+ curl_wrap_do(curl_request, influxdb_write_curl_cb, request);
+
+ return 0;
}
diff --git a/src/plugins/influxdb.c b/src/plugins/influxdb.c
index a98bd2b..3c94a1b 100644
--- a/src/plugins/influxdb.c
+++ b/src/plugins/influxdb.c
@@ -26,6 +26,50 @@
#include "wrap-json.h"
#include "../utils/list.h"
+CTLP_CAPI_REGISTER("influxdb");
+
+CTLP_ONLOAD(plugin, ret)
+{
+ int err = 0;
+ char *result;
+ size_t result_size;
+ CURL *request = curl_wrap_prepare_get("localhost:"DEFAULT_DBPORT"/ping",NULL, NULL);
+
+ struct reader_args r_args = {NULL, NULL, 1000000};
+ plugin->context = (void*)&r_args;
+
+ curl_wrap_perform(request, &result, &result_size);
+
+ if(curl_wrap_response_code_get(request) != 204) {
+ AFB_ApiError(plugin->api, "InfluxDB not reachable, please start it");
+ err = ERROR;
+ }
+
+ curl_easy_cleanup(request);
+ return err;
+}
+
+CTLP_CAPI(influxdb_ping, source, argsJ, eventJ)
+{
+ int ret = 0;
+ char *result;
+ size_t result_size;
+
+ CURL *curl_req = curl_wrap_prepare_get("localhost:"DEFAULT_DBPORT"/ping",NULL, NULL);
+
+ curl_wrap_perform(curl_req, &result, &result_size);
+
+ if(curl_wrap_response_code_get(curl_req) != 204) {
+ AFB_ApiError(source->api, "InfluxDB is offline.");
+ ret = ERROR;
+ }
+
+ curl_easy_cleanup(curl_req);
+
+ AFB_ApiNotice(source->api, "InfluxDB is up and running.");
+ return ret;
+}
+
void concatenate(char* dest, const char* source, const char *sep)
{
strncat(dest, sep, strlen(sep));
@@ -51,7 +95,7 @@ size_t make_url(char *url, size_t l_url, const char *host, const char *port, con
}
-int create_database()
+int create_database(AFB_ReqT request)
{
int ret = 0;
char *result;
@@ -62,18 +106,18 @@ int create_database()
post_data[0] = "q=CREATE DATABASE \""DEFAULT_DB"\"";
post_data[1] = NULL;
- CURL *request = curl_wrap_prepare_post_unescaped("localhost:"DEFAULT_DBPORT"/query",NULL, " ", post_data);
- curl_wrap_perform(request, &result, &result_size);
+ CURL *curl_req = curl_wrap_prepare_post_unescaped("localhost:"DEFAULT_DBPORT"/query",NULL, " ", post_data);
+ curl_wrap_perform(curl_req, &result, &result_size);
if(curl_wrap_response_code_get(request) != 200) {
- AFB_ERROR("Can't create database.");
+ AFB_ReqError(request, "Can't create database.");
ret = ERROR;
}
- curl_easy_cleanup(request);
+ curl_easy_cleanup(curl_req);
if(ret == 0)
- AFB_NOTICE("Database '"DEFAULT_DB"' created");
+ AFB_ReqNotice(request, "Database '"DEFAULT_DB"' created");
return ret;
}
diff --git a/src/plugins/influxdb.h b/src/plugins/influxdb.h
index ae40e77..ed05f97 100644
--- a/src/plugins/influxdb.h
+++ b/src/plugins/influxdb.h
@@ -19,6 +19,7 @@
#define _INFLUXDB_H_
#define _GNU_SOURCE
+#include "ctl-plugin.h"
#include "wrap-json.h"
#include "tsdb.h"
#include "../utils/list.h"
@@ -34,7 +35,7 @@ struct series_t {
uint64_t timestamp;
};
-int create_database();
+int create_database(AFB_ReqT request);
int unpack_metric_from_api(json_object *m, struct series_t *serie);
diff --git a/src/plugins/tsdb.c b/src/plugins/tsdb.c
index 7e10a74..27ae067 100644
--- a/src/plugins/tsdb.c
+++ b/src/plugins/tsdb.c
@@ -18,31 +18,6 @@
#include <time.h>
#include "tsdb.h"
-int influxdb_ping()
-{
- int ret = 0;
- char *result;
- size_t result_size;
- CURL *request = curl_wrap_prepare_get("localhost:"DEFAULT_DBPORT"/ping",NULL, NULL);
- curl_wrap_perform(request, &result, &result_size);
-
- if(curl_wrap_response_code_get(request) != 204) {
- AFB_ERROR("TimeSeries DB not reachable");
- ret = ERROR;
- }
-
- curl_easy_cleanup(request);
- return ret;
-}
-
-int db_ping()
-{
- int ret = 0;
- if(influxdb_ping() == 0) ret = INFLUX;
-
- return ret;
-}
-
uint64_t get_ts()
{
struct timespec ts;
diff --git a/src/plugins/tsdb.h b/src/plugins/tsdb.h
index 437c5dc..8469ac0 100644
--- a/src/plugins/tsdb.h
+++ b/src/plugins/tsdb.h
@@ -18,9 +18,6 @@
#ifndef _TSDB_H
#define _TSDB_H
-#define AFB_BINDING_VERSION 2
-#include <afb/afb-binding.h>
-
#include <json-c/json.h>
#include "curl-wrap.h"
@@ -32,6 +29,7 @@
#define URL_MAXIMUM_LENGTH 2047
enum db_available {
+ NODB = 0,
INFLUX = 1,
GRAPHITE = 2,
OPENTSDB = 4
@@ -43,13 +41,6 @@ struct reader_args {
u_int32_t delay;
};
-CURL *influxdb_write(const char* host, const char *port, json_object *metric);
-void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size);
-
-int influxdb_reader(void *args);
-
-int db_ping();
-
u_int64_t get_ts();
#endif