diff options
Diffstat (limited to 'lib/sender.c')
-rw-r--r-- | lib/sender.c | 42 |
1 files changed, 34 insertions, 8 deletions
diff --git a/lib/sender.c b/lib/sender.c index 7d2cd04..e795bc4 100644 --- a/lib/sender.c +++ b/lib/sender.c @@ -11,6 +11,7 @@ #include <unistd.h> #include <sys/socket.h> #include <sys/un.h> +#include <sys/epoll.h> #include <string.h> #include <errno.h> #include <assert.h> @@ -37,6 +38,7 @@ struct icipc_sender { icipc_sender_lost_conn_func_t lost_func; void *lost_data; + bool lost_connection; struct icipc_sender_task async_tasks[MAX_ASYNC_TASKS]; @@ -70,6 +72,7 @@ push_sync_task (struct icipc_sender *self, static void pop_sync_task (struct icipc_sender *self, bool trigger, + bool all, const uint8_t *buffer, size_t size) { @@ -80,7 +83,8 @@ pop_sync_task (struct icipc_sender *self, if (trigger) task->func (self, buffer, size, task->data); task->func = NULL; - return; + if (!all) + return; } } } @@ -92,19 +96,23 @@ socket_event_received (struct epoll_thread *t, int fd, void *data) /* receiver sends a reply, read it trigger corresponding task */ ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size); - if (size < 0) { - icipc_log_error ("sender: could not read reply: %s", strerror(errno)); - return; - } - - if (size == 0) { + if (size <= 0) { + if (size < 0) + icipc_log_error ("sender: could not read reply: %s", strerror(errno)); + /* receiver disconnected */ + epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL); + shutdown(self->socket_fd, SHUT_RDWR); + self->is_connected = false; + self->lost_connection = true; if (self->lost_func) self->lost_func (self, fd, self->lost_data); + /* clear queue */ + pop_sync_task (self, true, true, NULL, 0); return; } /* trigger async task */ - pop_sync_task (self, true, self->buffer_read, size); + pop_sync_task (self, true, false, self->buffer_read, size); return; } @@ -154,6 +162,7 @@ icipc_sender_new (const char *path, self->lost_func = lost_func; self->lost_data = lost_data; + self->lost_connection = false; if (user_size > 0) self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_sender)); @@ -185,6 +194,23 @@ icipc_sender_connect (struct icipc_sender *self) if (icipc_sender_is_connected (self)) return true; + /* if connection was lost, re-init epoll thread with new socket */ + if (self->lost_connection) { + icipc_epoll_thread_stop (&self->epoll_thread); + icipc_epoll_thread_destroy (&self->epoll_thread); + self->socket_fd = + socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0); + if (self->socket_fd < 0) + return false; + if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd, + socket_event_received, NULL, self)) { + close (self->socket_fd); + return false; + } + self->lost_connection = false; + } + + /* connect */ if (connect(self->socket_fd, (struct sockaddr *)&self->addr, sizeof(self->addr)) == 0 && icipc_epoll_thread_start (&self->epoll_thread)) { |