summaryrefslogtreecommitdiffstats
path: root/app/afbclient.cpp
diff options
context:
space:
mode:
authorScott Murray <scott.murray@konsulko.com>2019-04-22 21:38:18 -0400
committerScott Murray <scott.murray@konsulko.com>2019-04-22 22:42:06 -0400
commitb523b0c8cc37a989bfb84a5ebefe1ec1f2b0cbfb (patch)
tree482f3fee892fc0a0a327d256bd25134fb6996e15 /app/afbclient.cpp
A simple telematics demo application for AGL. It reads vehicle and engine speed from the CAN low-level binding and publishes them via MQTT. Change-Id: Ib85904e87919053cad1215b3f53cee81db25c94a Signed-off-by: Scott Murray <scott.murray@konsulko.com>
Diffstat (limited to 'app/afbclient.cpp')
-rw-r--r--app/afbclient.cpp236
1 files changed, 236 insertions, 0 deletions
diff --git a/app/afbclient.cpp b/app/afbclient.cpp
new file mode 100644
index 0000000..a40f6c9
--- /dev/null
+++ b/app/afbclient.cpp
@@ -0,0 +1,236 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "afbclient.h"
+#include <string>
+#include <cstring>
+#include <iostream>
+#include <mutex>
+#include <condition_variable>
+
+#undef DEBUG
+//#define DEBUG
+
+struct call_data
+{
+ bool sync;
+ std::mutex mutex;
+ std::condition_variable cv;
+ bool ready;
+ std::function<void(json_object*)> cb;
+ json_object *resp;
+};
+
+static void on_hangup_cb(void *closure, struct afb_wsj1 *wsj)
+{
+}
+
+static void on_call_cb(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg)
+{
+}
+
+static void on_reply_cb(void *closure, struct afb_wsj1_msg *msg)
+{
+ call_data *data = (call_data*) closure;
+ struct json_object* reply;
+
+ if(!data)
+ goto reply_done;
+
+ reply = afb_wsj1_msg_object_j(msg);
+ if(reply) {
+#ifdef DEBUG
+ std::cerr << __FUNCTION__ << ": reply = " << \
+ json_object_to_json_string_ext(reply, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY) << \
+ std::endl;
+#endif
+ if(data->sync) {
+ data->resp = reply;
+
+ // Increase reference count since we are going to use
+ // reply after this callback returns, caller must do a
+ // put.
+ json_object_get(reply);
+ } else if(data->cb != nullptr) {
+ data->cb(reply);
+ }
+ }
+reply_done:
+ if(data->sync) {
+ // Signal reply is done
+ {
+ std::lock_guard<std::mutex> lk(data->mutex);
+ data->ready = true;
+ }
+ data->cv.notify_one();
+ }
+}
+
+//
+// on_event_cb is inline in afbclient.h
+//
+
+static void *afb_loop_thread(struct sd_event* loop)
+{
+ for(;;)
+ sd_event_run(loop, 30000000);
+}
+
+AfbClient::AfbClient(const int port, const std::string &token)
+{
+ std::string uri;
+
+ if(sd_event_new(&m_afb_loop) < 0) {
+ std::cerr << __FUNCTION__ << ": Failed to create event loop" << std::endl;
+ return;
+ }
+
+ // Initialize interface for websocket
+ m_itf.on_hangup = on_hangup_cb;
+ m_itf.on_call = on_call_cb;
+ m_itf.on_event = on_event_cb;
+
+ uri = "ws://localhost:" + std::to_string(port) + "/api?token=" + token;
+#ifdef DEBUG
+ std::cerr << "Using URI: " << uri << std::endl;
+#endif
+ m_ws = afb_ws_client_connect_wsj1(m_afb_loop, uri.c_str(), &m_itf, this);
+ if(m_ws) {
+ m_afb_thread = std::thread(afb_loop_thread, m_afb_loop);
+ } else {
+ std::cerr << __FUNCTION__ << ": Failed to create websocket connection" << std::endl;
+ goto error;
+ }
+
+ m_valid = true;
+ return;
+error:
+ if(m_afb_loop) {
+ sd_event_unref(m_afb_loop);
+ m_afb_loop = nullptr;
+ }
+ return;
+}
+
+AfbClient::~AfbClient(void)
+{
+ sd_event_unref(m_afb_loop);
+ afb_wsj1_unref(m_ws);
+}
+
+int AfbClient::call(const std::string &api, const std::string &verb, struct json_object *arg, callback_fn cb)
+{
+ if(!m_valid)
+ return -1;
+
+ call_data data;
+ data.sync = false;
+ data.cb = cb;
+ int rc = afb_wsj1_call_j(m_ws, api.c_str(), verb.c_str(), arg, on_reply_cb, (void*) &data);
+ if(rc < 0) {
+ std::cerr << __FUNCTION__ << \
+ ": Failed to call " << \
+ api.c_str() << \
+ "/" << \
+ verb.c_str() << \
+ std::endl;
+ }
+ return rc;
+}
+
+int AfbClient::call_sync(const std::string &api, const std::string &verb, struct json_object *arg, struct json_object **resp)
+{
+ if(!m_valid)
+ return -1;
+
+ call_data data;
+ data.sync = true;
+ data.ready = false;
+ data.cb = nullptr;
+ int rc = afb_wsj1_call_j(m_ws, api.c_str(), verb.c_str(), arg, on_reply_cb, (void*) &data);
+ if(rc >= 0) {
+ // Wait for response
+ std::unique_lock<std::mutex> lk(data.mutex);
+ data.cv.wait(lk, [&]{ return data.ready; });
+
+ if(resp && data.resp)
+ *resp = data.resp;
+ } else {
+ std::cerr << __FUNCTION__ << \
+ ": Failed to call " << \
+ api.c_str() << \
+ "/" << \
+ verb.c_str() << \
+ std::endl;
+ }
+ return rc;
+}
+
+int AfbClient::subscribe(const std::string &api, const std::string &event, const std::string &eventString, callback_fn cb, const std::string &eventValueString)
+{
+ if(!m_valid)
+ return -1;
+
+ // For now, simply let the user over-write the callback if they
+ // specify the same eventString. Avoiding that and keeping track
+ // of the duplicate eventStrings/callbacks will complicate things
+ // quite a bit.
+
+ struct json_object *j_obj = json_object_new_object();
+ json_object_object_add(j_obj, eventValueString.c_str(), json_object_new_string(event.c_str()));
+ int rc = call_sync(api, std::string("subscribe"), j_obj);
+ if(rc >= 0 && cb != nullptr) {
+ m_event_handlers[eventString] = cb;
+ }
+ return rc;
+}
+
+int AfbClient::unsubscribe(const std::string &api, const std::string &eventString, const std::string &eventValueString)
+{
+ if(!m_valid)
+ return -1;
+
+ if(m_event_handlers.find(eventString) == m_event_handlers.end())
+ return -1;
+
+ struct json_object *j_obj = json_object_new_object();
+ json_object_object_add(j_obj, eventValueString.c_str(), json_object_new_string(eventString.c_str()));
+ int rc = call_sync(api, std::string("unsubscribe"), j_obj);
+ if(rc >= 0) {
+ m_event_handlers.erase(eventString);
+ }
+ return rc;
+}
+
+void AfbClient::on_event(const char* event, struct afb_wsj1_msg *msg)
+{
+#if 0
+ std::cerr << __FUNCTION__ << ": event = " << event << std::endl;
+#endif
+ struct json_object *contents = afb_wsj1_msg_object_j(msg);
+#ifdef DEBUG
+ std::cerr << __FUNCTION__ << ": contents = " << \
+ json_object_to_json_string_ext(contents, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY) << \
+ std::endl;
+#endif
+ struct json_object *data;
+ if(json_object_object_get_ex(contents, "data", &data)) {
+ auto i = m_event_handlers.find(std::string(event));
+ if (i != m_event_handlers.end() && i->second != nullptr) {
+ i->second(data);
+ }
+ }
+}