aboutsummaryrefslogtreecommitdiffstats
path: root/lib/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/receiver.c')
-rw-r--r--lib/receiver.c213
1 files changed, 213 insertions, 0 deletions
diff --git a/lib/receiver.c b/lib/receiver.c
new file mode 100644
index 0000000..f6c46c5
--- /dev/null
+++ b/lib/receiver.c
@@ -0,0 +1,213 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/epoll.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "private.h"
+#include "receiver.h"
+
+#include "icipc.h"
+
+#define MAX_SENDERS 128
+
+struct icipc_receiver {
+ struct sockaddr_un addr;
+ int socket_fd;
+
+ uint8_t *buffer_read;
+ size_t buffer_size;
+
+ struct epoll_thread epoll_thread;
+ bool thread_running;
+
+ const struct icipc_receiver_events *events;
+ void *events_data;
+
+ /* for subclasses */
+ void *user_data;
+};
+
+static bool
+reply_message (struct icipc_receiver *self,
+ int sender_fd,
+ uint8_t *buffer,
+ size_t size)
+{
+ return self->events && self->events->handle_message ?
+ self->events->handle_message (self, sender_fd, buffer, size, self->events_data) :
+ icipc_socket_write (sender_fd, buffer, size) == (ssize_t)size;
+}
+
+static void
+socket_event_received (struct epoll_thread *t, int fd, void *data)
+{
+ /* sender wants to connect, accept connection */
+ struct icipc_receiver *self = data;
+ socklen_t addr_size = sizeof(self->addr);
+ int sender_fd = accept4 (fd, (struct sockaddr*)&self->addr, &addr_size,
+ SOCK_CLOEXEC | SOCK_NONBLOCK);
+ struct epoll_event event;
+ event.events = EPOLLIN;
+ event.data.fd = sender_fd;
+ epoll_ctl (t->epoll_fd, EPOLL_CTL_ADD, sender_fd, &event);
+ if (self->events && self->events->sender_state)
+ self->events->sender_state (self, sender_fd,
+ ICIPC_RECEIVER_SENDER_STATE_CONNECTED, self->events_data);
+}
+
+static void
+other_event_received (struct epoll_thread *t, int fd, void *data)
+{
+ struct icipc_receiver *self = data;
+
+ /* sender sends a message, read it and reply */
+ ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size);
+ if (size < 0) {
+ icipc_log_error ("receiver: could not read message: %s", strerror(errno));
+ return;
+ }
+
+ if (size == 0) {
+ /* client disconnected */
+ epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
+ close (fd);
+ if (self->events && self->events->sender_state)
+ self->events->sender_state (self, fd,
+ ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED, self->events_data);
+ return;
+ }
+
+ /* reply */
+ if (!reply_message (self, fd, self->buffer_read, size))
+ icipc_log_error ("receiver: could not reply message: %s", strerror(errno));
+
+ return;
+}
+
+/* API */
+
+struct icipc_receiver *
+icipc_receiver_new (const char *path,
+ size_t buffer_size,
+ const struct icipc_receiver_events *events,
+ void *events_data,
+ size_t user_size)
+{
+ struct icipc_receiver *self;
+ int name_size;
+
+ /* check params */
+ if (path == NULL || buffer_size == 0)
+ return NULL;
+
+ unlink (path);
+
+ self = calloc (1, sizeof (struct icipc_receiver) + user_size);
+ if (self == NULL)
+ return NULL;
+
+ self->socket_fd = -1;
+
+ /* set address */
+ self->addr.sun_family = AF_LOCAL;
+ name_size = snprintf(self->addr.sun_path, sizeof(self->addr.sun_path), "%s",
+ path) + 1;
+ if (name_size > (int) sizeof(self->addr.sun_path))
+ goto error;
+
+ /* create socket */
+ self->socket_fd =
+ socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
+ if (self->socket_fd < 0)
+ goto error;
+
+ /* bind socket */
+ if (bind (self->socket_fd, (struct sockaddr *)&self->addr,
+ sizeof(self->addr)) != 0)
+ goto error;
+
+ /* listen socket */
+ if (listen (self->socket_fd, MAX_SENDERS) != 0)
+ goto error;
+
+ /* alloc buffer read */
+ self->buffer_size = buffer_size;
+ self->buffer_read = calloc (buffer_size, sizeof (uint8_t));
+ if (self->buffer_read == NULL)
+ goto error;
+
+ /* init epoll thread */
+ if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
+ socket_event_received, other_event_received, self))
+ goto error;
+
+ self->events = events;
+ self->events_data = events_data;
+ if (user_size > 0)
+ self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_receiver));
+
+ return self;
+
+error:
+ if (self->buffer_read)
+ free (self->buffer_read);
+ if (self->socket_fd != -1)
+ close (self->socket_fd);
+ free (self);
+ return NULL;
+}
+
+void
+icipc_receiver_free (struct icipc_receiver *self)
+{
+ icipc_receiver_stop (self);
+
+ icipc_epoll_thread_destroy (&self->epoll_thread);
+ free (self->buffer_read);
+ close (self->socket_fd);
+ free (self);
+}
+
+bool
+icipc_receiver_start (struct icipc_receiver *self)
+{
+ if (icipc_receiver_is_running (self))
+ return true;
+
+ self->thread_running = icipc_epoll_thread_start (&self->epoll_thread);
+ return self->thread_running;
+}
+
+void
+icipc_receiver_stop (struct icipc_receiver *self)
+{
+ if (icipc_receiver_is_running (self)) {
+ icipc_epoll_thread_stop (&self->epoll_thread);
+ self->thread_running = false;
+ }
+}
+
+bool
+icipc_receiver_is_running (struct icipc_receiver *self)
+{
+ return self->thread_running;
+}
+
+void *
+icipc_receiver_get_user_data (struct icipc_receiver *self)
+{
+ return self->user_data;
+}