diff options
author | Romain Forlot <romain.forlot@iot.bzh> | 2017-09-22 18:16:57 +0200 |
---|---|---|
committer | Romain Forlot <romain.forlot@iot.bzh> | 2017-12-14 11:00:25 +0100 |
commit | 51ee5299a7db41e52da2cf52dd9cd9c05b76740a (patch) | |
tree | ecdc922c7c7573ddb0dc091c9cbc3b111a0cc981 | |
parent | a758f4a632adc7fff4769d97379264de6c68685d (diff) |
Finalize subscription to be a simple relay for now
Adding new object holding signals subscribed and afb event
that observes Signals using Reactive response observer design
pattern
Change-Id: I96647d36e0d27c25a399c1b3789621a803a845b6
Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
-rw-r--r-- | plugins/low-can.cpp | 2 | ||||
-rw-r--r-- | signal-composer-binding/CMakeLists.txt | 2 | ||||
-rw-r--r-- | signal-composer-binding/clientApp.cpp | 65 | ||||
-rw-r--r-- | signal-composer-binding/clientApp.hpp | 34 | ||||
-rw-r--r-- | signal-composer-binding/signal-composer-binding.cpp | 54 | ||||
-rw-r--r-- | signal-composer-binding/signal-composer.cpp | 41 | ||||
-rw-r--r-- | signal-composer-binding/signal-composer.hpp | 18 | ||||
-rw-r--r-- | signal-composer-binding/source.cpp | 10 | ||||
-rw-r--r-- | signal-composer-binding/source.hpp | 8 |
9 files changed, 144 insertions, 90 deletions
diff --git a/plugins/low-can.cpp b/plugins/low-can.cpp index c1fbd60..62f10f3 100644 --- a/plugins/low-can.cpp +++ b/plugins/low-can.cpp @@ -129,7 +129,7 @@ CTLP_CAPI (subscribeToLow, source, argsJ, eventJ, context) { CTLP_CAPI (isOpen, source, argsJ, eventJ, context) { const char *eventName = nullptr; int eventStatus; - double timestamp; + uint64_t timestamp; lowCANCtxT *pluginCtx=(lowCANCtxT*)source->context; int err = wrap_json_unpack(eventJ, "{ss,sb,s?F}", diff --git a/signal-composer-binding/CMakeLists.txt b/signal-composer-binding/CMakeLists.txt index 3d7130b..5255e21 100644 --- a/signal-composer-binding/CMakeLists.txt +++ b/signal-composer-binding/CMakeLists.txt @@ -21,7 +21,7 @@ PROJECT_TARGET_ADD(signal-composer) # Define project Targets - add_library(${TARGET_NAME} MODULE ${TARGET_NAME}-binding.cpp ${TARGET_NAME}.cpp source.cpp signal.cpp) + add_library(${TARGET_NAME} MODULE ${TARGET_NAME}-binding.cpp ${TARGET_NAME}.cpp source.cpp signal.cpp clientApp.cpp) # Binder exposes a unique public entry point SET_TARGET_PROPERTIES(${TARGET_NAME} PROPERTIES diff --git a/signal-composer-binding/clientApp.cpp b/signal-composer-binding/clientApp.cpp new file mode 100644 index 0000000..ce30163 --- /dev/null +++ b/signal-composer-binding/clientApp.cpp @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2015, 2016 "IoT.bzh" + * Author "Romain Forlot" <romain.forlot@iot.bzh> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +#include "clientApp.hpp" + +clientAppCtx::clientAppCtx(const char* uuid) +: uuid_(uuid) +{} + +void clientAppCtx::update(Signal* sig) +{ + json_object* sigJ = sig->toJSON(); + if(afb_event_push(event_, sigJ) == 0) + {sig->delObserver(this);} + return; +} + +void clientAppCtx::appendSignals(std::vector<Signal*>& sigV) +{ + bool set = false; + // Clean up already subscribed signals to avoid duplicata + for (std::vector<Signal*>::const_iterator it = sigV.begin(); + it != sigV.end(); ++it) + { + for (auto& ctxSig: subscribedSignals_) + {if(*it == ctxSig) {set = true;}} + if (set) + { + set = false; + sigV.erase(it); + continue; + } + Signal* sig = *it; + sig->addObserver(this); + } + + subscribedSignals_.insert(subscribedSignals_.end(), sigV.begin(), sigV.end()); +} + +int clientAppCtx::makeSubscription(struct afb_req request) +{ + event_ = afb_event_is_valid(event_) ? + event_ : afb_daemon_make_event(uuid_.c_str()); + return afb_req_subscribe(request, event_); +} + +int clientAppCtx::makeUnsubscription(struct afb_req request) +{ + return afb_event_is_valid(event_) ? + afb_req_unsubscribe(request, event_) : -1; +} diff --git a/signal-composer-binding/clientApp.hpp b/signal-composer-binding/clientApp.hpp new file mode 100644 index 0000000..7e1bd9d --- /dev/null +++ b/signal-composer-binding/clientApp.hpp @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2015, 2016 "IoT.bzh" + * Author "Romain Forlot" <romain.forlot@iot.bzh> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +#pragma once + +#include "signal-composer.hpp" + +class clientAppCtx: public Observer<Signal> +{ +private: + std::string uuid_; + std::vector<Signal*> subscribedSignals_; + struct afb_event event_; +public: + explicit clientAppCtx(const char* uuid); + + void update(Signal* sig); + void appendSignals(std::vector<Signal*>& sigV); + int makeSubscription(struct afb_req request); + int makeUnsubscription(struct afb_req request); +}; diff --git a/signal-composer-binding/signal-composer-binding.cpp b/signal-composer-binding/signal-composer-binding.cpp index 9a32989..d4236a0 100644 --- a/signal-composer-binding/signal-composer-binding.cpp +++ b/signal-composer-binding/signal-composer-binding.cpp @@ -23,7 +23,7 @@ #include "signal-composer-binding.hpp" #include "signal-composer-apidef.h" -#include "signal-composer.hpp" +#include "clientApp.hpp" /// @brief callback for receiving message from low bindings. This will callback /// an action defined in the configuration files depending on the event received @@ -35,7 +35,7 @@ void onEvent(const char *event, json_object *object) AFB_DEBUG("Received event json: %s", json_object_to_json_string(object)); Composer& composer = Composer::instance(); - std::vector<std::shared_ptr<Signal>> signals = composer.searchSignals(event); + std::vector<Signal*> signals = composer.searchSignals(event); if(!signals.empty()) { for(auto& sig: signals) @@ -49,34 +49,16 @@ static int one_subscribe_unsubscribe(struct afb_req request, bool subscribe, const std::string& event, json_object* args, - clientAppCtxT* cContext) + clientAppCtx* cContext) { int err = 0; - bool set = false; - std::vector<std::shared_ptr<Signal>> signals = Composer::instance().searchSignals(event); + std::vector<Signal*> signals = Composer::instance().searchSignals(event); - // Clean up already subscribed signals to avoid duplicata - for (std::vector<std::shared_ptr<Signal>>::const_iterator sig = signals.begin(); - sig != signals.end(); ++sig) - { - for (auto& ctxSig: cContext->subscribedSignals) - {if(*sig == ctxSig) {set = true;}} - if (set) {signals.erase(sig);} - } - - cContext->subscribedSignals.insert(cContext->subscribedSignals.end(), signals.begin(), signals.end()); - cContext->event = afb_event_is_valid(cContext->event) ? - cContext->event : afb_daemon_make_event("Signal-Composer"); + cContext->appendSignals(signals); if(subscribe) - { - if(!afb_req_subscribe(request, cContext->event)) - {Composer::instance().addSubscription(cContext);} - } + {err = cContext->makeSubscription(request);} else - { - if(!afb_req_unsubscribe(request, cContext->event)) - {Composer::instance().removeSubscription(cContext);} - } + {err = cContext->makeUnsubscription(request);} return err; } @@ -84,7 +66,7 @@ static int one_subscribe_unsubscribe(struct afb_req request, static int subscribe_unsubscribe(struct afb_req request, bool subscribe, json_object* args, - clientAppCtxT* cContext) + clientAppCtx* cContext) { int rc = 0; json_object *event = nullptr; @@ -110,7 +92,7 @@ static int subscribe_unsubscribe(struct afb_req request, } /// @brief entry point for client subscription request. -static void do_subscribe_unsubscribe(afb_req request, bool subscribe, clientAppCtxT* cContext) +static void do_subscribe_unsubscribe(afb_req request, bool subscribe, clientAppCtx* cContext) { int rc = 0; json_object *oneArg = nullptr, *args = afb_req_json(request); @@ -136,17 +118,17 @@ static void do_subscribe_unsubscribe(afb_req request, bool subscribe, clientAppC /// @brief entry point for client un-subscription request. void subscribe(afb_req request) { - clientAppCtxT *clientAppCtx = (clientAppCtxT*)afb_req_context_make(request, 0, Composer::createContext, Composer::destroyContext, nullptr); + clientAppCtx *cContext = reinterpret_cast<clientAppCtx*>(afb_req_context_make(request, 0, Composer::createContext, Composer::destroyContext, nullptr)); - do_subscribe_unsubscribe(request, true, clientAppCtx); + do_subscribe_unsubscribe(request, true, cContext); } /// @brief entry point for client un-subscription request. void unsubscribe(afb_req request) { - clientAppCtxT *clientAppCtx = (clientAppCtxT*)afb_req_context_make(request, 0, Composer::createContext, Composer::destroyContext, nullptr); + clientAppCtx *cContext = reinterpret_cast<clientAppCtx*>(afb_req_context_make(request, 0, Composer::createContext, Composer::destroyContext, nullptr)); - do_subscribe_unsubscribe(request, false, clientAppCtx); + do_subscribe_unsubscribe(request, false, cContext); } /// @brief verb that loads JSON configuration (old SigComp.json file now) @@ -172,7 +154,7 @@ void list(afb_req request) { struct json_object *allSignalsJ = json_object_new_array(); - std::vector<std::shared_ptr<Signal>> allSignals = Composer::instance().getAllSignals(); + std::vector<Signal*> allSignals = Composer::instance().getAllSignals(); for(auto& sig: allSignals) {json_object_array_add(allSignalsJ, sig->toJSON());} @@ -229,13 +211,13 @@ int execConf() Composer& composer = Composer::instance(); int err = 0; CtlConfigExec(composer.ctlConfig()); - std::vector<std::shared_ptr<Signal>> allSignals = composer.getAllSignals(); + std::vector<Signal*> allSignals = composer.getAllSignals(); ssize_t sigCount = allSignals.size(); - for( std::shared_ptr<Signal>& sig: allSignals) + for( Signal*& sig: allSignals) { sig->attachToSourceSignals(composer); } - +/* for(auto& sig: allSignals) { if( (err += sig->recursionCheck()) ) @@ -244,7 +226,7 @@ int execConf() return err; } } - +*/ composer.execSignalsSubscription(); AFB_DEBUG("Signal Composer Control configuration Done.\n signals=%d", (int)sigCount); diff --git a/signal-composer-binding/signal-composer.cpp b/signal-composer-binding/signal-composer.cpp index ba1b406..db35e76 100644 --- a/signal-composer-binding/signal-composer.cpp +++ b/signal-composer-binding/signal-composer.cpp @@ -19,11 +19,11 @@ #include <string.h> #include <fnmatch.h> -#include "signal-composer.hpp" +#include "clientApp.hpp" extern "C" void setSignalValueHandle(const char* aName, long long int timestamp, struct SignalValue value) { - std::vector<std::shared_ptr<Signal>> signals = Composer::instance().searchSignals(aName); + std::vector<Signal*> signals = Composer::instance().searchSignals(aName); if(!signals.empty()) { for(auto& sig: signals) @@ -328,7 +328,7 @@ int Composer::loadSignals(CtlSectionT* section, json_object *signalsJ) return err; } -void Composer::processOptions(const char** opts, std::shared_ptr<Signal> sig, json_object* response) const +void Composer::processOptions(const char** opts, Signal* sig, json_object* response) const { for(int idx=0; idx < sizeof(opts); idx++) { @@ -392,10 +392,8 @@ void* Composer::createContext(void* ctx) uuid_t x; char cuid[38]; uuid_generate(x); - ctx = (clientAppCtxT*)calloc(1, sizeof(clientAppCtxT)); - clientAppCtxT* ret = (clientAppCtxT*) ctx; uuid_unparse(x, cuid); - ret->event = afb_daemon_make_event(cuid); + clientAppCtx* ret = new clientAppCtx(cuid); return (void*)ret; } @@ -453,12 +451,12 @@ int Composer::initSourcesAPI() return err; } -std::vector<std::shared_ptr<Signal>> Composer::getAllSignals() +std::vector<Signal*> Composer::getAllSignals() { - std::vector<std::shared_ptr<Signal>> allSignals; + std::vector<Signal*> allSignals; for( auto& source : sourcesListV_) { - std::vector<std::shared_ptr<Signal>> srcSignals = source.getSignals(); + std::vector<Signal*> srcSignals = source.getSignals(); allSignals.insert(allSignals.end(), srcSignals.begin(), srcSignals.end()); } @@ -475,10 +473,10 @@ SourceAPI* Composer::getSourceAPI(const std::string& api) return nullptr; } -std::vector<std::shared_ptr<Signal>> Composer::searchSignals(const std::string& aName) +std::vector<Signal*> Composer::searchSignals(const std::string& aName) { std::string api; - std::vector<std::shared_ptr<Signal>> signals; + std::vector<Signal*> signals; size_t sep = aName.find_first_of("/"); if(sep != std::string::npos) { @@ -488,8 +486,8 @@ std::vector<std::shared_ptr<Signal>> Composer::searchSignals(const std::string& } else { - std::vector<std::shared_ptr<Signal>> allSignals = getAllSignals(); - for (std::shared_ptr<Signal>& sig : allSignals) + std::vector<Signal*> allSignals = getAllSignals(); + for (Signal*& sig : allSignals) { if(*sig == aName) {signals.emplace_back(sig);} @@ -509,7 +507,7 @@ json_object* Composer::getSignalValue(const std::string& sig, json_object* optio &opts[2], &opts[3]); - std::vector<std::shared_ptr<Signal>> sigP = searchSignals(sig); + std::vector<Signal*> sigP = searchSignals(sig); if(!sigP.empty()) { for(auto& sig: sigP) @@ -556,18 +554,3 @@ int Composer::execSignalsSubscription() } return err; } - -void Composer::addSubscription(clientAppCtxT* ctx) -{ - subscriptions_.push_back(ctx); -} - -void Composer::removeSubscription(clientAppCtxT* ctx) -{ - for(std::vector<clientAppCtxT*>::const_iterator i = subscriptions_.begin(); - i != subscriptions_.end(); ++i) - { - if(ctx == *i) - {subscriptions_.erase(i);} - } -} diff --git a/signal-composer-binding/signal-composer.hpp b/signal-composer-binding/signal-composer.hpp index 95cbc68..2fd471c 100644 --- a/signal-composer-binding/signal-composer.hpp +++ b/signal-composer-binding/signal-composer.hpp @@ -16,18 +16,12 @@ */ #pragma once -#include <memory> + #include <vector> #include <string> #include "source.hpp" -typedef struct clientAppCtxS -{ - std::vector<std::shared_ptr<Signal>> subscribedSignals; - struct afb_event event; -} clientAppCtxT; - class Composer { private: @@ -35,7 +29,6 @@ private: static CtlSectionT ctlSections_[]; ///< Config Section definition (note: controls section index should match handle retrieval in) std::vector<SourceAPI> sourcesListV_; - std::vector<clientAppCtxT*> subscriptions_; explicit Composer(const std::string& filepath); Composer(); @@ -49,7 +42,7 @@ private: int loadOneSignal(json_object* signalsJ); static int loadSignals(CtlSectionT* section, json_object *signalsJ); - void processOptions(const char** opts, std::shared_ptr<Signal> sig, json_object* response) const; + void processOptions(const char** opts, Signal* sig, json_object* response) const; public: static Composer& instance(); static void* createContext(void* ctx); @@ -60,15 +53,12 @@ public: CtlConfigT* ctlConfig(); int initSourcesAPI(); - std::vector<std::shared_ptr<Signal>> getAllSignals(); + std::vector<Signal*> getAllSignals(); SourceAPI* getSourceAPI(const std::string& api); - std::vector<std::shared_ptr<Signal>> searchSignals(const std::string& aName); + std::vector<Signal*> searchSignals(const std::string& aName); json_object* getSignalValue(const std::string& sig, json_object* options); int execSignalsSubscription(); - - void addSubscription(clientAppCtxT* ctx); - void removeSubscription(clientAppCtxT* ctx); }; struct pluginCBT diff --git a/signal-composer-binding/source.cpp b/signal-composer-binding/source.cpp index d777515..f7dee11 100644 --- a/signal-composer-binding/source.cpp +++ b/signal-composer-binding/source.cpp @@ -44,14 +44,14 @@ std::string SourceAPI::api() const void SourceAPI::addSignal(const std::string& id, const std::string& event, std::vector<std::string>& depends, const std::string& sClass, const std::string& unit, double frequency, CtlActionT* onReceived, json_object* getSignalsArgs) { - std::shared_ptr<Signal> sig = std::make_shared<Signal>(id, event, depends, unit, frequency, onReceived, getSignalsArgs); + Signal* sig = new Signal(id, event, depends, unit, frequency, onReceived, getSignalsArgs); signalsMap_[sig] = false; } -std::vector<std::shared_ptr<Signal>> SourceAPI::getSignals() const +std::vector<Signal*> SourceAPI::getSignals() const { - std::vector<std::shared_ptr<Signal>> signals; + std::vector<Signal*> signals; for (auto& sig: signalsMap_) { signals.push_back(sig.first); @@ -59,9 +59,9 @@ std::vector<std::shared_ptr<Signal>> SourceAPI::getSignals() const return signals; } -std::vector<std::shared_ptr<Signal>> SourceAPI::searchSignals(const std::string& name) const +std::vector<Signal*> SourceAPI::searchSignals(const std::string& name) const { - std::vector<std::shared_ptr<Signal>> signals; + std::vector<Signal*> signals; for (auto& sig: signalsMap_) { if(*sig.first == name) diff --git a/signal-composer-binding/source.hpp b/signal-composer-binding/source.hpp index 0885bf0..8b06b89 100644 --- a/signal-composer-binding/source.hpp +++ b/signal-composer-binding/source.hpp @@ -17,7 +17,7 @@ #pragma once -#include <memory> + #include "signal.hpp" @@ -28,7 +28,7 @@ private: CtlActionT* init_; CtlActionT* getSignals_; - std::map<std::shared_ptr<Signal>, bool> signalsMap_; + std::map<Signal*, bool> signalsMap_; public: SourceAPI(); @@ -38,8 +38,8 @@ public: std::string api() const; void addSignal(const std::string& id, const std::string& event, std::vector<std::string>& sources, const std::string& sClass, const std::string& unit, double frequency, CtlActionT* onReceived, json_object* getSignalsArgs); - std::vector<std::shared_ptr<Signal>> getSignals() const; - std::vector<std::shared_ptr<Signal>> searchSignals(const std::string& name) const; + std::vector<Signal*> getSignals() const; + std::vector<Signal*> searchSignals(const std::string& name) const; int makeSubscription(); }; |