From 9a8dcb99c3efb5b00b0a014e4699655d479d9925 Mon Sep 17 00:00:00 2001 From: Romain Forlot Date: Fri, 24 Feb 2017 14:14:48 +0000 Subject: Improve mutex lock logic. - Use of bracket instead of unlock method - Change some mutex lock scope. - Added subscribed_signals map object mutex to manipulate it safely. Change-Id: I770c0b5701db6b1151511f7360ec31ae6dcc1de9 Signed-off-by: Romain Forlot --- src/can-signals.hpp | 9 +++++++++ src/can-utils.cpp | 19 ++++++++++++++++++- src/can-utils.hpp | 33 ++++++++++++++++++++++++++++++++- src/can_decode_message.cpp | 39 +++++++++++++++++++++------------------ src/can_event_push.cpp | 15 ++++++++------- src/can_event_push.hpp | 6 ------ src/can_reader.cpp | 5 +++-- src/low-can-binding.cpp | 20 +++++--------------- src/low-can-binding.hpp | 7 +++++-- 9 files changed, 101 insertions(+), 52 deletions(-) (limited to 'src') diff --git a/src/can-signals.hpp b/src/can-signals.hpp index af110d0d..fb77df5c 100644 --- a/src/can-signals.hpp +++ b/src/can-signals.hpp @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include "timer.hpp" #include "openxc.pb.h" @@ -46,6 +48,13 @@ extern "C" */ static std::map subscribed_signals; +/** +* @brief Mutex allowing safe manipulation on subscribed_signals map. +* @desc To ensure that the map object isn't modified when we read it, you +* have to set this mutex before use subscribed_signals map object. +*/ +extern std::mutex subscribed_signals_mutex; + /** Public: Return the currently active CAN configuration. */ CanMessageSet* getActiveMessageSet(); diff --git a/src/can-utils.cpp b/src/can-utils.cpp index c6f2d575..0d52bbe4 100644 --- a/src/can-utils.cpp +++ b/src/can-utils.cpp @@ -162,16 +162,33 @@ canfd_frame can_message_t::convert_to_canfd_frame() *********************************************************************************/ can_bus_t::can_bus_t(int& conf_file) - : conf_file_{conf_file} + : conf_file_{conf_file} { } void can_bus_t::start_threads() { th_decoding_ = std::thread(can_decode_message, std::ref(*this)); + is_decoding_ = true; th_pushing_ = std::thread(can_event_push, std::ref(*this)); + is_pushing_ = true; } +void can_bus_t::stop_threads() +{ + is_decoding_ = false; + is_pushing_ = false; +} + +bool can_bus_t::is_decoding() +{ + return is_decoding_; +} + +bool can_bus_t::is_pushing() +{ + return is_pushing_; +} int can_bus_t::init_can_dev() { diff --git a/src/can-utils.hpp b/src/can-utils.hpp index 11e19f35..76dc75b3 100644 --- a/src/can-utils.hpp +++ b/src/can-utils.hpp @@ -221,7 +221,9 @@ class can_bus_t { int conf_file_; /*!< conf_file_ - configuration file handle used to initialize can_bus_dev_t objects.*/ std::thread th_decoding_; /*!< thread that'll handle decoding a can frame */ + bool is_decoding_; /*!< boolean member controling thread while loop*/ std::thread th_pushing_; /*!< thread that'll handle pushing decoded can frame to subscribers */ + bool is_pushing_; /*!< boolean member controling thread while loop*/ bool has_can_message_; /*!< boolean members that control whether or not there is can_message into the queue */ std::queue can_message_q_; /*!< queue that'll store can_message_t to decoded */ @@ -258,6 +260,32 @@ class can_bus_t { */ void start_threads(); + /** + * @brief Will stop all threads holded by can_bus_t object + * which are decoding and pushing threads. + */ + void stop_threads(); + + /** + * @brief Telling if the decoding thread is running. + * This is the boolean value on which the while loop + * take its condition. Set it to false will stop the + * according thread. + * + * @return true if decoding thread is running, false if not. + */ + bool is_decoding(); + + /** + * @brief Telling if the pushing thread is running + * This is the boolean value on which the while loop + * take its condition. Set it to false will stop the + * according thread. + * + * @return true if pushing thread is running, false if not. + */ + bool is_pushing(); + /** * @brief Return first can_message_t on the queue * @@ -341,8 +369,11 @@ class can_bus_dev_t { /** * @brief Telling if the reading thread is running + * This is the boolean value on which the while loop + * take its condition. Set it to false will stop the + * according thread. * - * @return true if read is running, false if not. + * @return true if reading thread is running, false if not. */ bool is_running(); diff --git a/src/can_decode_message.cpp b/src/can_decode_message.cpp index 8d81f8b8..3c1825d1 100644 --- a/src/can_decode_message.cpp +++ b/src/can_decode_message.cpp @@ -31,41 +31,44 @@ void can_decode_message(can_bus_t &can_bus) std::vector signals; std::vector ::iterator signals_i; openxc_VehicleMessage vehicle_message; - openxc_DynamicField search_key, ret; + openxc_DynamicField search_key, decoded_message; bool send = true; decoder_t decoder; - while(can_bus.has_can_message()) + while(can_bus.is_decoding()) { - std::unique_lock can_message_lock(can_message_mutex); - new_can_message.wait(can_message_lock); + { + std::unique_lock can_message_lock(can_message_mutex); + new_can_message.wait(can_message_lock); can_message = can_bus.next_can_message(); - can_message_mutex.unlock(); + } - std::lock_guard decoded_can_message_lock(decoded_can_message_mutex); - /* First we have to found which CanSignal is */ - search_key = build_DynamicField((double)can_message.get_id()); - signals = find_can_signals(search_key); - - /* Decoding the message ! Don't kill the messenger ! */ - for(const auto& sig : signals) - { + /* First we have to found which CanSignal it is */ + search_key = build_DynamicField((double)can_message.get_id()); + signals = find_can_signals(search_key); + + /* Decoding the message ! Don't kill the messenger ! */ + for(const auto& sig : signals) + { + { + std::lock_guard subscribed_signals_lock(subscribed_signals_mutex); std::map subscribed_signals = get_subscribed_signals(); const auto& it_event = subscribed_signals.find(sig.genericName); if(it_event != subscribed_signals.end() && afb_event_is_valid(it_event->second)) { - ret = decoder.decodeSignal(sig, can_message, getSignals(), &send); - - openxc_SimpleMessage s_message = build_SimpleMessage(sig.genericName, ret); + decoded_message = decoder.decodeSignal(sig, can_message, getSignals(), &send); + openxc_SimpleMessage s_message = build_SimpleMessage(sig.genericName, decoded_message); vehicle_message = build_VehicleMessage_with_SimpleMessage(openxc_DynamicField_Type::openxc_DynamicField_Type_NUM, s_message); + + std::lock_guard decoded_can_message_lock(decoded_can_message_mutex); can_bus.push_new_vehicle_message(vehicle_message); } + new_decoded_can_message.notify_one(); } - decoded_can_message_mutex.unlock(); - new_decoded_can_message.notify_one(); + } } } diff --git a/src/can_event_push.cpp b/src/can_event_push.cpp index bfe8c803..fc403ee1 100644 --- a/src/can_event_push.cpp +++ b/src/can_event_push.cpp @@ -30,16 +30,18 @@ void can_event_push(can_bus_t& can_bus) openxc_SimpleMessage s_message; json_object* jo; - while(can_bus.has_vehicle_message()) + while(can_bus.is_pushing()) { - std::unique_lock decoded_can_message_lock(decoded_can_message_mutex); - new_decoded_can_message.wait(decoded_can_message_lock); + { + std::unique_lock decoded_can_message_lock(decoded_can_message_mutex); + new_decoded_can_message.wait(decoded_can_message_lock); v_message = can_bus.next_vehicle_message(); - decoded_can_message_mutex.unlock(); + } s_message = get_simple_message(v_message); - std::lock_guard push_signal_lock(subscribed_signals_mutex); + { + std::lock_guard subscribed_signals_lock(subscribed_signals_mutex); std::map subscribed_signals = get_subscribed_signals(); const auto& it_event = subscribed_signals.find(s_message.name); if(it_event != subscribed_signals.end() && afb_event_is_valid(it_event->second)) @@ -48,7 +50,6 @@ void can_event_push(can_bus_t& can_bus) jsonify_simple(s_message, jo); afb_event_push(it_event->second, jo); } - subscribed_signals_mutex.unlock(); - update_subscrided_signals.notify_one(); + } } } \ No newline at end of file diff --git a/src/can_event_push.hpp b/src/can_event_push.hpp index 00401448..e5972416 100644 --- a/src/can_event_push.hpp +++ b/src/can_event_push.hpp @@ -17,9 +17,3 @@ */ #pragma once - -#include -#include - -std::condition_variable update_subscrided_signals; -std::mutex subscribed_signals_mutex; \ No newline at end of file diff --git a/src/can_reader.cpp b/src/can_reader.cpp index 018c7cba..7ae981b8 100644 --- a/src/can_reader.cpp +++ b/src/can_reader.cpp @@ -29,9 +29,10 @@ void can_reader(can_bus_dev_t &can_bus_dev, can_bus_t& can_bus) { can_message.convert_from_canfd_frame(can_bus_dev.read()); - std::lock_guard can_message_lock(can_message_mutex); + { + std::lock_guard can_message_lock(can_message_mutex); can_bus.push_new_can_message(can_message); - can_message_mutex.unlock(); + } new_can_message.notify_one(); } } \ No newline at end of file diff --git a/src/low-can-binding.cpp b/src/low-can-binding.cpp index 21517903..be2d3737 100644 --- a/src/low-can-binding.cpp +++ b/src/low-can-binding.cpp @@ -19,6 +19,7 @@ #include "low-can-binding.hpp" #include +#include #include #include #include @@ -34,7 +35,6 @@ extern "C" { - #include #include }; @@ -43,15 +43,6 @@ extern "C" */ const struct afb_binding_interface *binder_interface; -/* - * CAN bus handler pointer. This is the object that will be use to - * initialize each CAN devices specified into the configuration file - * - * It is used by the reading thread also because of its can_message_q_ queue - * that store CAN messages read from the socket. - */ -can_bus_t *can_bus_handler; - /******************************************************************************** * * Subscription and unsubscription @@ -86,7 +77,7 @@ static int subscribe_unsubscribe_signal(struct afb_req request, bool subscribe, { int ret; - // TODO: lock the subscribed_signals when insert/remove + std::lock_guard subscribed_signals_lock(subscribed_signals_mutex); auto ss_i = subscribed_signals.find(sig.genericName); if (ss_i != subscribed_signals.end() && !afb_event_is_valid(ss_i->second)) { @@ -243,13 +234,12 @@ extern "C" fd_conf = afb_daemon_rootdir_open_locale(binder_interface->daemon, "can_bus.json", O_RDONLY, NULL); /* Initialize the CAN bus handler */ - can_bus_t cbh(fd_conf); - can_bus_handler = &cbh; + can_bus_t can_bus_handler(fd_conf); /* Open CAN socket */ - if(can_bus_handler->init_can_dev() == 0) + if(can_bus_handler.init_can_dev() == 0) { - can_bus_handler->start_threads(); + can_bus_handler.start_threads(); return 0; } ERROR(binder_interface, "There was something wrong with CAN device Initialization. Check your config file maybe"); diff --git a/src/low-can-binding.hpp b/src/low-can-binding.hpp index b2b544f4..2c84989e 100644 --- a/src/low-can-binding.hpp +++ b/src/low-can-binding.hpp @@ -18,8 +18,11 @@ #pragma once -#include -#include +#include +extern "C" +{ + #include +}; extern "C" struct afb_binding_interface; -- cgit 1.2.3-korg