diff options
author | Julian Bouzas <julian.bouzas@collabora.com> | 2021-06-28 11:21:51 -0400 |
---|---|---|
committer | George Kiagiadakis <george.kiagiadakis@collabora.com> | 2021-07-28 13:19:02 +0300 |
commit | 03fc350641c4a508dadf1a3d7477a48c5f4cda42 (patch) | |
tree | c4ec031c744cbff9aef10d03f1625865b0774257 /lib/sender.c | |
parent | 8745cd5d0cba0da5729bb75187aefb01c6b5dc83 (diff) |
wpipc: sender: disconnect and clear all pending tasks if connection was lost
If we don't remove the socket from epoll when the connection is lost, we risk
having the lost callback triggered more than once by epoll, creating race
condition in the unit test because more than 1 event happened. The epoll thread
and socket will be re-created again when re-connecting.
Note that we cannot destroy the epoll thread before triggering the lost
connection callback because we are running the callback in the epoll thread,
which is why removing the socket from epoll is the only way to make sure the
lost connection callback is only triggered once.
Signed-off-by: George Kiagiadakis <george.kiagiadakis@collabora.com>
Diffstat (limited to 'lib/sender.c')
-rw-r--r-- | lib/sender.c | 42 |
1 files changed, 34 insertions, 8 deletions
diff --git a/lib/sender.c b/lib/sender.c index 7d2cd04..e795bc4 100644 --- a/lib/sender.c +++ b/lib/sender.c @@ -11,6 +11,7 @@ #include <unistd.h> #include <sys/socket.h> #include <sys/un.h> +#include <sys/epoll.h> #include <string.h> #include <errno.h> #include <assert.h> @@ -37,6 +38,7 @@ struct icipc_sender { icipc_sender_lost_conn_func_t lost_func; void *lost_data; + bool lost_connection; struct icipc_sender_task async_tasks[MAX_ASYNC_TASKS]; @@ -70,6 +72,7 @@ push_sync_task (struct icipc_sender *self, static void pop_sync_task (struct icipc_sender *self, bool trigger, + bool all, const uint8_t *buffer, size_t size) { @@ -80,7 +83,8 @@ pop_sync_task (struct icipc_sender *self, if (trigger) task->func (self, buffer, size, task->data); task->func = NULL; - return; + if (!all) + return; } } } @@ -92,19 +96,23 @@ socket_event_received (struct epoll_thread *t, int fd, void *data) /* receiver sends a reply, read it trigger corresponding task */ ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size); - if (size < 0) { - icipc_log_error ("sender: could not read reply: %s", strerror(errno)); - return; - } - - if (size == 0) { + if (size <= 0) { + if (size < 0) + icipc_log_error ("sender: could not read reply: %s", strerror(errno)); + /* receiver disconnected */ + epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL); + shutdown(self->socket_fd, SHUT_RDWR); + self->is_connected = false; + self->lost_connection = true; if (self->lost_func) self->lost_func (self, fd, self->lost_data); + /* clear queue */ + pop_sync_task (self, true, true, NULL, 0); return; } /* trigger async task */ - pop_sync_task (self, true, self->buffer_read, size); + pop_sync_task (self, true, false, self->buffer_read, size); return; } @@ -154,6 +162,7 @@ icipc_sender_new (const char *path, self->lost_func = lost_func; self->lost_data = lost_data; + self->lost_connection = false; if (user_size > 0) self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_sender)); @@ -185,6 +194,23 @@ icipc_sender_connect (struct icipc_sender *self) if (icipc_sender_is_connected (self)) return true; + /* if connection was lost, re-init epoll thread with new socket */ + if (self->lost_connection) { + icipc_epoll_thread_stop (&self->epoll_thread); + icipc_epoll_thread_destroy (&self->epoll_thread); + self->socket_fd = + socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0); + if (self->socket_fd < 0) + return false; + if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd, + socket_event_received, NULL, self)) { + close (self->socket_fd); + return false; + } + self->lost_connection = false; + } + + /* connect */ if (connect(self->socket_fd, (struct sockaddr *)&self->addr, sizeof(self->addr)) == 0 && icipc_epoll_thread_start (&self->epoll_thread)) { |