diff options
Diffstat (limited to 'src/libcloudproxy.cpp')
-rw-r--r-- | src/libcloudproxy.cpp | 94 |
1 files changed, 72 insertions, 22 deletions
diff --git a/src/libcloudproxy.cpp b/src/libcloudproxy.cpp index 7b0f605..3638e46 100644 --- a/src/libcloudproxy.cpp +++ b/src/libcloudproxy.cpp @@ -16,24 +16,30 @@ #include <thread> #include <algorithm> +#include <condition_variable> + +#include <string.h> #include "libcloudproxy.h" #include "hmi-debug.h" - static const char API_name[] = "cloudproxy"; const std::vector<std::string> CloudProxyClient::m_api_list { - std::string("ping"), - std::string("createConnection"), - std::string("sendMessage"), - std::string("destroyConnection") - }; + std::string("ping"), + std::string("sendMessage") +}; const std::vector<std::string> CloudProxyClient::m_event_list { - std::string("sendMessageConfirmation"), - std::string("receivedMessage") - }; + std::string("sendMessageConfirmation"), + std::string("receivedMessage") +}; + + +bool CloudType::isSupported(const char* type) +{ + return (type && (strcmp(Azure, type) == 0 || strcmp(Aws, type) == 0)); +} static void event_loop_run(sd_event* loop) { @@ -134,9 +140,11 @@ int CloudProxyClient::init(const int port, const std::string& token) } -int CloudProxyClient::call(const std::string& verb, json_object* arg) +int CloudProxyClient::call_sync(const std::string& verb, json_object* arg) { - int ret; + std::lock_guard<std::mutex> l(m_mutex); + + int ret{-1}; if(!m_websock) { return -1; @@ -148,8 +156,49 @@ int CloudProxyClient::call(const std::string& verb, json_object* arg) return -1; } - ret = afb_wsj1_call_j(m_websock, API_name, verb.c_str(), arg, ::on_reply, this); - if (ret < 0) + std::condition_variable cv; + std::mutex cv_m; + bool signal{false}; + + std::function<void(afb_wsj1_msg *msg)> local_on_reply = [&ret, &cv, &cv_m, &signal](afb_wsj1_msg *msg)->void + { + std::unique_lock<std::mutex> lock(cv_m); + + if (msg && afb_wsj1_msg_is_reply_ok(msg)) + ret = 0; + else + ret = -1; + + signal = true; + cv.notify_all(); + }; + + int call_ret = afb_wsj1_call_j(m_websock, API_name, verb.c_str(), arg, + [](void *closure, afb_wsj1_msg *msg)->void + { + auto *cb = reinterpret_cast<std::function<void(afb_wsj1_msg *msg)> *>(closure); + if (!cb || !(*cb)) + { + HMI_ERROR("cloudproxyclient", "Can't process reply: invalid callback"); + return; + } + (*cb)(msg); + }, + &local_on_reply); + + if (call_ret == 0) + { + // Wait a signal if not received yet: + std::unique_lock<std::mutex> lock(cv_m); + if (!signal) + cv.wait(lock); + } + else + { + ret = call_ret; + } + + if (ret != 0) { HMI_ERROR("cloudproxyclient", "Failed to call verb:%s", verb.c_str()); } @@ -158,15 +207,16 @@ int CloudProxyClient::call(const std::string& verb, json_object* arg) } -int CloudProxyClient::sendMessage(const std::string& data) +int CloudProxyClient::sendMessage(const std::string& cloud_type, const std::string& data) { - if(!m_websock) + if(!m_websock || !CloudType::isSupported(cloud_type.c_str())) return -1; json_object* j_obj = json_object_new_object(); + json_object_object_add(j_obj, "cloud_type", json_object_new_string(cloud_type.c_str())); json_object_object_add(j_obj, "data", json_object_new_string(data.c_str())); - return this->call("sendMessage", j_obj); + return this->call_sync("sendMessage", j_obj); } void CloudProxyClient::set_event_handler(enum EventType et, handler_func f) @@ -185,7 +235,7 @@ void CloudProxyClient::set_event_handler(enum EventType et, handler_func f) break; } - this->handlers[et] = std::move(f); + this->m_handlers[et] = std::move(f); } } @@ -214,7 +264,7 @@ int CloudProxyClient::unsubscribe(const std::string& event_name) int ret = afb_wsj1_call_j(m_websock, API_name, "unsubscribe", j_obj, ::on_reply, this); if (ret < 0) - HMI_ERROR("cloudproxyclient", "unsubscribe filed for '%s'", event_name.c_str()); + HMI_ERROR("cloudproxyclient", "unsubscribe failed for '%s'", event_name.c_str()); return ret; } @@ -248,14 +298,14 @@ void CloudProxyClient::on_event(void *closure, const char *event, afb_wsj1_msg * const std::string et{event_type}; if (CloudProxyClient::m_event_list[0] == et) { - auto i = this->handlers.find(Event_SendMessageConfirmation); - if (i != this->handlers.end()) + auto i = this->m_handlers.find(Event_SendMessageConfirmation); + if (i != this->m_handlers.end()) i->second(json_data); } else if (CloudProxyClient::m_event_list[1] == et) { - auto i = this->handlers.find(Event_ReceivedMessage); - if (i != this->handlers.end()) + auto i = this->m_handlers.find(Event_ReceivedMessage); + if (i != this->m_handlers.end()) i->second(json_data); } } |