summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAndrey Shamanin <andrei.shamanin@orioninc.com>2020-10-16 16:08:27 +0300
committerAndrey Shamanin <andrei.shamanin@orioninc.com>2020-10-19 15:41:37 +0300
commit2d66933b9bf25af66696b097283109c8a6e87151 (patch)
tree92a0bc61822540d5175a87d643fcca24d4acb216 /src
parentaee2ff16cc87a939e68aa2900e23d6705dda5408 (diff)
Bug-AGL: SPEC-3645 Signed-off-by: Andrey Shamanin <andrei.shamanin@orioninc.com> Change-Id: Ie165a50e7975a8e6f25b259be1679a9689414148
Diffstat (limited to 'src')
-rw-r--r--src/libcloudproxy.cpp94
1 files changed, 72 insertions, 22 deletions
diff --git a/src/libcloudproxy.cpp b/src/libcloudproxy.cpp
index 7b0f605..3638e46 100644
--- a/src/libcloudproxy.cpp
+++ b/src/libcloudproxy.cpp
@@ -16,24 +16,30 @@
#include <thread>
#include <algorithm>
+#include <condition_variable>
+
+#include <string.h>
#include "libcloudproxy.h"
#include "hmi-debug.h"
-
static const char API_name[] = "cloudproxy";
const std::vector<std::string> CloudProxyClient::m_api_list {
- std::string("ping"),
- std::string("createConnection"),
- std::string("sendMessage"),
- std::string("destroyConnection")
- };
+ std::string("ping"),
+ std::string("sendMessage")
+};
const std::vector<std::string> CloudProxyClient::m_event_list {
- std::string("sendMessageConfirmation"),
- std::string("receivedMessage")
- };
+ std::string("sendMessageConfirmation"),
+ std::string("receivedMessage")
+};
+
+
+bool CloudType::isSupported(const char* type)
+{
+ return (type && (strcmp(Azure, type) == 0 || strcmp(Aws, type) == 0));
+}
static void event_loop_run(sd_event* loop)
{
@@ -134,9 +140,11 @@ int CloudProxyClient::init(const int port, const std::string& token)
}
-int CloudProxyClient::call(const std::string& verb, json_object* arg)
+int CloudProxyClient::call_sync(const std::string& verb, json_object* arg)
{
- int ret;
+ std::lock_guard<std::mutex> l(m_mutex);
+
+ int ret{-1};
if(!m_websock)
{
return -1;
@@ -148,8 +156,49 @@ int CloudProxyClient::call(const std::string& verb, json_object* arg)
return -1;
}
- ret = afb_wsj1_call_j(m_websock, API_name, verb.c_str(), arg, ::on_reply, this);
- if (ret < 0)
+ std::condition_variable cv;
+ std::mutex cv_m;
+ bool signal{false};
+
+ std::function<void(afb_wsj1_msg *msg)> local_on_reply = [&ret, &cv, &cv_m, &signal](afb_wsj1_msg *msg)->void
+ {
+ std::unique_lock<std::mutex> lock(cv_m);
+
+ if (msg && afb_wsj1_msg_is_reply_ok(msg))
+ ret = 0;
+ else
+ ret = -1;
+
+ signal = true;
+ cv.notify_all();
+ };
+
+ int call_ret = afb_wsj1_call_j(m_websock, API_name, verb.c_str(), arg,
+ [](void *closure, afb_wsj1_msg *msg)->void
+ {
+ auto *cb = reinterpret_cast<std::function<void(afb_wsj1_msg *msg)> *>(closure);
+ if (!cb || !(*cb))
+ {
+ HMI_ERROR("cloudproxyclient", "Can't process reply: invalid callback");
+ return;
+ }
+ (*cb)(msg);
+ },
+ &local_on_reply);
+
+ if (call_ret == 0)
+ {
+ // Wait a signal if not received yet:
+ std::unique_lock<std::mutex> lock(cv_m);
+ if (!signal)
+ cv.wait(lock);
+ }
+ else
+ {
+ ret = call_ret;
+ }
+
+ if (ret != 0)
{
HMI_ERROR("cloudproxyclient", "Failed to call verb:%s", verb.c_str());
}
@@ -158,15 +207,16 @@ int CloudProxyClient::call(const std::string& verb, json_object* arg)
}
-int CloudProxyClient::sendMessage(const std::string& data)
+int CloudProxyClient::sendMessage(const std::string& cloud_type, const std::string& data)
{
- if(!m_websock)
+ if(!m_websock || !CloudType::isSupported(cloud_type.c_str()))
return -1;
json_object* j_obj = json_object_new_object();
+ json_object_object_add(j_obj, "cloud_type", json_object_new_string(cloud_type.c_str()));
json_object_object_add(j_obj, "data", json_object_new_string(data.c_str()));
- return this->call("sendMessage", j_obj);
+ return this->call_sync("sendMessage", j_obj);
}
void CloudProxyClient::set_event_handler(enum EventType et, handler_func f)
@@ -185,7 +235,7 @@ void CloudProxyClient::set_event_handler(enum EventType et, handler_func f)
break;
}
- this->handlers[et] = std::move(f);
+ this->m_handlers[et] = std::move(f);
}
}
@@ -214,7 +264,7 @@ int CloudProxyClient::unsubscribe(const std::string& event_name)
int ret = afb_wsj1_call_j(m_websock, API_name, "unsubscribe", j_obj, ::on_reply, this);
if (ret < 0)
- HMI_ERROR("cloudproxyclient", "unsubscribe filed for '%s'", event_name.c_str());
+ HMI_ERROR("cloudproxyclient", "unsubscribe failed for '%s'", event_name.c_str());
return ret;
}
@@ -248,14 +298,14 @@ void CloudProxyClient::on_event(void *closure, const char *event, afb_wsj1_msg *
const std::string et{event_type};
if (CloudProxyClient::m_event_list[0] == et)
{
- auto i = this->handlers.find(Event_SendMessageConfirmation);
- if (i != this->handlers.end())
+ auto i = this->m_handlers.find(Event_SendMessageConfirmation);
+ if (i != this->m_handlers.end())
i->second(json_data);
}
else if (CloudProxyClient::m_event_list[1] == et)
{
- auto i = this->handlers.find(Event_ReceivedMessage);
- if (i != this->handlers.end())
+ auto i = this->m_handlers.find(Event_ReceivedMessage);
+ if (i != this->m_handlers.end())
i->second(json_data);
}
}