summaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorJulian Bouzas <julian.bouzas@collabora.com>2021-06-28 11:21:51 -0400
committerGeorge Kiagiadakis <george.kiagiadakis@collabora.com>2021-07-28 13:19:02 +0300
commit03fc350641c4a508dadf1a3d7477a48c5f4cda42 (patch)
treec4ec031c744cbff9aef10d03f1625865b0774257 /lib
parent8745cd5d0cba0da5729bb75187aefb01c6b5dc83 (diff)
wpipc: sender: disconnect and clear all pending tasks if connection was lost
If we don't remove the socket from epoll when the connection is lost, we risk having the lost callback triggered more than once by epoll, creating race condition in the unit test because more than 1 event happened. The epoll thread and socket will be re-created again when re-connecting. Note that we cannot destroy the epoll thread before triggering the lost connection callback because we are running the callback in the epoll thread, which is why removing the socket from epoll is the only way to make sure the lost connection callback is only triggered once. Signed-off-by: George Kiagiadakis <george.kiagiadakis@collabora.com>
Diffstat (limited to 'lib')
-rw-r--r--lib/sender.c42
1 files changed, 34 insertions, 8 deletions
diff --git a/lib/sender.c b/lib/sender.c
index 7d2cd04..e795bc4 100644
--- a/lib/sender.c
+++ b/lib/sender.c
@@ -11,6 +11,7 @@
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/epoll.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
@@ -37,6 +38,7 @@ struct icipc_sender {
icipc_sender_lost_conn_func_t lost_func;
void *lost_data;
+ bool lost_connection;
struct icipc_sender_task async_tasks[MAX_ASYNC_TASKS];
@@ -70,6 +72,7 @@ push_sync_task (struct icipc_sender *self,
static void
pop_sync_task (struct icipc_sender *self,
bool trigger,
+ bool all,
const uint8_t *buffer,
size_t size)
{
@@ -80,7 +83,8 @@ pop_sync_task (struct icipc_sender *self,
if (trigger)
task->func (self, buffer, size, task->data);
task->func = NULL;
- return;
+ if (!all)
+ return;
}
}
}
@@ -92,19 +96,23 @@ socket_event_received (struct epoll_thread *t, int fd, void *data)
/* receiver sends a reply, read it trigger corresponding task */
ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size);
- if (size < 0) {
- icipc_log_error ("sender: could not read reply: %s", strerror(errno));
- return;
- }
-
- if (size == 0) {
+ if (size <= 0) {
+ if (size < 0)
+ icipc_log_error ("sender: could not read reply: %s", strerror(errno));
+ /* receiver disconnected */
+ epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
+ shutdown(self->socket_fd, SHUT_RDWR);
+ self->is_connected = false;
+ self->lost_connection = true;
if (self->lost_func)
self->lost_func (self, fd, self->lost_data);
+ /* clear queue */
+ pop_sync_task (self, true, true, NULL, 0);
return;
}
/* trigger async task */
- pop_sync_task (self, true, self->buffer_read, size);
+ pop_sync_task (self, true, false, self->buffer_read, size);
return;
}
@@ -154,6 +162,7 @@ icipc_sender_new (const char *path,
self->lost_func = lost_func;
self->lost_data = lost_data;
+ self->lost_connection = false;
if (user_size > 0)
self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_sender));
@@ -185,6 +194,23 @@ icipc_sender_connect (struct icipc_sender *self)
if (icipc_sender_is_connected (self))
return true;
+ /* if connection was lost, re-init epoll thread with new socket */
+ if (self->lost_connection) {
+ icipc_epoll_thread_stop (&self->epoll_thread);
+ icipc_epoll_thread_destroy (&self->epoll_thread);
+ self->socket_fd =
+ socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0);
+ if (self->socket_fd < 0)
+ return false;
+ if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
+ socket_event_received, NULL, self)) {
+ close (self->socket_fd);
+ return false;
+ }
+ self->lost_connection = false;
+ }
+
+ /* connect */
if (connect(self->socket_fd, (struct sockaddr *)&self->addr,
sizeof(self->addr)) == 0 &&
icipc_epoll_thread_start (&self->epoll_thread)) {