diff options
author | Romain Forlot <romain.forlot@iot.bzh> | 2017-06-02 19:45:01 +0200 |
---|---|---|
committer | Romain Forlot <romain.forlot@iot.bzh> | 2017-06-02 19:45:01 +0200 |
commit | 4ab2164774bdb9a29b2f180a9013c26c0155628d (patch) | |
tree | eb2e92eb8270c6556f58c1fa21d0159a76d4f8b9 /CAN-binder/low-can-binding/binding/low-can-cb.cpp | |
parent | 7929a62962cff8bbb456bd0c3761dc68afc3d766 (diff) |
Get diag manager's socket into subscription obj
Subscription index map is the socket ID which implies the following:
- All diagnostic messages subscriptions are stored into a vector
holding diagnostic_message and there is 1 socket for all like the
diag manager did.
- Reworked workflow to open a BCM socket and adding an RX filter more flexible.
- Separated methods to handle on_no_clients event.
- Cleaning diagnostic manager code to remove all unneeded stuff now.
- Embed diagnostic response in VehicleMessage decoded message to be able
transmits the PID in event push thread. This is needed by to correctly handle case
when there is no clients subscribed to an AFB event. Else we can't find
the diagnostic message to remove from low_can_subscription vector.
Change-Id: Iab13fd556cda3c69827bcd67f3a23a03cb6a2cf2
Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
Diffstat (limited to 'CAN-binder/low-can-binding/binding/low-can-cb.cpp')
-rw-r--r-- | CAN-binder/low-can-binding/binding/low-can-cb.cpp | 310 |
1 files changed, 210 insertions, 100 deletions
diff --git a/CAN-binder/low-can-binding/binding/low-can-cb.cpp b/CAN-binder/low-can-binding/binding/low-can-cb.cpp index d4cc5a11..5b7561f8 100644 --- a/CAN-binder/low-can-binding/binding/low-can-cb.cpp +++ b/CAN-binder/low-can-binding/binding/low-can-cb.cpp @@ -19,7 +19,6 @@ #include "low-can-hat.hpp" #include <map> -#include <memory> #include <queue> #include <mutex> #include <vector> @@ -59,12 +58,6 @@ low_can_subscription_t::low_can_subscription_t(struct event_filter_t event_filte : event_filter_{event_filter} {} -low_can_subscription_t::low_can_subscription_t(struct event_filter_t event_filter, std::shared_ptr<diagnostic_message_t> diagnostic_message) - : diagnostic_message_{diagnostic_message}, event_filter_{event_filter} -{ - index_ = diagnostic_message->get_pid(); -} - low_can_subscription_t::low_can_subscription_t( low_can_subscription_t&& s) : index_{s.index_}, event_filter_{s.event_filter_}, @@ -77,9 +70,14 @@ low_can_subscription_t::low_can_subscription_t( low_can_subscription_t&& s) return *this; } +low_can_subscription_t::~low_can_subscription_t() +{ + socket_.close(); +} + low_can_subscription_t::operator bool() const { - return ((can_signal_ != nullptr || diagnostic_message_ != nullptr) && afb_event_is_valid(event_)); + return ((can_signal_ != nullptr || ! diagnostic_message_.empty()) && afb_event_is_valid(event_)); } struct afb_event& low_can_subscription_t::get_event() @@ -97,17 +95,47 @@ const std::shared_ptr<can_signal_t> low_can_subscription_t::get_can_signal() con return can_signal_; } -const std::shared_ptr<diagnostic_message_t> low_can_subscription_t::get_diagnostic_message() const +const std::vector<std::shared_ptr<diagnostic_message_t> > low_can_subscription_t::get_diagnostic_message() const { return diagnostic_message_; } +const std::shared_ptr<diagnostic_message_t> low_can_subscription_t::get_diagnostic_message(uint32_t pid) const +{ + for(const auto& diag: diagnostic_message_) + { + if(diag->get_pid() == pid) + { + return diag; + } + } + return nullptr; +} + +const std::shared_ptr<diagnostic_message_t> low_can_subscription_t::get_diagnostic_message(const std::string& name) const +{ + for(const auto& diag: diagnostic_message_) + { + if(diag->get_name() == name) + { + return diag; + } + } + return nullptr; +} + const std::string low_can_subscription_t::get_name() const { if (can_signal_ != nullptr) return can_signal_->get_name(); - if (diagnostic_message_ != nullptr) - return diagnostic_message_->get_name() ; + + return ""; +} + +const std::string low_can_subscription_t::get_name(uint32_t pid) const +{ + if (!diagnostic_message_.empty()) + return get_diagnostic_message(pid)->get_name() ; return ""; } @@ -152,47 +180,75 @@ void low_can_subscription_t::set_max(float max) event_filter_.max = max; } +int low_can_subscription_t::open_socket() +{ + int ret = 0; + if(! socket_) + { + if( can_signal_ != nullptr) + {ret = socket_.open(can_signal_->get_message()->get_bus_device_name());} + else if (! diagnostic_message_ .empty()) + {ret = socket_.open(application_t::instance().get_diagnostic_manager().get_bus_device_name());} + index_ = (int)socket_.socket(); + } + return ret; +} + +struct utils::simple_bcm_msg low_can_subscription_t::make_bcm_head(uint32_t can_id, uint32_t flags, const struct timeval& timeout, const struct timeval& frequency_thinning) const +{ + struct utils::simple_bcm_msg bcm_msg; + + memset(&bcm_msg, 0, sizeof(bcm_msg)); + + bcm_msg.msg_head.opcode = RX_SETUP; + bcm_msg.msg_head.can_id = can_id; + bcm_msg.msg_head.flags = flags; + bcm_msg.msg_head.ival1.tv_sec = timeout.tv_sec ; + bcm_msg.msg_head.ival1.tv_usec = timeout.tv_usec; + bcm_msg.msg_head.ival2.tv_sec = frequency_thinning.tv_sec ; + bcm_msg.msg_head.ival2.tv_usec = frequency_thinning.tv_usec; + + return bcm_msg; +} + +void low_can_subscription_t::add_bcm_frame(const struct can_frame& cfd, struct utils::simple_bcm_msg& bcm_msg) const +{ + for(int i=0; i < CAN_MAX_DLEN; i++) + { + if(cfd.data[i] != 0) + { + bcm_msg.msg_head.nframes = 1; + bcm_msg.frames = cfd; + return; + } + } +} + /// @brief Create a RX_SETUP receive job used by the BCM socket. /// /// @return 0 if ok else -1 -int low_can_subscription_t::create_rx_filter(std::shared_ptr<can_signal_t> sig) +int low_can_subscription_t::create_rx_filter() { - can_signal_= sig; - return create_rx_filter(); + int ret = -1; + if ( can_signal_ != nullptr) + {ret = create_rx_filter(can_signal_);} + else if (! diagnostic_message_ .empty()) + {ret = create_rx_filter(diagnostic_message_.front());} + + return ret; } /// @brief Create a RX_SETUP receive job used by the BCM socket. /// /// @return 0 if ok else -1 -int low_can_subscription_t::create_rx_filter() +int low_can_subscription_t::create_rx_filter(std::shared_ptr<can_signal_t> sig) { - if (can_signal_ == nullptr) - {return -1;} - - // Make sure that socket has been opened. - if(! socket_) - { - socket_.open(can_signal_->get_message()->get_bus_device_name()); - index_ = (int)socket_.socket(); - } + can_signal_= sig; - struct utils::simple_bcm_msg bcm_msg; struct can_frame cfd; - memset(&cfd, 0, sizeof(cfd)); - memset(&bcm_msg.msg_head, 0, sizeof(bcm_msg.msg_head)); - float val = (float)(1 << can_signal_->get_bit_size()) - 1; - struct timeval freq; - frequency_clock_t f = std::isnan(event_filter_.frequency) ? can_signal_->get_frequency() : frequency_clock_t(event_filter_.frequency); - freq = f.get_timeval_from_period(); - - bcm_msg.msg_head.opcode = RX_SETUP; - bcm_msg.msg_head.can_id = can_signal_->get_message()->get_id(); - bcm_msg.msg_head.flags = SETTIMER|RX_NO_AUTOTIMER; - bcm_msg.msg_head.ival2.tv_sec = freq.tv_sec ; - bcm_msg.msg_head.ival2.tv_usec = freq.tv_usec; - bcm_msg.msg_head.nframes = 1; + float val = (float)(1 << can_signal_->get_bit_size()) - 1; bitfield_encode_float(val, can_signal_->get_bit_position(), can_signal_->get_bit_size(), @@ -201,11 +257,61 @@ int low_can_subscription_t::create_rx_filter() cfd.data, CAN_MAX_DLEN); - bcm_msg.frames = cfd; + struct timeval freq, timeout = {0, 0}; + frequency_clock_t f = std::isnan(event_filter_.frequency) ? can_signal_->get_frequency() : frequency_clock_t(event_filter_.frequency); + freq = f.get_timeval_from_period(); + + utils::simple_bcm_msg bcm_msg = make_bcm_head(can_signal_->get_message()->get_id(), SETTIMER|RX_NO_AUTOTIMER, timeout, freq); + add_bcm_frame(cfd, bcm_msg); + + return create_rx_filter(bcm_msg); +} + +/// @brief Create a RX_SETUP receive job used by the BCM socket. +/// +/// @return 0 if ok else -1 +int low_can_subscription_t::create_rx_filter(std::shared_ptr<diagnostic_message_t> sig) +{ + diagnostic_message_.push_back(sig); + + struct timeval freq = frequency_clock_t(event_filter_.frequency).get_timeval_from_period(); + //struct timeval timeout = frequency_clock_t(10).get_timeval_from_period(); + struct timeval timeout = {0,0}; + + utils::simple_bcm_msg bcm_msg = make_bcm_head(OBD2_FUNCTIONAL_BROADCAST_ID, SETTIMER|RX_NO_AUTOTIMER|RX_FILTER_ID, timeout, freq); + return create_rx_filter(bcm_msg); +} + +/// @brief Create a RX_SETUP receive job used by the BCM socket. +/// +/// @return 0 if ok else -1 +int low_can_subscription_t::create_rx_filter(utils::simple_bcm_msg& bcm_msg) +{ + // Make sure that socket has been opened. + if(open_socket() < 0) + {return -1;} - if(socket_ << bcm_msg) - return 0; - return -1; + // If it isn't an OBD2 CAN ID then just add a simple RX_SETUP job + // else monitor all standard 8 CAN OBD2 ID response. + if(bcm_msg.msg_head.can_id != OBD2_FUNCTIONAL_BROADCAST_ID) + { + socket_ << bcm_msg; + if(! socket_) + return -1; + } + else + { + for(uint8_t i = 0; i < 8; i++) + { + bcm_msg.msg_head.can_id = OBD2_FUNCTIONAL_RESPONSE_START + i; + + socket_ << bcm_msg; + if(! socket_) + return -1; + } + } + + return 0; } ///****************************************************************************** @@ -214,66 +320,56 @@ int low_can_subscription_t::create_rx_filter() /// ///*******************************************************************************/ -void on_no_clients(const std::string& message) +void on_no_clients(std::shared_ptr<low_can_subscription_t> can_subscription, uint32_t pid) { - DiagnosticRequest* diag_req = application_t::instance().get_request_from_diagnostic_message(message); - if(diag_req != nullptr) + if( ! can_subscription->get_diagnostic_message().empty() && can_subscription->get_diagnostic_message(pid) != nullptr) { + DiagnosticRequest diag_req = can_subscription->get_diagnostic_message(pid)->build_diagnostic_request(); active_diagnostic_request_t* adr = application_t::instance().get_diagnostic_manager().find_recurring_request(diag_req); if( adr != nullptr) application_t::instance().get_diagnostic_manager().cleanup_request(adr, true); } - delete diag_req; - diag_req = nullptr; + + on_no_clients(can_subscription); +} + +void on_no_clients(std::shared_ptr<low_can_subscription_t> can_subscription) +{ + utils::signals_manager_t& sm = utils::signals_manager_t::instance(); + std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex()); + std::map<int, std::shared_ptr<low_can_subscription_t> >& s = sm.get_subscribed_signals(); + auto it = s.find(can_subscription->get_index()); + s.erase(it); } static void push_n_notify(const can_message_t& cm) { can_bus_t& cbm = application_t::instance().get_can_bus_manager(); - std::lock_guard<std::mutex> can_message_lock(cbm.get_can_message_mutex()); - { cbm.push_new_can_message(cm); } + { + std::lock_guard<std::mutex> can_message_lock(cbm.get_can_message_mutex()); + cbm.push_new_can_message(cm); + } cbm.get_new_can_message_cv().notify_one(); } -int read_message(sd_event_source *s, int fd, uint32_t revents, void *userdata) +int read_message(sd_event_source *event_source, int fd, uint32_t revents, void *userdata) { - can_message_t cm; - low_can_subscription_t* can_subscription; - diagnostic_manager_t& diag_m = application_t::instance().get_diagnostic_manager(); - - if(userdata != nullptr) + low_can_subscription_t* can_subscription = (low_can_subscription_t*)userdata; + if ((revents & EPOLLIN) != 0) { - can_subscription = (low_can_subscription_t*)userdata; + can_message_t cm; utils::socketcan_bcm_t& s = can_subscription->get_socket(); s >> cm; - } - else - { - utils::socketcan_bcm_t& s = diag_m.get_socket(); - s >> cm; - } - push_n_notify(cm); + push_n_notify(cm); + } /* check if error or hangup */ if ((revents & (EPOLLERR|EPOLLRDHUP|EPOLLHUP)) != 0) { - sd_event_source_unref(s); - if(userdata != nullptr) - { - can_subscription->get_socket().close(); - can_subscription->create_rx_filter(); - NOTICE(binder_interface, "%s: Recreated RX_SETUP BCM job for can_subscription: %s", __FUNCTION__, can_subscription->get_name().c_str()); - } - else - { - diag_m.get_socket().close(); - diag_m.cleanup_active_requests(true); - ERROR(binder_interface, "%s: Error on diagnostic manager socket, cancelling active requests.", __FUNCTION__); - } - return -1; + sd_event_source_unref(event_source); + can_subscription->get_socket().close(); } - return 0; } @@ -338,7 +434,18 @@ static int subscribe_unsubscribe_signal(struct afb_req request, bool subscribe, return make_subscription_unsubscription(request, can_subscription, s, subscribe); } -int subscribe_unsubscribe_diagnostic_messages(struct afb_req request, bool subscribe, std::vector<std::shared_ptr<diagnostic_message_t> > diagnostic_messages, struct event_filter_t& event_filter, std::map<int, std::shared_ptr<low_can_subscription_t> >& s) +static int add_to_event_loop(std::shared_ptr<low_can_subscription_t>& can_subscription) +{ + struct sd_event_source* event_source = nullptr; + return ( sd_event_add_io(afb_daemon_get_event_loop(binder_interface->daemon), + &event_source, + can_subscription->get_socket().socket(), + EPOLLIN, + read_message, + can_subscription.get())); +} + +static int subscribe_unsubscribe_diagnostic_messages(struct afb_req request, bool subscribe, std::vector<std::shared_ptr<diagnostic_message_t> > diagnostic_messages, struct event_filter_t& event_filter, std::map<int, std::shared_ptr<low_can_subscription_t> >& s) { int rets = 0; application_t& app = application_t::instance(); @@ -346,52 +453,55 @@ int subscribe_unsubscribe_diagnostic_messages(struct afb_req request, bool subsc for(const auto& sig : diagnostic_messages) { - DiagnosticRequest* diag_req = app.get_request_from_diagnostic_message(sig->get_name()); - float frequency = std::isnan(event_filter.frequency) ? sig->get_frequency() : event_filter.frequency; + DiagnosticRequest* diag_req = new DiagnosticRequest(sig->build_diagnostic_request()); + event_filter.frequency = std::isnan(event_filter.frequency) ? sig->get_frequency() : event_filter.frequency; std::shared_ptr<low_can_subscription_t> can_subscription; + auto it = std::find_if(s.begin(), s.end(), [&sig](std::pair<int, std::shared_ptr<low_can_subscription_t> > sub){ return (! sub.second->get_diagnostic_message().empty());}); + can_subscription = it != s.end() ? + it->second : + std::make_shared<low_can_subscription_t>(low_can_subscription_t(event_filter)); // If the requested diagnostic message isn't supported by the car then unsubcribe it // no matter what we want, worse case will be a fail unsubscription but at least we don't // poll a PID for nothing. //TODO: Adding callback requesting ignition status: diag_req, sig.c_str(), false, diagnostic_message_t::decode_obd2_response, diagnostic_message_t::check_ignition_status, frequency); if(sig->get_supported() && subscribe) { - if (s.find(sig->get_pid()) != s.end()) - { - can_subscription = s[sig->get_pid()]; - DEBUG(binder_interface, "%s: Signal: %s already subscribed. Adding a new subscription", __FUNCTION__, sig->get_name().c_str()); - } - else + diag_m.add_recurring_request(diag_req, sig->get_name().c_str(), false, sig->get_decoder(), sig->get_callback(), event_filter.frequency); + if(can_subscription->create_rx_filter(sig) < 0) + {return -1;} + DEBUG(binder_interface, "%s: Signal: %s subscribed", __FUNCTION__, sig->get_name().c_str()); + if(it == s.end() && add_to_event_loop(can_subscription) < 0) { - diag_m.add_recurring_request(diag_req, sig->get_name().c_str(), false, sig->get_decoder(), sig->get_callback(), frequency); - can_subscription = std::make_shared<low_can_subscription_t>(low_can_subscription_t(event_filter, sig)); - DEBUG(binder_interface, "%s: Signal: %s subscribed", __FUNCTION__, sig->get_name().c_str()); + diag_m.cleanup_request( + diag_m.find_recurring_request(*diag_req), true); + WARNING(binder_interface, "%s: signal: %s isn't supported. Canceling operation.", __FUNCTION__, sig->get_name().c_str()); + return -1; } } else { diag_m.cleanup_request( - diag_m.find_recurring_request(diag_req), true); - delete diag_req; - diag_req = nullptr; + diag_m.find_recurring_request(*diag_req), true); if(sig->get_supported()) + {DEBUG(binder_interface, "%s: %s cancelled due to unsubscribe", __FUNCTION__, sig->get_name().c_str());} + else { - DEBUG(binder_interface, "%s: %s cancelled due to unsubscribe", __FUNCTION__, sig->get_name().c_str()); - return 0; + WARNING(binder_interface, "%s: signal: %s isn't supported. Canceling operation.", __FUNCTION__, sig->get_name().c_str()); + return -1; } - WARNING(binder_interface, "%s: signal: %s isn't supported. Canceling operation.", __FUNCTION__, sig->get_name().c_str()); - return -1; } int ret = subscribe_unsubscribe_signal(request, subscribe, can_subscription, s); if(ret < 0) return ret; + rets++; } return rets; } -int subscribe_unsubscribe_can_signals(struct afb_req request, bool subscribe, std::vector<std::shared_ptr<can_signal_t> > can_signals, struct event_filter_t& event_filter, std::map<int, std::shared_ptr<low_can_subscription_t> >& s) +static int subscribe_unsubscribe_can_signals(struct afb_req request, bool subscribe, std::vector<std::shared_ptr<can_signal_t> > can_signals, struct event_filter_t& event_filter, std::map<int, std::shared_ptr<low_can_subscription_t> >& s) { int rets = 0; for(const auto& sig: can_signals) @@ -412,8 +522,8 @@ int subscribe_unsubscribe_can_signals(struct afb_req request, bool subscribe, st if(subscribe_unsubscribe_signal(request, subscribe, can_subscription, s) < 0) {return -1;} - struct sd_event_source* e_source; - sd_event_add_io(afb_daemon_get_event_loop(binder_interface->daemon), &e_source, can_subscription->get_socket().socket(), EPOLLIN, read_message, can_subscription.get()); + if(add_to_event_loop(can_subscription) < 0) + {return -1;} rets++; DEBUG(binder_interface, "%s: signal: %s subscribed", __FUNCTION__, sig->get_name().c_str()); } |