summaryrefslogtreecommitdiffstats
path: root/lib/sender.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sender.c')
-rw-r--r--lib/sender.c251
1 files changed, 251 insertions, 0 deletions
diff --git a/lib/sender.c b/lib/sender.c
new file mode 100644
index 0000000..7b5b0c9
--- /dev/null
+++ b/lib/sender.c
@@ -0,0 +1,251 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "private.h"
+#include "sender.h"
+
+#define MAX_ASYNC_TASKS 128
+
+struct icipc_sender_task {
+ icipc_sender_reply_func_t func;
+ void *data;
+};
+
+struct icipc_sender {
+ struct sockaddr_un addr;
+ int socket_fd;
+
+ uint8_t *buffer_read;
+ size_t buffer_size;
+
+ struct epoll_thread epoll_thread;
+ bool is_connected;
+
+ icipc_sender_lost_conn_func_t lost_func;
+ void *lost_data;
+
+ struct icipc_sender_task async_tasks[MAX_ASYNC_TASKS];
+
+ /* for subclasses */
+ void *user_data;
+};
+
+static int
+push_sync_task (struct icipc_sender *self,
+ icipc_sender_reply_func_t func,
+ void *data)
+{
+ size_t i;
+ for (i = MAX_ASYNC_TASKS; i > 1; i--) {
+ struct icipc_sender_task *curr = self->async_tasks + i - 1;
+ struct icipc_sender_task *next = self->async_tasks + i - 2;
+ if (next->func != NULL && curr->func == NULL) {
+ curr->func = func;
+ curr->data = data;
+ return i - 1;
+ } else if (i - 2 == 0 && next->func == NULL) {
+ /* empty queue */
+ next->func = func;
+ next->data = data;
+ return 0;
+ }
+ }
+ return -1;
+}
+
+static void
+pop_sync_task (struct icipc_sender *self,
+ bool trigger,
+ const uint8_t *buffer,
+ size_t size)
+{
+ size_t i;
+ for (i = 0; i < MAX_ASYNC_TASKS; i++) {
+ struct icipc_sender_task *task = self->async_tasks + i;
+ if (task->func != NULL) {
+ if (trigger)
+ task->func (self, buffer, size, task->data);
+ task->func = NULL;
+ return;
+ }
+ }
+}
+
+static void
+socket_event_received (struct epoll_thread *t, int fd, void *data)
+{
+ struct icipc_sender *self = data;
+
+ /* receiver sends a reply, read it trigger corresponding task */
+ ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size);
+ if (size < 0) {
+ icipc_log_error ("sender: could not read reply: %s", strerror(errno));
+ return;
+ }
+
+ if (size == 0) {
+ if (self->lost_func)
+ self->lost_func (self, fd, self->lost_data);
+ return;
+ }
+
+ /* trigger async task */
+ pop_sync_task (self, true, self->buffer_read, size);
+ return;
+}
+
+/* API */
+
+struct icipc_sender *
+icipc_sender_new (const char *path,
+ size_t buffer_size,
+ icipc_sender_lost_conn_func_t lost_func,
+ void *lost_data,
+ size_t user_size)
+{
+ struct icipc_sender *self;
+ int name_size;
+
+ if (path == NULL)
+ return NULL;
+
+ self = calloc (1, sizeof (struct icipc_sender) + user_size);
+ if (self == NULL)
+ return NULL;
+
+ self->socket_fd = -1;
+
+ /* set address */
+ self->addr.sun_family = AF_LOCAL;
+ name_size = snprintf(self->addr.sun_path, sizeof(self->addr.sun_path), "%s",
+ path) + 1;
+ if (name_size > (int) sizeof(self->addr.sun_path))
+ goto error;
+
+ /* create socket */
+ self->socket_fd =
+ socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0);
+ if (self->socket_fd < 0)
+ goto error;
+
+ /* alloc buffer read */
+ self->buffer_size = buffer_size;
+ self->buffer_read = calloc (buffer_size, sizeof (uint8_t));
+ if (self->buffer_read == NULL)
+ goto error;
+
+ /* init epoll thread */
+ if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
+ socket_event_received, NULL, self))
+ goto error;
+
+ self->lost_func = lost_func;
+ self->lost_data = lost_data;
+ if (user_size > 0)
+ self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_sender));
+
+ return self;
+
+error:
+ if (self->buffer_read)
+ free (self->buffer_read);
+ if (self->socket_fd != -1)
+ close (self->socket_fd);
+ free (self);
+ return NULL;
+}
+
+void
+icipc_sender_free (struct icipc_sender *self)
+{
+ icipc_sender_disconnect (self);
+
+ icipc_epoll_thread_destroy (&self->epoll_thread);
+ free (self->buffer_read);
+ close (self->socket_fd);
+ free (self);
+}
+
+bool
+icipc_sender_connect (struct icipc_sender *self)
+{
+ if (icipc_sender_is_connected (self))
+ return true;
+
+ if (connect(self->socket_fd, (struct sockaddr *)&self->addr,
+ sizeof(self->addr)) == 0 &&
+ icipc_epoll_thread_start (&self->epoll_thread)) {
+ self->is_connected = true;
+ return true;
+ }
+
+ return false;
+}
+
+void
+icipc_sender_disconnect (struct icipc_sender *self)
+{
+ if (icipc_sender_is_connected (self)) {
+ icipc_epoll_thread_stop (&self->epoll_thread);
+ shutdown(self->socket_fd, SHUT_RDWR);
+ self->is_connected = false;
+ }
+}
+
+bool
+icipc_sender_is_connected (struct icipc_sender *self)
+{
+ return self->is_connected;
+}
+
+bool
+icipc_sender_send (struct icipc_sender *self,
+ const uint8_t *buffer,
+ size_t size,
+ icipc_sender_reply_func_t func,
+ void *data)
+{
+ int id = -1;
+
+ if (buffer == NULL || size == 0)
+ return false;
+
+ if (!icipc_sender_is_connected (self))
+ return false;
+
+ /* add the task in the queue */
+ if (func) {
+ id = push_sync_task (self, func, data);
+ if (id == -1)
+ return false;
+ }
+
+ /* write buffer and remove task if it fails */
+ if (icipc_socket_write (self->socket_fd, buffer, size) <= 0) {
+ if (id != -1)
+ self->async_tasks[id].func = NULL;
+ return false;
+ }
+
+ return true;
+}
+
+void *
+icipc_sender_get_user_data (struct icipc_sender *self)
+{
+ return self->user_data;
+}