diff options
Diffstat (limited to 'lib/sender.c')
-rw-r--r-- | lib/sender.c | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/lib/sender.c b/lib/sender.c new file mode 100644 index 0000000..7b5b0c9 --- /dev/null +++ b/lib/sender.c @@ -0,0 +1,251 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <string.h> +#include <errno.h> +#include <assert.h> + +#include "private.h" +#include "sender.h" + +#define MAX_ASYNC_TASKS 128 + +struct icipc_sender_task { + icipc_sender_reply_func_t func; + void *data; +}; + +struct icipc_sender { + struct sockaddr_un addr; + int socket_fd; + + uint8_t *buffer_read; + size_t buffer_size; + + struct epoll_thread epoll_thread; + bool is_connected; + + icipc_sender_lost_conn_func_t lost_func; + void *lost_data; + + struct icipc_sender_task async_tasks[MAX_ASYNC_TASKS]; + + /* for subclasses */ + void *user_data; +}; + +static int +push_sync_task (struct icipc_sender *self, + icipc_sender_reply_func_t func, + void *data) +{ + size_t i; + for (i = MAX_ASYNC_TASKS; i > 1; i--) { + struct icipc_sender_task *curr = self->async_tasks + i - 1; + struct icipc_sender_task *next = self->async_tasks + i - 2; + if (next->func != NULL && curr->func == NULL) { + curr->func = func; + curr->data = data; + return i - 1; + } else if (i - 2 == 0 && next->func == NULL) { + /* empty queue */ + next->func = func; + next->data = data; + return 0; + } + } + return -1; +} + +static void +pop_sync_task (struct icipc_sender *self, + bool trigger, + const uint8_t *buffer, + size_t size) +{ + size_t i; + for (i = 0; i < MAX_ASYNC_TASKS; i++) { + struct icipc_sender_task *task = self->async_tasks + i; + if (task->func != NULL) { + if (trigger) + task->func (self, buffer, size, task->data); + task->func = NULL; + return; + } + } +} + +static void +socket_event_received (struct epoll_thread *t, int fd, void *data) +{ + struct icipc_sender *self = data; + + /* receiver sends a reply, read it trigger corresponding task */ + ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size); + if (size < 0) { + icipc_log_error ("sender: could not read reply: %s", strerror(errno)); + return; + } + + if (size == 0) { + if (self->lost_func) + self->lost_func (self, fd, self->lost_data); + return; + } + + /* trigger async task */ + pop_sync_task (self, true, self->buffer_read, size); + return; +} + +/* API */ + +struct icipc_sender * +icipc_sender_new (const char *path, + size_t buffer_size, + icipc_sender_lost_conn_func_t lost_func, + void *lost_data, + size_t user_size) +{ + struct icipc_sender *self; + int name_size; + + if (path == NULL) + return NULL; + + self = calloc (1, sizeof (struct icipc_sender) + user_size); + if (self == NULL) + return NULL; + + self->socket_fd = -1; + + /* set address */ + self->addr.sun_family = AF_LOCAL; + name_size = snprintf(self->addr.sun_path, sizeof(self->addr.sun_path), "%s", + path) + 1; + if (name_size > (int) sizeof(self->addr.sun_path)) + goto error; + + /* create socket */ + self->socket_fd = + socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0); + if (self->socket_fd < 0) + goto error; + + /* alloc buffer read */ + self->buffer_size = buffer_size; + self->buffer_read = calloc (buffer_size, sizeof (uint8_t)); + if (self->buffer_read == NULL) + goto error; + + /* init epoll thread */ + if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd, + socket_event_received, NULL, self)) + goto error; + + self->lost_func = lost_func; + self->lost_data = lost_data; + if (user_size > 0) + self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_sender)); + + return self; + +error: + if (self->buffer_read) + free (self->buffer_read); + if (self->socket_fd != -1) + close (self->socket_fd); + free (self); + return NULL; +} + +void +icipc_sender_free (struct icipc_sender *self) +{ + icipc_sender_disconnect (self); + + icipc_epoll_thread_destroy (&self->epoll_thread); + free (self->buffer_read); + close (self->socket_fd); + free (self); +} + +bool +icipc_sender_connect (struct icipc_sender *self) +{ + if (icipc_sender_is_connected (self)) + return true; + + if (connect(self->socket_fd, (struct sockaddr *)&self->addr, + sizeof(self->addr)) == 0 && + icipc_epoll_thread_start (&self->epoll_thread)) { + self->is_connected = true; + return true; + } + + return false; +} + +void +icipc_sender_disconnect (struct icipc_sender *self) +{ + if (icipc_sender_is_connected (self)) { + icipc_epoll_thread_stop (&self->epoll_thread); + shutdown(self->socket_fd, SHUT_RDWR); + self->is_connected = false; + } +} + +bool +icipc_sender_is_connected (struct icipc_sender *self) +{ + return self->is_connected; +} + +bool +icipc_sender_send (struct icipc_sender *self, + const uint8_t *buffer, + size_t size, + icipc_sender_reply_func_t func, + void *data) +{ + int id = -1; + + if (buffer == NULL || size == 0) + return false; + + if (!icipc_sender_is_connected (self)) + return false; + + /* add the task in the queue */ + if (func) { + id = push_sync_task (self, func, data); + if (id == -1) + return false; + } + + /* write buffer and remove task if it fails */ + if (icipc_socket_write (self->socket_fd, buffer, size) <= 0) { + if (id != -1) + self->async_tasks[id].func = NULL; + return false; + } + + return true; +} + +void * +icipc_sender_get_user_data (struct icipc_sender *self) +{ + return self->user_data; +} |