diff options
-rw-r--r-- | plugins/low-can.cpp | 11 | ||||
-rw-r--r-- | signal-composer-binding/signal-composer-binding.cpp | 50 | ||||
-rw-r--r-- | signal-composer-binding/signal-composer.cpp | 32 | ||||
-rw-r--r-- | signal-composer-binding/signal-composer.hpp | 6 | ||||
-rw-r--r-- | signal-composer-binding/source.cpp | 71 | ||||
-rw-r--r-- | signal-composer-binding/source.hpp | 4 |
6 files changed, 122 insertions, 52 deletions
diff --git a/plugins/low-can.cpp b/plugins/low-can.cpp index 57c389e..011b9a9 100644 --- a/plugins/low-can.cpp +++ b/plugins/low-can.cpp @@ -65,7 +65,7 @@ CTLP_ONLOAD(plugin, composerHandle) memset(&pluginCtx->allDoorsCtx, 0, sizeof(allDoorsCtxT)); struct signalCBT* handle = (struct signalCBT*)composerHandle; - handle->pluginCtx = pluginCtx; + handle->pluginCtx = (void*)pluginCtx; return (void*)handle; } @@ -117,10 +117,15 @@ CTLP_CAPI (subscribeToLow, source, argsJ, eventJ) { } else { - AFB_DEBUG("Calling subscribe with %s", json_object_to_json_string_ext(pluginCtx->subscriptionBatch, JSON_C_TO_STRING_PRETTY)); + json_object_get(pluginCtx->subscriptionBatch); + AFB_DEBUG("Calling subscribe with %s", json_object_to_json_string(pluginCtx->subscriptionBatch)); err = afb_service_call_sync("low-can", "subscribe", pluginCtx->subscriptionBatch, &responseJ); if(err) - {AFB_ERROR("Subscribe to '%s' responseJ:%s", json_object_to_json_string_ext(pluginCtx->subscriptionBatch, JSON_C_TO_STRING_PRETTY), json_object_to_json_string(responseJ));} + {AFB_ERROR("Subscribe to '%s' responseJ:%s", json_object_to_json_string(pluginCtx->subscriptionBatch), json_object_to_json_string(responseJ));} + + // Renew subscription json object for the next time we need it + json_object_put(pluginCtx->subscriptionBatch); + pluginCtx->subscriptionBatch = json_object_new_array(); } return err; diff --git a/signal-composer-binding/signal-composer-binding.cpp b/signal-composer-binding/signal-composer-binding.cpp index bf40b2f..9a113d5 100644 --- a/signal-composer-binding/signal-composer-binding.cpp +++ b/signal-composer-binding/signal-composer-binding.cpp @@ -165,19 +165,30 @@ void unsubscribe(afb_req request) /// @brief verb that loads JSON configuration (old SigComp.json file now) void loadConf(afb_req request) { - json_object* args = afb_req_json(request), *fileJ; - const char* filepath; + Composer& composer = Composer::instance(); + json_object *sourcesJ = nullptr, + *signalsJ = nullptr; + const char* filepath = afb_req_value(request, "filepath"); - wrap_json_unpack(args, "{s:s}", "filepath", &filepath); - fileJ = json_object_from_file(filepath); + json_object *fileJ = json_object_from_file(filepath); - if(Composer::instance().loadSignals(fileJ)) - {afb_req_fail_f(request, "Loading configuration or subscription error", "Error code: -1");} - else - { + json_object_object_get_ex(fileJ, "sources", &sourcesJ); + json_object_object_get_ex(fileJ, "signals", &signalsJ); - afb_req_success(request, NULL, NULL); + if( sourcesJ && composer.loadSources(sourcesJ)) + { + afb_req_fail_f(request, "Loading 'sources' configuration or subscription error", "Error code: -1"); + return; } + if(signalsJ && composer.loadSignals(signalsJ)) + { + afb_req_fail_f(request, "Loading 'signals' configuration or subscription error", "Error code: -1"); + return; + } + else + {composer.initSignals();} + + afb_req_success(request, NULL, NULL); } /// @brief entry point to list available signals @@ -189,7 +200,7 @@ void list(afb_req request) for(auto& sig: allSignals) {json_object_array_add(allSignalsJ, sig->toJSON());} - if(json_object_array_length(allSignalsJ) && !execConf()) + if(json_object_array_length(allSignalsJ)) {afb_req_success(request, allSignalsJ, NULL);} else {afb_req_fail(request, "error", "No Signals recorded so far");} @@ -238,25 +249,10 @@ int execConf() Composer& composer = Composer::instance(); int err = 0; CtlConfigExec(nullptr, composer.ctlConfig()); - std::vector<std::shared_ptr<Signal>> allSignals = composer.getAllSignals(); - ssize_t sigCount = allSignals.size(); - for( std::shared_ptr<Signal>& sig: allSignals) - { - sig->attachToSourceSignals(composer); - } - - for(auto& sig: allSignals) - { - if( (err += sig->initialRecursionCheck()) ) - { - AFB_ERROR("There is an infinite recursion loop in your signals definition. Root coming from signal: %s", sig->id().c_str()); - return err; - } - } - composer.execSignalsSubscription(); + composer.initSignals(); - AFB_DEBUG("Signal Composer Control configuration Done.\n signals=%d", (int)sigCount); + AFB_DEBUG("Signal Composer Control configuration Done."); return err; } diff --git a/signal-composer-binding/signal-composer.cpp b/signal-composer-binding/signal-composer.cpp index 3ad2d4e..8461748 100644 --- a/signal-composer-binding/signal-composer.cpp +++ b/signal-composer-binding/signal-composer.cpp @@ -634,6 +634,18 @@ int Composer::loadConfig(const std::string& filepath) // return -1; } +int Composer::loadSources(json_object* sourcesJ) +{ + int err = loadSourcesAPI(nullptr, nullptr, sourcesJ); + if(err) + { + AFB_ERROR("Loading sources failed. JSON: %s", json_object_to_json_string(sourcesJ)); + return err; + } + initSourcesAPI(); + return err; +} + int Composer::loadSignals(json_object* signalsJ) { return loadSignals(nullptr, nullptr, signalsJ); @@ -646,10 +658,23 @@ CtlConfigT* Composer::ctlConfig() void Composer::initSourcesAPI() { - for(auto& src: sourcesListV_) + for(int i=0; i < newSourcesListV_.size(); i++) { + std::shared_ptr<SourceAPI> src = newSourcesListV_.back(); + newSourcesListV_.pop_back(); src->init(); + sourcesListV_.push_back(src); + } +} + +void Composer::initSignals() +{ + for(int i=0; i < sourcesListV_.size(); i++) + { + std::shared_ptr<SourceAPI> src = sourcesListV_[i]; + src->initSignals(); } + execSignalsSubscription(); } std::shared_ptr<SourceAPI> Composer::getSourceAPI(const std::string& api) @@ -659,6 +684,11 @@ std::shared_ptr<SourceAPI> Composer::getSourceAPI(const std::string& api) if (source->api() == api) {return source;} } + for(auto& source: newSourcesListV_) + { + if (source->api() == api) + {return source;} + } return nullptr; } diff --git a/signal-composer-binding/signal-composer.hpp b/signal-composer-binding/signal-composer.hpp index 6064bf2..8ae41e9 100644 --- a/signal-composer-binding/signal-composer.hpp +++ b/signal-composer-binding/signal-composer.hpp @@ -26,6 +26,7 @@ private: CtlConfigT* ctlConfig_; static CtlSectionT ctlSections_[]; ///< Config Section definition (note: controls section index should match handle retrieval in) + std::vector<std::shared_ptr<SourceAPI>> newSourcesListV_; std::vector<std::shared_ptr<SourceAPI>> sourcesListV_; explicit Composer(const std::string& filepath); @@ -45,6 +46,7 @@ private: static int loadSignals(AFB_ApiT apihandle, CtlSectionT* section, json_object *signalsJ); void initSourcesAPI(); + void execSignalsSubscription(); std::shared_ptr<SourceAPI> getSourceAPI(const std::string& api); void processOptions(const std::map<std::string, int>& opts, std::shared_ptr<Signal> sig, json_object* response) const; public: @@ -53,12 +55,12 @@ public: static void destroyContext(void* ctx); static std::vector<std::string> parseURI(const std::string& uri); int loadConfig(const std::string& filepath); + int loadSources(json_object* sourcesJ); int loadSignals(json_object* signalsJ); + void initSignals(); CtlConfigT* ctlConfig(); std::vector<std::shared_ptr<Signal>> getAllSignals(); std::vector<std::shared_ptr<Signal>> searchSignals(const std::string& aName); json_object* getsignalValue(const std::string& sig, json_object* options); - - void execSignalsSubscription(); }; diff --git a/signal-composer-binding/source.cpp b/signal-composer-binding/source.cpp index 3180513..f616e51 100644 --- a/signal-composer-binding/source.cpp +++ b/signal-composer-binding/source.cpp @@ -57,16 +57,41 @@ void SourceAPI::addSignal(const std::string& id, const std::string& event, std:: { std::shared_ptr<Signal> sig = std::make_shared<Signal>(id, event, depends, unit, retention, frequency, onReceived, getSignalsArgs); - signalsMap_[id] = sig; + newSignalsM_[id] = sig; +} + +void SourceAPI::initSignals() +{ + Composer& composer = Composer::instance(); + int err = 0; + for(auto& i: newSignalsM_) + {i.second->attachToSourceSignals(composer);} + + for(auto i = newSignalsM_.begin(); i != newSignalsM_.end();) + { + if (err += i->second->initialRecursionCheck()) + { + AFB_ERROR("There is an infinite recursion loop in your signals definition. Root coming from signal: %s. Ignoring it.", i->second->id().c_str()); + ++i; + continue; + } + signalsM_[i->first] = i->second; + i = newSignalsM_.erase(i); + } } std::vector<std::shared_ptr<Signal>> SourceAPI::getSignals() const { std::vector<std::shared_ptr<Signal>> signals; - for (auto& sig: signalsMap_) + for (auto& sig: signalsM_) { signals.push_back(sig.second); } + for (auto& sig: newSignalsM_) + { + signals.push_back(sig.second); + } + return signals; } @@ -81,11 +106,18 @@ std::vector<std::shared_ptr<Signal>> SourceAPI::searchSignals(const std::string& { std::vector<std::shared_ptr<Signal>> signals; - if(signalsMap_.count(name)) - {signals.emplace_back(signalsMap_[name]);} + if(signalsM_.count(name)) + {signals.emplace_back(signalsM_[name]);} + if(newSignalsM_.count(name)) + {signals.emplace_back(signalsM_[name]);} else { - for (auto& sig: signalsMap_) + for (auto& sig: signalsM_) + { + if(*sig.second == name) + {signals.emplace_back(sig.second);} + } + for (auto& sig: newSignalsM_) { if(*sig.second == name) {signals.emplace_back(sig.second);} @@ -104,22 +136,25 @@ void SourceAPI::makeSubscription() source.api = nullptr; // We use binding v2, no dynamic API. source.request = {nullptr, nullptr}; - for(auto& sig: signalsMap_) + for(auto& sig: signalsM_) { - json_object* signalJ = sig.second->toJSON(); - if(!signalJ) + if(!sig.second->subscribed_) { - AFB_ERROR("Error building JSON query object to subscribe to for signal %s", sig.second->id().c_str()); - break; + json_object* signalJ = sig.second->toJSON(); + if(!signalJ) + { + AFB_ERROR("Error building JSON query object to subscribe to for signal %s", sig.second->id().c_str()); + break; + } + source.uid = sig.first.c_str(); + source.context = getSignals_->type == CTL_TYPE_CB ? + getSignals_->exec.cb.plugin->context: + nullptr; + ActionExecOne(&source, getSignals_, signalJ); + // Considerate signal subscribed no matter what + sig.second->subscribed_ = true; + json_object_put(signalJ); } - source.uid = sig.first.c_str(); - source.context = getSignals_->type == CTL_TYPE_CB ? - getSignals_->exec.cb.plugin->context: - nullptr; - ActionExecOne(&source, getSignals_, signalJ); - // Considerate signal subscribed no matter what - sig.second->subscribed_ = true; - json_object_put(signalJ); } source.uid = ""; ActionExecOne(&source, getSignals_, nullptr); diff --git a/signal-composer-binding/source.hpp b/signal-composer-binding/source.hpp index 5c1e074..3d44922 100644 --- a/signal-composer-binding/source.hpp +++ b/signal-composer-binding/source.hpp @@ -33,7 +33,8 @@ private: // Parameters inherited by source's signals if none defined for it struct signalsDefault signalsDefault_; - std::map<std::string, std::shared_ptr<Signal>> signalsMap_; + std::map<std::string, std::shared_ptr<Signal>> newSignalsM_; + std::map<std::string, std::shared_ptr<Signal>> signalsM_; public: SourceAPI(); @@ -44,6 +45,7 @@ public: const struct signalsDefault& signalsDefault() const; void addSignal(const std::string& id, const std::string& event, std::vector<std::string>& sources, int retention, const std::string& unit, double frequency, CtlActionT* onReceived, json_object* getSignalsArgs); + void initSignals(); std::vector<std::shared_ptr<Signal>> getSignals() const; std::vector<std::shared_ptr<Signal>> searchSignals(const std::string& name); |