From 4ab2164774bdb9a29b2f180a9013c26c0155628d Mon Sep 17 00:00:00 2001 From: Romain Forlot Date: Fri, 2 Jun 2017 19:45:01 +0200 Subject: Get diag manager's socket into subscription obj Subscription index map is the socket ID which implies the following: - All diagnostic messages subscriptions are stored into a vector holding diagnostic_message and there is 1 socket for all like the diag manager did. - Reworked workflow to open a BCM socket and adding an RX filter more flexible. - Separated methods to handle on_no_clients event. - Cleaning diagnostic manager code to remove all unneeded stuff now. - Embed diagnostic response in VehicleMessage decoded message to be able transmits the PID in event push thread. This is needed by to correctly handle case when there is no clients subscribed to an AFB event. Else we can't find the diagnostic message to remove from low_can_subscription vector. Change-Id: Iab13fd556cda3c69827bcd67f3a23a03cb6a2cf2 Signed-off-by: Romain Forlot --- CAN-binder/low-can-binding/binding/low-can-cb.cpp | 310 ++++++++++++++------- CAN-binder/low-can-binding/binding/low-can-cb.hpp | 14 +- CAN-binder/low-can-binding/binding/low-can-hat.hpp | 6 +- 3 files changed, 226 insertions(+), 104 deletions(-) (limited to 'CAN-binder/low-can-binding/binding') diff --git a/CAN-binder/low-can-binding/binding/low-can-cb.cpp b/CAN-binder/low-can-binding/binding/low-can-cb.cpp index d4cc5a11..5b7561f8 100644 --- a/CAN-binder/low-can-binding/binding/low-can-cb.cpp +++ b/CAN-binder/low-can-binding/binding/low-can-cb.cpp @@ -19,7 +19,6 @@ #include "low-can-hat.hpp" #include -#include #include #include #include @@ -59,12 +58,6 @@ low_can_subscription_t::low_can_subscription_t(struct event_filter_t event_filte : event_filter_{event_filter} {} -low_can_subscription_t::low_can_subscription_t(struct event_filter_t event_filter, std::shared_ptr diagnostic_message) - : diagnostic_message_{diagnostic_message}, event_filter_{event_filter} -{ - index_ = diagnostic_message->get_pid(); -} - low_can_subscription_t::low_can_subscription_t( low_can_subscription_t&& s) : index_{s.index_}, event_filter_{s.event_filter_}, @@ -77,9 +70,14 @@ low_can_subscription_t::low_can_subscription_t( low_can_subscription_t&& s) return *this; } +low_can_subscription_t::~low_can_subscription_t() +{ + socket_.close(); +} + low_can_subscription_t::operator bool() const { - return ((can_signal_ != nullptr || diagnostic_message_ != nullptr) && afb_event_is_valid(event_)); + return ((can_signal_ != nullptr || ! diagnostic_message_.empty()) && afb_event_is_valid(event_)); } struct afb_event& low_can_subscription_t::get_event() @@ -97,17 +95,47 @@ const std::shared_ptr low_can_subscription_t::get_can_signal() con return can_signal_; } -const std::shared_ptr low_can_subscription_t::get_diagnostic_message() const +const std::vector > low_can_subscription_t::get_diagnostic_message() const { return diagnostic_message_; } +const std::shared_ptr low_can_subscription_t::get_diagnostic_message(uint32_t pid) const +{ + for(const auto& diag: diagnostic_message_) + { + if(diag->get_pid() == pid) + { + return diag; + } + } + return nullptr; +} + +const std::shared_ptr low_can_subscription_t::get_diagnostic_message(const std::string& name) const +{ + for(const auto& diag: diagnostic_message_) + { + if(diag->get_name() == name) + { + return diag; + } + } + return nullptr; +} + const std::string low_can_subscription_t::get_name() const { if (can_signal_ != nullptr) return can_signal_->get_name(); - if (diagnostic_message_ != nullptr) - return diagnostic_message_->get_name() ; + + return ""; +} + +const std::string low_can_subscription_t::get_name(uint32_t pid) const +{ + if (!diagnostic_message_.empty()) + return get_diagnostic_message(pid)->get_name() ; return ""; } @@ -152,47 +180,75 @@ void low_can_subscription_t::set_max(float max) event_filter_.max = max; } +int low_can_subscription_t::open_socket() +{ + int ret = 0; + if(! socket_) + { + if( can_signal_ != nullptr) + {ret = socket_.open(can_signal_->get_message()->get_bus_device_name());} + else if (! diagnostic_message_ .empty()) + {ret = socket_.open(application_t::instance().get_diagnostic_manager().get_bus_device_name());} + index_ = (int)socket_.socket(); + } + return ret; +} + +struct utils::simple_bcm_msg low_can_subscription_t::make_bcm_head(uint32_t can_id, uint32_t flags, const struct timeval& timeout, const struct timeval& frequency_thinning) const +{ + struct utils::simple_bcm_msg bcm_msg; + + memset(&bcm_msg, 0, sizeof(bcm_msg)); + + bcm_msg.msg_head.opcode = RX_SETUP; + bcm_msg.msg_head.can_id = can_id; + bcm_msg.msg_head.flags = flags; + bcm_msg.msg_head.ival1.tv_sec = timeout.tv_sec ; + bcm_msg.msg_head.ival1.tv_usec = timeout.tv_usec; + bcm_msg.msg_head.ival2.tv_sec = frequency_thinning.tv_sec ; + bcm_msg.msg_head.ival2.tv_usec = frequency_thinning.tv_usec; + + return bcm_msg; +} + +void low_can_subscription_t::add_bcm_frame(const struct can_frame& cfd, struct utils::simple_bcm_msg& bcm_msg) const +{ + for(int i=0; i < CAN_MAX_DLEN; i++) + { + if(cfd.data[i] != 0) + { + bcm_msg.msg_head.nframes = 1; + bcm_msg.frames = cfd; + return; + } + } +} + /// @brief Create a RX_SETUP receive job used by the BCM socket. /// /// @return 0 if ok else -1 -int low_can_subscription_t::create_rx_filter(std::shared_ptr sig) +int low_can_subscription_t::create_rx_filter() { - can_signal_= sig; - return create_rx_filter(); + int ret = -1; + if ( can_signal_ != nullptr) + {ret = create_rx_filter(can_signal_);} + else if (! diagnostic_message_ .empty()) + {ret = create_rx_filter(diagnostic_message_.front());} + + return ret; } /// @brief Create a RX_SETUP receive job used by the BCM socket. /// /// @return 0 if ok else -1 -int low_can_subscription_t::create_rx_filter() +int low_can_subscription_t::create_rx_filter(std::shared_ptr sig) { - if (can_signal_ == nullptr) - {return -1;} - - // Make sure that socket has been opened. - if(! socket_) - { - socket_.open(can_signal_->get_message()->get_bus_device_name()); - index_ = (int)socket_.socket(); - } + can_signal_= sig; - struct utils::simple_bcm_msg bcm_msg; struct can_frame cfd; - memset(&cfd, 0, sizeof(cfd)); - memset(&bcm_msg.msg_head, 0, sizeof(bcm_msg.msg_head)); - float val = (float)(1 << can_signal_->get_bit_size()) - 1; - struct timeval freq; - frequency_clock_t f = std::isnan(event_filter_.frequency) ? can_signal_->get_frequency() : frequency_clock_t(event_filter_.frequency); - freq = f.get_timeval_from_period(); - - bcm_msg.msg_head.opcode = RX_SETUP; - bcm_msg.msg_head.can_id = can_signal_->get_message()->get_id(); - bcm_msg.msg_head.flags = SETTIMER|RX_NO_AUTOTIMER; - bcm_msg.msg_head.ival2.tv_sec = freq.tv_sec ; - bcm_msg.msg_head.ival2.tv_usec = freq.tv_usec; - bcm_msg.msg_head.nframes = 1; + float val = (float)(1 << can_signal_->get_bit_size()) - 1; bitfield_encode_float(val, can_signal_->get_bit_position(), can_signal_->get_bit_size(), @@ -201,11 +257,61 @@ int low_can_subscription_t::create_rx_filter() cfd.data, CAN_MAX_DLEN); - bcm_msg.frames = cfd; + struct timeval freq, timeout = {0, 0}; + frequency_clock_t f = std::isnan(event_filter_.frequency) ? can_signal_->get_frequency() : frequency_clock_t(event_filter_.frequency); + freq = f.get_timeval_from_period(); + + utils::simple_bcm_msg bcm_msg = make_bcm_head(can_signal_->get_message()->get_id(), SETTIMER|RX_NO_AUTOTIMER, timeout, freq); + add_bcm_frame(cfd, bcm_msg); + + return create_rx_filter(bcm_msg); +} + +/// @brief Create a RX_SETUP receive job used by the BCM socket. +/// +/// @return 0 if ok else -1 +int low_can_subscription_t::create_rx_filter(std::shared_ptr sig) +{ + diagnostic_message_.push_back(sig); + + struct timeval freq = frequency_clock_t(event_filter_.frequency).get_timeval_from_period(); + //struct timeval timeout = frequency_clock_t(10).get_timeval_from_period(); + struct timeval timeout = {0,0}; + + utils::simple_bcm_msg bcm_msg = make_bcm_head(OBD2_FUNCTIONAL_BROADCAST_ID, SETTIMER|RX_NO_AUTOTIMER|RX_FILTER_ID, timeout, freq); + return create_rx_filter(bcm_msg); +} + +/// @brief Create a RX_SETUP receive job used by the BCM socket. +/// +/// @return 0 if ok else -1 +int low_can_subscription_t::create_rx_filter(utils::simple_bcm_msg& bcm_msg) +{ + // Make sure that socket has been opened. + if(open_socket() < 0) + {return -1;} - if(socket_ << bcm_msg) - return 0; - return -1; + // If it isn't an OBD2 CAN ID then just add a simple RX_SETUP job + // else monitor all standard 8 CAN OBD2 ID response. + if(bcm_msg.msg_head.can_id != OBD2_FUNCTIONAL_BROADCAST_ID) + { + socket_ << bcm_msg; + if(! socket_) + return -1; + } + else + { + for(uint8_t i = 0; i < 8; i++) + { + bcm_msg.msg_head.can_id = OBD2_FUNCTIONAL_RESPONSE_START + i; + + socket_ << bcm_msg; + if(! socket_) + return -1; + } + } + + return 0; } ///****************************************************************************** @@ -214,66 +320,56 @@ int low_can_subscription_t::create_rx_filter() /// ///*******************************************************************************/ -void on_no_clients(const std::string& message) +void on_no_clients(std::shared_ptr can_subscription, uint32_t pid) { - DiagnosticRequest* diag_req = application_t::instance().get_request_from_diagnostic_message(message); - if(diag_req != nullptr) + if( ! can_subscription->get_diagnostic_message().empty() && can_subscription->get_diagnostic_message(pid) != nullptr) { + DiagnosticRequest diag_req = can_subscription->get_diagnostic_message(pid)->build_diagnostic_request(); active_diagnostic_request_t* adr = application_t::instance().get_diagnostic_manager().find_recurring_request(diag_req); if( adr != nullptr) application_t::instance().get_diagnostic_manager().cleanup_request(adr, true); } - delete diag_req; - diag_req = nullptr; + + on_no_clients(can_subscription); +} + +void on_no_clients(std::shared_ptr can_subscription) +{ + utils::signals_manager_t& sm = utils::signals_manager_t::instance(); + std::lock_guard subscribed_signals_lock(sm.get_subscribed_signals_mutex()); + std::map >& s = sm.get_subscribed_signals(); + auto it = s.find(can_subscription->get_index()); + s.erase(it); } static void push_n_notify(const can_message_t& cm) { can_bus_t& cbm = application_t::instance().get_can_bus_manager(); - std::lock_guard can_message_lock(cbm.get_can_message_mutex()); - { cbm.push_new_can_message(cm); } + { + std::lock_guard can_message_lock(cbm.get_can_message_mutex()); + cbm.push_new_can_message(cm); + } cbm.get_new_can_message_cv().notify_one(); } -int read_message(sd_event_source *s, int fd, uint32_t revents, void *userdata) +int read_message(sd_event_source *event_source, int fd, uint32_t revents, void *userdata) { - can_message_t cm; - low_can_subscription_t* can_subscription; - diagnostic_manager_t& diag_m = application_t::instance().get_diagnostic_manager(); - - if(userdata != nullptr) + low_can_subscription_t* can_subscription = (low_can_subscription_t*)userdata; + if ((revents & EPOLLIN) != 0) { - can_subscription = (low_can_subscription_t*)userdata; + can_message_t cm; utils::socketcan_bcm_t& s = can_subscription->get_socket(); s >> cm; - } - else - { - utils::socketcan_bcm_t& s = diag_m.get_socket(); - s >> cm; - } - push_n_notify(cm); + push_n_notify(cm); + } /* check if error or hangup */ if ((revents & (EPOLLERR|EPOLLRDHUP|EPOLLHUP)) != 0) { - sd_event_source_unref(s); - if(userdata != nullptr) - { - can_subscription->get_socket().close(); - can_subscription->create_rx_filter(); - NOTICE(binder_interface, "%s: Recreated RX_SETUP BCM job for can_subscription: %s", __FUNCTION__, can_subscription->get_name().c_str()); - } - else - { - diag_m.get_socket().close(); - diag_m.cleanup_active_requests(true); - ERROR(binder_interface, "%s: Error on diagnostic manager socket, cancelling active requests.", __FUNCTION__); - } - return -1; + sd_event_source_unref(event_source); + can_subscription->get_socket().close(); } - return 0; } @@ -338,7 +434,18 @@ static int subscribe_unsubscribe_signal(struct afb_req request, bool subscribe, return make_subscription_unsubscription(request, can_subscription, s, subscribe); } -int subscribe_unsubscribe_diagnostic_messages(struct afb_req request, bool subscribe, std::vector > diagnostic_messages, struct event_filter_t& event_filter, std::map >& s) +static int add_to_event_loop(std::shared_ptr& can_subscription) +{ + struct sd_event_source* event_source = nullptr; + return ( sd_event_add_io(afb_daemon_get_event_loop(binder_interface->daemon), + &event_source, + can_subscription->get_socket().socket(), + EPOLLIN, + read_message, + can_subscription.get())); +} + +static int subscribe_unsubscribe_diagnostic_messages(struct afb_req request, bool subscribe, std::vector > diagnostic_messages, struct event_filter_t& event_filter, std::map >& s) { int rets = 0; application_t& app = application_t::instance(); @@ -346,52 +453,55 @@ int subscribe_unsubscribe_diagnostic_messages(struct afb_req request, bool subsc for(const auto& sig : diagnostic_messages) { - DiagnosticRequest* diag_req = app.get_request_from_diagnostic_message(sig->get_name()); - float frequency = std::isnan(event_filter.frequency) ? sig->get_frequency() : event_filter.frequency; + DiagnosticRequest* diag_req = new DiagnosticRequest(sig->build_diagnostic_request()); + event_filter.frequency = std::isnan(event_filter.frequency) ? sig->get_frequency() : event_filter.frequency; std::shared_ptr can_subscription; + auto it = std::find_if(s.begin(), s.end(), [&sig](std::pair > sub){ return (! sub.second->get_diagnostic_message().empty());}); + can_subscription = it != s.end() ? + it->second : + std::make_shared(low_can_subscription_t(event_filter)); // If the requested diagnostic message isn't supported by the car then unsubcribe it // no matter what we want, worse case will be a fail unsubscription but at least we don't // poll a PID for nothing. //TODO: Adding callback requesting ignition status: diag_req, sig.c_str(), false, diagnostic_message_t::decode_obd2_response, diagnostic_message_t::check_ignition_status, frequency); if(sig->get_supported() && subscribe) { - if (s.find(sig->get_pid()) != s.end()) - { - can_subscription = s[sig->get_pid()]; - DEBUG(binder_interface, "%s: Signal: %s already subscribed. Adding a new subscription", __FUNCTION__, sig->get_name().c_str()); - } - else + diag_m.add_recurring_request(diag_req, sig->get_name().c_str(), false, sig->get_decoder(), sig->get_callback(), event_filter.frequency); + if(can_subscription->create_rx_filter(sig) < 0) + {return -1;} + DEBUG(binder_interface, "%s: Signal: %s subscribed", __FUNCTION__, sig->get_name().c_str()); + if(it == s.end() && add_to_event_loop(can_subscription) < 0) { - diag_m.add_recurring_request(diag_req, sig->get_name().c_str(), false, sig->get_decoder(), sig->get_callback(), frequency); - can_subscription = std::make_shared(low_can_subscription_t(event_filter, sig)); - DEBUG(binder_interface, "%s: Signal: %s subscribed", __FUNCTION__, sig->get_name().c_str()); + diag_m.cleanup_request( + diag_m.find_recurring_request(*diag_req), true); + WARNING(binder_interface, "%s: signal: %s isn't supported. Canceling operation.", __FUNCTION__, sig->get_name().c_str()); + return -1; } } else { diag_m.cleanup_request( - diag_m.find_recurring_request(diag_req), true); - delete diag_req; - diag_req = nullptr; + diag_m.find_recurring_request(*diag_req), true); if(sig->get_supported()) + {DEBUG(binder_interface, "%s: %s cancelled due to unsubscribe", __FUNCTION__, sig->get_name().c_str());} + else { - DEBUG(binder_interface, "%s: %s cancelled due to unsubscribe", __FUNCTION__, sig->get_name().c_str()); - return 0; + WARNING(binder_interface, "%s: signal: %s isn't supported. Canceling operation.", __FUNCTION__, sig->get_name().c_str()); + return -1; } - WARNING(binder_interface, "%s: signal: %s isn't supported. Canceling operation.", __FUNCTION__, sig->get_name().c_str()); - return -1; } int ret = subscribe_unsubscribe_signal(request, subscribe, can_subscription, s); if(ret < 0) return ret; + rets++; } return rets; } -int subscribe_unsubscribe_can_signals(struct afb_req request, bool subscribe, std::vector > can_signals, struct event_filter_t& event_filter, std::map >& s) +static int subscribe_unsubscribe_can_signals(struct afb_req request, bool subscribe, std::vector > can_signals, struct event_filter_t& event_filter, std::map >& s) { int rets = 0; for(const auto& sig: can_signals) @@ -412,8 +522,8 @@ int subscribe_unsubscribe_can_signals(struct afb_req request, bool subscribe, st if(subscribe_unsubscribe_signal(request, subscribe, can_subscription, s) < 0) {return -1;} - struct sd_event_source* e_source; - sd_event_add_io(afb_daemon_get_event_loop(binder_interface->daemon), &e_source, can_subscription->get_socket().socket(), EPOLLIN, read_message, can_subscription.get()); + if(add_to_event_loop(can_subscription) < 0) + {return -1;} rets++; DEBUG(binder_interface, "%s: signal: %s subscribed", __FUNCTION__, sig->get_name().c_str()); } diff --git a/CAN-binder/low-can-binding/binding/low-can-cb.hpp b/CAN-binder/low-can-binding/binding/low-can-cb.hpp index 3975c679..0efde63a 100644 --- a/CAN-binder/low-can-binding/binding/low-can-cb.hpp +++ b/CAN-binder/low-can-binding/binding/low-can-cb.hpp @@ -41,7 +41,7 @@ private: /// Signal part std::shared_ptr can_signal_; - std::shared_ptr diagnostic_message_; + std::vector > diagnostic_message_; /// Filtering part struct event_filter_t event_filter_; @@ -50,9 +50,9 @@ private: public: low_can_subscription_t(); low_can_subscription_t(struct event_filter_t event_filter); - low_can_subscription_t(struct event_filter_t event_filter, std::shared_ptr diagnostic_message); low_can_subscription_t(const low_can_subscription_t& s) = delete; low_can_subscription_t(low_can_subscription_t&& s); + ~low_can_subscription_t(); low_can_subscription_t& operator=(const low_can_subscription_t& s); explicit operator bool() const; @@ -60,8 +60,11 @@ public: int get_index() const; struct afb_event& get_event(); const std::shared_ptr get_can_signal() const; - const std::shared_ptr get_diagnostic_message() const; + const std::shared_ptr get_diagnostic_message(uint32_t pid) const; + const std::vector > get_diagnostic_message() const; + const std::shared_ptr get_diagnostic_message(const std::string& name) const; const std::string get_name() const; + const std::string get_name(uint32_t pid) const; float get_frequency() const; float get_min() const; float get_max() const; @@ -72,6 +75,11 @@ public: void set_min(float min); void set_max(float max); + struct utils::simple_bcm_msg make_bcm_head(uint32_t can_id, uint32_t flags, const struct timeval& timeout, const struct timeval& frequency_thinning) const; + void add_bcm_frame(const struct can_frame& cfd, struct utils::simple_bcm_msg& bcm_msg) const; + int open_socket(); int create_rx_filter(); int create_rx_filter(std::shared_ptr sig); + int create_rx_filter(std::shared_ptr sig); + int create_rx_filter(utils::simple_bcm_msg& bcm_msg); }; diff --git a/CAN-binder/low-can-binding/binding/low-can-hat.hpp b/CAN-binder/low-can-binding/binding/low-can-hat.hpp index a1c8dad1..632dbc90 100644 --- a/CAN-binder/low-can-binding/binding/low-can-hat.hpp +++ b/CAN-binder/low-can-binding/binding/low-can-hat.hpp @@ -20,6 +20,7 @@ #include #include +#include #include extern "C" @@ -31,7 +32,10 @@ extern "C" struct afb_binding_interface; extern const struct afb_binding_interface *binder_interface; -void on_no_clients(const std::string& message); +class low_can_subscription_t; + +void on_no_clients(std::shared_ptr can_subscription); +void on_no_clients(std::shared_ptr can_subscription, uint32_t pid); int read_message(sd_event_source *s, int fd, uint32_t revents, void *userdata); void subscribe(struct afb_req request); -- cgit 1.2.3-korg