aboutsummaryrefslogtreecommitdiffstats
path: root/lib/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/receiver.c')
-rw-r--r--lib/receiver.c312
1 files changed, 152 insertions, 160 deletions
diff --git a/lib/receiver.c b/lib/receiver.c
index ba7edda..9c21063 100644
--- a/lib/receiver.c
+++ b/lib/receiver.c
@@ -1,4 +1,4 @@
-/* PipeWire AGL Cluster IPC
+/* PipeWire AGL Cluster IPC
*
* Copyright © 2021 Collabora Ltd.
* @author Julian Bouzas <julian.bouzas@collabora.com>
@@ -15,196 +15,188 @@
#include <string.h>
#include <errno.h>
#include <assert.h>
-
#include "private.h"
#include "receiver.h"
-
#include "icipc.h"
#define MAX_SENDERS 128
struct icipc_receiver {
- struct sockaddr_un addr;
- int socket_fd;
+ struct sockaddr_un addr;
+ int socket_fd;
- uint8_t *buffer_read;
- size_t buffer_size;
+ uint8_t *buffer_read;
+ size_t buffer_size;
- struct epoll_thread epoll_thread;
- bool thread_running;
+ EpollThread epoll_thread;
+ bool thread_running;
- const struct icipc_receiver_events *events;
- void *events_data;
+ const struct icipc_receiver_events *events;
+ void *events_data;
- /* for subclasses */
- void *user_data;
+ /* for subclasses */
+ void *user_data;
};
-static bool
-reply_message (struct icipc_receiver *self,
- int sender_fd,
- uint8_t *buffer,
- size_t size)
-{
- return self->events && self->events->handle_message ?
- self->events->handle_message (self, sender_fd, buffer, size, self->events_data) :
- icipc_socket_write (sender_fd, buffer, size) == (ssize_t)size;
+static bool reply_message(
+ struct icipc_receiver *self,
+ int sender_fd,
+ uint8_t * buffer,
+ size_t size) {
+ return self->events && self->events->handle_message ?
+ self->events->handle_message(self, sender_fd, buffer, size,
+ self->events_data) :
+ icipc_socket_write(sender_fd, buffer, size) == (ssize_t) size;
}
-static void
-socket_event_received (struct epoll_thread *t, int fd, void *data)
-{
- /* sender wants to connect, accept connection */
- struct icipc_receiver *self = data;
- socklen_t addr_size = sizeof(self->addr);
- int sender_fd = accept4 (fd, (struct sockaddr*)&self->addr, &addr_size,
- SOCK_CLOEXEC | SOCK_NONBLOCK);
- struct epoll_event event;
- event.events = EPOLLIN;
- event.data.fd = sender_fd;
- epoll_ctl (t->epoll_fd, EPOLL_CTL_ADD, sender_fd, &event);
- if (self->events && self->events->sender_state)
- self->events->sender_state (self, sender_fd,
- ICIPC_RECEIVER_SENDER_STATE_CONNECTED, self->events_data);
+static void socket_event_received(EpollThread *t, int fd, void *data) {
+ /* sender wants to connect, accept connection */
+ struct icipc_receiver *self = data;
+ socklen_t addr_size = sizeof(self->addr);
+ int sender_fd = accept4(fd, (struct sockaddr *)&self->addr, &addr_size,
+ SOCK_CLOEXEC | SOCK_NONBLOCK);
+ struct epoll_event event;
+ event.events = EPOLLIN;
+ event.data.fd = sender_fd;
+ epoll_ctl(t->epoll_fd, EPOLL_CTL_ADD, sender_fd, &event);
+ if (self->events && self->events->sender_state)
+ self->events->sender_state(self, sender_fd,
+ ICIPC_RECEIVER_SENDER_STATE_CONNECTED,
+ self->events_data);
}
-static void
-other_event_received (struct epoll_thread *t, int fd, void *data)
-{
- struct icipc_receiver *self = data;
-
- /* sender sends a message, read it and reply */
- ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size);
- if (size <= 0) {
- if (size < 0)
- icipc_log_error ("receiver: could not read message: %s", strerror(errno));
- /* client disconnected */
- epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
- close (fd);
- if (self->events && self->events->sender_state)
- self->events->sender_state (self, fd,
- ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED, self->events_data);
- return;
- }
-
- /* reply */
- if (!reply_message (self, fd, self->buffer_read, size))
- icipc_log_error ("receiver: could not reply message: %s", strerror(errno));
-
- return;
+static void other_event_received(EpollThread *t, int fd, void *data) {
+ struct icipc_receiver *self = data;
+
+ /* sender sends a message, read it and reply */
+ ssize_t size =
+ icipc_socket_read(fd, &self->buffer_read, &self->buffer_size);
+ if (size <= 0) {
+ if (size < 0)
+ icipc_log_error("receiver: could not read message: %s",
+ strerror(errno));
+ /* client disconnected */
+ epoll_ctl(t->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
+ close(fd);
+ if (self->events && self->events->sender_state)
+ self->events->sender_state(self, fd,
+ ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED,
+ self->events_data);
+ return;
+ }
+
+ /* reply */
+ if (!reply_message(self, fd, self->buffer_read, size))
+ icipc_log_error("receiver: could not reply message: %s",
+ strerror(errno));
+
+ return;
}
/* API */
-struct icipc_receiver *
-icipc_receiver_new (const char *path,
- size_t buffer_size,
- const struct icipc_receiver_events *events,
- void *events_data,
- size_t user_size)
-{
- struct icipc_receiver *self;
- int res;
-
- /* check params */
- if (path == NULL || buffer_size == 0)
- return NULL;
-
- self = calloc (1, sizeof (struct icipc_receiver) + user_size);
- if (self == NULL)
- return NULL;
-
- self->socket_fd = -1;
-
- /* set address */
- self->addr.sun_family = AF_LOCAL;
- res = icipc_construct_socket_path (path, self->addr.sun_path, sizeof(self->addr.sun_path));
- if (res < 0)
- goto error;
-
- unlink (self->addr.sun_path);
-
- /* create socket */
- self->socket_fd =
- socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
- if (self->socket_fd < 0)
- goto error;
-
- /* bind socket */
- if (bind (self->socket_fd, (struct sockaddr *)&self->addr,
- sizeof(self->addr)) != 0)
- goto error;
-
- /* listen socket */
- if (listen (self->socket_fd, MAX_SENDERS) != 0)
- goto error;
-
- /* alloc buffer read */
- self->buffer_size = buffer_size;
- self->buffer_read = calloc (buffer_size, sizeof (uint8_t));
- if (self->buffer_read == NULL)
- goto error;
-
- /* init epoll thread */
- if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
- socket_event_received, other_event_received, self))
- goto error;
-
- self->events = events;
- self->events_data = events_data;
- if (user_size > 0)
- self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_receiver));
-
- return self;
-
-error:
- if (self->buffer_read)
- free (self->buffer_read);
- if (self->socket_fd != -1)
- close (self->socket_fd);
- free (self);
- return NULL;
+struct icipc_receiver *icipc_receiver_new(
+ const char *path,
+ size_t buffer_size,
+ const struct icipc_receiver_events *events,
+ void *events_data,
+ size_t user_size) {
+ struct icipc_receiver *self;
+ int res;
+
+ /* check params */
+ if (path == NULL || buffer_size == 0)
+ return NULL;
+
+ self = calloc(1, sizeof(struct icipc_receiver) + user_size);
+ if (self == NULL)
+ return NULL;
+
+ self->socket_fd = -1;
+
+ /* set address */
+ self->addr.sun_family = AF_LOCAL;
+ res =
+ icipc_construct_socket_path(path, self->addr.sun_path,
+ sizeof(self->addr.sun_path));
+ if (res < 0)
+ goto error;
+
+ unlink(self->addr.sun_path);
+
+ /* create socket */
+ self->socket_fd =
+ socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
+ if (self->socket_fd < 0)
+ goto error;
+
+ /* bind socket */
+ if (bind(self->socket_fd, (struct sockaddr *)&self->addr,
+ sizeof(self->addr)) != 0)
+ goto error;
+
+ /* listen socket */
+ if (listen(self->socket_fd, MAX_SENDERS) != 0)
+ goto error;
+
+ /* alloc buffer read */
+ self->buffer_size = buffer_size;
+ self->buffer_read = calloc(buffer_size, sizeof(uint8_t));
+ if (self->buffer_read == NULL)
+ goto error;
+
+ /* init epoll thread */
+ if (!icipc_epoll_thread_init(&self->epoll_thread, self->socket_fd,
+ socket_event_received,
+ other_event_received, self))
+ goto error;
+
+ self->events = events;
+ self->events_data = events_data;
+ if (user_size > 0)
+ self->user_data =
+ (void *)((uint8_t *) self + sizeof(struct icipc_receiver));
+
+ return self;
+
+ error:
+ if (self->buffer_read)
+ free(self->buffer_read);
+ if (self->socket_fd != -1)
+ close(self->socket_fd);
+ free(self);
+ return NULL;
}
-void
-icipc_receiver_free (struct icipc_receiver *self)
-{
- icipc_receiver_stop (self);
+void icipc_receiver_free(struct icipc_receiver *self) {
+ icipc_receiver_stop(self);
- icipc_epoll_thread_destroy (&self->epoll_thread);
- free (self->buffer_read);
- close (self->socket_fd);
- unlink (self->addr.sun_path);
- free (self);
+ icipc_epoll_thread_destroy(&self->epoll_thread);
+ free(self->buffer_read);
+ close(self->socket_fd);
+ unlink(self->addr.sun_path);
+ free(self);
}
-bool
-icipc_receiver_start (struct icipc_receiver *self)
-{
- if (icipc_receiver_is_running (self))
- return true;
+bool icipc_receiver_start(struct icipc_receiver *self) {
+ if (icipc_receiver_is_running(self))
+ return true;
- self->thread_running = icipc_epoll_thread_start (&self->epoll_thread);
- return self->thread_running;
+ self->thread_running = icipc_epoll_thread_start(&self->epoll_thread);
+ return self->thread_running;
}
-void
-icipc_receiver_stop (struct icipc_receiver *self)
-{
- if (icipc_receiver_is_running (self)) {
- icipc_epoll_thread_stop (&self->epoll_thread);
- self->thread_running = false;
- }
+void icipc_receiver_stop(struct icipc_receiver *self) {
+ if (icipc_receiver_is_running(self)) {
+ icipc_epoll_thread_stop(&self->epoll_thread);
+ self->thread_running = false;
+ }
}
-bool
-icipc_receiver_is_running (struct icipc_receiver *self)
-{
- return self->thread_running;
+bool icipc_receiver_is_running(struct icipc_receiver *self) {
+ return self->thread_running;
}
-void *
-icipc_receiver_get_user_data (struct icipc_receiver *self)
-{
- return self->user_data;
+void *icipc_receiver_get_user_data(struct icipc_receiver *self) {
+ return self->user_data;
}