aboutsummaryrefslogtreecommitdiffstats
path: root/app
diff options
context:
space:
mode:
authorScott Murray <scott.murray@konsulko.com>2019-04-22 21:38:18 -0400
committerScott Murray <scott.murray@konsulko.com>2019-04-22 22:42:06 -0400
commitb523b0c8cc37a989bfb84a5ebefe1ec1f2b0cbfb (patch)
tree482f3fee892fc0a0a327d256bd25134fb6996e15 /app
A simple telematics demo application for AGL. It reads vehicle and engine speed from the CAN low-level binding and publishes them via MQTT. Change-Id: Ib85904e87919053cad1215b3f53cee81db25c94a Signed-off-by: Scott Murray <scott.murray@konsulko.com>
Diffstat (limited to 'app')
-rw-r--r--app/CMakeLists.txt57
-rw-r--r--app/afbclient.cpp236
-rw-r--r--app/afbclient.h63
-rw-r--r--app/configuration.cpp151
-rw-r--r--app/configuration.h71
-rw-r--r--app/event.cpp86
-rw-r--r--app/event.h32
-rw-r--r--app/gps.cpp89
-rw-r--r--app/gps.h33
-rw-r--r--app/main.cpp158
-rw-r--r--app/mqttclient.cpp68
-rw-r--r--app/mqttclient.h36
-rw-r--r--app/network.cpp119
-rw-r--r--app/network.h27
14 files changed, 1226 insertions, 0 deletions
diff --git a/app/CMakeLists.txt b/app/CMakeLists.txt
new file mode 100644
index 0000000..73470d5
--- /dev/null
+++ b/app/CMakeLists.txt
@@ -0,0 +1,57 @@
+###########################################################################
+# Copyright 2019 Konsulko Group
+#
+# Author: Scott Murray <scott.murray@konsulko.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###########################################################################
+
+set(CMAKE_INCLUDE_CURRENT_DIR ON)
+set(CMAKE_AUTOMOC ON)
+set(CMAKE_CXX_STANDARD 14)
+
+find_package(PkgConfig REQUIRED)
+
+PROJECT_TARGET_ADD(telematics-recorder)
+
+add_executable(${TARGET_NAME}
+ main.cpp
+ configuration.cpp
+ afbclient.cpp
+ mqttclient.cpp
+ network.cpp
+ event.cpp
+ gps.cpp
+ ${RESOURCES}
+)
+
+pkg_check_modules(LIBAFBWSC REQUIRED libafbwsc)
+
+include_directories(
+ "${LIBAFBWSC_INCLUDE_DIRS}"
+)
+
+set_target_properties(${TARGET_NAME} PROPERTIES
+ LABELS "EXECUTABLE"
+ PREFIX ""
+ COMPILE_FLAGS "${EXTRAS_CFLAGS} -DFOR_AFB_BINDING"
+ LINK_FLAGS "${BINDINGS_LINK_FLAG}"
+ LINK_LIBRARIES "${EXTRAS_LIBRARIES}"
+ OUTPUT_NAME "${TARGET_NAME}"
+)
+
+target_link_libraries(${TARGET_NAME}
+ ${LIBAFBWSC_LIBRARIES}
+ -lmosquitto
+ -lpthread
+)
diff --git a/app/afbclient.cpp b/app/afbclient.cpp
new file mode 100644
index 0000000..a40f6c9
--- /dev/null
+++ b/app/afbclient.cpp
@@ -0,0 +1,236 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "afbclient.h"
+#include <string>
+#include <cstring>
+#include <iostream>
+#include <mutex>
+#include <condition_variable>
+
+#undef DEBUG
+//#define DEBUG
+
+struct call_data
+{
+ bool sync;
+ std::mutex mutex;
+ std::condition_variable cv;
+ bool ready;
+ std::function<void(json_object*)> cb;
+ json_object *resp;
+};
+
+static void on_hangup_cb(void *closure, struct afb_wsj1 *wsj)
+{
+}
+
+static void on_call_cb(void *closure, const char *api, const char *verb, struct afb_wsj1_msg *msg)
+{
+}
+
+static void on_reply_cb(void *closure, struct afb_wsj1_msg *msg)
+{
+ call_data *data = (call_data*) closure;
+ struct json_object* reply;
+
+ if(!data)
+ goto reply_done;
+
+ reply = afb_wsj1_msg_object_j(msg);
+ if(reply) {
+#ifdef DEBUG
+ std::cerr << __FUNCTION__ << ": reply = " << \
+ json_object_to_json_string_ext(reply, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY) << \
+ std::endl;
+#endif
+ if(data->sync) {
+ data->resp = reply;
+
+ // Increase reference count since we are going to use
+ // reply after this callback returns, caller must do a
+ // put.
+ json_object_get(reply);
+ } else if(data->cb != nullptr) {
+ data->cb(reply);
+ }
+ }
+reply_done:
+ if(data->sync) {
+ // Signal reply is done
+ {
+ std::lock_guard<std::mutex> lk(data->mutex);
+ data->ready = true;
+ }
+ data->cv.notify_one();
+ }
+}
+
+//
+// on_event_cb is inline in afbclient.h
+//
+
+static void *afb_loop_thread(struct sd_event* loop)
+{
+ for(;;)
+ sd_event_run(loop, 30000000);
+}
+
+AfbClient::AfbClient(const int port, const std::string &token)
+{
+ std::string uri;
+
+ if(sd_event_new(&m_afb_loop) < 0) {
+ std::cerr << __FUNCTION__ << ": Failed to create event loop" << std::endl;
+ return;
+ }
+
+ // Initialize interface for websocket
+ m_itf.on_hangup = on_hangup_cb;
+ m_itf.on_call = on_call_cb;
+ m_itf.on_event = on_event_cb;
+
+ uri = "ws://localhost:" + std::to_string(port) + "/api?token=" + token;
+#ifdef DEBUG
+ std::cerr << "Using URI: " << uri << std::endl;
+#endif
+ m_ws = afb_ws_client_connect_wsj1(m_afb_loop, uri.c_str(), &m_itf, this);
+ if(m_ws) {
+ m_afb_thread = std::thread(afb_loop_thread, m_afb_loop);
+ } else {
+ std::cerr << __FUNCTION__ << ": Failed to create websocket connection" << std::endl;
+ goto error;
+ }
+
+ m_valid = true;
+ return;
+error:
+ if(m_afb_loop) {
+ sd_event_unref(m_afb_loop);
+ m_afb_loop = nullptr;
+ }
+ return;
+}
+
+AfbClient::~AfbClient(void)
+{
+ sd_event_unref(m_afb_loop);
+ afb_wsj1_unref(m_ws);
+}
+
+int AfbClient::call(const std::string &api, const std::string &verb, struct json_object *arg, callback_fn cb)
+{
+ if(!m_valid)
+ return -1;
+
+ call_data data;
+ data.sync = false;
+ data.cb = cb;
+ int rc = afb_wsj1_call_j(m_ws, api.c_str(), verb.c_str(), arg, on_reply_cb, (void*) &data);
+ if(rc < 0) {
+ std::cerr << __FUNCTION__ << \
+ ": Failed to call " << \
+ api.c_str() << \
+ "/" << \
+ verb.c_str() << \
+ std::endl;
+ }
+ return rc;
+}
+
+int AfbClient::call_sync(const std::string &api, const std::string &verb, struct json_object *arg, struct json_object **resp)
+{
+ if(!m_valid)
+ return -1;
+
+ call_data data;
+ data.sync = true;
+ data.ready = false;
+ data.cb = nullptr;
+ int rc = afb_wsj1_call_j(m_ws, api.c_str(), verb.c_str(), arg, on_reply_cb, (void*) &data);
+ if(rc >= 0) {
+ // Wait for response
+ std::unique_lock<std::mutex> lk(data.mutex);
+ data.cv.wait(lk, [&]{ return data.ready; });
+
+ if(resp && data.resp)
+ *resp = data.resp;
+ } else {
+ std::cerr << __FUNCTION__ << \
+ ": Failed to call " << \
+ api.c_str() << \
+ "/" << \
+ verb.c_str() << \
+ std::endl;
+ }
+ return rc;
+}
+
+int AfbClient::subscribe(const std::string &api, const std::string &event, const std::string &eventString, callback_fn cb, const std::string &eventValueString)
+{
+ if(!m_valid)
+ return -1;
+
+ // For now, simply let the user over-write the callback if they
+ // specify the same eventString. Avoiding that and keeping track
+ // of the duplicate eventStrings/callbacks will complicate things
+ // quite a bit.
+
+ struct json_object *j_obj = json_object_new_object();
+ json_object_object_add(j_obj, eventValueString.c_str(), json_object_new_string(event.c_str()));
+ int rc = call_sync(api, std::string("subscribe"), j_obj);
+ if(rc >= 0 && cb != nullptr) {
+ m_event_handlers[eventString] = cb;
+ }
+ return rc;
+}
+
+int AfbClient::unsubscribe(const std::string &api, const std::string &eventString, const std::string &eventValueString)
+{
+ if(!m_valid)
+ return -1;
+
+ if(m_event_handlers.find(eventString) == m_event_handlers.end())
+ return -1;
+
+ struct json_object *j_obj = json_object_new_object();
+ json_object_object_add(j_obj, eventValueString.c_str(), json_object_new_string(eventString.c_str()));
+ int rc = call_sync(api, std::string("unsubscribe"), j_obj);
+ if(rc >= 0) {
+ m_event_handlers.erase(eventString);
+ }
+ return rc;
+}
+
+void AfbClient::on_event(const char* event, struct afb_wsj1_msg *msg)
+{
+#if 0
+ std::cerr << __FUNCTION__ << ": event = " << event << std::endl;
+#endif
+ struct json_object *contents = afb_wsj1_msg_object_j(msg);
+#ifdef DEBUG
+ std::cerr << __FUNCTION__ << ": contents = " << \
+ json_object_to_json_string_ext(contents, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY) << \
+ std::endl;
+#endif
+ struct json_object *data;
+ if(json_object_object_get_ex(contents, "data", &data)) {
+ auto i = m_event_handlers.find(std::string(event));
+ if (i != m_event_handlers.end() && i->second != nullptr) {
+ i->second(data);
+ }
+ }
+}
diff --git a/app/afbclient.h b/app/afbclient.h
new file mode 100644
index 0000000..9a36581
--- /dev/null
+++ b/app/afbclient.h
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef AFBCLIENT_H
+#define AFBCLIENT_H
+
+#include <string>
+#include <thread>
+#include <map>
+#include <functional>
+#include <systemd/sd-event.h>
+#include <json-c/json.h>
+
+extern "C"
+{
+#include <afb/afb-wsj1.h>
+#include <afb/afb-ws-client.h>
+}
+
+class AfbClient
+{
+public:
+ AfbClient(int port, const std::string &token);
+ ~AfbClient();
+
+ using callback_fn = std::function<void(json_object*)>;
+
+ int call(const std::string &api, const std::string &verb, struct json_object* arg, callback_fn cb = nullptr);
+ int call_sync(const std::string &api, const std::string &verb, struct json_object* arg, struct json_object **resp = NULL);
+ int subscribe(const std::string &api, const std::string &event, const std::string &eventString, callback_fn cb, const std::string &eventValueString = "event");
+ int unsubscribe(const std::string &api, const std::string &eventString, const std::string &eventValueString = "event");
+
+ static void on_event_cb(void *closure, const char* event, struct afb_wsj1_msg *msg) {
+ if(closure)
+ static_cast<AfbClient*>(closure)->on_event(event, msg);
+ }
+
+private:
+ struct afb_wsj1 *m_ws = nullptr;
+ struct afb_wsj1_itf m_itf;
+ std::thread m_afb_thread;
+ sd_event *m_afb_loop = nullptr;
+ bool m_valid = false;
+
+ std::map<const std::string, callback_fn> m_event_handlers;
+
+ void on_event(const char* event, struct afb_wsj1_msg *msg);
+};
+
+#endif // AFBCLIENT_H
diff --git a/app/configuration.cpp b/app/configuration.cpp
new file mode 100644
index 0000000..0762a4a
--- /dev/null
+++ b/app/configuration.cpp
@@ -0,0 +1,151 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "configuration.h"
+#include <glib.h>
+#include <strings.h>
+
+Configuration::Configuration(const std::string &filename, const std::string &device_uuid):
+ m_filename(filename),
+ m_device_uuid(device_uuid)
+{
+ read();
+}
+
+void Configuration::read(void)
+{
+ GKeyFile* conf_file;
+ GError *error = NULL;
+ char *value_str;
+ int n;
+
+ // Load settings from configuration file if it exists
+ conf_file = g_key_file_new();
+ if(!conf_file) {
+ return;
+ }
+
+ if(g_key_file_load_from_dirs(conf_file,
+ m_filename.c_str(),
+ (const gchar**) g_get_system_config_dirs(),
+ NULL,
+ G_KEY_FILE_KEEP_COMMENTS,
+ NULL) != TRUE) {
+ g_key_file_free(conf_file);
+ return;
+ }
+
+ //
+ // General
+ //
+
+ // Set log level if it is specified
+ error = NULL;
+ n = g_key_file_get_integer(conf_file, "General", "log_level", &error);
+ if(!error && n > 0) {
+ m_log_level = n;
+ }
+
+ value_str = g_key_file_get_string(conf_file, "General", "cellular_enabled", NULL);
+ if(value_str) {
+ if(!strcasecmp(value_str, "true")) {
+ m_cellular_enabled = true;
+ } else if(!strcasecmp(value_str, "false")) {
+ m_cellular_enabled = false;
+ }
+ }
+
+ value_str = g_key_file_get_string(conf_file, "General", "gps_enabled", NULL);
+ if(value_str) {
+ if(!strcasecmp(value_str, "true")) {
+ m_gps_enabled = true;
+ } else if(!strcasecmp(value_str, "false")) {
+ m_gps_enabled = false;
+ }
+ }
+
+ value_str = g_key_file_get_string(conf_file, "General", "check_online", NULL);
+ if(value_str) {
+ if(!strcasecmp(value_str, "true")) {
+ m_check_online = true;
+ } else if(!strcasecmp(value_str, "false")) {
+ m_check_online = false;
+ }
+ }
+
+ value_str = g_key_file_get_string(conf_file, "General", "device_uuid", NULL);
+ if(value_str) {
+ m_device_uuid = value_str;
+ }
+
+ //
+ // MQTT
+ //
+
+ value_str = g_key_file_get_string(conf_file, "MQTT", "broker", NULL);
+ if(value_str) {
+ m_mqtt_broker = value_str;
+ }
+
+ error = NULL;
+ n = g_key_file_get_integer(conf_file, "MQTT", "port", &error);
+ if(!error && n > 0) {
+ m_mqtt_port = n;
+ }
+
+ error = NULL;
+ n = g_key_file_get_integer(conf_file, "MQTT", "keepalive", &error);
+ if(!error && n >= 0) {
+ m_mqtt_keepalive = n;
+ }
+
+ error = NULL;
+ n = g_key_file_get_integer(conf_file, "MQTT", "qos", &error);
+ if(!error && n >= 0 && n < 3) {
+ m_mqtt_qos = n;
+ }
+
+ value_str = g_key_file_get_string(conf_file, "MQTT", "retain", NULL);
+ if(value_str) {
+ if(!strcasecmp(value_str, "true")) {
+ m_mqtt_retain = true;
+ } else if(!strcasecmp(value_str, "false")) {
+ m_mqtt_retain = false;
+ }
+ }
+
+ value_str = g_key_file_get_string(conf_file, "MQTT", "username", NULL);
+ if(value_str) {
+ m_mqtt_username = value_str;
+ }
+
+ value_str = g_key_file_get_string(conf_file, "MQTT", "password", NULL);
+ if(value_str) {
+ m_mqtt_password = value_str;
+ }
+
+ //
+ // Event
+ //
+
+ error = NULL;
+ n = g_key_file_get_integer(conf_file, "Event", "update_period", &error);
+ if(!error && n >= 0) {
+ m_update_period = n;
+ }
+
+ g_key_file_free(conf_file);
+}
diff --git a/app/configuration.h b/app/configuration.h
new file mode 100644
index 0000000..0e7a780
--- /dev/null
+++ b/app/configuration.h
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef CONFIGURATION_H
+#define CONFIGURATION_H
+
+#include <string>
+
+class Configuration
+{
+public:
+ Configuration(const std::string &file, const std::string &device_uuid = "");
+ ~Configuration() {};
+
+ uint32_t getLogLevel() { return m_log_level; }
+ bool isCellularEnabled() { return m_cellular_enabled; }
+ bool isGpsEnabled() { return m_gps_enabled; }
+ bool getCheckOnline() { return m_check_online; }
+ std::string getDeviceUUID() { return std::string(m_device_uuid); }
+
+ std::string getMqttClientId() { return std::string(m_mqtt_client_id); }
+ std::string getMqttBroker() { return std::string(m_mqtt_broker); }
+ uint32_t getMqttPort() { return m_mqtt_port; }
+ uint32_t getMqttKeepalive() { return m_mqtt_keepalive; }
+ uint32_t getMqttQos() { return m_mqtt_qos; }
+ bool getMqttRetain() { return m_mqtt_retain; }
+ std::string getMqttUsername() { return std::string(m_mqtt_username); }
+ std::string getMqttPassword() { return std::string(m_mqtt_password); }
+
+ uint32_t getUpdatePeriod() { return m_update_period; }
+
+private:
+ void read();
+
+ std::string m_filename;
+
+ // General
+ uint32_t m_log_level = 0;
+ bool m_cellular_enabled = false;
+ bool m_gps_enabled = false;
+ bool m_check_online = false;
+ std::string m_device_uuid;
+
+ // MQTT broker
+ std::string m_mqtt_client_id = "";
+ std::string m_mqtt_broker = "iot.eclipse.org";
+ uint32_t m_mqtt_port = 1883;
+ uint32_t m_mqtt_keepalive = 60;
+ uint32_t m_mqtt_qos = 0;
+ bool m_mqtt_retain = true;
+ std::string m_mqtt_username = "";
+ std::string m_mqtt_password = "";
+
+ // Event
+ uint32_t m_update_period = 10;
+};
+
+#endif // CONFIGURATION_H
diff --git a/app/event.cpp b/app/event.cpp
new file mode 100644
index 0000000..8b2d5b6
--- /dev/null
+++ b/app/event.cpp
@@ -0,0 +1,86 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "event.h"
+#include "gps.h"
+#include "network.h"
+#include <string>
+#include <iostream>
+#include <json-c/json.h>
+#include <time.h>
+
+static uint64_t last_vehicle_speed_update_usecs;
+static uint64_t last_engine_speed_update_usecs;
+
+void process_event(Configuration &config, AfbClient &afbclient, MqttClient &mqttclient, event_data *event)
+{
+ std::string topic("agl-telematics-demo/");
+ topic += event->name;
+ struct json_object *j_obj = json_object_new_object();
+ struct json_object *j_device = json_object_new_string(config.getDeviceUUID().c_str());
+ json_object_object_add(j_obj, "device", j_device);
+ struct json_object *j_val = json_object_new_int(event->value);
+ json_object_object_add(j_obj, "value", j_val);
+ struct json_object *j_ts = json_object_new_int64(event->timestamp);
+ json_object_object_add(j_obj, "timestamp", j_ts);
+
+ location_data location;
+ struct json_object *j_location = json_object_new_object();
+ if(config.isGpsEnabled() && get_location(config, afbclient, &location)) {
+ struct json_object *j_latitude = json_object_new_double(location.latitude);
+ json_object_object_add(j_location, "latitude", j_latitude);
+ struct json_object *j_longitude = json_object_new_double(location.longitude);
+ json_object_object_add(j_location, "longitude", j_longitude);
+ struct json_object *j_speed = json_object_new_int(location.speed);
+ json_object_object_add(j_location, "speed", j_speed);
+ struct json_object *j_track = json_object_new_int(location.track);
+ json_object_object_add(j_location, "track", j_track);
+ struct json_object *j_timestamp = json_object_new_string(location.timestamp.c_str());
+ json_object_object_add(j_location, "timestamp", j_timestamp);
+ }
+ json_object_object_add(j_obj, "location", j_location);
+
+ std::string msg(json_object_to_json_string_ext(j_obj, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY));
+ if(config.getLogLevel() > 1) {
+ std::cerr << __FUNCTION__ << ": topic = " << topic << ", msg = " << msg << std::endl;
+ }
+
+ struct timespec now;
+ clock_gettime(CLOCK_MONOTONIC_RAW, &now);
+ uint64_t now_usecs = now.tv_sec * 1000000 + now.tv_nsec / 1000;
+ uint64_t *past_usecs = NULL;
+ if(event->name == "vehicle.speed")
+ past_usecs = &last_vehicle_speed_update_usecs;
+ else if(event->name == "engine.speed")
+ past_usecs = &last_engine_speed_update_usecs;
+ if(!config.getUpdatePeriod() ||
+ past_usecs &&
+ (!*past_usecs ||
+ ((now_usecs - *past_usecs) > (config.getUpdatePeriod() * 1000000)))) {
+ int rc = -1;
+ if(config.getCheckOnline() && check_online(afbclient)) {
+ goto skip_update;
+ }
+ rc = mqttclient.publish(topic, msg, config.getMqttQos(), config.getMqttRetain());
+ if(rc != MOSQ_ERR_SUCCESS) {
+ std::cerr << __FUNCTION__ << ": MQTT publish failed, rc = " << rc << std::endl;
+ } else if(config.getLogLevel() > 0) {
+ std::cerr << __FUNCTION__ << ": MQTT publish succeeded" << std::endl;
+ }
+skip_update:
+ *past_usecs = now_usecs;
+ }
+}
diff --git a/app/event.h b/app/event.h
new file mode 100644
index 0000000..e3d8fec
--- /dev/null
+++ b/app/event.h
@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef EVENT_H
+#define EVENT_H
+
+#include "configuration.h"
+#include "afbclient.h"
+#include "mqttclient.h"
+
+struct event_data {
+ std::string name;
+ int32_t value;
+ uint64_t timestamp;
+};
+
+void process_event(Configuration &config, AfbClient &afbclient, MqttClient &mqttclient, event_data *event);
+
+#endif // EVENT_H
diff --git a/app/gps.cpp b/app/gps.cpp
new file mode 100644
index 0000000..79e97b4
--- /dev/null
+++ b/app/gps.cpp
@@ -0,0 +1,89 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "gps.h"
+#include <string>
+#include <iostream>
+#include <json-c/json.h>
+
+bool get_location(Configuration &config, AfbClient &afbclient, location_data *location)
+{
+ bool rc;
+ struct json_object *j_resp = NULL;
+ std::string status;
+
+ if(!location)
+ return false;
+
+ if(afbclient.call_sync(std::string("gps"), std::string("location"), NULL, &j_resp) < 0)
+ return false;
+
+ if(config.getLogLevel() > 1) {
+ std::cerr << __FUNCTION__ << ": j_resp = " << \
+ json_object_to_json_string_ext(j_resp, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY) << \
+ std::endl;
+ }
+
+ // Check status
+ rc = false;
+ struct json_object *j_request;
+ if(!json_object_object_get_ex(j_resp, "request", &j_request))
+ goto location_error;
+
+ struct json_object *j_status;
+ if(!json_object_object_get_ex(j_request, "status", &j_status))
+ goto location_error;
+
+ status = json_object_get_string(j_status);
+ if(status == "failed")
+ goto location_error;
+
+ // Get location data
+ struct json_object *j_response;
+ if(!json_object_object_get_ex(j_resp, "response", &j_response))
+ goto location_error;
+
+ struct json_object *j_latitude;
+ if(!json_object_object_get_ex(j_response, "latitude", &j_latitude))
+ goto location_error;
+
+ struct json_object *j_longitude;
+ if(!json_object_object_get_ex(j_response, "longitude", &j_longitude))
+ goto location_error;
+
+ struct json_object *j_speed;
+ if(!json_object_object_get_ex(j_response, "speed", &j_speed))
+ goto location_error;
+
+ struct json_object *j_track;
+ if(!json_object_object_get_ex(j_response, "track", &j_track))
+ goto location_error;
+
+ struct json_object *j_timestamp;
+ if(!json_object_object_get_ex(j_response, "timestamp", &j_timestamp))
+ goto location_error;
+
+ location->latitude = json_object_get_double(j_latitude);
+ location->longitude = json_object_get_double(j_longitude);
+ location->speed = json_object_get_int(j_speed);
+ location->track = json_object_get_int(j_track);
+ location->timestamp = json_object_get_string(j_timestamp);
+ rc = true;
+
+location_error:
+ json_object_put(j_resp);
+ return rc;
+}
diff --git a/app/gps.h b/app/gps.h
new file mode 100644
index 0000000..4d1badd
--- /dev/null
+++ b/app/gps.h
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef GPS_H
+#define GPS_H
+
+#include "configuration.h"
+#include "afbclient.h"
+
+struct location_data {
+ double latitude;
+ double longitude;
+ int32_t speed;
+ int32_t track;
+ std::string timestamp;
+};
+
+bool get_location(Configuration &config, AfbClient &afbclient, location_data *location);
+
+#endif // GPS_H
diff --git a/app/main.cpp b/app/main.cpp
new file mode 100644
index 0000000..2b07cc1
--- /dev/null
+++ b/app/main.cpp
@@ -0,0 +1,158 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <iostream>
+#include <cstring>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <json-c/json.h>
+#include "configuration.h"
+#include "afbclient.h"
+#include "mqttclient.h"
+#include "network.h"
+#include "event.h"
+
+#define CONFIGURATION_FILE "telematics-recorder.conf"
+#define DEVICE_UUID "e4bbc0a8-f435-4326-9769-d4a2c9f3c18d"
+
+static std::deque<event_data*> g_event_queue;
+static std::mutex g_event_queue_mutex;
+static bool g_event_queue_ready = false;
+static std::condition_variable g_event_queue_cv;
+
+static Configuration g_config(CONFIGURATION_FILE, DEVICE_UUID);
+
+void diagnostic_message_cb(json_object *data)
+{
+ if(!data)
+ return;
+
+ if(g_config.getLogLevel() > 2) {
+ std::cerr << __FUNCTION__ << ": data = " << \
+ json_object_to_json_string_ext(data, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY) << \
+ std::endl;
+ }
+
+ struct json_object *j_name;
+ if(!json_object_object_get_ex(data, "name", &j_name))
+ return;
+ struct json_object *j_value;
+ if(!json_object_object_get_ex(data, "value", &j_value))
+ return;
+ struct json_object *j_timestamp;
+ if(!json_object_object_get_ex(data, "timestamp", &j_timestamp))
+ return;
+ uint64_t timestamp = json_object_get_int64(j_timestamp);
+
+ std::string name(json_object_get_string(j_name));
+ int32_t value = json_object_get_int(j_value);
+ if(name == "diagnostic_messages.vehicle.speed" ||
+ name == "diagnostic_messages.engine.speed") {
+ if(g_config.getLogLevel() > 1) {
+ std::cerr << __FUNCTION__ << ": " << name << \
+ ", value = " << value << \
+ ", timestamp = " << timestamp << \
+ std::endl;
+ }
+
+ name.erase(0, 20);
+ event_data* event = new event_data{ name, value, timestamp };
+ {
+ std::lock_guard<std::mutex> lk(g_event_queue_mutex);
+ g_event_queue.push_back(event);
+ g_event_queue_ready = true;
+ }
+ g_event_queue_cv.notify_one();
+ }
+}
+
+int main(int argc, char *argv[])
+{
+ int port = 0;
+ std::string token;
+
+ try {
+ port = std::stol(argv[1]);
+ token = argv[2];
+ } catch (const std::invalid_argument& e) {
+ std::cerr << "Invalid argument" << std::endl;
+ exit(1);
+ } catch (const std::out_of_range& e) {
+ std::cerr << "Port out of range" << std::endl;
+ exit(1);
+ }
+
+ AfbClient afbclient(port, token);
+
+ if(g_config.isCellularEnabled()) {
+ // Wait for modem to appear, and enable it if not already
+ enable_modem(g_config, afbclient);
+ }
+
+ std::string client_id = g_config.getMqttClientId();
+ if(client_id.empty())
+ client_id = std::string("AGL-telematics-recorder") + DEVICE_UUID;
+ MqttClient mqttclient(client_id,
+ g_config.getMqttBroker(),
+ g_config.getMqttPort(),
+ g_config.getMqttKeepalive(),
+ g_config.getMqttUsername(),
+ g_config.getMqttPassword());
+
+ afbclient.subscribe(std::string("low-can"),
+ std::string("diagnostic_messages.vehicle.speed"),
+ std::string("low-can/diagnostic_messages"),
+ diagnostic_message_cb);
+ afbclient.subscribe(std::string("low-can"),
+ std::string("diagnostic_messages.engine.speed"),
+ std::string("low-can/diagnostic_messages"),
+ diagnostic_message_cb);
+
+ std::deque<event_data*> event_queue;
+ while(true) {
+ // Wait until event callback sends data
+ std::unique_lock<std::mutex> lk(g_event_queue_mutex);
+ g_event_queue_cv.wait(lk, []{ return g_event_queue_ready; });
+ if(!g_event_queue.empty()) {
+ // copy out the events
+ event_queue = g_event_queue;
+ g_event_queue.clear();
+ }
+ g_event_queue_ready = false;
+ lk.unlock();
+
+ for(event_data *event : event_queue) {
+ if(!event)
+ continue;
+
+ if(g_config.getLogLevel() > 0) {
+ std::cerr << __FUNCTION__ << ": " << \
+ event->name << ", value = " << event->value << \
+ ", timestamp = " << event->timestamp << \
+ std::endl;
+ }
+
+ process_event(g_config, afbclient, mqttclient, event);
+
+ delete event;
+ }
+ // Clear out processed events
+ event_queue.clear();
+ }
+ return 0;
+}
diff --git a/app/mqttclient.cpp b/app/mqttclient.cpp
new file mode 100644
index 0000000..f072013
--- /dev/null
+++ b/app/mqttclient.cpp
@@ -0,0 +1,68 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "mqttclient.h"
+#include <iostream>
+
+#undef DEBUG
+
+#ifdef DEBUG
+static void on_connect(struct mosquitto *mosq, void *obj, int rc)
+{
+ std::cerr << " MQTT Connected, rc = " << rc << std::endl;
+}
+
+static void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
+{
+ std::cerr << " MQTT Disconnected, rc = " << rc << std::endl;
+}
+#endif
+
+MqttClient::MqttClient(const std::string &id, const std::string &host, const int port, const int keepalive, const std::string &username, const std::string &password)
+{
+ mosquitto_lib_init();
+ m_mosq = mosquitto_new(id.c_str(), true, NULL);
+
+#ifdef DEBUG
+ mosquitto_connect_callback_set(m_mosq, on_connect);
+ mosquitto_disconnect_callback_set(m_mosq, on_disconnect);
+#endif
+
+ if(username.length())
+ mosquitto_username_pw_set(m_mosq, username.c_str(), password.c_str());
+
+ if(mosquitto_connect_async(m_mosq, host.c_str(), port, keepalive)) {
+ std::cerr << __FUNCTION__ << ": Unable to connect to " << host << std::endl;
+ }
+
+ int loop = mosquitto_loop_start(m_mosq);
+ if(loop != MOSQ_ERR_SUCCESS){
+ std::cerr << __FUNCTION__ << ": Unable to start loop, error = " << loop << std::endl;
+ }
+}
+
+MqttClient::~MqttClient(void)
+{
+ mosquitto_disconnect(m_mosq);
+ mosquitto_loop_stop(m_mosq, true);
+ mosquitto_destroy(m_mosq);
+ mosquitto_lib_cleanup();
+}
+
+int MqttClient::publish(const std::string &topic, const std::string &msg, const int qos, const bool retain)
+{
+ return mosquitto_publish(m_mosq, NULL, topic.c_str(), msg.length(), msg.c_str(), qos, retain);
+}
diff --git a/app/mqttclient.h b/app/mqttclient.h
new file mode 100644
index 0000000..b7b893e
--- /dev/null
+++ b/app/mqttclient.h
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MQTTCLIENT_H
+#define MQTTCLIENT_H
+
+#include <string>
+#include <mosquitto.h>
+#include <json-c/json.h>
+
+class MqttClient
+{
+public:
+ MqttClient(const std::string &id, const std::string &host, const int port, const int keepalive = 60, const std::string &username = "", const std::string &password = "");
+ ~MqttClient();
+
+ int publish(const std::string &topic, const std::string &msg, const int qos, const bool retain);
+
+private:
+ mosquitto *m_mosq;
+};
+
+#endif // MQTTCLIENT_H
diff --git a/app/network.cpp b/app/network.cpp
new file mode 100644
index 0000000..d163ef5
--- /dev/null
+++ b/app/network.cpp
@@ -0,0 +1,119 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "network.h"
+#include <string>
+#include <iostream>
+#include <unistd.h>
+#include <json-c/json.h>
+
+int enable_modem(Configuration &config, AfbClient &afbclient)
+{
+ int rc;
+ struct json_object *j_resp = NULL;
+
+ bool cellular_enabled = false;
+ bool cellular_found = false;
+ while(!cellular_found) {
+ // Check current state
+ rc = afbclient.call_sync(std::string("network-manager"), std::string("technologies"), NULL, &j_resp);
+ if(rc < 0 || !j_resp)
+ continue;
+ if(config.getLogLevel() > 1) {
+ std::cerr << __FUNCTION__ << ": j_resp = " << \
+ json_object_to_json_string_ext(j_resp, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY) << \
+ std::endl;
+ }
+
+ struct json_object *j_response;
+ if(!json_object_object_get_ex(j_resp, "response", &j_response))
+ continue;
+
+ struct json_object *j_values;
+ if(!json_object_object_get_ex(j_response, "values", &j_values))
+ continue;
+
+ for(int i = 0; i < json_object_array_length(j_values); i++) {
+ struct json_object *j_value = json_object_array_get_idx(j_values, i);
+ if(!j_value)
+ break;
+
+ struct json_object *j_technology;
+ if(!json_object_object_get_ex(j_value, "technology", &j_technology))
+ break;
+
+ std::string technology(json_object_get_string(j_technology));
+ if(technology == "cellular") {
+ struct json_object *j_properties;
+ if(!json_object_object_get_ex(j_value, "properties", &j_properties))
+ break;
+
+ struct json_object *j_powered;
+ if(!json_object_object_get_ex(j_properties, "powered", &j_powered))
+ break;
+
+ if(json_object_get_boolean(j_powered)) {
+ if(config.getLogLevel() > 0) {
+ std::cerr << __FUNCTION__ << ": cellular enabled!" << std::endl;
+ cellular_enabled = true;
+ }
+ }
+
+ std::cerr << __FUNCTION__ << ": cellular found!" << std::endl;
+ cellular_found = true;
+ rc = 0;
+ break;
+ }
+
+ }
+ json_object_put(j_resp);
+
+ sleep(1);
+ }
+
+ if(!cellular_enabled) {
+ if(config.getLogLevel() > 0) {
+ std::cerr << __FUNCTION__ << ": enabling cellular" << std::endl;
+ }
+ struct json_object *j_obj = json_object_new_object();
+ struct json_object *j_val = json_object_new_string("cellular");
+ json_object_object_add(j_obj, "technology", j_val);
+ rc = afbclient.call_sync(std::string("network-manager"), std::string("enable_technology"), j_obj);
+ }
+ return rc;
+}
+
+bool check_online(AfbClient &afbclient)
+{
+ int rc;
+ struct json_object *j_resp = NULL;
+
+ // Check current state
+ rc = afbclient.call_sync(std::string("network-manager"), std::string("state"), NULL, &j_resp);
+ if(rc < 0 || !j_resp)
+ return false;
+
+ struct json_object *j_response;
+ if(!json_object_object_get_ex(j_resp, "response", &j_response))
+ return false;
+
+ bool online = false;
+ std::string response(json_object_get_string(j_response));
+ if(response == "online")
+ online = true;
+ json_object_put(j_resp);
+ return online;
+}
diff --git a/app/network.h b/app/network.h
new file mode 100644
index 0000000..65d3330
--- /dev/null
+++ b/app/network.h
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2019 Konsulko Group
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NETWORK_H
+#define NETWORK_H
+
+#include "configuration.h"
+#include "afbclient.h"
+
+int enable_modem(Configuration &config, AfbClient &afbclient);
+
+bool check_online(AfbClient &afbclient);
+
+#endif // NETWORK_H