summaryrefslogtreecommitdiffstats
path: root/CAN-binder/low-can-binding/binding
diff options
context:
space:
mode:
authorRomain Forlot <romain.forlot@iot.bzh>2017-06-02 19:45:01 +0200
committerRomain Forlot <romain.forlot@iot.bzh>2017-06-02 19:45:01 +0200
commit4ab2164774bdb9a29b2f180a9013c26c0155628d (patch)
treeeb2e92eb8270c6556f58c1fa21d0159a76d4f8b9 /CAN-binder/low-can-binding/binding
parent7929a62962cff8bbb456bd0c3761dc68afc3d766 (diff)
Get diag manager's socket into subscription obj
Subscription index map is the socket ID which implies the following: - All diagnostic messages subscriptions are stored into a vector holding diagnostic_message and there is 1 socket for all like the diag manager did. - Reworked workflow to open a BCM socket and adding an RX filter more flexible. - Separated methods to handle on_no_clients event. - Cleaning diagnostic manager code to remove all unneeded stuff now. - Embed diagnostic response in VehicleMessage decoded message to be able transmits the PID in event push thread. This is needed by to correctly handle case when there is no clients subscribed to an AFB event. Else we can't find the diagnostic message to remove from low_can_subscription vector. Change-Id: Iab13fd556cda3c69827bcd67f3a23a03cb6a2cf2 Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
Diffstat (limited to 'CAN-binder/low-can-binding/binding')
-rw-r--r--CAN-binder/low-can-binding/binding/low-can-cb.cpp310
-rw-r--r--CAN-binder/low-can-binding/binding/low-can-cb.hpp14
-rw-r--r--CAN-binder/low-can-binding/binding/low-can-hat.hpp6
3 files changed, 226 insertions, 104 deletions
diff --git a/CAN-binder/low-can-binding/binding/low-can-cb.cpp b/CAN-binder/low-can-binding/binding/low-can-cb.cpp
index d4cc5a1..5b7561f 100644
--- a/CAN-binder/low-can-binding/binding/low-can-cb.cpp
+++ b/CAN-binder/low-can-binding/binding/low-can-cb.cpp
@@ -19,7 +19,6 @@
#include "low-can-hat.hpp"
#include <map>
-#include <memory>
#include <queue>
#include <mutex>
#include <vector>
@@ -59,12 +58,6 @@ low_can_subscription_t::low_can_subscription_t(struct event_filter_t event_filte
: event_filter_{event_filter}
{}
-low_can_subscription_t::low_can_subscription_t(struct event_filter_t event_filter, std::shared_ptr<diagnostic_message_t> diagnostic_message)
- : diagnostic_message_{diagnostic_message}, event_filter_{event_filter}
-{
- index_ = diagnostic_message->get_pid();
-}
-
low_can_subscription_t::low_can_subscription_t( low_can_subscription_t&& s)
: index_{s.index_},
event_filter_{s.event_filter_},
@@ -77,9 +70,14 @@ low_can_subscription_t::low_can_subscription_t( low_can_subscription_t&& s)
return *this;
}
+low_can_subscription_t::~low_can_subscription_t()
+{
+ socket_.close();
+}
+
low_can_subscription_t::operator bool() const
{
- return ((can_signal_ != nullptr || diagnostic_message_ != nullptr) && afb_event_is_valid(event_));
+ return ((can_signal_ != nullptr || ! diagnostic_message_.empty()) && afb_event_is_valid(event_));
}
struct afb_event& low_can_subscription_t::get_event()
@@ -97,17 +95,47 @@ const std::shared_ptr<can_signal_t> low_can_subscription_t::get_can_signal() con
return can_signal_;
}
-const std::shared_ptr<diagnostic_message_t> low_can_subscription_t::get_diagnostic_message() const
+const std::vector<std::shared_ptr<diagnostic_message_t> > low_can_subscription_t::get_diagnostic_message() const
{
return diagnostic_message_;
}
+const std::shared_ptr<diagnostic_message_t> low_can_subscription_t::get_diagnostic_message(uint32_t pid) const
+{
+ for(const auto& diag: diagnostic_message_)
+ {
+ if(diag->get_pid() == pid)
+ {
+ return diag;
+ }
+ }
+ return nullptr;
+}
+
+const std::shared_ptr<diagnostic_message_t> low_can_subscription_t::get_diagnostic_message(const std::string& name) const
+{
+ for(const auto& diag: diagnostic_message_)
+ {
+ if(diag->get_name() == name)
+ {
+ return diag;
+ }
+ }
+ return nullptr;
+}
+
const std::string low_can_subscription_t::get_name() const
{
if (can_signal_ != nullptr)
return can_signal_->get_name();
- if (diagnostic_message_ != nullptr)
- return diagnostic_message_->get_name() ;
+
+ return "";
+}
+
+const std::string low_can_subscription_t::get_name(uint32_t pid) const
+{
+ if (!diagnostic_message_.empty())
+ return get_diagnostic_message(pid)->get_name() ;
return "";
}
@@ -152,47 +180,75 @@ void low_can_subscription_t::set_max(float max)
event_filter_.max = max;
}
+int low_can_subscription_t::open_socket()
+{
+ int ret = 0;
+ if(! socket_)
+ {
+ if( can_signal_ != nullptr)
+ {ret = socket_.open(can_signal_->get_message()->get_bus_device_name());}
+ else if (! diagnostic_message_ .empty())
+ {ret = socket_.open(application_t::instance().get_diagnostic_manager().get_bus_device_name());}
+ index_ = (int)socket_.socket();
+ }
+ return ret;
+}
+
+struct utils::simple_bcm_msg low_can_subscription_t::make_bcm_head(uint32_t can_id, uint32_t flags, const struct timeval& timeout, const struct timeval& frequency_thinning) const
+{
+ struct utils::simple_bcm_msg bcm_msg;
+
+ memset(&bcm_msg, 0, sizeof(bcm_msg));
+
+ bcm_msg.msg_head.opcode = RX_SETUP;
+ bcm_msg.msg_head.can_id = can_id;
+ bcm_msg.msg_head.flags = flags;
+ bcm_msg.msg_head.ival1.tv_sec = timeout.tv_sec ;
+ bcm_msg.msg_head.ival1.tv_usec = timeout.tv_usec;
+ bcm_msg.msg_head.ival2.tv_sec = frequency_thinning.tv_sec ;
+ bcm_msg.msg_head.ival2.tv_usec = frequency_thinning.tv_usec;
+
+ return bcm_msg;
+}
+
+void low_can_subscription_t::add_bcm_frame(const struct can_frame& cfd, struct utils::simple_bcm_msg& bcm_msg) const
+{
+ for(int i=0; i < CAN_MAX_DLEN; i++)
+ {
+ if(cfd.data[i] != 0)
+ {
+ bcm_msg.msg_head.nframes = 1;
+ bcm_msg.frames = cfd;
+ return;
+ }
+ }
+}
+
/// @brief Create a RX_SETUP receive job used by the BCM socket.
///
/// @return 0 if ok else -1
-int low_can_subscription_t::create_rx_filter(std::shared_ptr<can_signal_t> sig)
+int low_can_subscription_t::create_rx_filter()
{
- can_signal_= sig;
- return create_rx_filter();
+ int ret = -1;
+ if ( can_signal_ != nullptr)
+ {ret = create_rx_filter(can_signal_);}
+ else if (! diagnostic_message_ .empty())
+ {ret = create_rx_filter(diagnostic_message_.front());}
+
+ return ret;
}
/// @brief Create a RX_SETUP receive job used by the BCM socket.
///
/// @return 0 if ok else -1
-int low_can_subscription_t::create_rx_filter()
+int low_can_subscription_t::create_rx_filter(std::shared_ptr<can_signal_t> sig)
{
- if (can_signal_ == nullptr)
- {return -1;}
-
- // Make sure that socket has been opened.
- if(! socket_)
- {
- socket_.open(can_signal_->get_message()->get_bus_device_name());
- index_ = (int)socket_.socket();
- }
+ can_signal_= sig;
- struct utils::simple_bcm_msg bcm_msg;
struct can_frame cfd;
-
memset(&cfd, 0, sizeof(cfd));
- memset(&bcm_msg.msg_head, 0, sizeof(bcm_msg.msg_head));
- float val = (float)(1 << can_signal_->get_bit_size()) - 1;
- struct timeval freq;
- frequency_clock_t f = std::isnan(event_filter_.frequency) ? can_signal_->get_frequency() : frequency_clock_t(event_filter_.frequency);
- freq = f.get_timeval_from_period();
-
- bcm_msg.msg_head.opcode = RX_SETUP;
- bcm_msg.msg_head.can_id = can_signal_->get_message()->get_id();
- bcm_msg.msg_head.flags = SETTIMER|RX_NO_AUTOTIMER;
- bcm_msg.msg_head.ival2.tv_sec = freq.tv_sec ;
- bcm_msg.msg_head.ival2.tv_usec = freq.tv_usec;
- bcm_msg.msg_head.nframes = 1;
+ float val = (float)(1 << can_signal_->get_bit_size()) - 1;
bitfield_encode_float(val,
can_signal_->get_bit_position(),
can_signal_->get_bit_size(),
@@ -201,11 +257,61 @@ int low_can_subscription_t::create_rx_filter()
cfd.data,
CAN_MAX_DLEN);
- bcm_msg.frames = cfd;
+ struct timeval freq, timeout = {0, 0};
+ frequency_clock_t f = std::isnan(event_filter_.frequency) ? can_signal_->get_frequency() : frequency_clock_t(event_filter_.frequency);
+ freq = f.get_timeval_from_period();
+
+ utils::simple_bcm_msg bcm_msg = make_bcm_head(can_signal_->get_message()->get_id(), SETTIMER|RX_NO_AUTOTIMER, timeout, freq);
+ add_bcm_frame(cfd, bcm_msg);
+
+ return create_rx_filter(bcm_msg);
+}
+
+/// @brief Create a RX_SETUP receive job used by the BCM socket.
+///
+/// @return 0 if ok else -1
+int low_can_subscription_t::create_rx_filter(std::shared_ptr<diagnostic_message_t> sig)
+{
+ diagnostic_message_.push_back(sig);
+
+ struct timeval freq = frequency_clock_t(event_filter_.frequency).get_timeval_from_period();
+ //struct timeval timeout = frequency_clock_t(10).get_timeval_from_period();
+ struct timeval timeout = {0,0};
+
+ utils::simple_bcm_msg bcm_msg = make_bcm_head(OBD2_FUNCTIONAL_BROADCAST_ID, SETTIMER|RX_NO_AUTOTIMER|RX_FILTER_ID, timeout, freq);
+ return create_rx_filter(bcm_msg);
+}
+
+/// @brief Create a RX_SETUP receive job used by the BCM socket.
+///
+/// @return 0 if ok else -1
+int low_can_subscription_t::create_rx_filter(utils::simple_bcm_msg& bcm_msg)
+{
+ // Make sure that socket has been opened.
+ if(open_socket() < 0)
+ {return -1;}
- if(socket_ << bcm_msg)
- return 0;
- return -1;
+ // If it isn't an OBD2 CAN ID then just add a simple RX_SETUP job
+ // else monitor all standard 8 CAN OBD2 ID response.
+ if(bcm_msg.msg_head.can_id != OBD2_FUNCTIONAL_BROADCAST_ID)
+ {
+ socket_ << bcm_msg;
+ if(! socket_)
+ return -1;
+ }
+ else
+ {
+ for(uint8_t i = 0; i < 8; i++)
+ {
+ bcm_msg.msg_head.can_id = OBD2_FUNCTIONAL_RESPONSE_START + i;
+
+ socket_ << bcm_msg;
+ if(! socket_)
+ return -1;
+ }
+ }
+
+ return 0;
}
///******************************************************************************
@@ -214,66 +320,56 @@ int low_can_subscription_t::create_rx_filter()
///
///*******************************************************************************/
-void on_no_clients(const std::string& message)
+void on_no_clients(std::shared_ptr<low_can_subscription_t> can_subscription, uint32_t pid)
{
- DiagnosticRequest* diag_req = application_t::instance().get_request_from_diagnostic_message(message);
- if(diag_req != nullptr)
+ if( ! can_subscription->get_diagnostic_message().empty() && can_subscription->get_diagnostic_message(pid) != nullptr)
{
+ DiagnosticRequest diag_req = can_subscription->get_diagnostic_message(pid)->build_diagnostic_request();
active_diagnostic_request_t* adr = application_t::instance().get_diagnostic_manager().find_recurring_request(diag_req);
if( adr != nullptr)
application_t::instance().get_diagnostic_manager().cleanup_request(adr, true);
}
- delete diag_req;
- diag_req = nullptr;
+
+ on_no_clients(can_subscription);
+}
+
+void on_no_clients(std::shared_ptr<low_can_subscription_t> can_subscription)
+{
+ utils::signals_manager_t& sm = utils::signals_manager_t::instance();
+ std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex());
+ std::map<int, std::shared_ptr<low_can_subscription_t> >& s = sm.get_subscribed_signals();
+ auto it = s.find(can_subscription->get_index());
+ s.erase(it);
}
static void push_n_notify(const can_message_t& cm)
{
can_bus_t& cbm = application_t::instance().get_can_bus_manager();
- std::lock_guard<std::mutex> can_message_lock(cbm.get_can_message_mutex());
- { cbm.push_new_can_message(cm); }
+ {
+ std::lock_guard<std::mutex> can_message_lock(cbm.get_can_message_mutex());
+ cbm.push_new_can_message(cm);
+ }
cbm.get_new_can_message_cv().notify_one();
}
-int read_message(sd_event_source *s, int fd, uint32_t revents, void *userdata)
+int read_message(sd_event_source *event_source, int fd, uint32_t revents, void *userdata)
{
- can_message_t cm;
- low_can_subscription_t* can_subscription;
- diagnostic_manager_t& diag_m = application_t::instance().get_diagnostic_manager();
-
- if(userdata != nullptr)
+ low_can_subscription_t* can_subscription = (low_can_subscription_t*)userdata;
+ if ((revents & EPOLLIN) != 0)
{
- can_subscription = (low_can_subscription_t*)userdata;
+ can_message_t cm;
utils::socketcan_bcm_t& s = can_subscription->get_socket();
s >> cm;
- }
- else
- {
- utils::socketcan_bcm_t& s = diag_m.get_socket();
- s >> cm;
- }
- push_n_notify(cm);
+ push_n_notify(cm);
+ }
/* check if error or hangup */
if ((revents & (EPOLLERR|EPOLLRDHUP|EPOLLHUP)) != 0)
{
- sd_event_source_unref(s);
- if(userdata != nullptr)
- {
- can_subscription->get_socket().close();
- can_subscription->create_rx_filter();
- NOTICE(binder_interface, "%s: Recreated RX_SETUP BCM job for can_subscription: %s", __FUNCTION__, can_subscription->get_name().c_str());
- }
- else
- {
- diag_m.get_socket().close();
- diag_m.cleanup_active_requests(true);
- ERROR(binder_interface, "%s: Error on diagnostic manager socket, cancelling active requests.", __FUNCTION__);
- }
- return -1;
+ sd_event_source_unref(event_source);
+ can_subscription->get_socket().close();
}
-
return 0;
}
@@ -338,7 +434,18 @@ static int subscribe_unsubscribe_signal(struct afb_req request, bool subscribe,
return make_subscription_unsubscription(request, can_subscription, s, subscribe);
}
-int subscribe_unsubscribe_diagnostic_messages(struct afb_req request, bool subscribe, std::vector<std::shared_ptr<diagnostic_message_t> > diagnostic_messages, struct event_filter_t& event_filter, std::map<int, std::shared_ptr<low_can_subscription_t> >& s)
+static int add_to_event_loop(std::shared_ptr<low_can_subscription_t>& can_subscription)
+{
+ struct sd_event_source* event_source = nullptr;
+ return ( sd_event_add_io(afb_daemon_get_event_loop(binder_interface->daemon),
+ &event_source,
+ can_subscription->get_socket().socket(),
+ EPOLLIN,
+ read_message,
+ can_subscription.get()));
+}
+
+static int subscribe_unsubscribe_diagnostic_messages(struct afb_req request, bool subscribe, std::vector<std::shared_ptr<diagnostic_message_t> > diagnostic_messages, struct event_filter_t& event_filter, std::map<int, std::shared_ptr<low_can_subscription_t> >& s)
{
int rets = 0;
application_t& app = application_t::instance();
@@ -346,52 +453,55 @@ int subscribe_unsubscribe_diagnostic_messages(struct afb_req request, bool subsc
for(const auto& sig : diagnostic_messages)
{
- DiagnosticRequest* diag_req = app.get_request_from_diagnostic_message(sig->get_name());
- float frequency = std::isnan(event_filter.frequency) ? sig->get_frequency() : event_filter.frequency;
+ DiagnosticRequest* diag_req = new DiagnosticRequest(sig->build_diagnostic_request());
+ event_filter.frequency = std::isnan(event_filter.frequency) ? sig->get_frequency() : event_filter.frequency;
std::shared_ptr<low_can_subscription_t> can_subscription;
+ auto it = std::find_if(s.begin(), s.end(), [&sig](std::pair<int, std::shared_ptr<low_can_subscription_t> > sub){ return (! sub.second->get_diagnostic_message().empty());});
+ can_subscription = it != s.end() ?
+ it->second :
+ std::make_shared<low_can_subscription_t>(low_can_subscription_t(event_filter));
// If the requested diagnostic message isn't supported by the car then unsubcribe it
// no matter what we want, worse case will be a fail unsubscription but at least we don't
// poll a PID for nothing.
//TODO: Adding callback requesting ignition status: diag_req, sig.c_str(), false, diagnostic_message_t::decode_obd2_response, diagnostic_message_t::check_ignition_status, frequency);
if(sig->get_supported() && subscribe)
{
- if (s.find(sig->get_pid()) != s.end())
- {
- can_subscription = s[sig->get_pid()];
- DEBUG(binder_interface, "%s: Signal: %s already subscribed. Adding a new subscription", __FUNCTION__, sig->get_name().c_str());
- }
- else
+ diag_m.add_recurring_request(diag_req, sig->get_name().c_str(), false, sig->get_decoder(), sig->get_callback(), event_filter.frequency);
+ if(can_subscription->create_rx_filter(sig) < 0)
+ {return -1;}
+ DEBUG(binder_interface, "%s: Signal: %s subscribed", __FUNCTION__, sig->get_name().c_str());
+ if(it == s.end() && add_to_event_loop(can_subscription) < 0)
{
- diag_m.add_recurring_request(diag_req, sig->get_name().c_str(), false, sig->get_decoder(), sig->get_callback(), frequency);
- can_subscription = std::make_shared<low_can_subscription_t>(low_can_subscription_t(event_filter, sig));
- DEBUG(binder_interface, "%s: Signal: %s subscribed", __FUNCTION__, sig->get_name().c_str());
+ diag_m.cleanup_request(
+ diag_m.find_recurring_request(*diag_req), true);
+ WARNING(binder_interface, "%s: signal: %s isn't supported. Canceling operation.", __FUNCTION__, sig->get_name().c_str());
+ return -1;
}
}
else
{
diag_m.cleanup_request(
- diag_m.find_recurring_request(diag_req), true);
- delete diag_req;
- diag_req = nullptr;
+ diag_m.find_recurring_request(*diag_req), true);
if(sig->get_supported())
+ {DEBUG(binder_interface, "%s: %s cancelled due to unsubscribe", __FUNCTION__, sig->get_name().c_str());}
+ else
{
- DEBUG(binder_interface, "%s: %s cancelled due to unsubscribe", __FUNCTION__, sig->get_name().c_str());
- return 0;
+ WARNING(binder_interface, "%s: signal: %s isn't supported. Canceling operation.", __FUNCTION__, sig->get_name().c_str());
+ return -1;
}
- WARNING(binder_interface, "%s: signal: %s isn't supported. Canceling operation.", __FUNCTION__, sig->get_name().c_str());
- return -1;
}
int ret = subscribe_unsubscribe_signal(request, subscribe, can_subscription, s);
if(ret < 0)
return ret;
+
rets++;
}
return rets;
}
-int subscribe_unsubscribe_can_signals(struct afb_req request, bool subscribe, std::vector<std::shared_ptr<can_signal_t> > can_signals, struct event_filter_t& event_filter, std::map<int, std::shared_ptr<low_can_subscription_t> >& s)
+static int subscribe_unsubscribe_can_signals(struct afb_req request, bool subscribe, std::vector<std::shared_ptr<can_signal_t> > can_signals, struct event_filter_t& event_filter, std::map<int, std::shared_ptr<low_can_subscription_t> >& s)
{
int rets = 0;
for(const auto& sig: can_signals)
@@ -412,8 +522,8 @@ int subscribe_unsubscribe_can_signals(struct afb_req request, bool subscribe, st
if(subscribe_unsubscribe_signal(request, subscribe, can_subscription, s) < 0)
{return -1;}
- struct sd_event_source* e_source;
- sd_event_add_io(afb_daemon_get_event_loop(binder_interface->daemon), &e_source, can_subscription->get_socket().socket(), EPOLLIN, read_message, can_subscription.get());
+ if(add_to_event_loop(can_subscription) < 0)
+ {return -1;}
rets++;
DEBUG(binder_interface, "%s: signal: %s subscribed", __FUNCTION__, sig->get_name().c_str());
}
diff --git a/CAN-binder/low-can-binding/binding/low-can-cb.hpp b/CAN-binder/low-can-binding/binding/low-can-cb.hpp
index 3975c67..0efde63 100644
--- a/CAN-binder/low-can-binding/binding/low-can-cb.hpp
+++ b/CAN-binder/low-can-binding/binding/low-can-cb.hpp
@@ -41,7 +41,7 @@ private:
/// Signal part
std::shared_ptr<can_signal_t> can_signal_;
- std::shared_ptr<diagnostic_message_t> diagnostic_message_;
+ std::vector<std::shared_ptr<diagnostic_message_t> > diagnostic_message_;
/// Filtering part
struct event_filter_t event_filter_;
@@ -50,9 +50,9 @@ private:
public:
low_can_subscription_t();
low_can_subscription_t(struct event_filter_t event_filter);
- low_can_subscription_t(struct event_filter_t event_filter, std::shared_ptr<diagnostic_message_t> diagnostic_message);
low_can_subscription_t(const low_can_subscription_t& s) = delete;
low_can_subscription_t(low_can_subscription_t&& s);
+ ~low_can_subscription_t();
low_can_subscription_t& operator=(const low_can_subscription_t& s);
explicit operator bool() const;
@@ -60,8 +60,11 @@ public:
int get_index() const;
struct afb_event& get_event();
const std::shared_ptr<can_signal_t> get_can_signal() const;
- const std::shared_ptr<diagnostic_message_t> get_diagnostic_message() const;
+ const std::shared_ptr<diagnostic_message_t> get_diagnostic_message(uint32_t pid) const;
+ const std::vector<std::shared_ptr<diagnostic_message_t> > get_diagnostic_message() const;
+ const std::shared_ptr<diagnostic_message_t> get_diagnostic_message(const std::string& name) const;
const std::string get_name() const;
+ const std::string get_name(uint32_t pid) const;
float get_frequency() const;
float get_min() const;
float get_max() const;
@@ -72,6 +75,11 @@ public:
void set_min(float min);
void set_max(float max);
+ struct utils::simple_bcm_msg make_bcm_head(uint32_t can_id, uint32_t flags, const struct timeval& timeout, const struct timeval& frequency_thinning) const;
+ void add_bcm_frame(const struct can_frame& cfd, struct utils::simple_bcm_msg& bcm_msg) const;
+ int open_socket();
int create_rx_filter();
int create_rx_filter(std::shared_ptr<can_signal_t> sig);
+ int create_rx_filter(std::shared_ptr<diagnostic_message_t> sig);
+ int create_rx_filter(utils::simple_bcm_msg& bcm_msg);
};
diff --git a/CAN-binder/low-can-binding/binding/low-can-hat.hpp b/CAN-binder/low-can-binding/binding/low-can-hat.hpp
index a1c8dad..632dbc9 100644
--- a/CAN-binder/low-can-binding/binding/low-can-hat.hpp
+++ b/CAN-binder/low-can-binding/binding/low-can-hat.hpp
@@ -20,6 +20,7 @@
#include <cstddef>
#include <string>
+#include <memory>
#include <systemd/sd-event.h>
extern "C"
@@ -31,7 +32,10 @@ extern "C" struct afb_binding_interface;
extern const struct afb_binding_interface *binder_interface;
-void on_no_clients(const std::string& message);
+class low_can_subscription_t;
+
+void on_no_clients(std::shared_ptr<low_can_subscription_t> can_subscription);
+void on_no_clients(std::shared_ptr<low_can_subscription_t> can_subscription, uint32_t pid);
int read_message(sd_event_source *s, int fd, uint32_t revents, void *userdata);
void subscribe(struct afb_req request);