From fef008b549c9c6d6ee1e564312787c3d3f14f0c0 Mon Sep 17 00:00:00 2001 From: Romain Forlot Date: Wed, 16 May 2018 00:58:28 +0200 Subject: Convert the binding to use the controller Ease Time series DB abstraction layer by using Dyn API that implemente the API defined by the JSON schema. Change-Id: I67de4fbca10048201fdd2da683732a5f4f5b5368 Signed-off-by: Romain Forlot --- afb-helpers | 2 +- app-controller-submodule | 2 +- conf.d/CMakeLists.txt | 20 ++++ conf.d/cmake/00-debian-osconfig.cmake | 1 + conf.d/cmake/00-default-osconfig.cmake | 1 + conf.d/cmake/config.cmake | 8 +- conf.d/project/CMakeLists.txt | 20 ++++ conf.d/project/etc/CMakeLists.txt | 31 ++++++ conf.d/project/etc/harvester-config.json | 36 +++++++ src/CMakeLists.txt | 9 +- src/harvester-binding.c | 156 +++++++++++++++++++++++++++++++ src/harvester-binding.h | 32 +++++++ src/harvester.c | 97 ------------------- src/harvester.h | 33 ------- src/plugins/CMakeLists.txt | 44 +++++++++ src/plugins/influxdb-reader.c | 67 +++++++------ src/plugins/influxdb-writer.c | 93 +++++++++++------- src/plugins/influxdb.c | 56 +++++++++-- src/plugins/influxdb.h | 3 +- src/plugins/tsdb.c | 25 ----- src/plugins/tsdb.h | 11 +-- 21 files changed, 504 insertions(+), 243 deletions(-) create mode 100644 conf.d/CMakeLists.txt create mode 100644 conf.d/cmake/00-debian-osconfig.cmake create mode 100644 conf.d/cmake/00-default-osconfig.cmake create mode 100644 conf.d/project/CMakeLists.txt create mode 100644 conf.d/project/etc/CMakeLists.txt create mode 100644 conf.d/project/etc/harvester-config.json create mode 100644 src/harvester-binding.c create mode 100644 src/harvester-binding.h delete mode 100644 src/harvester.c delete mode 100644 src/harvester.h create mode 100644 src/plugins/CMakeLists.txt diff --git a/afb-helpers b/afb-helpers index 560f9a9..bb22525 160000 --- a/afb-helpers +++ b/afb-helpers @@ -1 +1 @@ -Subproject commit 560f9a944f489c5b4c2e5baf74b7b51b3e352ade +Subproject commit bb225251d33fff921cc74df049864c0e4df4a7b1 diff --git a/app-controller-submodule b/app-controller-submodule index e8b0b3c..8684200 160000 --- a/app-controller-submodule +++ b/app-controller-submodule @@ -1 +1 @@ -Subproject commit e8b0b3c3cc9b0453b88d1315822fda5963754ae7 +Subproject commit 868420021d3a955ae6f01fed414edbd6d4c0ff9e diff --git a/conf.d/CMakeLists.txt b/conf.d/CMakeLists.txt new file mode 100644 index 0000000..3beb009 --- /dev/null +++ b/conf.d/CMakeLists.txt @@ -0,0 +1,20 @@ +########################################################################### +# Copyright 2015, 2016, 2017 IoT.bzh +# +# author: Fulup Ar Foll +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +########################################################################### + +# This component should be included as a submodule and wont compile as standalone +project_subdirs_add() diff --git a/conf.d/cmake/00-debian-osconfig.cmake b/conf.d/cmake/00-debian-osconfig.cmake new file mode 100644 index 0000000..2ce0ad3 --- /dev/null +++ b/conf.d/cmake/00-debian-osconfig.cmake @@ -0,0 +1 @@ +list(APPEND PKG_REQUIRED_LIST lua-5.3>=5.3) diff --git a/conf.d/cmake/00-default-osconfig.cmake b/conf.d/cmake/00-default-osconfig.cmake new file mode 100644 index 0000000..a2b9325 --- /dev/null +++ b/conf.d/cmake/00-default-osconfig.cmake @@ -0,0 +1 @@ +list(APPEND PKG_REQUIRED_LIST lua>=5.3) diff --git a/conf.d/cmake/config.cmake b/conf.d/cmake/config.cmake index fbce7a2..41fa6f0 100644 --- a/conf.d/cmake/config.cmake +++ b/conf.d/cmake/config.cmake @@ -77,7 +77,7 @@ set (PKG_REQUIRED_LIST # Prefix path where will be installed the files # Default: /usr/local (need root permission to write in) # ------------------------------------------------------ -#set(CMAKE_INSTALL_PREFIX $ENV{HOME}/opt) +set(CMAKE_INSTALL_PREFIX $ENV{HOME}/opt) # Customize link option # ----------------------------- @@ -125,6 +125,12 @@ set (PKG_REQUIRED_LIST # -O2 # CACHE STRING "Compilation flags for RELEASE build type.") +set(CONTROL_SUPPORT_LUA 1) +add_definitions(-DCONTROL_PLUGIN_PATH="${CMAKE_BINARY_DIR}/package/lib/plugins:${CMAKE_BINARY_DIR}/package/var:${CMAKE_INSTALL_PREFIX}/${PROJECT_NAME}/lib/plugins") +add_definitions(-DCONTROL_CONFIG_PATH="${CMAKE_BINARY_DIR}/package/etc:${CMAKE_INSTALL_PREFIX}/${PROJECT_NAME}/etc") +add_definitions(-DCTL_PLUGIN_MAGIC=1286576532) +add_definitions(-DUSE_API_DYN=1) + # (BUG!!!) as PKG_CONFIG_PATH does not work [should be an env variable] # --------------------------------------------------------------------- set(CMAKE_PREFIX_PATH ${CMAKE_INSTALL_PREFIX}/lib64/pkgconfig ${CMAKE_INSTALL_PREFIX}/lib/pkgconfig) diff --git a/conf.d/project/CMakeLists.txt b/conf.d/project/CMakeLists.txt new file mode 100644 index 0000000..3beb009 --- /dev/null +++ b/conf.d/project/CMakeLists.txt @@ -0,0 +1,20 @@ +########################################################################### +# Copyright 2015, 2016, 2017 IoT.bzh +# +# author: Fulup Ar Foll +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +########################################################################### + +# This component should be included as a submodule and wont compile as standalone +project_subdirs_add() diff --git a/conf.d/project/etc/CMakeLists.txt b/conf.d/project/etc/CMakeLists.txt new file mode 100644 index 0000000..d378d71 --- /dev/null +++ b/conf.d/project/etc/CMakeLists.txt @@ -0,0 +1,31 @@ +########################################################################### +# Copyright 2017 IoT.bzh +# +# author: Fulup Ar Foll +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +########################################################################### + +################################################## +# Control Policy Config file +################################################## +PROJECT_TARGET_ADD(harvester-config) + + file(GLOB CONF_FILES "*.json") + + add_input_files("${CONF_FILES}") + + SET_TARGET_PROPERTIES(${TARGET_NAME} PROPERTIES + LABELS "BINDING-CONFIG" + OUTPUT_NAME ${TARGET_NAME} + ) diff --git a/conf.d/project/etc/harvester-config.json b/conf.d/project/etc/harvester-config.json new file mode 100644 index 0000000..d2f7858 --- /dev/null +++ b/conf.d/project/etc/harvester-config.json @@ -0,0 +1,36 @@ +{ + "$schema": "http://iot.bzh/download/public/schema/json/ctl-schema.json", + "metadata": { + "uid": "Harvester", + "version": "1.0", + "api": "harvester", + "info": "Data collection binding" + }, + "plugins": [ + { + "uid": "influxdb", + "info": "Plugins that handle influxdb read and write", + "spath": "lib/plugins", + "libs": "influxdb.ctlso" + } + ], + + "onload": [ + { + "uid": "init_db", + "info": "Ensure that InfluxDB is up", + "action": "plugin://influxdb#influxdb_ping" + } + ], + + "controls": [ + { + "uid": "write", + "action": "plugin://influxdb#write_to_influxdb" + }, + { + "uid": "read", + "action": "plugin://influxdb#read_from_influxdb" + } + ] +} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7c8c2b4..a016c14 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -22,12 +22,7 @@ PROJECT_TARGET_ADD(harvester) # Define project Targets add_library(${TARGET_NAME} MODULE - ${TARGET_NAME}.c - plugins/tsdb.c - plugins/influxdb.c - plugins/influxdb-writer.c - plugins/influxdb-reader.c - utils/list.c + ${TARGET_NAME}-binding.c ) set(OPENAPI_DEF "harvester-apidef" CACHE STRING "name and path to the JSON API definition without extension") @@ -43,5 +38,7 @@ PROJECT_TARGET_ADD(harvester) # Library dependencies (include updates automatically) TARGET_LINK_LIBRARIES(${TARGET_NAME} afb-helpers + ctl-utilities ${link_libraries}) +add_subdirectory("plugins") diff --git a/src/harvester-binding.c b/src/harvester-binding.c new file mode 100644 index 0000000..c5d0602 --- /dev/null +++ b/src/harvester-binding.c @@ -0,0 +1,156 @@ +/* +* Copyright (C) 2016 "IoT.bzh" +* Author Fulup Ar Foll +* Author Romain Forlot +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#define _GNU_SOURCE +#include +#include +#include + +#include "harvester-binding.h" + +// default api to print log when apihandle not avaliable +afb_dynapi *AFB_default; + +// Config Section definition (note: controls section index should match handle +// retrieval in HalConfigExec) +static CtlSectionT ctrlSections[] = { + {.key = "plugins" , .loadCB = PluginConfig}, + {.key = "onload" , .loadCB = OnloadConfig}, + {.key = "controls", .loadCB = ControlConfig}, + {.key = "events" , .loadCB = EventConfig}, + {.key = NULL} +}; + +static void ctrlapi_ping(AFB_ReqT request) { + static int count = 0; + + count++; + AFB_ReqNotice(request, "Controller:ping count=%d", count); + AFB_ReqSucess(request, json_object_new_int(count), NULL); + + return; +} + +void ctrlapi_auth(AFB_ReqT request) +{ + AFB_ReqSetLOA(request, 1); + AFB_ReqSucess(request, NULL, NULL); +} + +static AFB_ApiVerbs CtrlApiVerbs[] = { + /* VERB'S NAME FUNCTION TO CALL SHORT DESCRIPTION */ + {.verb = "ping", .callback = ctrlapi_ping, .info = "ping test for API"}, + {.verb = "auth", .callback = ctrlapi_auth, .info = "Authenticate session to raise Level Of Assurance of the session"}, + {.verb = NULL} /* marker for end of the array */ +}; + +static int CtrlLoadStaticVerbs(afb_dynapi *apiHandle, AFB_ApiVerbs *verbs) { + int errcount = 0; + + for (int idx = 0; verbs[idx].verb; idx++) { + errcount += afb_dynapi_add_verb( + apiHandle, CtrlApiVerbs[idx].verb, NULL, CtrlApiVerbs[idx].callback, + (void *)&CtrlApiVerbs[idx], CtrlApiVerbs[idx].auth, 0); + } + + return errcount; +}; + +static int CtrlInitOneApi(AFB_ApiT apiHandle) { + int err = 0; + AFB_default = apiHandle; // hugely hack to make all V2 AFB_DEBUG to work in fileutils + + // retrieve section config from api handle + CtlConfigT *ctrlConfig = (CtlConfigT *)afb_dynapi_get_userdata(apiHandle); + err = CtlConfigExec(apiHandle, ctrlConfig); + if(err) { + AFB_ApiError(apiHandle, "Error at CtlConfigExec step"); + return err; + } + + return err; +} + +// next generation dynamic API-V3 mode +#include + +static int CtrlLoadOneApi(void *cbdata, AFB_ApiT apiHandle) { + CtlConfigT *ctrlConfig = (CtlConfigT *)cbdata; + + // save closure as api's data context + afb_dynapi_set_userdata(apiHandle, ctrlConfig); + + // add static controls verbs + int err = CtrlLoadStaticVerbs(apiHandle, CtrlApiVerbs); + if (err) { + AFB_ApiError(apiHandle, "CtrlLoadSection fail to register static V2 verbs"); + return ERROR; + } + + // load section for corresponding API + err = CtlLoadSections(apiHandle, ctrlConfig, ctrlSections); + + // declare an event event manager for this API; + afb_dynapi_on_event(apiHandle, CtrlDispatchApiEvent); + + // init API function (does not receive user closure ??? + afb_dynapi_on_init(apiHandle, CtrlInitOneApi); + + afb_dynapi_seal(apiHandle); + return err; +} + +int afbBindingVdyn(afb_dynapi *apiHandle) { + + AFB_default = apiHandle; + AFB_ApiNotice(apiHandle, "Controller in afbBindingVdyn"); + + const char *dirList = getenv("CONTROL_CONFIG_PATH"); + if (!dirList) + dirList = CONTROL_CONFIG_PATH; + + const char *configPath = CtlConfigSearch(apiHandle, dirList, ""); + if (!configPath) { + AFB_ApiError(apiHandle, "CtlPreInit: No %s* config found in %s ", GetBinderName(), dirList); + return ERROR; + } + + // load config file and create API + CtlConfigT *ctrlConfig = CtlLoadMetaData(apiHandle, configPath); + if (!ctrlConfig) { + AFB_ApiError(apiHandle, + "CtrlBindingDyn No valid control config file in:\n-- %s", + configPath); + return ERROR; + } + + if (!ctrlConfig->api) { + AFB_ApiError(apiHandle, + "CtrlBindingDyn API Missing from metadata in:\n-- %s", + configPath); + return ERROR; + } + + AFB_ApiNotice(apiHandle, "Controller API='%s' info='%s'", ctrlConfig->api, + ctrlConfig->info); + + // create one API per config file (Pre-V3 return code ToBeChanged) + int status = afb_dynapi_new_api(apiHandle, ctrlConfig->api, ctrlConfig->info, 1, CtrlLoadOneApi, ctrlConfig); + + return status; +} diff --git a/src/harvester-binding.h b/src/harvester-binding.h new file mode 100644 index 0000000..7ac00a0 --- /dev/null +++ b/src/harvester-binding.h @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2016 "IoT.bzh" + * Author Fulup Ar Foll + * Author Romain Forlot + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef _CTL_BINDING_INCLUDE_ +#define _CTL_BINDING_INCLUDE_ + +#include +#include +#include +#include + +#ifndef ERROR + #define ERROR -1 +#endif + +#endif /* _CTL_BINDING_INCLUDE_ */ diff --git a/src/harvester.c b/src/harvester.c deleted file mode 100644 index 1d4d9a6..0000000 --- a/src/harvester.c +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (C) 2018 "IoT.bzh" - * Author "Romain Forlot" - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define _GNU_SOURCE -#include "harvester.h" -#include "harvester-apidef.h" - -#include "plugins/tsdb.h" -#include "curl-wrap.h" -#include "wrap-json.h" - -CURL* (*tsdb_write)(const char* host, const char *port, json_object *metric); -void (*write_curl_cb)(void *closure, int status, CURL *curl, const char *result, size_t size); - -struct reader_args r_args = {NULL, NULL, 1000000}; - -int do_write(struct afb_req req, const char* host, const char *port, json_object *metric) -{ - CURL *curl_request; - - curl_request = tsdb_write(host, port, metric); - curl_wrap_do(curl_request, write_curl_cb, &req); - - return 0; -} - -void afv_write(struct afb_req req) -{ - const char *port = NULL; - const char *host = NULL; - - json_object *req_args = afb_req_json(req), - *portJ = NULL, - *metric = NULL; - - if(wrap_json_unpack(req_args, "{s?s,s?o,so!}", - "host", &host, - "port", &portJ, - "metric", &metric) || ! metric) - afb_req_fail(req, "Failed", "Error processing arguments. Miss metric\ -JSON object or malformed"); - else { - port = json_object_is_type(portJ, json_type_null) ? - NULL : json_object_to_json_string(portJ); - if(do_write(req, host, port, metric)) - afb_req_fail(req, "Failed", "Error processing metric JSON object.\ -Malformed !"); - } -} - -void afv_auth(struct afb_req request) -{ - afb_req_session_set_LOA(request, 1); - afb_req_success(request, NULL, NULL); -} - -int init() -{ - int tsdb_available = 0; - - if(curl_global_init(CURL_GLOBAL_DEFAULT) != 0) { - AFB_ERROR("Something went wrong initiliazing libcurl. Abort"); - return ERROR; - } - - tsdb_available = db_ping(); - switch (tsdb_available) { - case INFLUX: - tsdb_write = influxdb_write; - write_curl_cb = influxdb_write_curl_cb; - /*if(influxdb_reader(&r_args) != 0) { - AFB_ERROR("Problem initiating reader timer. Abort"); - return ERROR; - }*/ - break; - default: - AFB_ERROR("No Time Series Database found. Abort"); - return ERROR; - break; - } - /* TODO:*/ - return 0; -} diff --git a/src/harvester.h b/src/harvester.h deleted file mode 100644 index b11fffe..0000000 --- a/src/harvester.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (C) 2018 "IoT.bzh" - * Author "Romain Forlot" - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef _HARVESTER_H_ -#define _HARVESTER_H_ - -#define AFB_BINDING_VERSION 2 -#include - -enum metric_type {b = 0, i, d, str} type; -union metric_value { - int b_value; - int i_value; - double d_value; - char *str_value; -}; - -int init(); - -#endif diff --git a/src/plugins/CMakeLists.txt b/src/plugins/CMakeLists.txt new file mode 100644 index 0000000..e366685 --- /dev/null +++ b/src/plugins/CMakeLists.txt @@ -0,0 +1,44 @@ +########################################################################### +# Copyright 2015, 2016, 2017 IoT.bzh +# +# author: Romain Forlot +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +########################################################################### + +PROJECT_TARGET_ADD(influxdb) + + # Define targets + ADD_LIBRARY(${TARGET_NAME} MODULE + ${TARGET_NAME}.c + ${TARGET_NAME}-writer.c + ${TARGET_NAME}-reader.c + tsdb.c + ../utils/list.c) + + # Alsa Plugin properties + SET_TARGET_PROPERTIES(${TARGET_NAME} PROPERTIES + LABELS "PLUGIN" + PREFIX "" + SUFFIX ".ctlso" + OUTPUT_NAME ${TARGET_NAME} + ) + + # Library dependencies (include updates automatically) + TARGET_LINK_LIBRARIES(${TARGET_NAME} + afb-helpers + ${link_libraries} + ) + + target_include_directories(${TARGET_NAME} + PRIVATE "${CMAKE_SOURCE_DIR}/app-controller-submodule/ctl-lib") diff --git a/src/plugins/influxdb-reader.c b/src/plugins/influxdb-reader.c index a1fa3a0..052a3da 100644 --- a/src/plugins/influxdb-reader.c +++ b/src/plugins/influxdb-reader.c @@ -29,6 +29,7 @@ struct metrics_list { struct series_t serie; json_object *metricsJ; + AFB_ApiT api; }; static void fill_n_send_values(void *c, json_object *valuesJ) @@ -44,7 +45,7 @@ static void fill_n_send_values(void *c, json_object *valuesJ) else { if(set_value(m_list->serie.serie_columns.tags, valuesJ, i)) { if(set_value(m_list->serie.serie_columns.fields, valuesJ, j)) { - AFB_ERROR("No tags nor fields fits."); + AFB_ApiError(m_list->api, "No tags nor fields fits."); } j++; } @@ -85,7 +86,7 @@ static void unpack_metric_from_db(void *ml, json_object *metricJ) "name", &m_list->serie.name, "columns", &columnsJ, "values", &valuesJ)) { - AFB_ERROR("Unpacking metric goes wrong"); + AFB_ApiError(m_list->api, "Unpacking metric goes wrong"); return; } @@ -93,7 +94,7 @@ static void unpack_metric_from_db(void *ml, json_object *metricJ) wrap_json_array_for_all(valuesJ, fill_n_send_values, m_list); } -static json_object *unpack_series(json_object *seriesJ) +static json_object *unpack_series(AFB_ApiT apiHandle, json_object *seriesJ) { struct metrics_list m_list = { .serie = { @@ -104,7 +105,8 @@ static json_object *unpack_series(json_object *seriesJ) }, .timestamp = 0 }, - .metricsJ = json_object_new_array() + .metricsJ = json_object_new_array(), + .api = apiHandle }; wrap_json_array_for_all(seriesJ, unpack_metric_from_db, (void*)&m_list); @@ -112,7 +114,7 @@ static json_object *unpack_series(json_object *seriesJ) return m_list.metricsJ; } -static void forward_to_garner(const char *result, size_t size) +static void forward_to_garner(AFB_ApiT apiHandle, const char *result, size_t size) { int id = 0; json_object *resultsJ = NULL, @@ -131,39 +133,42 @@ static void forward_to_garner(const char *result, size_t size) } if(seriesJ) { - metrics2send = unpack_series(seriesJ); + metrics2send = unpack_series(apiHandle, seriesJ); if(json_object_array_length(metrics2send)) { - if(afb_service_call_sync("garner", "write", metrics2send, &call_resultJ)) { - AFB_ERROR("Metrics were sent but not done, an error happens. Details: %s", json_object_to_json_string(call_resultJ)); + if(AFB_ServiceSync(apiHandle, "garner", "write", metrics2send, &call_resultJ)) { + AFB_ApiError(apiHandle, "Metrics were sent but not done, an error happens. Details: %s", json_object_to_json_string(call_resultJ)); } } } else { - AFB_ERROR("Empty response. Request results was:\n%s", result); + AFB_ApiError(apiHandle, "Empty response. Request results was:\n%s", result); } } static void influxdb_read_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size) { + if(!closure) + return; + AFB_ApiT apiHandle = (AFB_ApiT)closure; long rep_code = curl_wrap_response_code_get(curl); switch(rep_code) { case 200: - AFB_DEBUG("Read correctly done"); - forward_to_garner(result, size); + AFB_ApiDebug(apiHandle, "Read correctly done"); + forward_to_garner(apiHandle, result, size); break; case 400: - AFB_ERROR("Unacceptable request. %s", result); + AFB_ApiError(apiHandle, "Unacceptable request. %s", result); break; case 401: - AFB_ERROR("Invalid authentication. %s", result); + AFB_ApiError(apiHandle, "Invalid authentication. %s", result); break; default: - AFB_ERROR("Unexptected behavior. %s", result); + AFB_ApiError(apiHandle, "Unexptected behavior. %s", result); break; } } -static CURL *make_curl_query_get(const char *url) +static CURL *make_curl_query_get(AFB_ApiT apiHandle, const char *url) { CURL *curl; char *args[5]; @@ -179,7 +184,7 @@ static CURL *make_curl_query_get(const char *url) args[4] = NULL; length_now = asprintf(&now, "%lu", get_ts()); - int rootdir_fd = afb_daemon_rootdir_get_fd(); + int rootdir_fd = AFB_RootDirGetFD(apiHandle); int fd_last_read = openat(rootdir_fd, "last_db_read", O_CREAT | O_RDWR, S_IRWXU); if (fd_last_read < 0) return NULL; @@ -188,7 +193,7 @@ static CURL *make_curl_query_get(const char *url) else write the last timestamp */ if(read(fd_last_read, last_ts, sizeof(last_ts)) == 0) { if(write(fd_last_read, now, length_now) != length_now) - AFB_ERROR("Error writing last_db_read file: %s\n", strerror( errno )); + AFB_ApiError(apiHandle, "Error writing last_db_read file: %s\n", strerror( errno )); } else { strcat(query, " WHERE time >= "); @@ -196,7 +201,7 @@ static CURL *make_curl_query_get(const char *url) close(fd_last_read); fd_last_read = openat(rootdir_fd, "last_db_read", O_TRUNC | O_RDWR); if (write(fd_last_read, now, length_now) != length_now) - AFB_ERROR("Error writing last_db_read file: %s", strerror( errno )); + AFB_ApiError(apiHandle, "Error writing last_db_read file: %s", strerror( errno )); } args[3] = query; @@ -208,33 +213,37 @@ static CURL *make_curl_query_get(const char *url) static int influxdb_read(sd_event_source *s, uint64_t usec, void *userdata) { CURL *curl; - struct reader_args *a = NULL; - if(userdata) - a = (struct reader_args*)userdata; - else + struct reader_args *r_args = NULL; + CtlSourceT *source = NULL; + if(userdata) { + source = (CtlSourceT*)userdata; + r_args = (struct reader_args*)source->context; + } + else { return ERROR; + } char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */ - make_url(url, sizeof(url), a->host, a->port, "query"); - curl = make_curl_query_get(url); - curl_wrap_do(curl, influxdb_read_curl_cb, NULL); + make_url(url, sizeof(url), r_args->host, r_args->port, "query"); + curl = make_curl_query_get(source->api, url); + curl_wrap_do(curl, influxdb_read_curl_cb, (void*)source->api); /* Reschedule next run */ - sd_event_source_set_time(s, usec + a->delay); + sd_event_source_set_time(s, usec + r_args->delay); return 0; } -int influxdb_reader(void *args) +CTLP_CAPI(read_from_influxdb, source, argsJ, eventJ) { int err = 0; uint64_t usec; struct sd_event_source *evtSource = NULL; /* Set a cyclic cb call each 1s to call the read callback */ - sd_event_now(afb_daemon_get_event_loop(), CLOCK_MONOTONIC, &usec); - err = sd_event_add_time(afb_daemon_get_event_loop(), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, args); + sd_event_now(AFB_GetEventLoop(source->api), CLOCK_MONOTONIC, &usec); + err = sd_event_add_time(AFB_GetEventLoop(source->api), &evtSource, CLOCK_MONOTONIC, usec+1000000, 250, influxdb_read, (void*)source); if(!err) err = sd_event_source_set_enabled(evtSource, SD_EVENT_ON); diff --git a/src/plugins/influxdb-writer.c b/src/plugins/influxdb-writer.c index c21f4fd..677bfd0 100644 --- a/src/plugins/influxdb-writer.c +++ b/src/plugins/influxdb-writer.c @@ -21,6 +21,35 @@ #include "influxdb.h" +void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size) +{ + AFB_ReqT request = (AFB_ReqT)closure; + long rep_code = curl_wrap_response_code_get(curl); + switch(rep_code) { + case 204: + AFB_ReqDebug(request, "Request correctly written"); + AFB_ReqSucess(request, NULL, "Request has been successfully writen"); + break; + case 400: + AFB_ReqFail(request, "Bad request", result); + break; + case 401: + AFB_ReqFail(request, "Unauthorized access", result); + break; + case 404: + AFB_ReqFail(request, "Not found", result); + AFB_ReqNotice(request, "Attempt to create the DB '"DEFAULT_DB"'"); + create_database(request); + break; + case 500: + AFB_ReqFailF(request, "Timeout", "Overloaded server: %s", result); + break; + default: + AFB_ReqFail(request, "Failure", "Unexpected behavior."); + break; + } +} + static size_t format_write_args(char *query, struct series_t *serie) { char *ts; @@ -59,36 +88,7 @@ static size_t format_write_args(char *query, struct series_t *serie) return strlen(query); } -void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size) -{ - struct afb_req *req = (struct afb_req*)closure; - long rep_code = curl_wrap_response_code_get(curl); - switch(rep_code) { - case 204: - AFB_DEBUG("Request correctly written"); - afb_req_success(*req, NULL, "Request has been successfully writen"); - break; - case 400: - afb_req_fail(*req, "Bad request", result); - break; - case 401: - afb_req_fail(*req, "Unauthorized access", result); - break; - case 404: - afb_req_fail(*req, "Not found", result); - AFB_NOTICE("Attempt to create the DB '"DEFAULT_DB"'"); - create_database(); - break; - case 500: - afb_req_fail_f(*req, "Timeout", "Overloaded server: %s", result); - break; - default: - afb_req_fail(*req, "Failure", "Unexpected behavior."); - break; - } -} - -CURL *make_curl_write_post(const char *url, json_object *metricsJ) +CURL *make_curl_write_post(AFB_ApiT apiHandle, const char *url, json_object *metricsJ) { CURL *curl = NULL; size_t lpd = 0, len_write = 0, i = 0; @@ -114,7 +114,7 @@ CURL *make_curl_write_post(const char *url, json_object *metricsJ) memset(serie, 0, sizeof(struct series_t)); if(unpack_metric_from_api(json_object_array_get_idx(metricsArrayJ, i), serie)) { - AFB_ERROR("ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ)); + AFB_ApiError(apiHandle, "ERROR unpacking metric. %s", json_object_to_json_string(metricsArrayJ)); break; } else { @@ -144,9 +144,36 @@ CURL *make_curl_write_post(const char *url, json_object *metricsJ) return curl; } -CURL *influxdb_write(const char* host, const char *port, json_object *metricJ) +CURL *influxdb_write(AFB_ApiT apiHandle, const char* host, const char *port, json_object *metricJ) { char url[URL_MAXIMUM_LENGTH]; /* Safe limit for most popular web browser */ make_url(url, sizeof(url), host, port, "write"); - return make_curl_write_post(url, metricJ); + return make_curl_write_post(apiHandle, url, metricJ); +} + +CTLP_CAPI(write_to_influxdb, source, argsJ, eventJ) +{ + AFB_ReqT request = source->request; + const char *port = NULL; + const char *host = NULL; + CURL *curl_request; + + json_object *req_args = AFB_ReqJson(request), + *portJ = NULL, + *metric = NULL; + + if(wrap_json_unpack(req_args, "{s?s,s?o,so!}", + "host", &host, + "port", &portJ, + "metric", &metric) || ! metric) + AFB_ReqFail(request, "Failed", "Error processing arguments. Miss metric\ +JSON object or malformed"); + else + port = json_object_is_type(portJ, json_type_null) ? + NULL : json_object_to_json_string(portJ); + + curl_request = influxdb_write(source->api, host, port, metric); + curl_wrap_do(curl_request, influxdb_write_curl_cb, request); + + return 0; } diff --git a/src/plugins/influxdb.c b/src/plugins/influxdb.c index a98bd2b..3c94a1b 100644 --- a/src/plugins/influxdb.c +++ b/src/plugins/influxdb.c @@ -26,6 +26,50 @@ #include "wrap-json.h" #include "../utils/list.h" +CTLP_CAPI_REGISTER("influxdb"); + +CTLP_ONLOAD(plugin, ret) +{ + int err = 0; + char *result; + size_t result_size; + CURL *request = curl_wrap_prepare_get("localhost:"DEFAULT_DBPORT"/ping",NULL, NULL); + + struct reader_args r_args = {NULL, NULL, 1000000}; + plugin->context = (void*)&r_args; + + curl_wrap_perform(request, &result, &result_size); + + if(curl_wrap_response_code_get(request) != 204) { + AFB_ApiError(plugin->api, "InfluxDB not reachable, please start it"); + err = ERROR; + } + + curl_easy_cleanup(request); + return err; +} + +CTLP_CAPI(influxdb_ping, source, argsJ, eventJ) +{ + int ret = 0; + char *result; + size_t result_size; + + CURL *curl_req = curl_wrap_prepare_get("localhost:"DEFAULT_DBPORT"/ping",NULL, NULL); + + curl_wrap_perform(curl_req, &result, &result_size); + + if(curl_wrap_response_code_get(curl_req) != 204) { + AFB_ApiError(source->api, "InfluxDB is offline."); + ret = ERROR; + } + + curl_easy_cleanup(curl_req); + + AFB_ApiNotice(source->api, "InfluxDB is up and running."); + return ret; +} + void concatenate(char* dest, const char* source, const char *sep) { strncat(dest, sep, strlen(sep)); @@ -51,7 +95,7 @@ size_t make_url(char *url, size_t l_url, const char *host, const char *port, con } -int create_database() +int create_database(AFB_ReqT request) { int ret = 0; char *result; @@ -62,18 +106,18 @@ int create_database() post_data[0] = "q=CREATE DATABASE \""DEFAULT_DB"\""; post_data[1] = NULL; - CURL *request = curl_wrap_prepare_post_unescaped("localhost:"DEFAULT_DBPORT"/query",NULL, " ", post_data); - curl_wrap_perform(request, &result, &result_size); + CURL *curl_req = curl_wrap_prepare_post_unescaped("localhost:"DEFAULT_DBPORT"/query",NULL, " ", post_data); + curl_wrap_perform(curl_req, &result, &result_size); if(curl_wrap_response_code_get(request) != 200) { - AFB_ERROR("Can't create database."); + AFB_ReqError(request, "Can't create database."); ret = ERROR; } - curl_easy_cleanup(request); + curl_easy_cleanup(curl_req); if(ret == 0) - AFB_NOTICE("Database '"DEFAULT_DB"' created"); + AFB_ReqNotice(request, "Database '"DEFAULT_DB"' created"); return ret; } diff --git a/src/plugins/influxdb.h b/src/plugins/influxdb.h index ae40e77..ed05f97 100644 --- a/src/plugins/influxdb.h +++ b/src/plugins/influxdb.h @@ -19,6 +19,7 @@ #define _INFLUXDB_H_ #define _GNU_SOURCE +#include "ctl-plugin.h" #include "wrap-json.h" #include "tsdb.h" #include "../utils/list.h" @@ -34,7 +35,7 @@ struct series_t { uint64_t timestamp; }; -int create_database(); +int create_database(AFB_ReqT request); int unpack_metric_from_api(json_object *m, struct series_t *serie); diff --git a/src/plugins/tsdb.c b/src/plugins/tsdb.c index 7e10a74..27ae067 100644 --- a/src/plugins/tsdb.c +++ b/src/plugins/tsdb.c @@ -18,31 +18,6 @@ #include #include "tsdb.h" -int influxdb_ping() -{ - int ret = 0; - char *result; - size_t result_size; - CURL *request = curl_wrap_prepare_get("localhost:"DEFAULT_DBPORT"/ping",NULL, NULL); - curl_wrap_perform(request, &result, &result_size); - - if(curl_wrap_response_code_get(request) != 204) { - AFB_ERROR("TimeSeries DB not reachable"); - ret = ERROR; - } - - curl_easy_cleanup(request); - return ret; -} - -int db_ping() -{ - int ret = 0; - if(influxdb_ping() == 0) ret = INFLUX; - - return ret; -} - uint64_t get_ts() { struct timespec ts; diff --git a/src/plugins/tsdb.h b/src/plugins/tsdb.h index 437c5dc..8469ac0 100644 --- a/src/plugins/tsdb.h +++ b/src/plugins/tsdb.h @@ -18,9 +18,6 @@ #ifndef _TSDB_H #define _TSDB_H -#define AFB_BINDING_VERSION 2 -#include - #include #include "curl-wrap.h" @@ -32,6 +29,7 @@ #define URL_MAXIMUM_LENGTH 2047 enum db_available { + NODB = 0, INFLUX = 1, GRAPHITE = 2, OPENTSDB = 4 @@ -43,13 +41,6 @@ struct reader_args { u_int32_t delay; }; -CURL *influxdb_write(const char* host, const char *port, json_object *metric); -void influxdb_write_curl_cb(void *closure, int status, CURL *curl, const char *result, size_t size); - -int influxdb_reader(void *args); - -int db_ping(); - u_int64_t get_ts(); #endif -- cgit 1.2.3-korg