summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRomain Forlot <romain.forlot@iot.bzh>2017-09-22 09:23:11 +0200
committerRomain Forlot <romain.forlot@iot.bzh>2017-12-14 11:00:25 +0100
commite4258ef6e45009b5625f85ec7e4f8946805e1c4a (patch)
tree90cede1867442e0437aa240d6acc8e5ca40ef5fd
parent960d051d0a20b7146617880d30ad2496afa1f5e5 (diff)
Subscribe
Change-Id: I7bbe972254d60f89cb26c98ea8519af087d8ae90 Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
-rw-r--r--signal-composer-binding/signal-composer-binding.cpp53
-rw-r--r--signal-composer-binding/signal-composer.cpp24
-rw-r--r--signal-composer-binding/signal-composer.hpp8
3 files changed, 66 insertions, 19 deletions
diff --git a/signal-composer-binding/signal-composer-binding.cpp b/signal-composer-binding/signal-composer-binding.cpp
index 6c793a5..9a32989 100644
--- a/signal-composer-binding/signal-composer-binding.cpp
+++ b/signal-composer-binding/signal-composer-binding.cpp
@@ -48,31 +48,60 @@ void onEvent(const char *event, json_object *object)
static int one_subscribe_unsubscribe(struct afb_req request,
bool subscribe,
const std::string& event,
- json_object* args)
+ json_object* args,
+ clientAppCtxT* cContext)
{
- return 0;
+ int err = 0;
+ bool set = false;
+ std::vector<std::shared_ptr<Signal>> signals = Composer::instance().searchSignals(event);
+
+ // Clean up already subscribed signals to avoid duplicata
+ for (std::vector<std::shared_ptr<Signal>>::const_iterator sig = signals.begin();
+ sig != signals.end(); ++sig)
+ {
+ for (auto& ctxSig: cContext->subscribedSignals)
+ {if(*sig == ctxSig) {set = true;}}
+ if (set) {signals.erase(sig);}
+ }
+
+ cContext->subscribedSignals.insert(cContext->subscribedSignals.end(), signals.begin(), signals.end());
+ cContext->event = afb_event_is_valid(cContext->event) ?
+ cContext->event : afb_daemon_make_event("Signal-Composer");
+ if(subscribe)
+ {
+ if(!afb_req_subscribe(request, cContext->event))
+ {Composer::instance().addSubscription(cContext);}
+ }
+ else
+ {
+ if(!afb_req_unsubscribe(request, cContext->event))
+ {Composer::instance().removeSubscription(cContext);}
+ }
+
+ return err;
}
static int subscribe_unsubscribe(struct afb_req request,
bool subscribe,
- json_object* args)
+ json_object* args,
+ clientAppCtxT* cContext)
{
int rc = 0;
json_object *event = nullptr;
if (args == NULL || !json_object_object_get_ex(args, "event", &event))
{
- rc = one_subscribe_unsubscribe(request, subscribe, "*", args);
+ rc = one_subscribe_unsubscribe(request, subscribe, "*", args, cContext);
}
else if (json_object_get_type(event) == json_type_string)
{
- rc = one_subscribe_unsubscribe(request, subscribe, json_object_get_string(event), args);
+ rc = one_subscribe_unsubscribe(request, subscribe, json_object_get_string(event), args, cContext);
}
else if (json_object_get_type(event) == json_type_array)
{
for (int i = 0 ; i < json_object_array_length(event) ; i++)
{
json_object *x = json_object_array_get_idx(event, i);
- rc += one_subscribe_unsubscribe(request, subscribe, json_object_get_string(x), args);
+ rc += one_subscribe_unsubscribe(request, subscribe, json_object_get_string(x), args, cContext);
}
}
else {rc = -1;}
@@ -81,7 +110,7 @@ static int subscribe_unsubscribe(struct afb_req request,
}
/// @brief entry point for client subscription request.
-static void do_subscribe_unsubscribe(afb_req request, bool subscribe)
+static void do_subscribe_unsubscribe(afb_req request, bool subscribe, clientAppCtxT* cContext)
{
int rc = 0;
json_object *oneArg = nullptr, *args = afb_req_json(request);
@@ -90,12 +119,12 @@ static void do_subscribe_unsubscribe(afb_req request, bool subscribe)
for (int i = 0 ; i < json_object_array_length(args); i++)
{
oneArg = json_object_array_get_idx(args, i);
- rc += subscribe_unsubscribe(request, subscribe, oneArg);
+ rc += subscribe_unsubscribe(request, subscribe, oneArg, cContext);
}
}
else
{
- rc = subscribe_unsubscribe(request, subscribe, args);
+ rc = subscribe_unsubscribe(request, subscribe, args, cContext);
}
if(rc >= 0)
@@ -109,7 +138,7 @@ void subscribe(afb_req request)
{
clientAppCtxT *clientAppCtx = (clientAppCtxT*)afb_req_context_make(request, 0, Composer::createContext, Composer::destroyContext, nullptr);
- //do_subscribe_unsubscribe(request, true);
+ do_subscribe_unsubscribe(request, true, clientAppCtx);
}
/// @brief entry point for client un-subscription request.
@@ -117,7 +146,7 @@ void unsubscribe(afb_req request)
{
clientAppCtxT *clientAppCtx = (clientAppCtxT*)afb_req_context_make(request, 0, Composer::createContext, Composer::destroyContext, nullptr);
- //do_subscribe_unsubscribe(request, false);
+ do_subscribe_unsubscribe(request, false, clientAppCtx);
}
/// @brief verb that loads JSON configuration (old SigComp.json file now)
@@ -216,7 +245,7 @@ int execConf()
}
}
- composer.execSubscription();
+ composer.execSignalsSubscription();
AFB_DEBUG("Signal Composer Control configuration Done.\n signals=%d", (int)sigCount);
diff --git a/signal-composer-binding/signal-composer.cpp b/signal-composer-binding/signal-composer.cpp
index c6ffdd4..ba1b406 100644
--- a/signal-composer-binding/signal-composer.cpp
+++ b/signal-composer-binding/signal-composer.cpp
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+#include <uuid.h>
#include <string.h>
#include <fnmatch.h>
@@ -389,12 +390,12 @@ Composer& Composer::instance()
void* Composer::createContext(void* ctx)
{
uuid_t x;
+ char cuid[38];
uuid_generate(x);
ctx = (clientAppCtxT*)calloc(1, sizeof(clientAppCtxT));
clientAppCtxT* ret = (clientAppCtxT*) ctx;
- uuid_copy(ret->uid, x);
- ret->subscribedSignals = std::vector<std::shared_ptr<Signal>>();
- ret->event = afb_daemon_make_event("evt");
+ uuid_unparse(x, cuid);
+ ret->event = afb_daemon_make_event(cuid);
return (void*)ret;
}
@@ -543,7 +544,7 @@ json_object* Composer::getSignalValue(const std::string& sig, json_object* optio
return response;
}
-int Composer::execSubscription()
+int Composer::execSignalsSubscription()
{
int err = 0;
for(SourceAPI& srcAPI: sourcesListV_)
@@ -555,3 +556,18 @@ int Composer::execSubscription()
}
return err;
}
+
+void Composer::addSubscription(clientAppCtxT* ctx)
+{
+ subscriptions_.push_back(ctx);
+}
+
+void Composer::removeSubscription(clientAppCtxT* ctx)
+{
+ for(std::vector<clientAppCtxT*>::const_iterator i = subscriptions_.begin();
+ i != subscriptions_.end(); ++i)
+ {
+ if(ctx == *i)
+ {subscriptions_.erase(i);}
+ }
+}
diff --git a/signal-composer-binding/signal-composer.hpp b/signal-composer-binding/signal-composer.hpp
index afd987b..95cbc68 100644
--- a/signal-composer-binding/signal-composer.hpp
+++ b/signal-composer-binding/signal-composer.hpp
@@ -16,7 +16,6 @@
*/
#pragma once
-#include <uuid.h>
#include <memory>
#include <vector>
#include <string>
@@ -25,7 +24,6 @@
typedef struct clientAppCtxS
{
- uuid_t uid;
std::vector<std::shared_ptr<Signal>> subscribedSignals;
struct afb_event event;
} clientAppCtxT;
@@ -37,6 +35,7 @@ private:
static CtlSectionT ctlSections_[]; ///< Config Section definition (note: controls section index should match handle retrieval in)
std::vector<SourceAPI> sourcesListV_;
+ std::vector<clientAppCtxT*> subscriptions_;
explicit Composer(const std::string& filepath);
Composer();
@@ -66,7 +65,10 @@ public:
std::vector<std::shared_ptr<Signal>> searchSignals(const std::string& aName);
json_object* getSignalValue(const std::string& sig, json_object* options);
- int execSubscription();
+ int execSignalsSubscription();
+
+ void addSubscription(clientAppCtxT* ctx);
+ void removeSubscription(clientAppCtxT* ctx);
};
struct pluginCBT