aboutsummaryrefslogtreecommitdiffstats
path: root/lib/sender.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sender.c')
-rw-r--r--lib/sender.c42
1 files changed, 34 insertions, 8 deletions
diff --git a/lib/sender.c b/lib/sender.c
index 7d2cd04..e795bc4 100644
--- a/lib/sender.c
+++ b/lib/sender.c
@@ -11,6 +11,7 @@
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/epoll.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
@@ -37,6 +38,7 @@ struct icipc_sender {
icipc_sender_lost_conn_func_t lost_func;
void *lost_data;
+ bool lost_connection;
struct icipc_sender_task async_tasks[MAX_ASYNC_TASKS];
@@ -70,6 +72,7 @@ push_sync_task (struct icipc_sender *self,
static void
pop_sync_task (struct icipc_sender *self,
bool trigger,
+ bool all,
const uint8_t *buffer,
size_t size)
{
@@ -80,7 +83,8 @@ pop_sync_task (struct icipc_sender *self,
if (trigger)
task->func (self, buffer, size, task->data);
task->func = NULL;
- return;
+ if (!all)
+ return;
}
}
}
@@ -92,19 +96,23 @@ socket_event_received (struct epoll_thread *t, int fd, void *data)
/* receiver sends a reply, read it trigger corresponding task */
ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size);
- if (size < 0) {
- icipc_log_error ("sender: could not read reply: %s", strerror(errno));
- return;
- }
-
- if (size == 0) {
+ if (size <= 0) {
+ if (size < 0)
+ icipc_log_error ("sender: could not read reply: %s", strerror(errno));
+ /* receiver disconnected */
+ epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
+ shutdown(self->socket_fd, SHUT_RDWR);
+ self->is_connected = false;
+ self->lost_connection = true;
if (self->lost_func)
self->lost_func (self, fd, self->lost_data);
+ /* clear queue */
+ pop_sync_task (self, true, true, NULL, 0);
return;
}
/* trigger async task */
- pop_sync_task (self, true, self->buffer_read, size);
+ pop_sync_task (self, true, false, self->buffer_read, size);
return;
}
@@ -154,6 +162,7 @@ icipc_sender_new (const char *path,
self->lost_func = lost_func;
self->lost_data = lost_data;
+ self->lost_connection = false;
if (user_size > 0)
self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_sender));
@@ -185,6 +194,23 @@ icipc_sender_connect (struct icipc_sender *self)
if (icipc_sender_is_connected (self))
return true;
+ /* if connection was lost, re-init epoll thread with new socket */
+ if (self->lost_connection) {
+ icipc_epoll_thread_stop (&self->epoll_thread);
+ icipc_epoll_thread_destroy (&self->epoll_thread);
+ self->socket_fd =
+ socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0);
+ if (self->socket_fd < 0)
+ return false;
+ if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
+ socket_event_received, NULL, self)) {
+ close (self->socket_fd);
+ return false;
+ }
+ self->lost_connection = false;
+ }
+
+ /* connect */
if (connect(self->socket_fd, (struct sockaddr *)&self->addr,
sizeof(self->addr)) == 0 &&
icipc_epoll_thread_start (&self->epoll_thread)) {