summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRomain Forlot <romain.forlot@iot.bzh>2017-09-22 18:16:57 +0200
committerRomain Forlot <romain.forlot@iot.bzh>2017-12-14 11:00:25 +0100
commit51ee5299a7db41e52da2cf52dd9cd9c05b76740a (patch)
treeecdc922c7c7573ddb0dc091c9cbc3b111a0cc981
parenta758f4a632adc7fff4769d97379264de6c68685d (diff)
Finalize subscription to be a simple relay for now
Adding new object holding signals subscribed and afb event that observes Signals using Reactive response observer design pattern Change-Id: I96647d36e0d27c25a399c1b3789621a803a845b6 Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
-rw-r--r--plugins/low-can.cpp2
-rw-r--r--signal-composer-binding/CMakeLists.txt2
-rw-r--r--signal-composer-binding/clientApp.cpp65
-rw-r--r--signal-composer-binding/clientApp.hpp34
-rw-r--r--signal-composer-binding/signal-composer-binding.cpp54
-rw-r--r--signal-composer-binding/signal-composer.cpp41
-rw-r--r--signal-composer-binding/signal-composer.hpp18
-rw-r--r--signal-composer-binding/source.cpp10
-rw-r--r--signal-composer-binding/source.hpp8
9 files changed, 144 insertions, 90 deletions
diff --git a/plugins/low-can.cpp b/plugins/low-can.cpp
index c1fbd60..62f10f3 100644
--- a/plugins/low-can.cpp
+++ b/plugins/low-can.cpp
@@ -129,7 +129,7 @@ CTLP_CAPI (subscribeToLow, source, argsJ, eventJ, context) {
CTLP_CAPI (isOpen, source, argsJ, eventJ, context) {
const char *eventName = nullptr;
int eventStatus;
- double timestamp;
+ uint64_t timestamp;
lowCANCtxT *pluginCtx=(lowCANCtxT*)source->context;
int err = wrap_json_unpack(eventJ, "{ss,sb,s?F}",
diff --git a/signal-composer-binding/CMakeLists.txt b/signal-composer-binding/CMakeLists.txt
index 3d7130b..5255e21 100644
--- a/signal-composer-binding/CMakeLists.txt
+++ b/signal-composer-binding/CMakeLists.txt
@@ -21,7 +21,7 @@
PROJECT_TARGET_ADD(signal-composer)
# Define project Targets
- add_library(${TARGET_NAME} MODULE ${TARGET_NAME}-binding.cpp ${TARGET_NAME}.cpp source.cpp signal.cpp)
+ add_library(${TARGET_NAME} MODULE ${TARGET_NAME}-binding.cpp ${TARGET_NAME}.cpp source.cpp signal.cpp clientApp.cpp)
# Binder exposes a unique public entry point
SET_TARGET_PROPERTIES(${TARGET_NAME} PROPERTIES
diff --git a/signal-composer-binding/clientApp.cpp b/signal-composer-binding/clientApp.cpp
new file mode 100644
index 0000000..ce30163
--- /dev/null
+++ b/signal-composer-binding/clientApp.cpp
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2015, 2016 "IoT.bzh"
+ * Author "Romain Forlot" <romain.forlot@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "clientApp.hpp"
+
+clientAppCtx::clientAppCtx(const char* uuid)
+: uuid_(uuid)
+{}
+
+void clientAppCtx::update(Signal* sig)
+{
+ json_object* sigJ = sig->toJSON();
+ if(afb_event_push(event_, sigJ) == 0)
+ {sig->delObserver(this);}
+ return;
+}
+
+void clientAppCtx::appendSignals(std::vector<Signal*>& sigV)
+{
+ bool set = false;
+ // Clean up already subscribed signals to avoid duplicata
+ for (std::vector<Signal*>::const_iterator it = sigV.begin();
+ it != sigV.end(); ++it)
+ {
+ for (auto& ctxSig: subscribedSignals_)
+ {if(*it == ctxSig) {set = true;}}
+ if (set)
+ {
+ set = false;
+ sigV.erase(it);
+ continue;
+ }
+ Signal* sig = *it;
+ sig->addObserver(this);
+ }
+
+ subscribedSignals_.insert(subscribedSignals_.end(), sigV.begin(), sigV.end());
+}
+
+int clientAppCtx::makeSubscription(struct afb_req request)
+{
+ event_ = afb_event_is_valid(event_) ?
+ event_ : afb_daemon_make_event(uuid_.c_str());
+ return afb_req_subscribe(request, event_);
+}
+
+int clientAppCtx::makeUnsubscription(struct afb_req request)
+{
+ return afb_event_is_valid(event_) ?
+ afb_req_unsubscribe(request, event_) : -1;
+}
diff --git a/signal-composer-binding/clientApp.hpp b/signal-composer-binding/clientApp.hpp
new file mode 100644
index 0000000..7e1bd9d
--- /dev/null
+++ b/signal-composer-binding/clientApp.hpp
@@ -0,0 +1,34 @@
+/*
+ * Copyright (C) 2015, 2016 "IoT.bzh"
+ * Author "Romain Forlot" <romain.forlot@iot.bzh>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+#pragma once
+
+#include "signal-composer.hpp"
+
+class clientAppCtx: public Observer<Signal>
+{
+private:
+ std::string uuid_;
+ std::vector<Signal*> subscribedSignals_;
+ struct afb_event event_;
+public:
+ explicit clientAppCtx(const char* uuid);
+
+ void update(Signal* sig);
+ void appendSignals(std::vector<Signal*>& sigV);
+ int makeSubscription(struct afb_req request);
+ int makeUnsubscription(struct afb_req request);
+};
diff --git a/signal-composer-binding/signal-composer-binding.cpp b/signal-composer-binding/signal-composer-binding.cpp
index 9a32989..d4236a0 100644
--- a/signal-composer-binding/signal-composer-binding.cpp
+++ b/signal-composer-binding/signal-composer-binding.cpp
@@ -23,7 +23,7 @@
#include "signal-composer-binding.hpp"
#include "signal-composer-apidef.h"
-#include "signal-composer.hpp"
+#include "clientApp.hpp"
/// @brief callback for receiving message from low bindings. This will callback
/// an action defined in the configuration files depending on the event received
@@ -35,7 +35,7 @@ void onEvent(const char *event, json_object *object)
AFB_DEBUG("Received event json: %s", json_object_to_json_string(object));
Composer& composer = Composer::instance();
- std::vector<std::shared_ptr<Signal>> signals = composer.searchSignals(event);
+ std::vector<Signal*> signals = composer.searchSignals(event);
if(!signals.empty())
{
for(auto& sig: signals)
@@ -49,34 +49,16 @@ static int one_subscribe_unsubscribe(struct afb_req request,
bool subscribe,
const std::string& event,
json_object* args,
- clientAppCtxT* cContext)
+ clientAppCtx* cContext)
{
int err = 0;
- bool set = false;
- std::vector<std::shared_ptr<Signal>> signals = Composer::instance().searchSignals(event);
+ std::vector<Signal*> signals = Composer::instance().searchSignals(event);
- // Clean up already subscribed signals to avoid duplicata
- for (std::vector<std::shared_ptr<Signal>>::const_iterator sig = signals.begin();
- sig != signals.end(); ++sig)
- {
- for (auto& ctxSig: cContext->subscribedSignals)
- {if(*sig == ctxSig) {set = true;}}
- if (set) {signals.erase(sig);}
- }
-
- cContext->subscribedSignals.insert(cContext->subscribedSignals.end(), signals.begin(), signals.end());
- cContext->event = afb_event_is_valid(cContext->event) ?
- cContext->event : afb_daemon_make_event("Signal-Composer");
+ cContext->appendSignals(signals);
if(subscribe)
- {
- if(!afb_req_subscribe(request, cContext->event))
- {Composer::instance().addSubscription(cContext);}
- }
+ {err = cContext->makeSubscription(request);}
else
- {
- if(!afb_req_unsubscribe(request, cContext->event))
- {Composer::instance().removeSubscription(cContext);}
- }
+ {err = cContext->makeUnsubscription(request);}
return err;
}
@@ -84,7 +66,7 @@ static int one_subscribe_unsubscribe(struct afb_req request,
static int subscribe_unsubscribe(struct afb_req request,
bool subscribe,
json_object* args,
- clientAppCtxT* cContext)
+ clientAppCtx* cContext)
{
int rc = 0;
json_object *event = nullptr;
@@ -110,7 +92,7 @@ static int subscribe_unsubscribe(struct afb_req request,
}
/// @brief entry point for client subscription request.
-static void do_subscribe_unsubscribe(afb_req request, bool subscribe, clientAppCtxT* cContext)
+static void do_subscribe_unsubscribe(afb_req request, bool subscribe, clientAppCtx* cContext)
{
int rc = 0;
json_object *oneArg = nullptr, *args = afb_req_json(request);
@@ -136,17 +118,17 @@ static void do_subscribe_unsubscribe(afb_req request, bool subscribe, clientAppC
/// @brief entry point for client un-subscription request.
void subscribe(afb_req request)
{
- clientAppCtxT *clientAppCtx = (clientAppCtxT*)afb_req_context_make(request, 0, Composer::createContext, Composer::destroyContext, nullptr);
+ clientAppCtx *cContext = reinterpret_cast<clientAppCtx*>(afb_req_context_make(request, 0, Composer::createContext, Composer::destroyContext, nullptr));
- do_subscribe_unsubscribe(request, true, clientAppCtx);
+ do_subscribe_unsubscribe(request, true, cContext);
}
/// @brief entry point for client un-subscription request.
void unsubscribe(afb_req request)
{
- clientAppCtxT *clientAppCtx = (clientAppCtxT*)afb_req_context_make(request, 0, Composer::createContext, Composer::destroyContext, nullptr);
+ clientAppCtx *cContext = reinterpret_cast<clientAppCtx*>(afb_req_context_make(request, 0, Composer::createContext, Composer::destroyContext, nullptr));
- do_subscribe_unsubscribe(request, false, clientAppCtx);
+ do_subscribe_unsubscribe(request, false, cContext);
}
/// @brief verb that loads JSON configuration (old SigComp.json file now)
@@ -172,7 +154,7 @@ void list(afb_req request)
{
struct json_object *allSignalsJ = json_object_new_array();
- std::vector<std::shared_ptr<Signal>> allSignals = Composer::instance().getAllSignals();
+ std::vector<Signal*> allSignals = Composer::instance().getAllSignals();
for(auto& sig: allSignals)
{json_object_array_add(allSignalsJ, sig->toJSON());}
@@ -229,13 +211,13 @@ int execConf()
Composer& composer = Composer::instance();
int err = 0;
CtlConfigExec(composer.ctlConfig());
- std::vector<std::shared_ptr<Signal>> allSignals = composer.getAllSignals();
+ std::vector<Signal*> allSignals = composer.getAllSignals();
ssize_t sigCount = allSignals.size();
- for( std::shared_ptr<Signal>& sig: allSignals)
+ for( Signal*& sig: allSignals)
{
sig->attachToSourceSignals(composer);
}
-
+/*
for(auto& sig: allSignals)
{
if( (err += sig->recursionCheck()) )
@@ -244,7 +226,7 @@ int execConf()
return err;
}
}
-
+*/
composer.execSignalsSubscription();
AFB_DEBUG("Signal Composer Control configuration Done.\n signals=%d", (int)sigCount);
diff --git a/signal-composer-binding/signal-composer.cpp b/signal-composer-binding/signal-composer.cpp
index ba1b406..db35e76 100644
--- a/signal-composer-binding/signal-composer.cpp
+++ b/signal-composer-binding/signal-composer.cpp
@@ -19,11 +19,11 @@
#include <string.h>
#include <fnmatch.h>
-#include "signal-composer.hpp"
+#include "clientApp.hpp"
extern "C" void setSignalValueHandle(const char* aName, long long int timestamp, struct SignalValue value)
{
- std::vector<std::shared_ptr<Signal>> signals = Composer::instance().searchSignals(aName);
+ std::vector<Signal*> signals = Composer::instance().searchSignals(aName);
if(!signals.empty())
{
for(auto& sig: signals)
@@ -328,7 +328,7 @@ int Composer::loadSignals(CtlSectionT* section, json_object *signalsJ)
return err;
}
-void Composer::processOptions(const char** opts, std::shared_ptr<Signal> sig, json_object* response) const
+void Composer::processOptions(const char** opts, Signal* sig, json_object* response) const
{
for(int idx=0; idx < sizeof(opts); idx++)
{
@@ -392,10 +392,8 @@ void* Composer::createContext(void* ctx)
uuid_t x;
char cuid[38];
uuid_generate(x);
- ctx = (clientAppCtxT*)calloc(1, sizeof(clientAppCtxT));
- clientAppCtxT* ret = (clientAppCtxT*) ctx;
uuid_unparse(x, cuid);
- ret->event = afb_daemon_make_event(cuid);
+ clientAppCtx* ret = new clientAppCtx(cuid);
return (void*)ret;
}
@@ -453,12 +451,12 @@ int Composer::initSourcesAPI()
return err;
}
-std::vector<std::shared_ptr<Signal>> Composer::getAllSignals()
+std::vector<Signal*> Composer::getAllSignals()
{
- std::vector<std::shared_ptr<Signal>> allSignals;
+ std::vector<Signal*> allSignals;
for( auto& source : sourcesListV_)
{
- std::vector<std::shared_ptr<Signal>> srcSignals = source.getSignals();
+ std::vector<Signal*> srcSignals = source.getSignals();
allSignals.insert(allSignals.end(), srcSignals.begin(), srcSignals.end());
}
@@ -475,10 +473,10 @@ SourceAPI* Composer::getSourceAPI(const std::string& api)
return nullptr;
}
-std::vector<std::shared_ptr<Signal>> Composer::searchSignals(const std::string& aName)
+std::vector<Signal*> Composer::searchSignals(const std::string& aName)
{
std::string api;
- std::vector<std::shared_ptr<Signal>> signals;
+ std::vector<Signal*> signals;
size_t sep = aName.find_first_of("/");
if(sep != std::string::npos)
{
@@ -488,8 +486,8 @@ std::vector<std::shared_ptr<Signal>> Composer::searchSignals(const std::string&
}
else
{
- std::vector<std::shared_ptr<Signal>> allSignals = getAllSignals();
- for (std::shared_ptr<Signal>& sig : allSignals)
+ std::vector<Signal*> allSignals = getAllSignals();
+ for (Signal*& sig : allSignals)
{
if(*sig == aName)
{signals.emplace_back(sig);}
@@ -509,7 +507,7 @@ json_object* Composer::getSignalValue(const std::string& sig, json_object* optio
&opts[2],
&opts[3]);
- std::vector<std::shared_ptr<Signal>> sigP = searchSignals(sig);
+ std::vector<Signal*> sigP = searchSignals(sig);
if(!sigP.empty())
{
for(auto& sig: sigP)
@@ -556,18 +554,3 @@ int Composer::execSignalsSubscription()
}
return err;
}
-
-void Composer::addSubscription(clientAppCtxT* ctx)
-{
- subscriptions_.push_back(ctx);
-}
-
-void Composer::removeSubscription(clientAppCtxT* ctx)
-{
- for(std::vector<clientAppCtxT*>::const_iterator i = subscriptions_.begin();
- i != subscriptions_.end(); ++i)
- {
- if(ctx == *i)
- {subscriptions_.erase(i);}
- }
-}
diff --git a/signal-composer-binding/signal-composer.hpp b/signal-composer-binding/signal-composer.hpp
index 95cbc68..2fd471c 100644
--- a/signal-composer-binding/signal-composer.hpp
+++ b/signal-composer-binding/signal-composer.hpp
@@ -16,18 +16,12 @@
*/
#pragma once
-#include <memory>
+
#include <vector>
#include <string>
#include "source.hpp"
-typedef struct clientAppCtxS
-{
- std::vector<std::shared_ptr<Signal>> subscribedSignals;
- struct afb_event event;
-} clientAppCtxT;
-
class Composer
{
private:
@@ -35,7 +29,6 @@ private:
static CtlSectionT ctlSections_[]; ///< Config Section definition (note: controls section index should match handle retrieval in)
std::vector<SourceAPI> sourcesListV_;
- std::vector<clientAppCtxT*> subscriptions_;
explicit Composer(const std::string& filepath);
Composer();
@@ -49,7 +42,7 @@ private:
int loadOneSignal(json_object* signalsJ);
static int loadSignals(CtlSectionT* section, json_object *signalsJ);
- void processOptions(const char** opts, std::shared_ptr<Signal> sig, json_object* response) const;
+ void processOptions(const char** opts, Signal* sig, json_object* response) const;
public:
static Composer& instance();
static void* createContext(void* ctx);
@@ -60,15 +53,12 @@ public:
CtlConfigT* ctlConfig();
int initSourcesAPI();
- std::vector<std::shared_ptr<Signal>> getAllSignals();
+ std::vector<Signal*> getAllSignals();
SourceAPI* getSourceAPI(const std::string& api);
- std::vector<std::shared_ptr<Signal>> searchSignals(const std::string& aName);
+ std::vector<Signal*> searchSignals(const std::string& aName);
json_object* getSignalValue(const std::string& sig, json_object* options);
int execSignalsSubscription();
-
- void addSubscription(clientAppCtxT* ctx);
- void removeSubscription(clientAppCtxT* ctx);
};
struct pluginCBT
diff --git a/signal-composer-binding/source.cpp b/signal-composer-binding/source.cpp
index d777515..f7dee11 100644
--- a/signal-composer-binding/source.cpp
+++ b/signal-composer-binding/source.cpp
@@ -44,14 +44,14 @@ std::string SourceAPI::api() const
void SourceAPI::addSignal(const std::string& id, const std::string& event, std::vector<std::string>& depends, const std::string& sClass, const std::string& unit, double frequency, CtlActionT* onReceived, json_object* getSignalsArgs)
{
- std::shared_ptr<Signal> sig = std::make_shared<Signal>(id, event, depends, unit, frequency, onReceived, getSignalsArgs);
+ Signal* sig = new Signal(id, event, depends, unit, frequency, onReceived, getSignalsArgs);
signalsMap_[sig] = false;
}
-std::vector<std::shared_ptr<Signal>> SourceAPI::getSignals() const
+std::vector<Signal*> SourceAPI::getSignals() const
{
- std::vector<std::shared_ptr<Signal>> signals;
+ std::vector<Signal*> signals;
for (auto& sig: signalsMap_)
{
signals.push_back(sig.first);
@@ -59,9 +59,9 @@ std::vector<std::shared_ptr<Signal>> SourceAPI::getSignals() const
return signals;
}
-std::vector<std::shared_ptr<Signal>> SourceAPI::searchSignals(const std::string& name) const
+std::vector<Signal*> SourceAPI::searchSignals(const std::string& name) const
{
- std::vector<std::shared_ptr<Signal>> signals;
+ std::vector<Signal*> signals;
for (auto& sig: signalsMap_)
{
if(*sig.first == name)
diff --git a/signal-composer-binding/source.hpp b/signal-composer-binding/source.hpp
index 0885bf0..8b06b89 100644
--- a/signal-composer-binding/source.hpp
+++ b/signal-composer-binding/source.hpp
@@ -17,7 +17,7 @@
#pragma once
-#include <memory>
+
#include "signal.hpp"
@@ -28,7 +28,7 @@ private:
CtlActionT* init_;
CtlActionT* getSignals_;
- std::map<std::shared_ptr<Signal>, bool> signalsMap_;
+ std::map<Signal*, bool> signalsMap_;
public:
SourceAPI();
@@ -38,8 +38,8 @@ public:
std::string api() const;
void addSignal(const std::string& id, const std::string& event, std::vector<std::string>& sources, const std::string& sClass, const std::string& unit, double frequency, CtlActionT* onReceived, json_object* getSignalsArgs);
- std::vector<std::shared_ptr<Signal>> getSignals() const;
- std::vector<std::shared_ptr<Signal>> searchSignals(const std::string& name) const;
+ std::vector<Signal*> getSignals() const;
+ std::vector<Signal*> searchSignals(const std::string& name) const;
int makeSubscription();
};