aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJulian Bouzas <julian.bouzas@collabora.com>2021-04-20 04:08:58 -0400
committerGeorge Kiagiadakis <george.kiagiadakis@collabora.com>2021-07-28 13:19:02 +0300
commitf25bb13718f334bc0c96d29ea9f3a57c0a6f3a34 (patch)
treeeaa30eafe2df92e3d5d75571d022c3a775246bff
parent7bf96bda703dd157385cbb175ec90bd6f38af404 (diff)
lib: add wpipc library
Simple library that uses sockets for inter-process communication. It provides an API to create server and client objects. Users can add custom handlers in the server, and clients can send requests for those custom handlers. Signed-off-by: George Kiagiadakis <george.kiagiadakis@collabora.com>
-rw-r--r--lib/client.c84
-rw-r--r--lib/client.h56
-rw-r--r--lib/defs.h26
-rw-r--r--lib/icipc.h18
-rw-r--r--lib/meson.build40
-rw-r--r--lib/private.h97
-rw-r--r--lib/protocol.c217
-rw-r--r--lib/protocol.h87
-rw-r--r--lib/receiver.c213
-rw-r--r--lib/receiver.h78
-rw-r--r--lib/sender.c251
-rw-r--r--lib/sender.h75
-rw-r--r--lib/server.c258
-rw-r--r--lib/server.h85
-rw-r--r--lib/utils.c269
-rw-r--r--src/icipc-client.c104
-rw-r--r--tests/client-server.c124
-rw-r--r--tests/meson.build26
-rw-r--r--tests/protocol.c77
-rw-r--r--tests/sender-receiver.c286
20 files changed, 2471 insertions, 0 deletions
diff --git a/lib/client.c b/lib/client.c
new file mode 100644
index 0000000..735796f
--- /dev/null
+++ b/lib/client.c
@@ -0,0 +1,84 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include "private.h"
+#include "protocol.h"
+#include "sender.h"
+#include "client.h"
+
+#define BUFFER_SIZE 1024
+
+static void
+on_lost_connection (struct icipc_sender *self,
+ int receiver_fd,
+ void *data)
+{
+ icipc_log_warn ("client: lost connection with server %d", receiver_fd);
+}
+
+/* API */
+
+struct icipc_client *
+icipc_client_new (const char *path, bool connect)
+{
+ struct icipc_sender *base;
+ base = icipc_sender_new (path, BUFFER_SIZE, on_lost_connection, NULL, 0);
+
+ if (connect)
+ icipc_sender_connect (base);
+
+ return (struct icipc_client *)base;
+}
+
+void
+icipc_client_free (struct icipc_client *self)
+{
+ struct icipc_sender *base = icipc_client_to_sender (self);
+ icipc_sender_free (base);
+}
+
+bool
+icipc_client_send_request (struct icipc_client *self,
+ const char *name,
+ const struct spa_pod *args,
+ icipc_sender_reply_func_t reply,
+ void *data)
+{
+ struct icipc_sender *base = icipc_client_to_sender (self);
+
+ /* check params */
+ if (name == NULL)
+ return false;
+
+ const size_t size = icipc_protocol_calculate_request_size (name, args);
+ uint8_t buffer[size];
+ icipc_protocol_build_request (buffer, size, name, args);
+ return icipc_sender_send (base, buffer, size, reply, data);
+}
+
+const struct spa_pod *
+icipc_client_send_request_finish (struct icipc_sender *self,
+ const uint8_t *buffer,
+ size_t size,
+ const char **error)
+{
+ /* error */
+ if (icipc_protocol_is_reply_error (buffer, size)) {
+ icipc_protocol_parse_reply_error (buffer, size, error);
+ return NULL;
+ }
+
+ /* ok */
+ if (icipc_protocol_is_reply_ok (buffer, size)) {
+ const struct spa_pod *value = NULL;
+ if (icipc_protocol_parse_reply_ok (buffer, size, &value))
+ return value;
+ }
+
+ return NULL;
+}
diff --git a/lib/client.h b/lib/client.h
new file mode 100644
index 0000000..7ac8e1a
--- /dev/null
+++ b/lib/client.h
@@ -0,0 +1,56 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_CLIENT_H__
+#define __ICIPC_CLIENT_H__
+
+#include <spa/pod/pod.h>
+
+#include <stddef.h>
+
+#include "sender.h"
+#include "defs.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define icipc_client_to_sender(self) ((struct icipc_sender *)(self))
+
+struct icipc_client;
+
+ICIPC_API
+struct icipc_client *
+icipc_client_new (const char *path, bool connect);
+
+ICIPC_API
+void
+icipc_client_free (struct icipc_client *self);
+
+ICIPC_API
+bool
+icipc_client_send_request (struct icipc_client *self,
+ const char *name,
+ const struct spa_pod *args,
+ icipc_sender_reply_func_t reply,
+ void *data);
+
+/* for reply handlers only */
+
+ICIPC_API
+const struct spa_pod *
+icipc_client_send_request_finish (struct icipc_sender *self,
+ const uint8_t *buffer,
+ size_t size,
+ const char **error);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/defs.h b/lib/defs.h
new file mode 100644
index 0000000..e962db7
--- /dev/null
+++ b/lib/defs.h
@@ -0,0 +1,26 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_DEFS_H__
+#define __ICIPC_DEFS_H__
+
+#if defined(__GNUC__)
+# define ICIPC_API_EXPORT extern __attribute__ ((visibility ("default")))
+#else
+# define ICIPC_API_EXPORT extern
+#endif
+
+#ifndef ICIPC_API
+# define ICIPC_API ICIPC_API_EXPORT
+#endif
+
+#ifndef ICIPC_PRIVATE_API
+# define ICIPC_PRIVATE_API __attribute__ ((deprecated ("Private API")))
+#endif
+
+#endif
diff --git a/lib/icipc.h b/lib/icipc.h
new file mode 100644
index 0000000..a619513
--- /dev/null
+++ b/lib/icipc.h
@@ -0,0 +1,18 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_H__
+#define __ICIPC_H__
+
+#include "protocol.h"
+#include "receiver.h"
+#include "sender.h"
+#include "server.h"
+#include "client.h"
+
+#endif
diff --git a/lib/meson.build b/lib/meson.build
new file mode 100644
index 0000000..318b304
--- /dev/null
+++ b/lib/meson.build
@@ -0,0 +1,40 @@
+icipc_lib_sources = files(
+ 'utils.c',
+ 'protocol.c',
+ 'receiver.c',
+ 'sender.c',
+ 'client.c',
+ 'server.c',
+)
+
+icipc_lib_headers = files(
+ 'protocol.h',
+ 'receiver.h',
+ 'sender.h',
+ 'client.h',
+ 'server.h',
+ 'icipc.h',
+)
+
+
+
+install_headers(icipc_lib_headers,
+ install_dir : join_paths(get_option('includedir'), 'icipc-' + wireplumber_api_version, 'icipc')
+)
+
+icipc_lib = library('icipc-' + wireplumber_api_version,
+ icipc_lib_sources,
+ c_args : [
+ '-D_GNU_SOURCE',
+ '-DG_LOG_USE_STRUCTURED',
+ '-DG_LOG_DOMAIN="icipc"',
+ ],
+ install: true,
+ dependencies : [dependency('threads'), spa_dep],
+)
+
+icipc_dep = declare_dependency(
+ link_with: icipc_lib,
+ include_directories: wp_lib_include_dir,
+ dependencies: [spa_dep],
+)
diff --git a/lib/private.h b/lib/private.h
new file mode 100644
index 0000000..55d87a3
--- /dev/null
+++ b/lib/private.h
@@ -0,0 +1,97 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_PRIVATE_H__
+#define __ICIPC_PRIVATE_H__
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <pthread.h>
+#include <stddef.h>
+#include <sys/types.h>
+#include <stdarg.h>
+
+#include "defs.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* log */
+
+#define icipc_log_info(F, ...) \
+ icipc_log(ICIPC_LOG_LEVEL_INFO, (F), ##__VA_ARGS__)
+#define icipc_log_warn(F, ...) \
+ icipc_log(ICIPC_LOG_LEVEL_WARN, (F), ##__VA_ARGS__)
+#define icipc_log_error(F, ...) \
+ icipc_log(ICIPC_LOG_LEVEL_ERROR, (F), ##__VA_ARGS__)
+
+enum icipc_log_level {
+ ICIPC_LOG_LEVEL_NONE = 0,
+ ICIPC_LOG_LEVEL_ERROR,
+ ICIPC_LOG_LEVEL_WARN,
+ ICIPC_LOG_LEVEL_INFO,
+};
+
+void
+icipc_logv (enum icipc_log_level level,
+ const char *fmt,
+ va_list args) __attribute__ ((format (printf, 2, 0)));
+
+void
+icipc_log (enum icipc_log_level level,
+ const char *fmt,
+ ...) __attribute__ ((format (printf, 2, 3)));
+
+/* socket */
+
+ssize_t
+icipc_socket_write (int fd, const uint8_t *buffer, size_t size);
+
+ssize_t
+icipc_socket_read (int fd, uint8_t **buffer, size_t *max_size);
+
+/* epoll thread */
+
+struct epoll_thread;
+
+typedef void (*icipc_epoll_thread_event_funct_t) (struct epoll_thread *self,
+ int fd,
+ void *data);
+
+struct epoll_thread {
+ int socket_fd;
+ int epoll_fd;
+ int event_fd;
+ pthread_t thread;
+ icipc_epoll_thread_event_funct_t socket_event_func;
+ icipc_epoll_thread_event_funct_t other_event_func;
+ void *event_data;
+};
+
+bool
+icipc_epoll_thread_init (struct epoll_thread *self,
+ int socket_fd,
+ icipc_epoll_thread_event_funct_t sock_func,
+ icipc_epoll_thread_event_funct_t other_func,
+ void *data);
+
+bool
+icipc_epoll_thread_start (struct epoll_thread *self);
+
+void
+icipc_epoll_thread_stop (struct epoll_thread *self);
+
+void
+icipc_epoll_thread_destroy (struct epoll_thread *self);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/protocol.c b/lib/protocol.c
new file mode 100644
index 0000000..6de4bf5
--- /dev/null
+++ b/lib/protocol.c
@@ -0,0 +1,217 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <assert.h>
+
+#include <spa/pod/builder.h>
+#include <spa/pod/parser.h>
+
+#include "protocol.h"
+
+#define SIZE_PADDING 128
+
+enum icipc_protocol_reply_code {
+ REPLY_CODE_ERROR = 0,
+ REPLY_CODE_OK,
+};
+
+static bool
+is_reply (const uint8_t *buffer, size_t size, int code)
+{
+ const struct spa_pod *pod = (const struct spa_pod *)buffer;
+ struct spa_pod_parser p;
+ struct spa_pod_frame f;
+ int parsed_code = 0;
+
+ /* check if struct */
+ if (!spa_pod_is_struct (pod))
+ return false;
+
+ /* parse */
+ spa_pod_parser_pod (&p, pod);
+ spa_pod_parser_push_struct(&p, &f);
+ spa_pod_parser_get_int (&p, &parsed_code);
+
+ return parsed_code == code;
+}
+
+/* API */
+
+size_t
+icipc_protocol_calculate_request_size (const char *name,
+ const struct spa_pod *args)
+{
+ assert (name);
+ return strlen(name) + (args ? SPA_POD_SIZE(args) : 8) + SIZE_PADDING;
+}
+
+void
+icipc_protocol_build_request (uint8_t *buffer,
+ size_t size,
+ const char *name,
+ const struct spa_pod *args)
+{
+ const struct spa_pod none = SPA_POD_INIT_None();
+ struct spa_pod_builder b;
+ struct spa_pod_frame f;
+
+ if (args == NULL)
+ args = &none;
+
+ spa_pod_builder_init (&b, buffer, size);
+ spa_pod_builder_push_struct (&b, &f);
+ spa_pod_builder_string (&b, name);
+ spa_pod_builder_primitive (&b, args);
+ spa_pod_builder_pop(&b, &f);
+}
+
+bool
+icipc_protocol_parse_request (const uint8_t *buffer,
+ size_t size,
+ const char **name,
+ const struct spa_pod **args)
+{
+ const struct spa_pod *pod = (const struct spa_pod *)buffer;
+ struct spa_pod_parser p;
+ struct spa_pod_frame f;
+ const char *parsed_name = NULL;
+ struct spa_pod *parsed_args = NULL;
+
+ /* check if struct */
+ if (!spa_pod_is_struct (pod))
+ return false;
+
+ /* parse */
+ spa_pod_parser_pod (&p, pod);
+ spa_pod_parser_push_struct(&p, &f);
+ spa_pod_parser_get_string (&p, &parsed_name);
+ spa_pod_parser_get_pod (&p, &parsed_args);
+ spa_pod_parser_pop(&p, &f);
+
+ /* check name and args */
+ if (name == NULL || args == NULL)
+ return false;
+
+ if (name != NULL)
+ *name = parsed_name;
+ if (args != NULL)
+ *args = parsed_args;
+ return true;
+}
+
+size_t
+icipc_protocol_calculate_reply_ok_size (const struct spa_pod *value)
+{
+ return (value ? SPA_POD_SIZE(value) : 8) + SIZE_PADDING;
+}
+
+size_t
+icipc_protocol_calculate_reply_error_size (const char *msg)
+{
+ assert (msg);
+ return strlen(msg) + SIZE_PADDING;
+}
+
+void
+icipc_protocol_build_reply_ok (uint8_t *buffer,
+ size_t size,
+ const struct spa_pod *value)
+{
+ const struct spa_pod none = SPA_POD_INIT_None();
+ struct spa_pod_builder b;
+ struct spa_pod_frame f;
+
+ if (value == NULL)
+ value = &none;
+
+ spa_pod_builder_init (&b, buffer, size);
+ spa_pod_builder_push_struct (&b, &f);
+ spa_pod_builder_int (&b, REPLY_CODE_OK);
+ spa_pod_builder_primitive (&b, value);
+ spa_pod_builder_pop(&b, &f);
+}
+
+void
+icipc_protocol_build_reply_error (uint8_t *buffer,
+ size_t size,
+ const char *msg)
+{
+ struct spa_pod_builder b;
+ struct spa_pod_frame f;
+ spa_pod_builder_init (&b, buffer, size);
+ spa_pod_builder_push_struct (&b, &f);
+ spa_pod_builder_int (&b, REPLY_CODE_ERROR);
+ spa_pod_builder_string (&b, msg);
+ spa_pod_builder_pop(&b, &f);
+}
+
+bool
+icipc_protocol_is_reply_ok (const uint8_t *buffer, size_t size)
+{
+ return is_reply (buffer, size, REPLY_CODE_OK);
+}
+
+bool
+icipc_protocol_is_reply_error (const uint8_t *buffer, size_t size)
+{
+ return is_reply (buffer, size, REPLY_CODE_ERROR);
+}
+
+bool
+icipc_protocol_parse_reply_ok (const uint8_t *buffer,
+ size_t size,
+ const struct spa_pod **value)
+{
+ const struct spa_pod *pod = (const struct spa_pod *)buffer;
+ struct spa_pod_parser p;
+ struct spa_pod_frame f;
+ int parsed_code = 0;
+ struct spa_pod *parsed_value = NULL;
+
+ /* check if struct */
+ if (!spa_pod_is_struct (pod))
+ return false;
+
+ /* parse */
+ spa_pod_parser_pod (&p, pod);
+ spa_pod_parser_push_struct(&p, &f);
+ spa_pod_parser_get_int (&p, &parsed_code);
+ spa_pod_parser_get_pod (&p, &parsed_value);
+ spa_pod_parser_pop (&p, &f);
+
+ if (value != NULL)
+ *value = parsed_value;
+ return true;
+}
+
+bool
+icipc_protocol_parse_reply_error (const uint8_t *buffer,
+ size_t size,
+ const char **msg)
+{
+ const struct spa_pod *pod = (const struct spa_pod *)buffer;
+ struct spa_pod_parser p;
+ struct spa_pod_frame f;
+ int parsed_code = 0;
+ const char *parsed_msg = NULL;
+
+ /* check if struct */
+ if (!spa_pod_is_struct (pod))
+ return false;
+
+ /* parse */
+ spa_pod_parser_pod (&p, pod);
+ spa_pod_parser_push_struct(&p, &f);
+ spa_pod_parser_get_int (&p, &parsed_code);
+ spa_pod_parser_get_string (&p, &parsed_msg);
+ spa_pod_parser_pop (&p, &f);
+
+ if (msg != NULL)
+ *msg = parsed_msg;
+ return true;
+}
diff --git a/lib/protocol.h b/lib/protocol.h
new file mode 100644
index 0000000..730c918
--- /dev/null
+++ b/lib/protocol.h
@@ -0,0 +1,87 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_PROTOCOL_H__
+#define __ICIPC_PROTOCOL_H__
+
+#include <spa/pod/pod.h>
+
+#include "defs.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* request */
+
+ICIPC_API
+size_t
+icipc_protocol_calculate_request_size (const char *name,
+ const struct spa_pod *args);
+
+ICIPC_API
+void
+icipc_protocol_build_request (uint8_t *buffer,
+ size_t size,
+ const char *name,
+ const struct spa_pod *args);
+
+ICIPC_API
+bool
+icipc_protocol_parse_request (const uint8_t *buffer,
+ size_t size,
+ const char **name,
+ const struct spa_pod **args);
+
+/* reply */
+
+ICIPC_API
+size_t
+icipc_protocol_calculate_reply_ok_size (const struct spa_pod *value);
+
+ICIPC_API
+size_t
+icipc_protocol_calculate_reply_error_size (const char *msg);
+
+ICIPC_API
+void
+icipc_protocol_build_reply_ok (uint8_t *buffer,
+ size_t size,
+ const struct spa_pod *value);
+
+ICIPC_API
+void
+icipc_protocol_build_reply_error (uint8_t *buffer,
+ size_t size,
+ const char *msg);
+
+ICIPC_API
+bool
+icipc_protocol_is_reply_ok (const uint8_t *buffer, size_t size);
+
+ICIPC_API
+bool
+icipc_protocol_is_reply_error (const uint8_t *buffer, size_t size);
+
+ICIPC_API
+bool
+icipc_protocol_parse_reply_ok (const uint8_t *buffer,
+ size_t size,
+ const struct spa_pod **value);
+
+ICIPC_API
+bool
+icipc_protocol_parse_reply_error (const uint8_t *buffer,
+ size_t size,
+ const char **msg);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/receiver.c b/lib/receiver.c
new file mode 100644
index 0000000..f6c46c5
--- /dev/null
+++ b/lib/receiver.c
@@ -0,0 +1,213 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/epoll.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "private.h"
+#include "receiver.h"
+
+#include "icipc.h"
+
+#define MAX_SENDERS 128
+
+struct icipc_receiver {
+ struct sockaddr_un addr;
+ int socket_fd;
+
+ uint8_t *buffer_read;
+ size_t buffer_size;
+
+ struct epoll_thread epoll_thread;
+ bool thread_running;
+
+ const struct icipc_receiver_events *events;
+ void *events_data;
+
+ /* for subclasses */
+ void *user_data;
+};
+
+static bool
+reply_message (struct icipc_receiver *self,
+ int sender_fd,
+ uint8_t *buffer,
+ size_t size)
+{
+ return self->events && self->events->handle_message ?
+ self->events->handle_message (self, sender_fd, buffer, size, self->events_data) :
+ icipc_socket_write (sender_fd, buffer, size) == (ssize_t)size;
+}
+
+static void
+socket_event_received (struct epoll_thread *t, int fd, void *data)
+{
+ /* sender wants to connect, accept connection */
+ struct icipc_receiver *self = data;
+ socklen_t addr_size = sizeof(self->addr);
+ int sender_fd = accept4 (fd, (struct sockaddr*)&self->addr, &addr_size,
+ SOCK_CLOEXEC | SOCK_NONBLOCK);
+ struct epoll_event event;
+ event.events = EPOLLIN;
+ event.data.fd = sender_fd;
+ epoll_ctl (t->epoll_fd, EPOLL_CTL_ADD, sender_fd, &event);
+ if (self->events && self->events->sender_state)
+ self->events->sender_state (self, sender_fd,
+ ICIPC_RECEIVER_SENDER_STATE_CONNECTED, self->events_data);
+}
+
+static void
+other_event_received (struct epoll_thread *t, int fd, void *data)
+{
+ struct icipc_receiver *self = data;
+
+ /* sender sends a message, read it and reply */
+ ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size);
+ if (size < 0) {
+ icipc_log_error ("receiver: could not read message: %s", strerror(errno));
+ return;
+ }
+
+ if (size == 0) {
+ /* client disconnected */
+ epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
+ close (fd);
+ if (self->events && self->events->sender_state)
+ self->events->sender_state (self, fd,
+ ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED, self->events_data);
+ return;
+ }
+
+ /* reply */
+ if (!reply_message (self, fd, self->buffer_read, size))
+ icipc_log_error ("receiver: could not reply message: %s", strerror(errno));
+
+ return;
+}
+
+/* API */
+
+struct icipc_receiver *
+icipc_receiver_new (const char *path,
+ size_t buffer_size,
+ const struct icipc_receiver_events *events,
+ void *events_data,
+ size_t user_size)
+{
+ struct icipc_receiver *self;
+ int name_size;
+
+ /* check params */
+ if (path == NULL || buffer_size == 0)
+ return NULL;
+
+ unlink (path);
+
+ self = calloc (1, sizeof (struct icipc_receiver) + user_size);
+ if (self == NULL)
+ return NULL;
+
+ self->socket_fd = -1;
+
+ /* set address */
+ self->addr.sun_family = AF_LOCAL;
+ name_size = snprintf(self->addr.sun_path, sizeof(self->addr.sun_path), "%s",
+ path) + 1;
+ if (name_size > (int) sizeof(self->addr.sun_path))
+ goto error;
+
+ /* create socket */
+ self->socket_fd =
+ socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
+ if (self->socket_fd < 0)
+ goto error;
+
+ /* bind socket */
+ if (bind (self->socket_fd, (struct sockaddr *)&self->addr,
+ sizeof(self->addr)) != 0)
+ goto error;
+
+ /* listen socket */
+ if (listen (self->socket_fd, MAX_SENDERS) != 0)
+ goto error;
+
+ /* alloc buffer read */
+ self->buffer_size = buffer_size;
+ self->buffer_read = calloc (buffer_size, sizeof (uint8_t));
+ if (self->buffer_read == NULL)
+ goto error;
+
+ /* init epoll thread */
+ if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
+ socket_event_received, other_event_received, self))
+ goto error;
+
+ self->events = events;
+ self->events_data = events_data;
+ if (user_size > 0)
+ self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_receiver));
+
+ return self;
+
+error:
+ if (self->buffer_read)
+ free (self->buffer_read);
+ if (self->socket_fd != -1)
+ close (self->socket_fd);
+ free (self);
+ return NULL;
+}
+
+void
+icipc_receiver_free (struct icipc_receiver *self)
+{
+ icipc_receiver_stop (self);
+
+ icipc_epoll_thread_destroy (&self->epoll_thread);
+ free (self->buffer_read);
+ close (self->socket_fd);
+ free (self);
+}
+
+bool
+icipc_receiver_start (struct icipc_receiver *self)
+{
+ if (icipc_receiver_is_running (self))
+ return true;
+
+ self->thread_running = icipc_epoll_thread_start (&self->epoll_thread);
+ return self->thread_running;
+}
+
+void
+icipc_receiver_stop (struct icipc_receiver *self)
+{
+ if (icipc_receiver_is_running (self)) {
+ icipc_epoll_thread_stop (&self->epoll_thread);
+ self->thread_running = false;
+ }
+}
+
+bool
+icipc_receiver_is_running (struct icipc_receiver *self)
+{
+ return self->thread_running;
+}
+
+void *
+icipc_receiver_get_user_data (struct icipc_receiver *self)
+{
+ return self->user_data;
+}
diff --git a/lib/receiver.h b/lib/receiver.h
new file mode 100644
index 0000000..cf13e1b
--- /dev/null
+++ b/lib/receiver.h
@@ -0,0 +1,78 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_RECEIVER_H__
+#define __ICIPC_RECEIVER_H__
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stddef.h>
+
+#include "defs.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct icipc_receiver;
+
+enum icipc_receiver_sender_state {
+ ICIPC_RECEIVER_SENDER_STATE_CONNECTED = 0,
+ ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED
+};
+
+struct icipc_receiver_events {
+ /* emitted when a sender state changes */
+ void (*sender_state) (struct icipc_receiver *self,
+ int sender_fd,
+ enum icipc_receiver_sender_state state,
+ void *data);
+
+ /* emitted when message is received and needs to be handled */
+ bool (*handle_message) (struct icipc_receiver *self,
+ int sender_fd,
+ const uint8_t *buffer,
+ size_t size,
+ void *data);
+};
+
+ICIPC_API
+struct icipc_receiver *
+icipc_receiver_new (const char *path,
+ size_t buffer_size,
+ const struct icipc_receiver_events *events,
+ void *events_data,
+ size_t user_size);
+
+ICIPC_API
+void
+icipc_receiver_free (struct icipc_receiver *self);
+
+ICIPC_API
+bool
+icipc_receiver_start (struct icipc_receiver *self);
+
+ICIPC_API
+void
+icipc_receiver_stop (struct icipc_receiver *self);
+
+ICIPC_API
+bool
+icipc_receiver_is_running (struct icipc_receiver *self);
+
+/* for subclasses only */
+
+ICIPC_API
+void *
+icipc_receiver_get_user_data (struct icipc_receiver *self);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/sender.c b/lib/sender.c
new file mode 100644
index 0000000..7b5b0c9
--- /dev/null
+++ b/lib/sender.c
@@ -0,0 +1,251 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "private.h"
+#include "sender.h"
+
+#define MAX_ASYNC_TASKS 128
+
+struct icipc_sender_task {
+ icipc_sender_reply_func_t func;
+ void *data;
+};
+
+struct icipc_sender {
+ struct sockaddr_un addr;
+ int socket_fd;
+
+ uint8_t *buffer_read;
+ size_t buffer_size;
+
+ struct epoll_thread epoll_thread;
+ bool is_connected;
+
+ icipc_sender_lost_conn_func_t lost_func;
+ void *lost_data;
+
+ struct icipc_sender_task async_tasks[MAX_ASYNC_TASKS];
+
+ /* for subclasses */
+ void *user_data;
+};
+
+static int
+push_sync_task (struct icipc_sender *self,
+ icipc_sender_reply_func_t func,
+ void *data)
+{
+ size_t i;
+ for (i = MAX_ASYNC_TASKS; i > 1; i--) {
+ struct icipc_sender_task *curr = self->async_tasks + i - 1;
+ struct icipc_sender_task *next = self->async_tasks + i - 2;
+ if (next->func != NULL && curr->func == NULL) {
+ curr->func = func;
+ curr->data = data;
+ return i - 1;
+ } else if (i - 2 == 0 && next->func == NULL) {
+ /* empty queue */
+ next->func = func;
+ next->data = data;
+ return 0;
+ }
+ }
+ return -1;
+}
+
+static void
+pop_sync_task (struct icipc_sender *self,
+ bool trigger,
+ const uint8_t *buffer,
+ size_t size)
+{
+ size_t i;
+ for (i = 0; i < MAX_ASYNC_TASKS; i++) {
+ struct icipc_sender_task *task = self->async_tasks + i;
+ if (task->func != NULL) {
+ if (trigger)
+ task->func (self, buffer, size, task->data);
+ task->func = NULL;
+ return;
+ }
+ }
+}
+
+static void
+socket_event_received (struct epoll_thread *t, int fd, void *data)
+{
+ struct icipc_sender *self = data;
+
+ /* receiver sends a reply, read it trigger corresponding task */
+ ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size);
+ if (size < 0) {
+ icipc_log_error ("sender: could not read reply: %s", strerror(errno));
+ return;
+ }
+
+ if (size == 0) {
+ if (self->lost_func)
+ self->lost_func (self, fd, self->lost_data);
+ return;
+ }
+
+ /* trigger async task */
+ pop_sync_task (self, true, self->buffer_read, size);
+ return;
+}
+
+/* API */
+
+struct icipc_sender *
+icipc_sender_new (const char *path,
+ size_t buffer_size,
+ icipc_sender_lost_conn_func_t lost_func,
+ void *lost_data,
+ size_t user_size)
+{
+ struct icipc_sender *self;
+ int name_size;
+
+ if (path == NULL)
+ return NULL;
+
+ self = calloc (1, sizeof (struct icipc_sender) + user_size);
+ if (self == NULL)
+ return NULL;
+
+ self->socket_fd = -1;
+
+ /* set address */
+ self->addr.sun_family = AF_LOCAL;
+ name_size = snprintf(self->addr.sun_path, sizeof(self->addr.sun_path), "%s",
+ path) + 1;
+ if (name_size > (int) sizeof(self->addr.sun_path))
+ goto error;
+
+ /* create socket */
+ self->socket_fd =
+ socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0);
+ if (self->socket_fd < 0)
+ goto error;
+
+ /* alloc buffer read */
+ self->buffer_size = buffer_size;
+ self->buffer_read = calloc (buffer_size, sizeof (uint8_t));
+ if (self->buffer_read == NULL)
+ goto error;
+
+ /* init epoll thread */
+ if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
+ socket_event_received, NULL, self))
+ goto error;
+
+ self->lost_func = lost_func;
+ self->lost_data = lost_data;
+ if (user_size > 0)
+ self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_sender));
+
+ return self;
+
+error:
+ if (self->buffer_read)
+ free (self->buffer_read);
+ if (self->socket_fd != -1)
+ close (self->socket_fd);
+ free (self);
+ return NULL;
+}
+
+void
+icipc_sender_free (struct icipc_sender *self)
+{
+ icipc_sender_disconnect (self);
+
+ icipc_epoll_thread_destroy (&self->epoll_thread);
+ free (self->buffer_read);
+ close (self->socket_fd);
+ free (self);
+}
+
+bool
+icipc_sender_connect (struct icipc_sender *self)
+{
+ if (icipc_sender_is_connected (self))
+ return true;
+
+ if (connect(self->socket_fd, (struct sockaddr *)&self->addr,
+ sizeof(self->addr)) == 0 &&
+ icipc_epoll_thread_start (&self->epoll_thread)) {
+ self->is_connected = true;
+ return true;
+ }
+
+ return false;
+}
+
+void
+icipc_sender_disconnect (struct icipc_sender *self)
+{
+ if (icipc_sender_is_connected (self)) {
+ icipc_epoll_thread_stop (&self->epoll_thread);
+ shutdown(self->socket_fd, SHUT_RDWR);
+ self->is_connected = false;
+ }
+}
+
+bool
+icipc_sender_is_connected (struct icipc_sender *self)
+{
+ return self->is_connected;
+}
+
+bool
+icipc_sender_send (struct icipc_sender *self,
+ const uint8_t *buffer,
+ size_t size,
+ icipc_sender_reply_func_t func,
+ void *data)
+{
+ int id = -1;
+
+ if (buffer == NULL || size == 0)
+ return false;
+
+ if (!icipc_sender_is_connected (self))
+ return false;
+
+ /* add the task in the queue */
+ if (func) {
+ id = push_sync_task (self, func, data);
+ if (id == -1)
+ return false;
+ }
+
+ /* write buffer and remove task if it fails */
+ if (icipc_socket_write (self->socket_fd, buffer, size) <= 0) {
+ if (id != -1)
+ self->async_tasks[id].func = NULL;
+ return false;
+ }
+
+ return true;
+}
+
+void *
+icipc_sender_get_user_data (struct icipc_sender *self)
+{
+ return self->user_data;
+}
diff --git a/lib/sender.h b/lib/sender.h
new file mode 100644
index 0000000..88aaf3a
--- /dev/null
+++ b/lib/sender.h
@@ -0,0 +1,75 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_SENDER_H__
+#define __ICIPC_SENDER_H__
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stddef.h>
+
+#include "defs.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct icipc_sender;
+
+typedef void (*icipc_sender_lost_conn_func_t) (struct icipc_sender *self,
+ int receiver_fd,
+ void *data);
+
+typedef void (*icipc_sender_reply_func_t) (struct icipc_sender *self,
+ const uint8_t *buffer,
+ size_t size,
+ void *data);
+
+ICIPC_API
+struct icipc_sender *
+icipc_sender_new (const char *path,
+ size_t buffer_size,
+ icipc_sender_lost_conn_func_t lost_func,
+ void *lost_data,
+ size_t user_size);
+
+ICIPC_API
+void
+icipc_sender_free (struct icipc_sender *self);
+
+ICIPC_API
+bool
+icipc_sender_connect (struct icipc_sender *self);
+
+ICIPC_API
+void
+icipc_sender_disconnect (struct icipc_sender *self);
+
+ICIPC_API
+bool
+icipc_sender_is_connected (struct icipc_sender *self);
+
+ICIPC_API
+bool
+icipc_sender_send (struct icipc_sender *self,
+ const uint8_t *buffer,
+ size_t size,
+ icipc_sender_reply_func_t reply,
+ void *data);
+
+/* for subclasses only */
+
+ICIPC_API
+void *
+icipc_sender_get_user_data (struct icipc_sender *self);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/server.c b/lib/server.c
new file mode 100644
index 0000000..a798894
--- /dev/null
+++ b/lib/server.c
@@ -0,0 +1,258 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <pthread.h>
+
+#include "private.h"
+#include "protocol.h"
+#include "receiver.h"
+#include "server.h"
+
+#define BUFFER_SIZE 1024
+#define MAX_REQUEST_HANDLERS 128
+
+struct icipc_server_client_handler
+{
+ icipc_server_client_handler_func_t handler;
+ void *data;
+};
+
+struct icipc_server_request_handler
+{
+ const char *name;
+ icipc_server_request_handler_func_t handler;
+ void *data;
+};
+
+struct icipc_server_priv {
+ pthread_mutex_t mutex;
+ struct icipc_server_client_handler client_handler;
+ size_t n_request_handlers;
+ struct icipc_server_request_handler request_handlers[MAX_REQUEST_HANDLERS];
+};
+
+static void
+sender_state (struct icipc_receiver *base,
+ int sender_fd,
+ enum icipc_receiver_sender_state sender_state,
+ void *data)
+{
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+
+ icipc_log_info ("server: new state %d on client %d", sender_state, sender_fd);
+
+ pthread_mutex_lock (&priv->mutex);
+ if (priv->client_handler.handler)
+ priv->client_handler.handler ((struct icipc_server *)base, sender_fd,
+ sender_state, priv->client_handler.data);
+ pthread_mutex_unlock (&priv->mutex);
+}
+
+static bool
+handle_message (struct icipc_receiver *base,
+ int sender_fd,
+ const uint8_t *buffer,
+ size_t size,
+ void *data)
+{
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+ const char *name = NULL;
+ const struct spa_pod *args = NULL;
+
+ icipc_log_info ("server: message from client %d received", sender_fd);
+
+ /* parse */
+ if (!icipc_protocol_parse_request (buffer, size, &name, &args)) {
+ const char *msg = "could not parse request";
+ const size_t s = icipc_protocol_calculate_reply_error_size (msg);
+ uint8_t b[s];
+ icipc_protocol_build_reply_error (b, s, msg);
+ return icipc_socket_write (sender_fd, b, s) == (ssize_t)s;
+ }
+
+ /* handle */
+ size_t i;
+ bool res = false;
+ pthread_mutex_lock (&priv->mutex);
+
+ for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
+ struct icipc_server_request_handler *rh = priv->request_handlers + i;
+ if (rh->name != NULL && strcmp (rh->name, name) == 0 &&
+ rh->handler != NULL) {
+ res = rh->handler ((struct icipc_server *)base, sender_fd, name, args,
+ rh->data);
+ pthread_mutex_unlock (&priv->mutex);
+ return res;
+ }
+ }
+
+ /* handler was not found, reply with error */
+ res = icipc_server_reply_error ((struct icipc_server *)base, sender_fd,
+ "request handler not found");
+
+ pthread_mutex_unlock (&priv->mutex);
+ return res;
+}
+
+static struct icipc_receiver_events events = {
+ .sender_state = sender_state,
+ .handle_message = handle_message,
+};
+
+/* API */
+
+struct icipc_server *
+icipc_server_new (const char *path, bool start)
+{
+ struct icipc_server_priv * priv = NULL;
+ struct icipc_receiver *base = NULL;
+
+ base = icipc_receiver_new (path, BUFFER_SIZE, &events, NULL,
+ sizeof (struct icipc_server_priv));
+ if (base == NULL)
+ return NULL;
+
+ priv = icipc_receiver_get_user_data (base);
+ pthread_mutex_init (&priv->mutex, NULL);
+ priv->n_request_handlers = 0;
+
+ if (start)
+ icipc_receiver_start (base);
+
+ return (struct icipc_server *)base;
+}
+
+void
+icipc_server_free (struct icipc_server *self)
+{
+ struct icipc_receiver *base = icipc_server_to_receiver (self);
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+
+ pthread_mutex_destroy (&priv->mutex);
+
+ icipc_receiver_free (base);
+}
+
+void
+icipc_server_set_client_handler (struct icipc_server *self,
+ icipc_server_client_handler_func_t handler,
+ void *data)
+{
+ struct icipc_receiver *base = icipc_server_to_receiver (self);
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+
+ pthread_mutex_lock (&priv->mutex);
+ priv->client_handler.handler = handler;
+ priv->client_handler.data = data;
+ pthread_mutex_unlock (&priv->mutex);
+}
+
+void
+icipc_server_clear_client_handler (struct icipc_server *self)
+{
+ struct icipc_receiver *base = icipc_server_to_receiver (self);
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+
+ pthread_mutex_lock (&priv->mutex);
+ priv->client_handler.handler = NULL;
+ priv->client_handler.data = NULL;
+ pthread_mutex_unlock (&priv->mutex);
+}
+
+bool
+icipc_server_set_request_handler (struct icipc_server *self,
+ const char *name,
+ icipc_server_request_handler_func_t handler,
+ void *data)
+{
+ struct icipc_receiver *base = icipc_server_to_receiver (self);
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+ size_t i;
+
+ /* check params */
+ if (name == NULL)
+ return false;
+
+ pthread_mutex_lock (&priv->mutex);
+
+ /* make sure handler does not exist */
+ for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
+ struct icipc_server_request_handler *rh = priv->request_handlers + i;
+ if (rh->name != NULL && strcmp (rh->name, name) == 0) {
+ pthread_mutex_unlock (&priv->mutex);
+ return false;
+ }
+ }
+
+ /* set handler */
+ for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
+ struct icipc_server_request_handler *rh = priv->request_handlers + i;
+ if (rh->name == NULL) {
+ rh->name = name;
+ rh->handler = handler;
+ rh->data = data;
+ pthread_mutex_unlock (&priv->mutex);
+ return true;
+ }
+ }
+
+ pthread_mutex_unlock (&priv->mutex);
+
+ return false;
+}
+
+void
+icipc_server_clear_request_handler (struct icipc_server *self,
+ const char *name)
+{
+ struct icipc_receiver *base = icipc_server_to_receiver (self);
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+ size_t i;
+
+ /* check params */
+ if (name == NULL)
+ return;
+
+ pthread_mutex_lock (&priv->mutex);
+
+ /* clear handler */
+ for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
+ struct icipc_server_request_handler *rh = priv->request_handlers + i;
+ if (rh->name != NULL && strcmp (rh->name, name) == 0) {
+ rh->name = NULL;
+ break;
+ }
+ }
+
+ pthread_mutex_unlock (&priv->mutex);
+}
+
+bool
+icipc_server_reply_ok (struct icipc_server *self,
+ int client_fd,
+ const struct spa_pod *value)
+{
+ const size_t s = icipc_protocol_calculate_reply_ok_size (value);
+ uint8_t b[s];
+ icipc_protocol_build_reply_ok (b, s, value);
+ return icipc_socket_write (client_fd, b, s) == (ssize_t)s;
+}
+
+bool
+icipc_server_reply_error (struct icipc_server *self,
+ int client_fd,
+ const char *msg)
+{
+ if (msg == NULL)
+ return false;
+
+ const size_t s = icipc_protocol_calculate_reply_error_size (msg);
+ uint8_t b[s];
+ icipc_protocol_build_reply_error (b, s, msg);
+ return icipc_socket_write (client_fd, b, s) == (ssize_t)s;
+}
diff --git a/lib/server.h b/lib/server.h
new file mode 100644
index 0000000..3cc0ed9
--- /dev/null
+++ b/lib/server.h
@@ -0,0 +1,85 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_SERVER_H__
+#define __ICIPC_SERVER_H__
+
+#include <spa/pod/pod.h>
+
+#include "defs.h"
+
+#include "receiver.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define icipc_server_to_receiver(self) ((struct icipc_receiver *)(self))
+
+struct icipc_server;
+
+typedef void (*icipc_server_client_handler_func_t) (struct icipc_server *self,
+ int client_fd,
+ enum icipc_receiver_sender_state client_state,
+ void *data);
+
+typedef bool (*icipc_server_request_handler_func_t) (struct icipc_server *self,
+ int client_fd,
+ const char *name,
+ const struct spa_pod *args,
+ void *data);
+
+ICIPC_API
+struct icipc_server *
+icipc_server_new (const char *path, bool start);
+
+ICIPC_API
+void
+icipc_server_free (struct icipc_server *self);
+
+ICIPC_API
+void
+icipc_server_set_client_handler (struct icipc_server *self,
+ icipc_server_client_handler_func_t handler,
+ void *data);
+
+ICIPC_API
+void
+icipc_server_clear_client_handler (struct icipc_server *self);
+
+ICIPC_API
+bool
+icipc_server_set_request_handler (struct icipc_server *self,
+ const char *name,
+ icipc_server_request_handler_func_t handler,
+ void *data);
+
+ICIPC_API
+void
+icipc_server_clear_request_handler (struct icipc_server *self,
+ const char *name);
+
+/* for request handlers only */
+
+ICIPC_API
+bool
+icipc_server_reply_ok (struct icipc_server *self,
+ int client_fd,
+ const struct spa_pod *value);
+
+ICIPC_API
+bool
+icipc_server_reply_error (struct icipc_server *self,
+ int client_fd,
+ const char *msg);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/utils.c b/lib/utils.c
new file mode 100644
index 0000000..7c683d1
--- /dev/null
+++ b/lib/utils.c
@@ -0,0 +1,269 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <string.h>
+#include <pthread.h>
+#include <time.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "private.h"
+
+#define MAX_POLL_EVENTS 128
+#define MAX_LOG_MESSAGE 1024
+
+/* log */
+
+const char *icipc_logger_level_text[] = {
+ [ICIPC_LOG_LEVEL_ERROR] = "E",
+ [ICIPC_LOG_LEVEL_WARN] = "W",
+ [ICIPC_LOG_LEVEL_INFO] = "I",
+};
+
+struct icipc_logger {
+ enum icipc_log_level level;
+};
+
+static const struct icipc_logger *
+icipc_log_get_instance (void)
+{
+ static struct icipc_logger logger_ = { 0, };
+ static struct icipc_logger* instance_ = NULL;
+
+ if (instance_ == NULL) {
+ char * val_str = NULL;
+ enum icipc_log_level val = 0;
+
+ /* default to error */
+ logger_.level = ICIPC_LOG_LEVEL_WARN;
+
+ /* get level from env */
+ val_str = getenv ("ICIPC_DEBUG");
+ if (val_str && sscanf (val_str, "%u", &val) == 1 &&
+ val >= ICIPC_LOG_LEVEL_NONE)
+ logger_.level = val;
+
+ instance_ = &logger_;
+ }
+
+ return instance_;
+}
+
+void
+icipc_logv (enum icipc_log_level level, const char *fmt, va_list args)
+{
+ const struct icipc_logger *logger = NULL;
+
+ logger = icipc_log_get_instance ();
+ assert (logger);
+
+ if (logger->level >= level) {
+ assert (level > 0);
+ char msg[MAX_LOG_MESSAGE];
+ struct timespec time;
+ clock_gettime (CLOCK_REALTIME, &time);
+ vsnprintf (msg, MAX_LOG_MESSAGE, fmt, args);
+ fprintf (stderr, "[%s][%lu.%lu] %s\n", icipc_logger_level_text[level],
+ time.tv_sec, time.tv_sec, msg);
+ }
+}
+
+void
+icipc_log (enum icipc_log_level level, const char *fmt, ...)
+{
+ va_list args;
+ va_start (args, fmt);
+ icipc_logv (level, fmt, args);
+ va_end (args);
+}
+
+/* socket */
+
+ssize_t
+icipc_socket_write (int fd, const uint8_t *buffer, size_t size)
+{
+ size_t total_written = 0;
+ size_t n;
+
+ assert (fd >= 0);
+ assert (buffer != NULL);
+ assert (size > 0);
+
+ do {
+ n = write(fd, buffer, size);
+ if (n < size) {
+ if (errno == EINTR)
+ continue;
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ return total_written;
+ return -1;
+ }
+ total_written += n;
+ } while (total_written < size);
+
+ return total_written;
+}
+
+ssize_t
+icipc_socket_read (int fd, uint8_t **buffer, size_t *max_size)
+{
+ ssize_t n;
+ ssize_t size;
+ size_t offset = 0;
+
+ assert (buffer);
+ assert (*buffer);
+ assert (max_size);
+ assert (*max_size > 0);
+
+again:
+ size = *max_size - offset;
+ n = read (fd, *buffer + offset, size);
+ if (n == 0)
+ return 0;
+
+ /* check for errors */
+ if (n < 0) {
+ if (errno == EINTR)
+ goto again;
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ return offset;
+ return -1;
+ }
+
+ /* realloc if we need more space, and read again */
+ if (n >= size) {
+ *max_size += *max_size;
+ *buffer = reallocarray (*buffer, *max_size, sizeof (uint8_t));
+ offset += n;
+ goto again;
+ }
+
+ return offset + n;
+}
+
+/* epoll thread */
+
+bool
+icipc_epoll_thread_init (struct epoll_thread *self,
+ int socket_fd,
+ icipc_epoll_thread_event_funct_t sock_func,
+ icipc_epoll_thread_event_funct_t other_func,
+ void *data)
+{
+ struct epoll_event event;
+
+ self->socket_fd = socket_fd;
+ self->event_fd = -1;
+ self->epoll_fd = -1;
+
+ /* create event fd */
+ self->event_fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
+ if (self->event_fd == -1)
+ goto error;
+
+ /* create epoll fd */
+ self->epoll_fd = epoll_create1 (EPOLL_CLOEXEC);
+ if (self->epoll_fd == -1)
+ goto error;
+
+ /* poll socket fd */
+ event.events = EPOLLIN;
+ event.data.fd = self->socket_fd;
+ if (epoll_ctl (self->epoll_fd, EPOLL_CTL_ADD, self->socket_fd, &event) != 0)
+ goto error;
+
+ /* poll event fd */
+ event.events = EPOLLIN;
+ event.data.fd = self->event_fd;
+ if (epoll_ctl (self->epoll_fd, EPOLL_CTL_ADD, self->event_fd, &event) != 0)
+ goto error;
+
+ self->socket_event_func = sock_func;
+ self->other_event_func = other_func;
+ self->event_data = data;
+ return true;
+
+error:
+ if (self->epoll_fd != -1)
+ close (self->epoll_fd);
+ if (self->event_fd != -1)
+ close (self->event_fd);
+ return false;
+}
+
+static void *
+epoll_thread_run (void *data)
+{
+ struct epoll_thread *self = data;
+ bool exit = false;
+
+ while (!exit) {
+ /* wait for events */
+ struct epoll_event ep[MAX_POLL_EVENTS];
+ int n = epoll_wait (self->epoll_fd, ep, MAX_POLL_EVENTS, -1);
+ if (n < 0) {
+ icipc_log_error ("epoll_thread: failed to wait for event: %s",
+ strerror(errno));
+ continue;
+ }
+
+ for (int i = 0; i < n; i++) {
+ /* socket fd */
+ if (ep[i].data.fd == self->socket_fd) {
+ if (self->socket_event_func)
+ self->socket_event_func (self, ep[i].data.fd, self->event_data);
+ }
+
+ /* event fd */
+ else if (ep[i].data.fd == self->event_fd) {
+ uint64_t stop = 0;
+ ssize_t res = read (ep[i].data.fd, &stop, sizeof(uint64_t));
+ if (res == sizeof(uint64_t) && stop == 1)
+ exit = true;
+ }
+
+ /* other */
+ else {
+ if (self->other_event_func)
+ self->other_event_func (self, ep[i].data.fd, self->event_data);
+ }
+ }
+ }
+
+ return NULL;
+}
+
+bool
+icipc_epoll_thread_start (struct epoll_thread *self)
+{
+ return pthread_create (&self->thread, NULL, epoll_thread_run, self) == 0;
+}
+
+void
+icipc_epoll_thread_stop (struct epoll_thread *self)
+{
+ uint64_t value = 1;
+ ssize_t res = write (self->event_fd, &value, sizeof(uint64_t));
+ if (res == sizeof(uint64_t))
+ pthread_join (self->thread, NULL);
+}
+
+void
+icipc_epoll_thread_destroy (struct epoll_thread *self)
+{
+ close (self->epoll_fd);
+ close (self->event_fd);
+}
diff --git a/src/icipc-client.c b/src/icipc-client.c
new file mode 100644
index 0000000..aa71b1f
--- /dev/null
+++ b/src/icipc-client.c
@@ -0,0 +1,104 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <pthread.h>
+#include <assert.h>
+
+#include <spa/pod/builder.h>
+#include <icipc/icipc.h>
+
+struct client_data {
+ struct icipc_client *c;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ bool reply_received;
+};
+
+static void
+reply_handler (struct icipc_sender *self, const uint8_t *buffer, size_t size, void *p)
+{
+ struct client_data *data = p;
+ const char *error = NULL;
+ const struct spa_pod *pod = icipc_client_send_request_finish (self, buffer, size, &error);
+ if (!pod)
+ printf ("error: %s\n", error ? error : "unknown");
+ else
+ printf ("success!\n");
+
+ /* signal reply received */
+ pthread_mutex_lock (&data->mutex);
+ data->reply_received = true;
+ pthread_cond_signal (&data->cond);
+ pthread_mutex_unlock (&data->mutex);
+}
+
+int
+main (int argc, char *argv[])
+{
+ struct client_data data;
+
+ if (argc < 2) {
+ printf ("usage: <server-path>\n");
+ return -1;
+ }
+
+ /* init */
+ data.c = icipc_client_new (argv[1], true);
+ pthread_mutex_init (&data.mutex, NULL);
+ pthread_cond_init (&data.cond, NULL);
+ data.reply_received = false;
+
+ while (true) {
+ char str[1024];
+
+ printf ("> ");
+ fgets (str, 1023, stdin);
+
+ if (strncmp (str, "help", 4) == 0) {
+ printf ("help\tprints this message\n");
+ printf ("quit\texits the client\n");
+ printf ("send\tsends a request, usage: send <request-name> [args]\n");
+ } else if (strncmp (str, "quit", 4) == 0) {
+ printf ("exiting...\n");
+ break;
+ } else if (strncmp (str, "send", 4) == 0) {
+ char request_name[128];
+ char request_args[1024];
+ int n = sscanf(str, "send %s %s", request_name, request_args);
+ if (n <= 0)
+ continue;
+
+ /* send request */
+ if (n >= 2) {
+ /* TODO: for now we always create a string pod for args */
+ struct {
+ struct spa_pod_string pod;
+ char str[1024];
+ } args;
+ args.pod = SPA_POD_INIT_String(1024);
+ strncpy (args.str, request_args, 1024);
+ icipc_client_send_request (data.c, request_name, (const struct spa_pod *)&args,
+ reply_handler, &data);
+ } else {
+ icipc_client_send_request (data.c, request_name, NULL, reply_handler, &data);
+ }
+
+ /* wait for reply */
+ pthread_mutex_lock (&data.mutex);
+ while (!data.reply_received)
+ pthread_cond_wait (&data.cond, &data.mutex);
+ pthread_mutex_unlock (&data.mutex);
+ }
+ }
+
+ /* clean up */
+ pthread_cond_destroy (&data.cond);
+ pthread_mutex_destroy (&data.mutex);
+ icipc_client_free (data.c);
+ return 0;
+}
diff --git a/tests/client-server.c b/tests/client-server.c
new file mode 100644
index 0000000..ca6d1c3
--- /dev/null
+++ b/tests/client-server.c
@@ -0,0 +1,124 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <glib.h>
+#include <spa/pod/builder.h>
+#include <spa/pod/parser.h>
+#include <icipc/icipc.h>
+
+#define TEST_ADDRESS "/tmp/icipc-client-server"
+
+static bool
+increment_request_handler (struct icipc_server *self, int client_fd,
+ const char *name, const struct spa_pod *args, void *data)
+{
+ int32_t val = 0;
+ g_assert_true (spa_pod_is_int (args));
+ g_assert_true (spa_pod_get_int (args, &val) == 0);
+ struct spa_pod_int res = SPA_POD_INIT_Int (val + 1);
+ return icipc_server_reply_ok (self, client_fd, (struct spa_pod *)&res);
+}
+
+static bool
+error_request_handler (struct icipc_server *self, int client_fd,
+ const char *name, const struct spa_pod *args, void *data)
+{
+ return icipc_server_reply_error (self, client_fd, "error message");
+}
+
+struct reply_data {
+ int32_t incremented;
+ const char *error;
+ int n_replies;
+ GMutex mutex;
+};
+
+static void
+wait_for_reply (struct reply_data *data, int n_replies)
+{
+ while (true) {
+ g_mutex_lock (&data->mutex);
+ if (data->n_replies == n_replies) {
+ g_mutex_unlock (&data->mutex);
+ break;
+ }
+ g_mutex_unlock (&data->mutex);
+ }
+}
+
+static void
+reply_handler (struct icipc_sender *self, const uint8_t *buffer, size_t size, void *p)
+{
+ struct reply_data *data = p;
+ g_assert_nonnull (data);
+
+ g_mutex_lock (&data->mutex);
+
+ const struct spa_pod *pod = icipc_client_send_request_finish (self, buffer, size, &data->error);
+ if (pod) {
+ g_assert_true (spa_pod_is_int (pod));
+ g_assert_true (spa_pod_get_int (pod, &data->incremented) == 0);
+ }
+ data->n_replies++;
+
+ g_mutex_unlock (&data->mutex);
+}
+
+static void
+test_icipc_server_client ()
+{
+ struct icipc_server *s = icipc_server_new (TEST_ADDRESS, true);
+ g_assert_nonnull (s);
+ struct icipc_client *c = icipc_client_new (TEST_ADDRESS, true);
+ g_assert_nonnull (c);
+ struct reply_data data;
+ g_mutex_init (&data.mutex);
+
+ /* add request handlers */
+ g_assert_true (icipc_server_set_request_handler (s, "INCREMENT", increment_request_handler, NULL));
+ g_assert_true (icipc_server_set_request_handler (s, "ERROR", error_request_handler, NULL));
+
+ /* send an INCREMENT request of 3, and make sure the returned value is 4 */
+ data.incremented = -1;
+ data.error = NULL;
+ data.n_replies = 0;
+ struct spa_pod_int i = SPA_POD_INIT_Int (3);
+ g_assert_true (icipc_client_send_request (c, "INCREMENT", (struct spa_pod *)&i, reply_handler, &data));
+ wait_for_reply (&data, 1);
+ g_assert_null (data.error);
+ g_assert_cmpint (data.incremented, ==, 4);
+
+ /* send an ERROR request, and make sure the returned value is an error */
+ data.error = NULL;
+ data.n_replies = 0;
+ g_assert_true (icipc_client_send_request (c, "ERROR", NULL, reply_handler, &data));
+ wait_for_reply (&data, 1);
+ g_assert_cmpstr (data.error, ==, "error message");
+
+ /* send an unhandled request, and make sure the server replies with an error */
+ data.error = NULL;
+ data.n_replies = 0;
+ g_assert_true (icipc_client_send_request (c, "UNHANDLED-REQUEST", NULL, reply_handler, &data));
+ wait_for_reply (&data, 1);
+ g_assert_cmpstr (data.error, ==, "request handler not found");
+
+ /* clean up */
+ g_mutex_clear (&data.mutex);
+ icipc_client_free (c);
+ icipc_server_free (s);
+}
+
+gint
+main (gint argc, gchar *argv[])
+{
+ g_test_init (&argc, &argv, NULL);
+
+ g_test_add_func ("/icipc/icipc-server-client", test_icipc_server_client);
+
+ return g_test_run ();
+}
diff --git a/tests/meson.build b/tests/meson.build
new file mode 100644
index 0000000..12771c0
--- /dev/null
+++ b/tests/meson.build
@@ -0,0 +1,26 @@
+common_deps = [icipc_dep, glib_dep]
+common_env = [
+ 'G_TEST_SRCDIR=@0@'.format(meson.current_source_dir()),
+ 'G_TEST_BUILDDIR=@0@'.format(meson.current_build_dir()),
+]
+
+test(
+ 'test-icipc-sender-receiver',
+ executable('test-sender-receiver', 'sender-receiver.c', dependencies: common_deps),
+ env: common_env,
+ workdir : meson.current_source_dir(),
+)
+
+test(
+ 'test-icipc-protocol',
+ executable('test-protocol', 'protocol.c', dependencies: common_deps),
+ env: common_env,
+ workdir : meson.current_source_dir(),
+)
+
+test(
+ 'test-icipc-client-server',
+ executable('test-client-server', 'client-server.c', dependencies: common_deps),
+ env: common_env,
+ workdir : meson.current_source_dir(),
+)
diff --git a/tests/protocol.c b/tests/protocol.c
new file mode 100644
index 0000000..5e272a1
--- /dev/null
+++ b/tests/protocol.c
@@ -0,0 +1,77 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <glib.h>
+#include <spa/pod/builder.h>
+#include <spa/pod/parser.h>
+#include <icipc/icipc.h>
+
+static void
+test_icipc_protocol ()
+{
+ uint8_t b[1024];
+
+ /* request null value */
+ {
+ icipc_protocol_build_request (b, sizeof(b), "name", NULL);
+ const char *name = NULL;
+ const struct spa_pod *value = NULL;
+ g_assert_true (icipc_protocol_parse_request (b, sizeof(b), &name, &value));
+ g_assert_cmpstr (name, ==, "name");
+ g_assert_true (spa_pod_is_none (value));
+ }
+
+ /* request */
+ {
+ struct spa_pod_int i = SPA_POD_INIT_Int (8);
+ icipc_protocol_build_request (b, sizeof(b), "name", (struct spa_pod *)&i);
+ const char *name = NULL;
+ const struct spa_pod_int *value = NULL;
+ g_assert_true (icipc_protocol_parse_request (b, sizeof(b), &name, (const struct spa_pod **)&value));
+ g_assert_cmpstr (name, ==, "name");
+ g_assert_cmpint (value->value, ==, 8);
+ }
+
+ /* reply error */
+ {
+ icipc_protocol_build_reply_error (b, sizeof(b), "error message");
+ g_assert_true (icipc_protocol_is_reply_error (b, sizeof(b)));
+ const char *msg = NULL;
+ g_assert_true (icipc_protocol_parse_reply_error (b, sizeof(b), &msg));
+ g_assert_cmpstr (msg, ==, "error message");
+ }
+
+ /* reply ok null value */
+ {
+ icipc_protocol_build_reply_ok (b, sizeof(b), NULL);
+ g_assert_true (icipc_protocol_is_reply_ok (b, sizeof(b)));
+ const struct spa_pod *value = NULL;
+ g_assert_true (icipc_protocol_parse_reply_ok (b, sizeof(b), &value));
+ g_assert_true (spa_pod_is_none (value));
+ }
+
+ /* reply ok */
+ {
+ struct spa_pod_int i = SPA_POD_INIT_Int (3);
+ icipc_protocol_build_reply_ok (b, sizeof(b), (struct spa_pod *)&i);
+ g_assert_true (icipc_protocol_is_reply_ok (b, sizeof(b)));
+ const struct spa_pod_int *value = NULL;
+ g_assert_true (icipc_protocol_parse_reply_ok (b, sizeof(b), (const struct spa_pod **)&value));
+ g_assert_cmpint (value->value, ==, 3);
+ }
+}
+
+gint
+main (gint argc, gchar *argv[])
+{
+ g_test_init (&argc, &argv, NULL);
+
+ g_test_add_func ("/icipc/icipc-protocol", test_icipc_protocol);
+
+ return g_test_run ();
+}
diff --git a/tests/sender-receiver.c b/tests/sender-receiver.c
new file mode 100644
index 0000000..bd8721d
--- /dev/null
+++ b/tests/sender-receiver.c
@@ -0,0 +1,286 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <glib.h>
+#include <icipc/icipc.h>
+
+#define TEST_ADDRESS "/tmp/icipc-sender-receiver"
+
+struct event_data {
+ const uint8_t * expected_data;
+ size_t expected_size;
+ int connections;
+ int n_events;
+ GMutex mutex;
+};
+
+static void
+wait_for_event (struct event_data *data, int n_events)
+{
+ while (true) {
+ g_mutex_lock (&data->mutex);
+ if (data->n_events == n_events) {
+ g_mutex_unlock (&data->mutex);
+ break;
+ }
+ g_mutex_unlock (&data->mutex);
+ }
+}
+
+static void
+sender_state_callback (struct icipc_receiver *self, int sender_fd,
+ enum icipc_receiver_sender_state sender_state, void *p)
+{
+ struct event_data *data = p;
+ g_assert_nonnull (data);
+
+ g_mutex_lock (&data->mutex);
+ switch (sender_state) {
+ case ICIPC_RECEIVER_SENDER_STATE_CONNECTED:
+ data->connections++;
+ break;
+ case ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED:
+ data->connections--;
+ break;
+ default:
+ g_assert_not_reached ();
+ break;
+ }
+ data->n_events++;
+ g_mutex_unlock (&data->mutex);
+}
+
+static void
+reply_callback (struct icipc_sender *self, const uint8_t *buffer, size_t size, void *p)
+{
+ struct event_data *data = p;
+ g_assert_nonnull (data);
+ g_assert_nonnull (buffer);
+
+ g_mutex_lock (&data->mutex);
+ g_assert_cmpmem (buffer, size, data->expected_data, data->expected_size);
+ data->n_events++;
+ g_mutex_unlock (&data->mutex);
+}
+
+static void
+test_icipc_receiver_basic ()
+{
+ struct icipc_receiver *r = icipc_receiver_new (TEST_ADDRESS, 16, NULL, NULL, 0);
+ g_assert_nonnull (r);
+
+ /* start and stop */
+ g_assert_false (icipc_receiver_is_running (r));
+ g_assert_true (icipc_receiver_start (r));
+ g_assert_true (icipc_receiver_is_running (r));
+ icipc_receiver_stop (r);
+ g_assert_false (icipc_receiver_is_running (r));
+
+ /* clean up */
+ icipc_receiver_free (r);
+}
+
+static void
+test_icipc_sender_basic ()
+{
+ struct icipc_sender *s = icipc_sender_new (TEST_ADDRESS, 16, NULL, NULL, 0);
+ g_assert_nonnull (s);
+
+ /* clean up */
+ icipc_sender_free (s);
+}
+
+static void
+test_icipc_sender_connect ()
+{
+ static struct icipc_receiver_events events = {
+ .sender_state = sender_state_callback,
+ .handle_message = NULL,
+ };
+ struct event_data data;
+ g_mutex_init (&data.mutex);
+ data.n_events = 0;
+ data.connections = 0;
+ struct icipc_receiver *r = icipc_receiver_new (TEST_ADDRESS, 16, &events, &data, 0);
+ g_assert_nonnull (r);
+ struct icipc_sender *s = icipc_sender_new (TEST_ADDRESS, 16, NULL, NULL, 0);
+ g_assert_nonnull (s);
+
+ /* start receiver */
+ g_assert_true (icipc_receiver_start (r));
+
+ /* connect sender */
+ g_assert_true (icipc_sender_connect (s));
+ g_assert_true (icipc_sender_is_connected (s));
+ wait_for_event (&data, 1);
+ g_assert_cmpint (data.connections, ==, 1);
+
+ /* disconnect sender */
+ icipc_sender_disconnect (s);
+ g_assert_false (icipc_sender_is_connected (s));
+ wait_for_event (&data, 2);
+ g_assert_cmpint (data.connections, ==, 0);
+
+ /* stop receiver */
+ icipc_receiver_stop (r);
+
+ /* clean up */
+ g_mutex_clear (&data.mutex);
+ icipc_sender_free (s);
+ icipc_receiver_free (r);
+}
+
+static void
+lost_connection_handler (struct icipc_sender *self, int receiver_fd, void *p)
+{
+ struct event_data *data = p;
+ g_assert_nonnull (data);
+
+ g_mutex_lock (&data->mutex);
+ data->n_events++;
+ g_mutex_unlock (&data->mutex);
+}
+
+static void
+test_icipc_sender_lost_connection ()
+{
+ struct event_data data;
+ g_mutex_init (&data.mutex);
+ struct icipc_receiver *r = icipc_receiver_new (TEST_ADDRESS, 16, NULL, NULL, 0);
+ g_assert_nonnull (r);
+ struct icipc_sender *s = icipc_sender_new (TEST_ADDRESS, 16, lost_connection_handler, &data, 0);
+ g_assert_nonnull (s);
+
+ /* connect sender */
+ g_assert_true (icipc_sender_connect (s));
+ g_assert_true (icipc_sender_is_connected (s));
+
+ /* destroy receiver and make sure the lost connection handler is triggered */
+ data.n_events = 0;
+ icipc_receiver_free (r);
+ wait_for_event (&data, 1);
+
+ /* clean up */
+ g_mutex_clear (&data.mutex);
+ icipc_sender_free (s);
+}
+
+static void
+test_icipc_sender_send ()
+{
+ struct icipc_receiver *r = icipc_receiver_new (TEST_ADDRESS, 2, NULL, NULL, 0);
+ g_assert_nonnull (r);
+ struct icipc_sender *s = icipc_sender_new (TEST_ADDRESS, 2, NULL, NULL, 0);
+ g_assert_nonnull (s);
+ struct event_data data;
+ g_mutex_init (&data.mutex);
+ data.n_events = 0;
+
+ /* start receiver */
+ g_assert_true (icipc_receiver_start (r));
+
+ /* connect */
+ g_assert_true (icipc_sender_connect (s));
+ g_assert_true (icipc_sender_is_connected (s));
+
+ /* send 1 byte message (should not realloc) */
+ data.n_events = 0;
+ data.expected_data = (const uint8_t *)"h";
+ data.expected_size = 1;
+ g_assert_true (icipc_sender_send (s, (const uint8_t *)"h1", 1, reply_callback, &data));
+ wait_for_event (&data, 1);
+
+ /* send 2 bytes message (should realloc once to 4) */
+ data.n_events = 0;
+ data.expected_data = (const uint8_t *)"hi";
+ data.expected_size = 2;
+ g_assert_true (icipc_sender_send (s, (const uint8_t *)"hi", 2, reply_callback, &data));
+ wait_for_event (&data, 1);
+
+ /* send 3 bytes message (should not realloc) */
+ data.n_events = 0;
+ data.expected_data = (const uint8_t *)"hii";
+ data.expected_size = 3;
+ g_assert_true (icipc_sender_send (s, (const uint8_t *)"hii", 3, reply_callback, &data));
+ wait_for_event (&data, 1);
+
+ /* send 28 bytes message (should realloc 3 times: first to 8, then to 16 and finally to 32) */
+ data.n_events = 0;
+ data.expected_data = (const uint8_t *)"bigger than 16 bytes message";
+ data.expected_size = 28;
+ g_assert_true (icipc_sender_send (s, (const uint8_t *)"bigger than 16 bytes message", 28, reply_callback, &data));
+ wait_for_event (&data, 1);
+
+ /* don't allow empty messages */
+ data.n_events = 0;
+ g_assert_false (icipc_sender_send (s, (const uint8_t *)"", 0, NULL, NULL));
+
+ /* stop receiver */
+ icipc_receiver_stop (r);
+
+ /* clean up */
+ g_mutex_clear (&data.mutex);
+ icipc_sender_free (s);
+ icipc_receiver_free (r);
+}
+
+static void
+test_icipc_multiple_senders_send ()
+{
+ struct icipc_receiver *r = icipc_receiver_new (TEST_ADDRESS, 16, NULL, NULL, 0);
+ g_assert_nonnull (r);
+ struct icipc_sender *senders[50];
+ struct event_data data;
+ g_mutex_init (&data.mutex);
+ data.n_events = 0;
+
+ /* start receiver */
+ g_assert_true (icipc_receiver_start (r));
+
+ /* create and connect 50 senders */
+ for (int i = 0; i < 50; i++) {
+ senders[i] = icipc_sender_new (TEST_ADDRESS, 16, NULL, NULL, 0);
+ g_assert_nonnull (senders[i]);
+ g_assert_true (icipc_sender_connect (senders[i]));
+ g_assert_true (icipc_sender_is_connected (senders[i]));
+ }
+
+ /* send 50 messages (1 per sender) */
+ data.n_events = 0;
+ data.expected_data = (const uint8_t *)"hello";
+ data.expected_size = 5;
+ for (int i = 0; i < 50; i++)
+ g_assert_true (icipc_sender_send (senders[i], (const uint8_t *)"hello", 5, reply_callback, &data));
+ wait_for_event (&data, 50);
+
+ /* stop receiver */
+ icipc_receiver_stop (r);
+
+ /* clean up */
+ g_mutex_clear (&data.mutex);
+ for (int i = 0; i < 50; i++)
+ icipc_sender_free (senders[i]);
+ icipc_receiver_free (r);
+}
+
+gint
+main (gint argc, gchar *argv[])
+{
+ g_test_init (&argc, &argv, NULL);
+
+ g_test_add_func ("/icipc/receiver-basic", test_icipc_receiver_basic);
+ g_test_add_func ("/icipc/sender-basic", test_icipc_sender_basic);
+ g_test_add_func ("/icipc/sender-connect", test_icipc_sender_connect);
+ g_test_add_func ("/icipc/sender-lost-connection",
+ test_icipc_sender_lost_connection);
+ g_test_add_func ("/icipc/sender-send", test_icipc_sender_send);
+ g_test_add_func ("/icipc/multiple-senders-send",
+ test_icipc_multiple_senders_send);
+
+ return g_test_run ();
+}