diff options
Diffstat (limited to 'lib/receiver.c')
-rw-r--r-- | lib/receiver.c | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/lib/receiver.c b/lib/receiver.c new file mode 100644 index 0000000..f6c46c5 --- /dev/null +++ b/lib/receiver.c @@ -0,0 +1,213 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <sys/epoll.h> +#include <string.h> +#include <errno.h> +#include <assert.h> + +#include "private.h" +#include "receiver.h" + +#include "icipc.h" + +#define MAX_SENDERS 128 + +struct icipc_receiver { + struct sockaddr_un addr; + int socket_fd; + + uint8_t *buffer_read; + size_t buffer_size; + + struct epoll_thread epoll_thread; + bool thread_running; + + const struct icipc_receiver_events *events; + void *events_data; + + /* for subclasses */ + void *user_data; +}; + +static bool +reply_message (struct icipc_receiver *self, + int sender_fd, + uint8_t *buffer, + size_t size) +{ + return self->events && self->events->handle_message ? + self->events->handle_message (self, sender_fd, buffer, size, self->events_data) : + icipc_socket_write (sender_fd, buffer, size) == (ssize_t)size; +} + +static void +socket_event_received (struct epoll_thread *t, int fd, void *data) +{ + /* sender wants to connect, accept connection */ + struct icipc_receiver *self = data; + socklen_t addr_size = sizeof(self->addr); + int sender_fd = accept4 (fd, (struct sockaddr*)&self->addr, &addr_size, + SOCK_CLOEXEC | SOCK_NONBLOCK); + struct epoll_event event; + event.events = EPOLLIN; + event.data.fd = sender_fd; + epoll_ctl (t->epoll_fd, EPOLL_CTL_ADD, sender_fd, &event); + if (self->events && self->events->sender_state) + self->events->sender_state (self, sender_fd, + ICIPC_RECEIVER_SENDER_STATE_CONNECTED, self->events_data); +} + +static void +other_event_received (struct epoll_thread *t, int fd, void *data) +{ + struct icipc_receiver *self = data; + + /* sender sends a message, read it and reply */ + ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size); + if (size < 0) { + icipc_log_error ("receiver: could not read message: %s", strerror(errno)); + return; + } + + if (size == 0) { + /* client disconnected */ + epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL); + close (fd); + if (self->events && self->events->sender_state) + self->events->sender_state (self, fd, + ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED, self->events_data); + return; + } + + /* reply */ + if (!reply_message (self, fd, self->buffer_read, size)) + icipc_log_error ("receiver: could not reply message: %s", strerror(errno)); + + return; +} + +/* API */ + +struct icipc_receiver * +icipc_receiver_new (const char *path, + size_t buffer_size, + const struct icipc_receiver_events *events, + void *events_data, + size_t user_size) +{ + struct icipc_receiver *self; + int name_size; + + /* check params */ + if (path == NULL || buffer_size == 0) + return NULL; + + unlink (path); + + self = calloc (1, sizeof (struct icipc_receiver) + user_size); + if (self == NULL) + return NULL; + + self->socket_fd = -1; + + /* set address */ + self->addr.sun_family = AF_LOCAL; + name_size = snprintf(self->addr.sun_path, sizeof(self->addr.sun_path), "%s", + path) + 1; + if (name_size > (int) sizeof(self->addr.sun_path)) + goto error; + + /* create socket */ + self->socket_fd = + socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0); + if (self->socket_fd < 0) + goto error; + + /* bind socket */ + if (bind (self->socket_fd, (struct sockaddr *)&self->addr, + sizeof(self->addr)) != 0) + goto error; + + /* listen socket */ + if (listen (self->socket_fd, MAX_SENDERS) != 0) + goto error; + + /* alloc buffer read */ + self->buffer_size = buffer_size; + self->buffer_read = calloc (buffer_size, sizeof (uint8_t)); + if (self->buffer_read == NULL) + goto error; + + /* init epoll thread */ + if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd, + socket_event_received, other_event_received, self)) + goto error; + + self->events = events; + self->events_data = events_data; + if (user_size > 0) + self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_receiver)); + + return self; + +error: + if (self->buffer_read) + free (self->buffer_read); + if (self->socket_fd != -1) + close (self->socket_fd); + free (self); + return NULL; +} + +void +icipc_receiver_free (struct icipc_receiver *self) +{ + icipc_receiver_stop (self); + + icipc_epoll_thread_destroy (&self->epoll_thread); + free (self->buffer_read); + close (self->socket_fd); + free (self); +} + +bool +icipc_receiver_start (struct icipc_receiver *self) +{ + if (icipc_receiver_is_running (self)) + return true; + + self->thread_running = icipc_epoll_thread_start (&self->epoll_thread); + return self->thread_running; +} + +void +icipc_receiver_stop (struct icipc_receiver *self) +{ + if (icipc_receiver_is_running (self)) { + icipc_epoll_thread_stop (&self->epoll_thread); + self->thread_running = false; + } +} + +bool +icipc_receiver_is_running (struct icipc_receiver *self) +{ + return self->thread_running; +} + +void * +icipc_receiver_get_user_data (struct icipc_receiver *self) +{ + return self->user_data; +} |