/* * Copyright (C) 2019 Konsulko Group * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "afbclient.h" #include #include #include #include #include #undef DEBUG //#define DEBUG struct call_data { bool sync; std::mutex mutex; std::condition_variable cv; bool ready; std::function cb; json_object *resp; }; static void on_hangup_cb(void *closure, struct afb_wsj1 *wsj) { } static void on_call_cb(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg) { } static void on_reply_cb(void *closure, struct afb_wsj1_msg *msg) { call_data *data = (call_data*) closure; struct json_object* reply; if(!data) goto reply_done; reply = afb_wsj1_msg_object_j(msg); if(reply) { #ifdef DEBUG std::cerr << __FUNCTION__ << ": reply = " << \ json_object_to_json_string_ext(reply, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY) << \ std::endl; #endif if(data->sync) { data->resp = reply; // Increase reference count since we are going to use // reply after this callback returns, caller must do a // put. json_object_get(reply); } else if(data->cb != nullptr) { data->cb(reply); } } reply_done: if(data->sync) { // Signal reply is done { std::lock_guard lk(data->mutex); data->ready = true; } data->cv.notify_one(); } } // // on_event_cb is inline in afbclient.h // static void *afb_loop_thread(struct sd_event* loop) { for(;;) sd_event_run(loop, 30000000); } AfbClient::AfbClient(const int port, const std::string &token) { std::string uri; if(sd_event_new(&m_afb_loop) < 0) { std::cerr << __FUNCTION__ << ": Failed to create event loop" << std::endl; return; } // Initialize interface for websocket m_itf.on_hangup = on_hangup_cb; m_itf.on_call = on_call_cb; m_itf.on_event = on_event_cb; uri = "ws://localhost:" + std::to_string(port) + "/api?token=" + token; #ifdef DEBUG std::cerr << "Using URI: " << uri << std::endl; #endif m_ws = afb_ws_client_connect_wsj1(m_afb_loop, uri.c_str(), &m_itf, this); if(m_ws) { m_afb_thread = std::thread(afb_loop_thread, m_afb_loop); } else { std::cerr << __FUNCTION__ << ": Failed to create websocket connection" << std::endl; goto error; } m_valid = true; return; error: if(m_afb_loop) { sd_event_unref(m_afb_loop); m_afb_loop = nullptr; } return; } AfbClient::~AfbClient(void) { sd_event_unref(m_afb_loop); afb_wsj1_unref(m_ws); } int AfbClient::call(const std::string &api, const std::string &verb, struct json_object *arg, callback_fn cb) { if(!m_valid) return -1; call_data data; data.sync = false; data.cb = cb; int rc = afb_wsj1_call_j(m_ws, api.c_str(), verb.c_str(), arg, on_reply_cb, (void*) &data); if(rc < 0) { std::cerr << __FUNCTION__ << \ ": Failed to call " << \ api.c_str() << \ "/" << \ verb.c_str() << \ std::endl; } return rc; } int AfbClient::call_sync(const std::string &api, const std::string &verb, struct json_object *arg, struct json_object **resp) { if(!m_valid) return -1; call_data data; data.sync = true; data.ready = false; data.cb = nullptr; int rc = afb_wsj1_call_j(m_ws, api.c_str(), verb.c_str(), arg, on_reply_cb, (void*) &data); if(rc >= 0) { // Wait for response std::unique_lock lk(data.mutex); data.cv.wait(lk, [&]{ return data.ready; }); if(resp && data.resp) *resp = data.resp; } else { std::cerr << __FUNCTION__ << \ ": Failed to call " << \ api.c_str() << \ "/" << \ verb.c_str() << \ std::endl; } return rc; } int AfbClient::subscribe(const std::string &api, const std::string &event, const std::string &eventString, callback_fn cb, const std::string &eventValueString) { if(!m_valid) return -1; // For now, simply let the user over-write the callback if they // specify the same eventString. Avoiding that and keeping track // of the duplicate eventStrings/callbacks will complicate things // quite a bit. struct json_object *j_obj = json_object_new_object(); json_object_object_add(j_obj, eventValueString.c_str(), json_object_new_string(event.c_str())); int rc = call_sync(api, std::string("subscribe"), j_obj); if(rc >= 0 && cb != nullptr) { m_event_handlers[eventString] = cb; } return rc; } int AfbClient::unsubscribe(const std::string &api, const std::string &eventString, const std::string &eventValueString) { if(!m_valid) return -1; if(m_event_handlers.find(eventString) == m_event_handlers.end()) return -1; struct json_object *j_obj = json_object_new_object(); json_object_object_add(j_obj, eventValueString.c_str(), json_object_new_string(eventString.c_str())); int rc = call_sync(api, std::string("unsubscribe"), j_obj); if(rc >= 0) { m_event_handlers.erase(eventString); } return rc; } void AfbClient::on_event(const char* event, struct afb_wsj1_msg *msg) { #if 0 std::cerr << __FUNCTION__ << ": event = " << event << std::endl; #endif struct json_object *contents = afb_wsj1_msg_object_j(msg); #ifdef DEBUG std::cerr << __FUNCTION__ << ": contents = " << \ json_object_to_json_string_ext(contents, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY) << \ std::endl; #endif struct json_object *data; if(json_object_object_get_ex(contents, "data", &data)) { auto i = m_event_handlers.find(std::string(event)); if (i != m_event_handlers.end() && i->second != nullptr) { i->second(data); } } }