diff options
author | Romain Forlot <romain.forlot@iot.bzh> | 2017-09-22 09:23:11 +0200 |
---|---|---|
committer | Romain Forlot <romain.forlot@iot.bzh> | 2017-12-14 11:00:25 +0100 |
commit | e4258ef6e45009b5625f85ec7e4f8946805e1c4a (patch) | |
tree | 90cede1867442e0437aa240d6acc8e5ca40ef5fd | |
parent | 960d051d0a20b7146617880d30ad2496afa1f5e5 (diff) |
Subscribe
Change-Id: I7bbe972254d60f89cb26c98ea8519af087d8ae90
Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
-rw-r--r-- | signal-composer-binding/signal-composer-binding.cpp | 53 | ||||
-rw-r--r-- | signal-composer-binding/signal-composer.cpp | 24 | ||||
-rw-r--r-- | signal-composer-binding/signal-composer.hpp | 8 |
3 files changed, 66 insertions, 19 deletions
diff --git a/signal-composer-binding/signal-composer-binding.cpp b/signal-composer-binding/signal-composer-binding.cpp index 6c793a5..9a32989 100644 --- a/signal-composer-binding/signal-composer-binding.cpp +++ b/signal-composer-binding/signal-composer-binding.cpp @@ -48,31 +48,60 @@ void onEvent(const char *event, json_object *object) static int one_subscribe_unsubscribe(struct afb_req request, bool subscribe, const std::string& event, - json_object* args) + json_object* args, + clientAppCtxT* cContext) { - return 0; + int err = 0; + bool set = false; + std::vector<std::shared_ptr<Signal>> signals = Composer::instance().searchSignals(event); + + // Clean up already subscribed signals to avoid duplicata + for (std::vector<std::shared_ptr<Signal>>::const_iterator sig = signals.begin(); + sig != signals.end(); ++sig) + { + for (auto& ctxSig: cContext->subscribedSignals) + {if(*sig == ctxSig) {set = true;}} + if (set) {signals.erase(sig);} + } + + cContext->subscribedSignals.insert(cContext->subscribedSignals.end(), signals.begin(), signals.end()); + cContext->event = afb_event_is_valid(cContext->event) ? + cContext->event : afb_daemon_make_event("Signal-Composer"); + if(subscribe) + { + if(!afb_req_subscribe(request, cContext->event)) + {Composer::instance().addSubscription(cContext);} + } + else + { + if(!afb_req_unsubscribe(request, cContext->event)) + {Composer::instance().removeSubscription(cContext);} + } + + return err; } static int subscribe_unsubscribe(struct afb_req request, bool subscribe, - json_object* args) + json_object* args, + clientAppCtxT* cContext) { int rc = 0; json_object *event = nullptr; if (args == NULL || !json_object_object_get_ex(args, "event", &event)) { - rc = one_subscribe_unsubscribe(request, subscribe, "*", args); + rc = one_subscribe_unsubscribe(request, subscribe, "*", args, cContext); } else if (json_object_get_type(event) == json_type_string) { - rc = one_subscribe_unsubscribe(request, subscribe, json_object_get_string(event), args); + rc = one_subscribe_unsubscribe(request, subscribe, json_object_get_string(event), args, cContext); } else if (json_object_get_type(event) == json_type_array) { for (int i = 0 ; i < json_object_array_length(event) ; i++) { json_object *x = json_object_array_get_idx(event, i); - rc += one_subscribe_unsubscribe(request, subscribe, json_object_get_string(x), args); + rc += one_subscribe_unsubscribe(request, subscribe, json_object_get_string(x), args, cContext); } } else {rc = -1;} @@ -81,7 +110,7 @@ static int subscribe_unsubscribe(struct afb_req request, } /// @brief entry point for client subscription request. -static void do_subscribe_unsubscribe(afb_req request, bool subscribe) +static void do_subscribe_unsubscribe(afb_req request, bool subscribe, clientAppCtxT* cContext) { int rc = 0; json_object *oneArg = nullptr, *args = afb_req_json(request); @@ -90,12 +119,12 @@ static void do_subscribe_unsubscribe(afb_req request, bool subscribe) for (int i = 0 ; i < json_object_array_length(args); i++) { oneArg = json_object_array_get_idx(args, i); - rc += subscribe_unsubscribe(request, subscribe, oneArg); + rc += subscribe_unsubscribe(request, subscribe, oneArg, cContext); } } else { - rc = subscribe_unsubscribe(request, subscribe, args); + rc = subscribe_unsubscribe(request, subscribe, args, cContext); } if(rc >= 0) @@ -109,7 +138,7 @@ void subscribe(afb_req request) { clientAppCtxT *clientAppCtx = (clientAppCtxT*)afb_req_context_make(request, 0, Composer::createContext, Composer::destroyContext, nullptr); - //do_subscribe_unsubscribe(request, true); + do_subscribe_unsubscribe(request, true, clientAppCtx); } /// @brief entry point for client un-subscription request. @@ -117,7 +146,7 @@ void unsubscribe(afb_req request) { clientAppCtxT *clientAppCtx = (clientAppCtxT*)afb_req_context_make(request, 0, Composer::createContext, Composer::destroyContext, nullptr); - //do_subscribe_unsubscribe(request, false); + do_subscribe_unsubscribe(request, false, clientAppCtx); } /// @brief verb that loads JSON configuration (old SigComp.json file now) @@ -216,7 +245,7 @@ int execConf() } } - composer.execSubscription(); + composer.execSignalsSubscription(); AFB_DEBUG("Signal Composer Control configuration Done.\n signals=%d", (int)sigCount); diff --git a/signal-composer-binding/signal-composer.cpp b/signal-composer-binding/signal-composer.cpp index c6ffdd4..ba1b406 100644 --- a/signal-composer-binding/signal-composer.cpp +++ b/signal-composer-binding/signal-composer.cpp @@ -15,6 +15,7 @@ * limitations under the License. */ +#include <uuid.h> #include <string.h> #include <fnmatch.h> @@ -389,12 +390,12 @@ Composer& Composer::instance() void* Composer::createContext(void* ctx) { uuid_t x; + char cuid[38]; uuid_generate(x); ctx = (clientAppCtxT*)calloc(1, sizeof(clientAppCtxT)); clientAppCtxT* ret = (clientAppCtxT*) ctx; - uuid_copy(ret->uid, x); - ret->subscribedSignals = std::vector<std::shared_ptr<Signal>>(); - ret->event = afb_daemon_make_event("evt"); + uuid_unparse(x, cuid); + ret->event = afb_daemon_make_event(cuid); return (void*)ret; } @@ -543,7 +544,7 @@ json_object* Composer::getSignalValue(const std::string& sig, json_object* optio return response; } -int Composer::execSubscription() +int Composer::execSignalsSubscription() { int err = 0; for(SourceAPI& srcAPI: sourcesListV_) @@ -555,3 +556,18 @@ int Composer::execSubscription() } return err; } + +void Composer::addSubscription(clientAppCtxT* ctx) +{ + subscriptions_.push_back(ctx); +} + +void Composer::removeSubscription(clientAppCtxT* ctx) +{ + for(std::vector<clientAppCtxT*>::const_iterator i = subscriptions_.begin(); + i != subscriptions_.end(); ++i) + { + if(ctx == *i) + {subscriptions_.erase(i);} + } +} diff --git a/signal-composer-binding/signal-composer.hpp b/signal-composer-binding/signal-composer.hpp index afd987b..95cbc68 100644 --- a/signal-composer-binding/signal-composer.hpp +++ b/signal-composer-binding/signal-composer.hpp @@ -16,7 +16,6 @@ */ #pragma once -#include <uuid.h> #include <memory> #include <vector> #include <string> @@ -25,7 +24,6 @@ typedef struct clientAppCtxS { - uuid_t uid; std::vector<std::shared_ptr<Signal>> subscribedSignals; struct afb_event event; } clientAppCtxT; @@ -37,6 +35,7 @@ private: static CtlSectionT ctlSections_[]; ///< Config Section definition (note: controls section index should match handle retrieval in) std::vector<SourceAPI> sourcesListV_; + std::vector<clientAppCtxT*> subscriptions_; explicit Composer(const std::string& filepath); Composer(); @@ -66,7 +65,10 @@ public: std::vector<std::shared_ptr<Signal>> searchSignals(const std::string& aName); json_object* getSignalValue(const std::string& sig, json_object* options); - int execSubscription(); + int execSignalsSubscription(); + + void addSubscription(clientAppCtxT* ctx); + void removeSubscription(clientAppCtxT* ctx); }; struct pluginCBT |