aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRomain Forlot <romain.forlot@iot.bzh>2018-05-16 00:58:28 +0200
committerSebastien Douheret <sebastien.douheret@iot.bzh>2018-07-10 23:41:14 +0200
commitfef008b549c9c6d6ee1e564312787c3d3f14f0c0 (patch)
tree43ab4879055ef9e4198705be7cca6fe30177593f
parentdaf148db51e3abe2d7cfbeb7224124f64b8fc4e3 (diff)
Convert the binding to use the controller
Ease Time series DB abstraction layer by using Dyn API that implemente the API defined by the JSON schema. Change-Id: I67de4fbca10048201fdd2da683732a5f4f5b5368 Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
m---------afb-helpers0
m---------app-controller-submodule0
-rw-r--r--conf.d/CMakeLists.txt20
-rw-r--r--conf.d/cmake/00-debian-osconfig.cmake1
-rw-r--r--conf.d/cmake/00-default-osconfig.cmake1
-rw-r--r--conf.d/cmake/config.cmake8
-rw-r--r--conf.d/project/CMakeLists.txt20
-rw-r--r--conf.d/project/etc/CMakeLists.txt31
-rw-r--r--conf.d/project/etc/harvester-config.json36
-rw-r--r--src/CMakeLists.txt9
-rw-r--r--src/harvester-binding.c156
-rw-r--r--src/harvester-binding.h (renamed from src/harvester.h)29
-rw-r--r--src/harvester.c97
-rw-r--r--src/plugins/CMakeLists.txt44
-rw-r--r--src/plugins/influxdb-reader.c67
-rw-r--r--src/plugins/influxdb-writer.c93
-rw-r--r--src/plugins/influxdb.c56
-rw-r--r--src/plugins/influxdb.h3
-rw-r--r--src/plugins/tsdb.c25
-rw-r--r--src/plugins/tsdb.h11
20 files changed, 484 insertions, 223 deletions
diff --git a/afb-helpers b/afb-helpers
-Subproject 560f9a944f489c5b4c2e5baf74b7b51b3e352ad
+Subproject bb225251d33fff921cc74df049864c0e4df4a7b
diff --git a/app-controller-submodule b/app-controller-submodule
-Subproject e8b0b3c3cc9b0453b88d1315822fda5963754ae
+Subproject 868420021d3a955ae6f01fed414edbd6d4c0ff9
diff --git a/conf.d/CMakeLists.txt b/conf.d/CMakeLists.txt
new file mode 100644
index 0000000..3beb009
--- /dev/null
+++ b/conf.d/CMakeLists.txt
@@ -0,0 +1,20 @@
+###########################################################################
+# Copyright 2015, 2016, 2017 IoT.bzh
+#
+# author: Fulup Ar Foll <rfulup@iot.bzh>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###########################################################################
+
+# This component should be included as a submodule and wont compile as standalone
+project_subdirs_add()
diff --git a/conf.d/cmake/00-debian-osconfig.cmake b/conf.d/cmake/00-debian-osconfig.cmake
new file mode 100644
index 0000000..2ce0ad3
--- /dev/null
+++ b/conf.d/cmake/00-debian-osconfig.cmake
@@ -0,0 +1 @@
+list(APPEND PKG_REQUIRED_LIST lua-5.3>=5.3)
diff --git a/conf.d/cmake/00-default-osconfig.cmake b/conf.d/cmake/00-default-osconfig.cmake
new file mode 100644
index 0000000..a2b9325
--- /dev/null
+++ b/conf.d/cmake/00-default-osconfig.cmake
@@ -0,0 +1 @@
+list(APPEND PKG_REQUIRED_LIST lua>=5.3)
diff --git a/conf.d/cmake/config.cmake b/conf.d/cmake/config.cmake
index fbce7a2..41fa6f0 100644
--- a/conf.d/cmake/config.cmake
+++ b/conf.d/cmake/config.cmake
@@ -77,7 +77,7 @@ set (PKG_REQUIRED_LIST
# Prefix path where will be installed the files
# Default: /usr/local (need root permission to write in)
# ------------------------------------------------------
-#set(CMAKE_INSTALL_PREFIX $ENV{HOME}/opt)
+set(CMAKE_INSTALL_PREFIX $ENV{HOME}/opt)
# Customize link option
# -----------------------------
@@ -125,6 +125,12 @@ set (PKG_REQUIRED_LIST
# -O2
# CACHE STRING "Compilation flags for RELEASE build type.")
+set(CONTROL_SUPPORT_LUA 1)
+add_definitions(-DCONTROL_PLUGIN_PATH="${CMAKE_BINARY_DIR}/package/lib/plugins:${CMAKE_BINARY_DIR}/package/var:${CMAKE_INSTALL_PREFIX}/${PROJECT_NAME}/lib/plugins")
+add_definitions(-DCONTROL_CONFIG_PATH="${CMAKE_BINARY_DIR}/package/etc:${CMAKE_INSTALL_PREFIX}/${PROJECT_NAME}/etc")
+add_definitions(-DCTL_PLUGIN_MAGIC=1286576532)
+add_definitions(-DUSE_API_DYN=1)
+
# (BUG!!!) as PKG_CONFIG_PATH does not work [should be an env variable]
# ---------------------------------------------------------------------
set(CMAKE_PREFIX_PATH ${CMAKE_INSTALL_PREFIX}/lib64/pkgconfig ${CMAKE_INSTALL_PREFIX}/lib/pkgconfig)
diff --git a/conf.d/project/CMakeLists.txt b/conf.d/project/CMakeLists.txt
new file mode 100644
index 0000000..3beb009
--- /dev/null
+++ b/conf.d/project/CMakeLists.txt
@@ -0,0 +1,20 @@
+###########################################################################
+# Copyright 2015, 2016, 2017 IoT.bzh
+#
+# author: Fulup Ar Foll <rfulup@iot.bzh>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###########################################################################
+
+# This component should be included as a submodule and wont compile as standalone
+project_subdirs_add()
diff --git a/conf.d/project/etc/CMakeLists.txt b/conf.d/project/etc/CMakeLists.txt
new file mode 100644
index 0000000..d378d71
--- /dev/null
+++ b/conf.d/project/etc/CMakeLists.txt
@@ -0,0 +1,31 @@
+###########################################################################
+# Copyright 2017 IoT.bzh
+#
+# author: Fulup Ar Foll <fulup@iot.bzh>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###########################################################################
+
+##################################################
+# Control Policy Config file
+##################################################
+PROJECT_TARGET_ADD(harvester-config)
+
+ file(GLOB CONF_FILES "*.json")
+
+ add_input_files("${CONF_FILES}")
+
+ SET_TARGET_PROPERTIES(${TARGET_NAME} PROPERTIES
+ LABELS "BINDING-CONFIG"
+ OUTPUT_NAME ${TARGET_NAME}
+ )
diff --git a/conf.d/project/etc/harvester-config.json b/conf.d/project/etc/harvester-config.json
new file mode 100644
index 0000000..d2f7858
--- /dev/null
+++ b/conf.d/project/etc/harvester-config.json
@@ -0,0 +1,36 @@
+{
+ "$schema": "http://iot.bzh/download/public/schema/json/ctl-schema.json",
+ "metadata": {
+ "uid": "Harvester",
+ "version": "1.0",
+ "api": "harvester",
+ "info": "Data collection binding"
+ },
+ "plugins": [
+ {
+ "uid": "influxdb",
+ "info": "Plugins that handle influxdb read and write",
+ "spath": "lib/plugins",
+ "libs": "influxdb.ctlso"
+ }
+ ],
+
+ "onload": [
+ {
+ "uid": "init_db",
+ "info": "Ensure that InfluxDB is up",
+ "action": "plugin://influxdb#influxdb_ping"
+ }
+ ],
+
+ "controls": [
+ {
+ "uid": "write",
+ "action": "plugin://influxdb#write_to_influxdb"
+ },
+ {
+ "uid": "read",
+ "action": "plugin://influxdb#read_from_influxdb"
+ }
+ ]
+}
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 7c8c2b4..a016c14 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -22,12 +22,7 @@ PROJECT_TARGET_ADD(harvester)
# Define project Targets
add_library(${TARGET_NAME} MODULE
- ${TARGET_NAME}.c
- plugins/tsdb.c
- plugins/influxdb.c
- plugins/influxdb-writer.c
- plugins/influxdb-reader.c
- utils/list.c
+ ${TARGET_NAME}-binding.c
)
set(OPENAPI_DEF "harvester-apidef" CACHE STRING "name and path to the JSON API definition without extension")
@@ -43,5 +38,7 @@ PROJECT_TARGET_ADD(harvester)
# Library dependencies (include updates automatically)
TARGET_LINK_LIBRARIES(${TARGET_NAME}
afb-helpers
+ ctl-utilities
${link_libraries})
+add_subdirectory("plugins")
diff --git a/src/harvester-binding.c b/src/harvester-binding.c
new file mode 100644
index 0000000..c5d0602
--- /dev/null
+++ b/src/harvester-binding.c
@@ -0,0 +1,156 @@
+/*
+* Copyright (C) 2016 "IoT.bzh"
+* Author Fulup Ar Foll <fulup@iot.bzh>
+* Author Romain Forlot <romain@iot.bzh>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+
+#include "harvester-binding.h"
+
+// default api to print log when apihandle not avaliable
+afb_dynapi *AFB_default;
+
+// Config Section definition (note: controls section index should match handle
+// retrieval in HalConfigExec)
+static CtlSectionT ctrlSections[] = {
+ {.key = "plugins" , .loadCB = PluginConfig},
+ {.key = "onload" , .loadCB = OnloadConfig},
+ {.key = "controls", .loadCB = ControlConfig},
+ {.key = "events" , .loadCB = EventConfig},
+ {.key = NULL}
+};
+
+static void ctrlapi_ping(AFB_ReqT request) {
+ static int count = 0;
+
+ count++;
+ AFB_ReqNotice(request, "Controller:ping count=%d", count);
+ AFB_ReqSucess(request, json_object_new_int(count), NULL);
+
+ return;
+}
+
+void ctrlapi_auth(AFB_ReqT request)
+{
+ AFB_ReqSetLOA(request, 1);
+ AFB_ReqSucess(request, NULL, NULL);
+}
+
+static AFB_ApiVerbs CtrlApiVerbs[] = {
+ /* VERB'S NAME FUNCTION TO CALL SHORT DESCRIPTION */
+ {.verb = "ping", .callback = ctrlapi_ping, .info = "ping test for API"},
+ {.verb = "auth", .callback = ctrlapi_auth, .info = "Authenticate session to raise Level Of Assurance of the session"},
+ {.verb = NULL} /* marker for end of the array */
+};
+
+static int CtrlLoadStaticVerbs(afb_dynapi *apiHandle, AFB_ApiVerbs *verbs) {
+ int errcount = 0;
+
+ for (int idx = 0; verbs[idx].verb; idx++) {
+ errcount += afb_dynapi_add_verb(
+ apiHandle, CtrlApiVerbs[idx].verb, NULL, CtrlApiVerbs[idx].callback,
+ (void *)&CtrlApiVerbs[idx], CtrlApiVerbs[idx].auth, 0);
+ }
+
+ return errcount;
+};
+
+static int CtrlInitOneApi(AFB_ApiT apiHandle) {
+ int err = 0;
+ AFB_default = apiHandle; // hugely hack to make all V2 AFB_DEBUG to work in fileutils
+
+ // retrieve section config from api handle
+ CtlConfigT *ctrlConfig = (CtlConfigT *)afb_dynapi_get_userdata(apiHandle);
+ err = CtlConfigExec(apiHandle, ctrlConfig);
+ if(err) {
+ AFB_ApiError(apiHandle, "Error at CtlConfigExec step");
+ return err;
+ }
+
+ return err;
+}
+
+// next generation dynamic API-V3 mode
+#include <signal.h>
+
+static int CtrlLoadOneApi(void *cbdata, AFB_ApiT apiHandle) {
+ CtlConfigT *ctrlConfig = (CtlConfigT *)cbdata;
+
+ // save closure as api's data context
+ afb_dynapi_set_userdata(apiHandle, ctrlConfig);
+
+ // add static controls verbs
+ int err = CtrlLoadStaticVerbs(apiHandle, CtrlApiVerbs);
+ if (err) {
+ AFB_ApiError(apiHandle, "CtrlLoadSection fail to register static V2 verbs");
+ return ERROR;
+ }
+
+ // load section for corresponding API
+ err = CtlLoadSections(apiHandle, ctrlConfig, ctrlSections);
+
+ // declare an event event manager for this API;
+ afb_dynapi_on_event(apiHandle, CtrlDispatchApiEvent);
+
+ // init API function (does not receive user closure ???
+ afb_dynapi_on_init(apiHandle, CtrlInitOneApi);
+
+ afb_dynapi_seal(apiHandle);
+ return err;
+}
+
+int afbBindingVdyn(afb_dynapi *apiHandle) {
+
+ AFB_default = apiHandle;
+ AFB_ApiNotice(apiHandle, "Controller in afbBindingVdyn");
+
+ const char *dirList = getenv("CONTROL_CONFIG_PATH");
+ if (!dirList)
+ dirList = CONTROL_CONFIG_PATH;
+
+ const char *configPath = CtlConfigSearch(apiHandle, dirList, "");
+ if (!configPath) {
+ AFB_ApiError(apiHandle, "CtlPreInit: No %s* config found in %s ", GetBinderName(), dirList);
+ return ERROR;
+ }
+
+ // load config file and create API
+ CtlConfigT *ctrlConfig = CtlLoadMetaData(apiHandle, configPath);
+ if (!ctrlConfig) {
+ AFB_ApiError(apiHandle,
+ "CtrlBindingDyn No valid control config file in:\n-- %s",
+ configPath);
+ return ERROR;
+ }
+
+ if (!ctrlConfig->api) {
+ AFB_ApiError(apiHandle,
+ "CtrlBindingDyn API Missing from metadata in:\n-- %s",
+ configPath);
+ return ERROR;
+ }
+
+ AFB_ApiNotice(apiHandle, "Controller API='%s' info='%s'", ctrlConfig->api,
+ ctrlConfig->info);
+
+ // create one API per config file (Pre-V3 return code ToBeChanged)
+ int status = afb_dynapi_new_api(apiHandle, ctrlConfig->api, ctrlConfig->info, 1, CtrlLoadOneApi, ctrlConfig);
+
+ return status;
+}
diff --git a/src/harvester.h b/src/harvester-binding.h
index b11fffe..7ac00a0 100644
--- a/src/harvester.h
+++ b/src/harvester-binding.h
@@ -1,12 +1,13 @@
/*
- * Copyright (C) 2018 "IoT.bzh"
- * Author "Romain Forlot" <romain.forlot@iot.bzh>
+ * Copyright (C) 2016 "IoT.bzh"
+ * Author Fulup Ar Foll <fulup@iot.bzh>
+ * Author Romain Forlot <romain@iot.bzh>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,20 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef _HARVESTER_H_
-#define _HARVESTER_H_
-#define AFB_BINDING_VERSION 2
-#include <afb/afb-binding.h>
-enum metric_type {b = 0, i, d, str} type;
-union metric_value {
- int b_value;
- int i_value;
- double d_value;
- char *str_value;
-};
+#ifndef _CTL_BINDING_INCLUDE_
+#define _CTL_BINDING_INCLUDE_
-int init();
+#include <stdio.h>
+#include <ctl-config.h>
+#include <filescan-utils.h>
+#include <wrap-json.h>
+#ifndef ERROR
+ #define ERROR -1
#endif
+
+#endif /* _CTL_BINDING_INCLUDE_ */
diff --git a/src/harvester.c b/src/harvester.c
deleted file mode 100644
index 1d4d9a6..0000000
--- a/src/harvester.c
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (C) 2018 "IoT.bzh"
- * Author "Romain Forlot" <romain.forlot@iot.bzh>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define _GNU_SOURCE
-#include "harvester.h"
-#include "harvester-apidef.h"
-
-#include "plugins/tsdb.h"
-#include "curl-wrap.h"
-#include "wrap-json.h"
-
-CURL* (*tsdb_write)(const char* host, const char *port, json_object *metric);
-void (*write_curl_cb)(void *closure, int status, CURL *curl, const char *result, size_t size);
-
-struct reader_args r_args = {NULL, NULL, 1000000};
-
-int do_write(struct afb_req req, const char* host, const char *port, json_object *metric)
-{
- CURL *curl_request;
-
- curl_request = tsdb_write(host, port, metric);
- curl_wrap_do(curl_request, write_curl_cb, &req);
-
- return 0;
-}
-
-void afv_write(struct afb_req req)
-{
- const char *port = NULL;
- const char *host = NULL;
-
- json_object *req_args = afb_req_json(req),
- *portJ = NULL,
- *metric = NULL;
-
- if(wrap_json_unpack(req_args, "{s?s,s?o,so!}",
- "host", &host,
- "port", &portJ,
- "metric", &metric) || ! metric)
- afb_req_fail(req, "Failed", "Error processing arguments. Miss metric\
-JSON object or malformed");
- else {
- port = json_object_is_type(portJ, json_type_null) ?
- NULL : json_object_to_json_string(portJ);
- if(do_write(req, host, port, metric))
- afb_req_fail(req, "Failed", "Error processing metric JSON object.\
-Malformed !");
- }
-}
-
-void afv_auth(struct afb_req request)
-{
- afb_req_session_set_LOA(request, 1);
- afb_req_success(request, NULL, NULL);
-}
-
-int init()
-{
- int tsdb_available = 0;
-
- if(curl_global_init(CURL_GLOBAL_DEFAULT) != 0) {
- AFB_ERROR("Something went wrong initiliazing libcurl. Abort");
- return ERROR;
- }
-
- tsdb_available = db_ping();
- switch (tsdb_available) {
- case INFLUX:
- tsdb_write = influxdb_write;
- write_curl_cb = influxdb_write_curl_cb;
- /*if(influxdb_reader(&r_args) != 0) {
- AFB_ERROR("Problem initiating reader timer. Abort");
- return ERROR;
- }*/
- break;
- default:
- AFB_ERROR("No Time Series Database found. Abort");
- return ERROR;
- break;
- }
- /* TODO:*/
- return 0;
-}
diff --git a/src/plugins/CMakeLists.txt b/src/plugins/CMakeLists.txt
new file mode 100644
index 0000000..e366685
--- /dev/null
+++ b/src/plugins/CMakeLists.txt
@@ -0,0 +1,44 @@
+###########################################################################
+# Copyright 2015, 2016, 2017 IoT.bzh
+#
+# author: Romain Forlot <romain.forlot@iot.bzh>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###########################################################################
+
+PROJECT_TARGET_ADD(influxdb)
+
+ # Define targets
+ ADD_LIBRARY(${TARGET_NAME} MODULE
+ ${TARGET_NAME}.c
+ ${TARGET_NAME}-writer.c
+ ${TARGET_NAME}-reader.c
+ tsdb.c
+ ../utils/list.c)
+
+ # Alsa Plugin properties
+ SET_TARGET_PROPERTIES(${TARGET_NAME} PROPERTIES
+ LABELS "PLUGIN"
+ PREFIX ""
+ SUFFIX ".ctlso"
+ OUTPUT_NAME ${TARGET_NAME}
+ )
+
+ # Library dependencies (include updates automatically)
+ TARGET_LINK_LIBRARIES(${TARGET_NAME}
+ afb-helpers
+ ${link_libraries}
+ )
+
+ target_include_directories(${TARGET_NAME}
+ PRIVATE "${CMAKE_SOURCE_DIR}/app-controller-submodule/ctl-lib")
diff --git a/src/plugins/influxdb-reader.c b/src/plugins/influxdb-reader.c
index a1fa3a0..052a3da 100644
--- a/src/plugins/influxdb-reader.c
+++ b/src/plugins/influxdb-reader.c
@@ -29,6 +29,7 @@
struct metrics_list {
struct series_t serie;
json_object *metricsJ;
+ AFB_ApiT api;
};
static void fill_n_send_values(void *c, json_object *valuesJ)
@@ -44,7 +45,7 @@ static void fill_n_send_values(void *c, json_object *valuesJ)
else {
if(set_value(m_list->serie.serie_columns.tags, valuesJ, i)) {
if(set_value(m_list->serie.serie_columns.fields, valuesJ, j)) {
- AFB_ERROR("No tags nor fields fits.");
+ AFB_ApiError(m_list->api, "No tags nor fields fits.");
}
j++;
}
@@ -85,7 +86,7 @@ static void unpack_metric_from_db(void *ml, json_object *metricJ)
"name", &m_list->serie.name,
"columns", &columnsJ,
"values", &valuesJ)) {
- AFB_ERROR("Unpacking metric goes wrong");
+ AFB_ApiError(m_list->api, "Unpacking metric goes wrong");
return;
}
@@ -93,7 +94,7 @@ static void unpack_metric_from_db(void *ml, json_object *metricJ)
wrap_json_array_for_all(valuesJ, fill_n_send_values, m_list);
}
-static json_object *unpack_series(json_object *seriesJ)
+static json_object *unpack_series(AFB_ApiT apiHandle, json_object *seriesJ)
{
struct metrics_list m_list = {
.serie = {
@@ -104,7 +105,8 @@ static json_object *unpack_series(json_object *seriesJ)
},
.timestamp = 0
},
- .metricsJ = json_object_new_array()
+ .metricsJ = json_object_new_array(),
+ .api = apiHandle
};
wrap_json_array_for_all(seriesJ, unpack_metric_from_db, (void*)&m_list);
@@ -112,7 +114,7 @@ static json_object *unpack_series(json_object *seriesJ)
return m_list.metricsJ;
}
-static void forward_to_garner(const char *result, size_t size)
+static void forward_to_garner(AFB_ApiT apiHandle, const char *result, size_t size)
{
int id = 0;
json_object *resultsJ = NULL,
@@ -131,39 +133,42 @@ static void forward_to_garner(const char *result, size_t size)
}
if(seriesJ) {
- metrics2send = unpack_series(seriesJ);
+ metrics2send = unpack_series(apiHandle, seriesJ);
if(json_object_array_length(metrics2send)) {
- if(afb_service_call_sync("garner", "write", metrics2send, &call_resultJ)) {
- AFB_ERROR("Metrics were sent but not done, an error happens. Details: %s", json_object_to_json_string(call_resultJ));
+ if(AFB_ServiceSync(apiHandle, "garner", "write", metrics2send, &call_resultJ)) {
+ AFB_ApiError(apiHandle, "Metrics were sent but not done, an error happens. Details: %s", json_object_to_json_string(call_resultJ));
}
}
}
else {
- AFB_ERROR("Empty response. Request results was:\n%s", result);
+ AFB_ApiError(apiHandle, "Empty response. Request results was:\n%s", result);
}
}
static void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
{
+ if(!closure)
+ return;
+ AFB_ApiT apiHandle = (AFB_ApiT)closure;
long rep_code = curl_wrap_response_code_get(curl);
switch(rep_code) {
case 200:
- AFB_DEBUG("Read correctly done");
- forward_to_garner(result, size);
+ AFB_ApiDebug(apiHandle, "Read correctly done");
+ forward_to_garner(apiHandle, result, size);
break;
case 400:
- AFB_ERROR("Unacceptable request. %s", result);
+ AFB_ApiError(apiHandle, "Unacceptable request. %s", result);
break;
case 401:
- AFB_ERROR("Invalid authentication. %s", result);
+ AFB_ApiError(apiHandle, "Invalid authentication. %s", result);
break;
default:
- AFB_ERROR("Unexptected behavior. %s", result);
+ AFB_ApiError(apiHandle, "Unexptected behavior. %s", result);
break;
}
}
-static CURL *make_curl_query_get(const char *url)
+static CURL *make_curl_query_get(AFB_ApiT apiHandle, const char *url)
{
CURL *curl;
char *args[5];
@@ -179,7 +184,7 @@ static CURL *make_curl_query_get(const char *url)
args[4] = NULL;
length_now = asprintf(&now, "%lu", get_ts());
- int rootdir_fd = afb_daemon_rootdir_get_fd();
+ int rootdir_fd = AFB_RootDirGetFD(apiHandle);
int fd_last_read = openat(rootdir_fd, "last_db_read", O_CREAT | O_RDWR, S_IRWXU);
if (fd_last_read < 0)
return NULL;
@@ -188,7 +193,7 @@ static CURL *make_curl_query_get(const char *url)
else write the last timestamp */
if(read(fd_last_read, last_ts, sizeof(last_ts)) == 0) {
if(write(fd_last_read, now, length_now) != length_now)
- AFB_ERROR("Error writing last_db_read file: %s\n", strerror( errno ));
+ AFB_ApiError(apiHandle, "Error writing last_db_read file: %s\n", strerror( errno ));
}
else {
strcat(query, " WHERE time >= ");
@@ -196,7 +201,7 @@ static CURL *make_curl_query_get(const char *url)
close(fd_last_read);
fd_last_read = openat(rootdir_fd, "last_db_read", O_TRUNC | O_RDWR);
if (write(fd_last_read, now, length_now) != length_now)
- AFB_ERROR("Error writing last_db_read file: %s", strerror( errno ));
+ AFB_ApiError(apiHandle, "Error writing last_db_read file: %s", strerror( errno ));
}
args[3] = query;
@@ -208,33 +213,37 @@ static CURL *make_curl_query_get(const char *url)
static int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata)
{
CURL *curl;
- struct reader_args *a = NULL;
- if(userdata)
- a = (struct reader_args*)userdata;
- else
+ struct reader_args *r_args = NULL;
+ CtlSourceT *source = NULL;
+ if(userdata) {
+ source = (CtlSourceT*)userdata;
+ r_args = (struct reader_args*)source->context;
+ }
+ else {
return ERROR;
+ }
char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
- make_url(url, sizeof(url), a->host, a->port, "query");
- curl = make_curl_query_get(url);
- curl_wrap_do(curl, influxdb_read_curl_cb, NULL);
+ make_url(url, sizeof(url), r_args->host, r_args->port, "query");
+ curl = make_curl_query_get(source->api, url);
+ curl_wrap_do(curl, influxdb_read_curl_cb, (void*)source->api);
/* Reschedule next run */
- sd_event_source_set_time(s, usec + a->delay);
+ sd_event_source_set_time(s, usec + r_args->delay);
return 0;
}
-int influxdb_reader(void *args)
+CTLP_CAPI(read_from_influxdb, source, argsJ, eventJ)
{
int err = 0;
uint64_t usec;
struct sd_event_source *evtSource = NULL;
/* Set a cyclic cb call each 1s to call the read callback */
- sd_event_now(afb_daemon_get_event_loop(), CLOCK_MONOTONIC, &usec);
- err = sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args);
+ sd_event_now(AFB_GetEventLoop(source->api), CLOCK_MONOTONIC, &usec);
+ err = sd_event_add_time(AFB_GetEventLoop(source->api), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, (void*)source);
if(!err)
err = sd_event_source_set_enabled(evtSource, SD_EVENT_ON);
diff --git a/src/plugins/influxdb-writer.c b/src/plugins/influxdb-writer.c
index c21f4fd..677bfd0 100644
--- a/src/plugins/influxdb-writer.c
+++ b/src/plugins/influxdb-writer.c
@@ -21,6 +21,35 @@
#include "influxdb.h"
+void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
+{
+ AFB_ReqT request = (AFB_ReqT)closure;
+ long rep_code = curl_wrap_response_code_get(curl);
+ switch(rep_code) {
+ case 204:
+ AFB_ReqDebug(request, "Request correctly written");
+ AFB_ReqSucess(request, NULL, "Request has been successfully writen");
+ break;
+ case 400:
+ AFB_ReqFail(request, "Bad request", result);
+ break;
+ case 401:
+ AFB_ReqFail(request, "Unauthorized access", result);
+ break;
+ case 404:
+ AFB_ReqFail(request, "Not found", result);
+ AFB_ReqNotice(request, "Attempt to create the DB '"DEFAULT_DB"'");
+ create_database(request);
+ break;
+ case 500:
+ AFB_ReqFailF(request, "Timeout", "Overloaded server: %s", result);
+ break;
+ default:
+ AFB_ReqFail(request, "Failure", "Unexpected behavior.");
+ break;
+ }
+}
+
static size_t format_write_args(char *query, struct series_t *serie)
{
char *ts;
@@ -59,36 +88,7 @@ static size_t format_write_args(char *query, struct series_t *serie)
return strlen(query);
}
-void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size)
-{
- struct afb_req *req = (struct afb_req*)closure;
- long rep_code = curl_wrap_response_code_get(curl);
- switch(rep_code) {
- case 204:
- AFB_DEBUG("Request correctly written");
- afb_req_success(*req, NULL, "Request has been successfully writen");
- break;
- case 400:
- afb_req_fail(*req, "Bad request", result);
- break;
- case 401:
- afb_req_fail(*req, "Unauthorized access", result);
- break;
- case 404:
- afb_req_fail(*req, "Not found", result);
- AFB_NOTICE("Attempt to create the DB '"DEFAULT_DB"'");
- create_database();
- break;
- case 500:
- afb_req_fail_f(*req, "Timeout", "Overloaded server: %s", result);
- break;
- default:
- afb_req_fail(*req, "Failure", "Unexpected behavior.");
- break;
- }
-}
-
-CURL *make_curl_write_post(const char *url, json_object *metricsJ)
+CURL *make_curl_write_post(AFB_ApiT apiHandle, const char *url, json_object *metricsJ)
{
CURL *curl = NULL;
size_t lpd = 0, len_write = 0, i = 0;
@@ -114,7 +114,7 @@ CURL *make_curl_write_post(const char *url, json_object *metricsJ)
memset(serie, 0, sizeof(struct series_t));
if(unpack_metric_from_api(json_object_array_get_idx(metricsArrayJ, i), serie)) {
- AFB_ERROR("ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ));
+ AFB_ApiError(apiHandle, "ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ));
break;
}
else {
@@ -144,9 +144,36 @@ CURL *make_curl_write_post(const char *url, json_object *metricsJ)
return curl;
}
-CURL *influxdb_write(const char* host, const char *port, json_object *metricJ)
+CURL *influxdb_write(AFB_ApiT apiHandle, const char* host, const char *port, json_object *metricJ)
{
char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */
make_url(url, sizeof(url), host, port, "write");
- return make_curl_write_post(url, metricJ);
+ return make_curl_write_post(apiHandle, url, metricJ);
+}
+
+CTLP_CAPI(write_to_influxdb, source, argsJ, eventJ)
+{
+ AFB_ReqT request = source->request;
+ const char *port = NULL;
+ const char *host = NULL;
+ CURL *curl_request;
+
+ json_object *req_args = AFB_ReqJson(request),
+ *portJ = NULL,
+ *metric = NULL;
+
+ if(wrap_json_unpack(req_args, "{s?s,s?o,so!}",
+ "host", &host,
+ "port", &portJ,
+ "metric", &metric) || ! metric)
+ AFB_ReqFail(request, "Failed", "Error processing arguments. Miss metric\
+JSON object or malformed");
+ else
+ port = json_object_is_type(portJ, json_type_null) ?
+ NULL : json_object_to_json_string(portJ);
+
+ curl_request = influxdb_write(source->api, host, port, metric);
+ curl_wrap_do(curl_request, influxdb_write_curl_cb, request);
+
+ return 0;
}
diff --git a/src/plugins/influxdb.c b/src/plugins/influxdb.c
index a98bd2b..3c94a1b 100644
--- a/src/plugins/influxdb.c
+++ b/src/plugins/influxdb.c
@@ -26,6 +26,50 @@
#include "wrap-json.h"
#include "../utils/list.h"
+CTLP_CAPI_REGISTER("influxdb");
+
+CTLP_ONLOAD(plugin, ret)
+{
+ int err = 0;
+ char *result;
+ size_t result_size;
+ CURL *request = curl_wrap_prepare_get("localhost:"DEFAULT_DBPORT"/ping",NULL, NULL);
+
+ struct reader_args r_args = {NULL, NULL, 1000000};
+ plugin->context = (void*)&r_args;
+
+ curl_wrap_perform(request, &result, &result_size);
+
+ if(curl_wrap_response_code_get(request) != 204) {
+ AFB_ApiError(plugin->api, "InfluxDB not reachable, please start it");
+ err = ERROR;
+ }
+
+ curl_easy_cleanup(request);
+ return err;
+}
+
+CTLP_CAPI(influxdb_ping, source, argsJ, eventJ)
+{
+ int ret = 0;
+ char *result;
+ size_t result_size;
+
+ CURL *curl_req = curl_wrap_prepare_get("localhost:"DEFAULT_DBPORT"/ping",NULL, NULL);
+
+ curl_wrap_perform(curl_req, &result, &result_size);
+
+ if(curl_wrap_response_code_get(curl_req) != 204) {
+ AFB_ApiError(source->api, "InfluxDB is offline.");
+ ret = ERROR;
+ }
+
+ curl_easy_cleanup(curl_req);
+
+ AFB_ApiNotice(source->api, "InfluxDB is up and running.");
+ return ret;
+}
+
void concatenate(char* dest, const char* source, const char *sep)
{
strncat(dest, sep, strlen(sep));
@@ -51,7 +95,7 @@ size_t make_url(char *url, size_t l_url, const char *host, const char *port, con
}
-int create_database()
+int create_database(AFB_ReqT request)
{
int ret = 0;
char *result;
@@ -62,18 +106,18 @@ int create_database()
post_data[0] = "q=CREATE DATABASE \""DEFAULT_DB"\"";
post_data[1] = NULL;
- CURL *request = curl_wrap_prepare_post_unescaped("localhost:"DEFAULT_DBPORT"/query",NULL, " ", post_data);
- curl_wrap_perform(request, &result, &result_size);
+ CURL *curl_req = curl_wrap_prepare_post_unescaped("localhost:"DEFAULT_DBPORT"/query",NULL, " ", post_data);
+ curl_wrap_perform(curl_req, &result, &result_size);
if(curl_wrap_response_code_get(request) != 200) {
- AFB_ERROR("Can't create database.");
+ AFB_ReqError(request, "Can't create database.");
ret = ERROR;
}
- curl_easy_cleanup(request);
+ curl_easy_cleanup(curl_req);
if(ret == 0)
- AFB_NOTICE("Database '"DEFAULT_DB"' created");
+ AFB_ReqNotice(request, "Database '"DEFAULT_DB"' created");
return ret;
}
diff --git a/src/plugins/influxdb.h b/src/plugins/influxdb.h
index ae40e77..ed05f97 100644
--- a/src/plugins/influxdb.h
+++ b/src/plugins/influxdb.h
@@ -19,6 +19,7 @@
#define _INFLUXDB_H_
#define _GNU_SOURCE
+#include "ctl-plugin.h"
#include "wrap-json.h"
#include "tsdb.h"
#include "../utils/list.h"
@@ -34,7 +35,7 @@ struct series_t {
uint64_t timestamp;
};
-int create_database();
+int create_database(AFB_ReqT request);
int unpack_metric_from_api(json_object *m, struct series_t *serie);
diff --git a/src/plugins/tsdb.c b/src/plugins/tsdb.c
index 7e10a74..27ae067 100644
--- a/src/plugins/tsdb.c
+++ b/src/plugins/tsdb.c
@@ -18,31 +18,6 @@
#include <time.h>
#include "tsdb.h"
-int influxdb_ping()
-{
- int ret = 0;
- char *result;
- size_t result_size;
- CURL *request = curl_wrap_prepare_get("localhost:"DEFAULT_DBPORT"/ping",NULL, NULL);
- curl_wrap_perform(request, &result, &result_size);
-
- if(curl_wrap_response_code_get(request) != 204) {
- AFB_ERROR("TimeSeries DB not reachable");
- ret = ERROR;
- }
-
- curl_easy_cleanup(request);
- return ret;
-}
-
-int db_ping()
-{
- int ret = 0;
- if(influxdb_ping() == 0) ret = INFLUX;
-
- return ret;
-}
-
uint64_t get_ts()
{
struct timespec ts;
diff --git a/src/plugins/tsdb.h b/src/plugins/tsdb.h
index 437c5dc..8469ac0 100644
--- a/src/plugins/tsdb.h
+++ b/src/plugins/tsdb.h
@@ -18,9 +18,6 @@
#ifndef _TSDB_H
#define _TSDB_H
-#define AFB_BINDING_VERSION 2
-#include <afb/afb-binding.h>
-
#include <json-c/json.h>
#include "curl-wrap.h"
@@ -32,6 +29,7 @@
#define URL_MAXIMUM_LENGTH 2047
enum db_available {
+ NODB = 0,
INFLUX = 1,
GRAPHITE = 2,
OPENTSDB = 4
@@ -43,13 +41,6 @@ struct reader_args {
u_int32_t delay;
};
-CURL *influxdb_write(const char* host, const char *port, json_object *metric);
-void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size);
-
-int influxdb_reader(void *args);
-
-int db_ping();
-
u_int64_t get_ts();
#endif