summaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorJulian Bouzas <julian.bouzas@collabora.com>2021-04-20 04:08:58 -0400
committerGeorge Kiagiadakis <george.kiagiadakis@collabora.com>2021-07-28 13:19:02 +0300
commitf25bb13718f334bc0c96d29ea9f3a57c0a6f3a34 (patch)
treeeaa30eafe2df92e3d5d75571d022c3a775246bff /lib
parent7bf96bda703dd157385cbb175ec90bd6f38af404 (diff)
lib: add wpipc library
Simple library that uses sockets for inter-process communication. It provides an API to create server and client objects. Users can add custom handlers in the server, and clients can send requests for those custom handlers. Signed-off-by: George Kiagiadakis <george.kiagiadakis@collabora.com>
Diffstat (limited to 'lib')
-rw-r--r--lib/client.c84
-rw-r--r--lib/client.h56
-rw-r--r--lib/defs.h26
-rw-r--r--lib/icipc.h18
-rw-r--r--lib/meson.build40
-rw-r--r--lib/private.h97
-rw-r--r--lib/protocol.c217
-rw-r--r--lib/protocol.h87
-rw-r--r--lib/receiver.c213
-rw-r--r--lib/receiver.h78
-rw-r--r--lib/sender.c251
-rw-r--r--lib/sender.h75
-rw-r--r--lib/server.c258
-rw-r--r--lib/server.h85
-rw-r--r--lib/utils.c269
15 files changed, 1854 insertions, 0 deletions
diff --git a/lib/client.c b/lib/client.c
new file mode 100644
index 0000000..735796f
--- /dev/null
+++ b/lib/client.c
@@ -0,0 +1,84 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include "private.h"
+#include "protocol.h"
+#include "sender.h"
+#include "client.h"
+
+#define BUFFER_SIZE 1024
+
+static void
+on_lost_connection (struct icipc_sender *self,
+ int receiver_fd,
+ void *data)
+{
+ icipc_log_warn ("client: lost connection with server %d", receiver_fd);
+}
+
+/* API */
+
+struct icipc_client *
+icipc_client_new (const char *path, bool connect)
+{
+ struct icipc_sender *base;
+ base = icipc_sender_new (path, BUFFER_SIZE, on_lost_connection, NULL, 0);
+
+ if (connect)
+ icipc_sender_connect (base);
+
+ return (struct icipc_client *)base;
+}
+
+void
+icipc_client_free (struct icipc_client *self)
+{
+ struct icipc_sender *base = icipc_client_to_sender (self);
+ icipc_sender_free (base);
+}
+
+bool
+icipc_client_send_request (struct icipc_client *self,
+ const char *name,
+ const struct spa_pod *args,
+ icipc_sender_reply_func_t reply,
+ void *data)
+{
+ struct icipc_sender *base = icipc_client_to_sender (self);
+
+ /* check params */
+ if (name == NULL)
+ return false;
+
+ const size_t size = icipc_protocol_calculate_request_size (name, args);
+ uint8_t buffer[size];
+ icipc_protocol_build_request (buffer, size, name, args);
+ return icipc_sender_send (base, buffer, size, reply, data);
+}
+
+const struct spa_pod *
+icipc_client_send_request_finish (struct icipc_sender *self,
+ const uint8_t *buffer,
+ size_t size,
+ const char **error)
+{
+ /* error */
+ if (icipc_protocol_is_reply_error (buffer, size)) {
+ icipc_protocol_parse_reply_error (buffer, size, error);
+ return NULL;
+ }
+
+ /* ok */
+ if (icipc_protocol_is_reply_ok (buffer, size)) {
+ const struct spa_pod *value = NULL;
+ if (icipc_protocol_parse_reply_ok (buffer, size, &value))
+ return value;
+ }
+
+ return NULL;
+}
diff --git a/lib/client.h b/lib/client.h
new file mode 100644
index 0000000..7ac8e1a
--- /dev/null
+++ b/lib/client.h
@@ -0,0 +1,56 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_CLIENT_H__
+#define __ICIPC_CLIENT_H__
+
+#include <spa/pod/pod.h>
+
+#include <stddef.h>
+
+#include "sender.h"
+#include "defs.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define icipc_client_to_sender(self) ((struct icipc_sender *)(self))
+
+struct icipc_client;
+
+ICIPC_API
+struct icipc_client *
+icipc_client_new (const char *path, bool connect);
+
+ICIPC_API
+void
+icipc_client_free (struct icipc_client *self);
+
+ICIPC_API
+bool
+icipc_client_send_request (struct icipc_client *self,
+ const char *name,
+ const struct spa_pod *args,
+ icipc_sender_reply_func_t reply,
+ void *data);
+
+/* for reply handlers only */
+
+ICIPC_API
+const struct spa_pod *
+icipc_client_send_request_finish (struct icipc_sender *self,
+ const uint8_t *buffer,
+ size_t size,
+ const char **error);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/defs.h b/lib/defs.h
new file mode 100644
index 0000000..e962db7
--- /dev/null
+++ b/lib/defs.h
@@ -0,0 +1,26 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_DEFS_H__
+#define __ICIPC_DEFS_H__
+
+#if defined(__GNUC__)
+# define ICIPC_API_EXPORT extern __attribute__ ((visibility ("default")))
+#else
+# define ICIPC_API_EXPORT extern
+#endif
+
+#ifndef ICIPC_API
+# define ICIPC_API ICIPC_API_EXPORT
+#endif
+
+#ifndef ICIPC_PRIVATE_API
+# define ICIPC_PRIVATE_API __attribute__ ((deprecated ("Private API")))
+#endif
+
+#endif
diff --git a/lib/icipc.h b/lib/icipc.h
new file mode 100644
index 0000000..a619513
--- /dev/null
+++ b/lib/icipc.h
@@ -0,0 +1,18 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_H__
+#define __ICIPC_H__
+
+#include "protocol.h"
+#include "receiver.h"
+#include "sender.h"
+#include "server.h"
+#include "client.h"
+
+#endif
diff --git a/lib/meson.build b/lib/meson.build
new file mode 100644
index 0000000..318b304
--- /dev/null
+++ b/lib/meson.build
@@ -0,0 +1,40 @@
+icipc_lib_sources = files(
+ 'utils.c',
+ 'protocol.c',
+ 'receiver.c',
+ 'sender.c',
+ 'client.c',
+ 'server.c',
+)
+
+icipc_lib_headers = files(
+ 'protocol.h',
+ 'receiver.h',
+ 'sender.h',
+ 'client.h',
+ 'server.h',
+ 'icipc.h',
+)
+
+
+
+install_headers(icipc_lib_headers,
+ install_dir : join_paths(get_option('includedir'), 'icipc-' + wireplumber_api_version, 'icipc')
+)
+
+icipc_lib = library('icipc-' + wireplumber_api_version,
+ icipc_lib_sources,
+ c_args : [
+ '-D_GNU_SOURCE',
+ '-DG_LOG_USE_STRUCTURED',
+ '-DG_LOG_DOMAIN="icipc"',
+ ],
+ install: true,
+ dependencies : [dependency('threads'), spa_dep],
+)
+
+icipc_dep = declare_dependency(
+ link_with: icipc_lib,
+ include_directories: wp_lib_include_dir,
+ dependencies: [spa_dep],
+)
diff --git a/lib/private.h b/lib/private.h
new file mode 100644
index 0000000..55d87a3
--- /dev/null
+++ b/lib/private.h
@@ -0,0 +1,97 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_PRIVATE_H__
+#define __ICIPC_PRIVATE_H__
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <pthread.h>
+#include <stddef.h>
+#include <sys/types.h>
+#include <stdarg.h>
+
+#include "defs.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* log */
+
+#define icipc_log_info(F, ...) \
+ icipc_log(ICIPC_LOG_LEVEL_INFO, (F), ##__VA_ARGS__)
+#define icipc_log_warn(F, ...) \
+ icipc_log(ICIPC_LOG_LEVEL_WARN, (F), ##__VA_ARGS__)
+#define icipc_log_error(F, ...) \
+ icipc_log(ICIPC_LOG_LEVEL_ERROR, (F), ##__VA_ARGS__)
+
+enum icipc_log_level {
+ ICIPC_LOG_LEVEL_NONE = 0,
+ ICIPC_LOG_LEVEL_ERROR,
+ ICIPC_LOG_LEVEL_WARN,
+ ICIPC_LOG_LEVEL_INFO,
+};
+
+void
+icipc_logv (enum icipc_log_level level,
+ const char *fmt,
+ va_list args) __attribute__ ((format (printf, 2, 0)));
+
+void
+icipc_log (enum icipc_log_level level,
+ const char *fmt,
+ ...) __attribute__ ((format (printf, 2, 3)));
+
+/* socket */
+
+ssize_t
+icipc_socket_write (int fd, const uint8_t *buffer, size_t size);
+
+ssize_t
+icipc_socket_read (int fd, uint8_t **buffer, size_t *max_size);
+
+/* epoll thread */
+
+struct epoll_thread;
+
+typedef void (*icipc_epoll_thread_event_funct_t) (struct epoll_thread *self,
+ int fd,
+ void *data);
+
+struct epoll_thread {
+ int socket_fd;
+ int epoll_fd;
+ int event_fd;
+ pthread_t thread;
+ icipc_epoll_thread_event_funct_t socket_event_func;
+ icipc_epoll_thread_event_funct_t other_event_func;
+ void *event_data;
+};
+
+bool
+icipc_epoll_thread_init (struct epoll_thread *self,
+ int socket_fd,
+ icipc_epoll_thread_event_funct_t sock_func,
+ icipc_epoll_thread_event_funct_t other_func,
+ void *data);
+
+bool
+icipc_epoll_thread_start (struct epoll_thread *self);
+
+void
+icipc_epoll_thread_stop (struct epoll_thread *self);
+
+void
+icipc_epoll_thread_destroy (struct epoll_thread *self);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/protocol.c b/lib/protocol.c
new file mode 100644
index 0000000..6de4bf5
--- /dev/null
+++ b/lib/protocol.c
@@ -0,0 +1,217 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <assert.h>
+
+#include <spa/pod/builder.h>
+#include <spa/pod/parser.h>
+
+#include "protocol.h"
+
+#define SIZE_PADDING 128
+
+enum icipc_protocol_reply_code {
+ REPLY_CODE_ERROR = 0,
+ REPLY_CODE_OK,
+};
+
+static bool
+is_reply (const uint8_t *buffer, size_t size, int code)
+{
+ const struct spa_pod *pod = (const struct spa_pod *)buffer;
+ struct spa_pod_parser p;
+ struct spa_pod_frame f;
+ int parsed_code = 0;
+
+ /* check if struct */
+ if (!spa_pod_is_struct (pod))
+ return false;
+
+ /* parse */
+ spa_pod_parser_pod (&p, pod);
+ spa_pod_parser_push_struct(&p, &f);
+ spa_pod_parser_get_int (&p, &parsed_code);
+
+ return parsed_code == code;
+}
+
+/* API */
+
+size_t
+icipc_protocol_calculate_request_size (const char *name,
+ const struct spa_pod *args)
+{
+ assert (name);
+ return strlen(name) + (args ? SPA_POD_SIZE(args) : 8) + SIZE_PADDING;
+}
+
+void
+icipc_protocol_build_request (uint8_t *buffer,
+ size_t size,
+ const char *name,
+ const struct spa_pod *args)
+{
+ const struct spa_pod none = SPA_POD_INIT_None();
+ struct spa_pod_builder b;
+ struct spa_pod_frame f;
+
+ if (args == NULL)
+ args = &none;
+
+ spa_pod_builder_init (&b, buffer, size);
+ spa_pod_builder_push_struct (&b, &f);
+ spa_pod_builder_string (&b, name);
+ spa_pod_builder_primitive (&b, args);
+ spa_pod_builder_pop(&b, &f);
+}
+
+bool
+icipc_protocol_parse_request (const uint8_t *buffer,
+ size_t size,
+ const char **name,
+ const struct spa_pod **args)
+{
+ const struct spa_pod *pod = (const struct spa_pod *)buffer;
+ struct spa_pod_parser p;
+ struct spa_pod_frame f;
+ const char *parsed_name = NULL;
+ struct spa_pod *parsed_args = NULL;
+
+ /* check if struct */
+ if (!spa_pod_is_struct (pod))
+ return false;
+
+ /* parse */
+ spa_pod_parser_pod (&p, pod);
+ spa_pod_parser_push_struct(&p, &f);
+ spa_pod_parser_get_string (&p, &parsed_name);
+ spa_pod_parser_get_pod (&p, &parsed_args);
+ spa_pod_parser_pop(&p, &f);
+
+ /* check name and args */
+ if (name == NULL || args == NULL)
+ return false;
+
+ if (name != NULL)
+ *name = parsed_name;
+ if (args != NULL)
+ *args = parsed_args;
+ return true;
+}
+
+size_t
+icipc_protocol_calculate_reply_ok_size (const struct spa_pod *value)
+{
+ return (value ? SPA_POD_SIZE(value) : 8) + SIZE_PADDING;
+}
+
+size_t
+icipc_protocol_calculate_reply_error_size (const char *msg)
+{
+ assert (msg);
+ return strlen(msg) + SIZE_PADDING;
+}
+
+void
+icipc_protocol_build_reply_ok (uint8_t *buffer,
+ size_t size,
+ const struct spa_pod *value)
+{
+ const struct spa_pod none = SPA_POD_INIT_None();
+ struct spa_pod_builder b;
+ struct spa_pod_frame f;
+
+ if (value == NULL)
+ value = &none;
+
+ spa_pod_builder_init (&b, buffer, size);
+ spa_pod_builder_push_struct (&b, &f);
+ spa_pod_builder_int (&b, REPLY_CODE_OK);
+ spa_pod_builder_primitive (&b, value);
+ spa_pod_builder_pop(&b, &f);
+}
+
+void
+icipc_protocol_build_reply_error (uint8_t *buffer,
+ size_t size,
+ const char *msg)
+{
+ struct spa_pod_builder b;
+ struct spa_pod_frame f;
+ spa_pod_builder_init (&b, buffer, size);
+ spa_pod_builder_push_struct (&b, &f);
+ spa_pod_builder_int (&b, REPLY_CODE_ERROR);
+ spa_pod_builder_string (&b, msg);
+ spa_pod_builder_pop(&b, &f);
+}
+
+bool
+icipc_protocol_is_reply_ok (const uint8_t *buffer, size_t size)
+{
+ return is_reply (buffer, size, REPLY_CODE_OK);
+}
+
+bool
+icipc_protocol_is_reply_error (const uint8_t *buffer, size_t size)
+{
+ return is_reply (buffer, size, REPLY_CODE_ERROR);
+}
+
+bool
+icipc_protocol_parse_reply_ok (const uint8_t *buffer,
+ size_t size,
+ const struct spa_pod **value)
+{
+ const struct spa_pod *pod = (const struct spa_pod *)buffer;
+ struct spa_pod_parser p;
+ struct spa_pod_frame f;
+ int parsed_code = 0;
+ struct spa_pod *parsed_value = NULL;
+
+ /* check if struct */
+ if (!spa_pod_is_struct (pod))
+ return false;
+
+ /* parse */
+ spa_pod_parser_pod (&p, pod);
+ spa_pod_parser_push_struct(&p, &f);
+ spa_pod_parser_get_int (&p, &parsed_code);
+ spa_pod_parser_get_pod (&p, &parsed_value);
+ spa_pod_parser_pop (&p, &f);
+
+ if (value != NULL)
+ *value = parsed_value;
+ return true;
+}
+
+bool
+icipc_protocol_parse_reply_error (const uint8_t *buffer,
+ size_t size,
+ const char **msg)
+{
+ const struct spa_pod *pod = (const struct spa_pod *)buffer;
+ struct spa_pod_parser p;
+ struct spa_pod_frame f;
+ int parsed_code = 0;
+ const char *parsed_msg = NULL;
+
+ /* check if struct */
+ if (!spa_pod_is_struct (pod))
+ return false;
+
+ /* parse */
+ spa_pod_parser_pod (&p, pod);
+ spa_pod_parser_push_struct(&p, &f);
+ spa_pod_parser_get_int (&p, &parsed_code);
+ spa_pod_parser_get_string (&p, &parsed_msg);
+ spa_pod_parser_pop (&p, &f);
+
+ if (msg != NULL)
+ *msg = parsed_msg;
+ return true;
+}
diff --git a/lib/protocol.h b/lib/protocol.h
new file mode 100644
index 0000000..730c918
--- /dev/null
+++ b/lib/protocol.h
@@ -0,0 +1,87 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_PROTOCOL_H__
+#define __ICIPC_PROTOCOL_H__
+
+#include <spa/pod/pod.h>
+
+#include "defs.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* request */
+
+ICIPC_API
+size_t
+icipc_protocol_calculate_request_size (const char *name,
+ const struct spa_pod *args);
+
+ICIPC_API
+void
+icipc_protocol_build_request (uint8_t *buffer,
+ size_t size,
+ const char *name,
+ const struct spa_pod *args);
+
+ICIPC_API
+bool
+icipc_protocol_parse_request (const uint8_t *buffer,
+ size_t size,
+ const char **name,
+ const struct spa_pod **args);
+
+/* reply */
+
+ICIPC_API
+size_t
+icipc_protocol_calculate_reply_ok_size (const struct spa_pod *value);
+
+ICIPC_API
+size_t
+icipc_protocol_calculate_reply_error_size (const char *msg);
+
+ICIPC_API
+void
+icipc_protocol_build_reply_ok (uint8_t *buffer,
+ size_t size,
+ const struct spa_pod *value);
+
+ICIPC_API
+void
+icipc_protocol_build_reply_error (uint8_t *buffer,
+ size_t size,
+ const char *msg);
+
+ICIPC_API
+bool
+icipc_protocol_is_reply_ok (const uint8_t *buffer, size_t size);
+
+ICIPC_API
+bool
+icipc_protocol_is_reply_error (const uint8_t *buffer, size_t size);
+
+ICIPC_API
+bool
+icipc_protocol_parse_reply_ok (const uint8_t *buffer,
+ size_t size,
+ const struct spa_pod **value);
+
+ICIPC_API
+bool
+icipc_protocol_parse_reply_error (const uint8_t *buffer,
+ size_t size,
+ const char **msg);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/receiver.c b/lib/receiver.c
new file mode 100644
index 0000000..f6c46c5
--- /dev/null
+++ b/lib/receiver.c
@@ -0,0 +1,213 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/epoll.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "private.h"
+#include "receiver.h"
+
+#include "icipc.h"
+
+#define MAX_SENDERS 128
+
+struct icipc_receiver {
+ struct sockaddr_un addr;
+ int socket_fd;
+
+ uint8_t *buffer_read;
+ size_t buffer_size;
+
+ struct epoll_thread epoll_thread;
+ bool thread_running;
+
+ const struct icipc_receiver_events *events;
+ void *events_data;
+
+ /* for subclasses */
+ void *user_data;
+};
+
+static bool
+reply_message (struct icipc_receiver *self,
+ int sender_fd,
+ uint8_t *buffer,
+ size_t size)
+{
+ return self->events && self->events->handle_message ?
+ self->events->handle_message (self, sender_fd, buffer, size, self->events_data) :
+ icipc_socket_write (sender_fd, buffer, size) == (ssize_t)size;
+}
+
+static void
+socket_event_received (struct epoll_thread *t, int fd, void *data)
+{
+ /* sender wants to connect, accept connection */
+ struct icipc_receiver *self = data;
+ socklen_t addr_size = sizeof(self->addr);
+ int sender_fd = accept4 (fd, (struct sockaddr*)&self->addr, &addr_size,
+ SOCK_CLOEXEC | SOCK_NONBLOCK);
+ struct epoll_event event;
+ event.events = EPOLLIN;
+ event.data.fd = sender_fd;
+ epoll_ctl (t->epoll_fd, EPOLL_CTL_ADD, sender_fd, &event);
+ if (self->events && self->events->sender_state)
+ self->events->sender_state (self, sender_fd,
+ ICIPC_RECEIVER_SENDER_STATE_CONNECTED, self->events_data);
+}
+
+static void
+other_event_received (struct epoll_thread *t, int fd, void *data)
+{
+ struct icipc_receiver *self = data;
+
+ /* sender sends a message, read it and reply */
+ ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size);
+ if (size < 0) {
+ icipc_log_error ("receiver: could not read message: %s", strerror(errno));
+ return;
+ }
+
+ if (size == 0) {
+ /* client disconnected */
+ epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
+ close (fd);
+ if (self->events && self->events->sender_state)
+ self->events->sender_state (self, fd,
+ ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED, self->events_data);
+ return;
+ }
+
+ /* reply */
+ if (!reply_message (self, fd, self->buffer_read, size))
+ icipc_log_error ("receiver: could not reply message: %s", strerror(errno));
+
+ return;
+}
+
+/* API */
+
+struct icipc_receiver *
+icipc_receiver_new (const char *path,
+ size_t buffer_size,
+ const struct icipc_receiver_events *events,
+ void *events_data,
+ size_t user_size)
+{
+ struct icipc_receiver *self;
+ int name_size;
+
+ /* check params */
+ if (path == NULL || buffer_size == 0)
+ return NULL;
+
+ unlink (path);
+
+ self = calloc (1, sizeof (struct icipc_receiver) + user_size);
+ if (self == NULL)
+ return NULL;
+
+ self->socket_fd = -1;
+
+ /* set address */
+ self->addr.sun_family = AF_LOCAL;
+ name_size = snprintf(self->addr.sun_path, sizeof(self->addr.sun_path), "%s",
+ path) + 1;
+ if (name_size > (int) sizeof(self->addr.sun_path))
+ goto error;
+
+ /* create socket */
+ self->socket_fd =
+ socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
+ if (self->socket_fd < 0)
+ goto error;
+
+ /* bind socket */
+ if (bind (self->socket_fd, (struct sockaddr *)&self->addr,
+ sizeof(self->addr)) != 0)
+ goto error;
+
+ /* listen socket */
+ if (listen (self->socket_fd, MAX_SENDERS) != 0)
+ goto error;
+
+ /* alloc buffer read */
+ self->buffer_size = buffer_size;
+ self->buffer_read = calloc (buffer_size, sizeof (uint8_t));
+ if (self->buffer_read == NULL)
+ goto error;
+
+ /* init epoll thread */
+ if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
+ socket_event_received, other_event_received, self))
+ goto error;
+
+ self->events = events;
+ self->events_data = events_data;
+ if (user_size > 0)
+ self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_receiver));
+
+ return self;
+
+error:
+ if (self->buffer_read)
+ free (self->buffer_read);
+ if (self->socket_fd != -1)
+ close (self->socket_fd);
+ free (self);
+ return NULL;
+}
+
+void
+icipc_receiver_free (struct icipc_receiver *self)
+{
+ icipc_receiver_stop (self);
+
+ icipc_epoll_thread_destroy (&self->epoll_thread);
+ free (self->buffer_read);
+ close (self->socket_fd);
+ free (self);
+}
+
+bool
+icipc_receiver_start (struct icipc_receiver *self)
+{
+ if (icipc_receiver_is_running (self))
+ return true;
+
+ self->thread_running = icipc_epoll_thread_start (&self->epoll_thread);
+ return self->thread_running;
+}
+
+void
+icipc_receiver_stop (struct icipc_receiver *self)
+{
+ if (icipc_receiver_is_running (self)) {
+ icipc_epoll_thread_stop (&self->epoll_thread);
+ self->thread_running = false;
+ }
+}
+
+bool
+icipc_receiver_is_running (struct icipc_receiver *self)
+{
+ return self->thread_running;
+}
+
+void *
+icipc_receiver_get_user_data (struct icipc_receiver *self)
+{
+ return self->user_data;
+}
diff --git a/lib/receiver.h b/lib/receiver.h
new file mode 100644
index 0000000..cf13e1b
--- /dev/null
+++ b/lib/receiver.h
@@ -0,0 +1,78 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_RECEIVER_H__
+#define __ICIPC_RECEIVER_H__
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stddef.h>
+
+#include "defs.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct icipc_receiver;
+
+enum icipc_receiver_sender_state {
+ ICIPC_RECEIVER_SENDER_STATE_CONNECTED = 0,
+ ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED
+};
+
+struct icipc_receiver_events {
+ /* emitted when a sender state changes */
+ void (*sender_state) (struct icipc_receiver *self,
+ int sender_fd,
+ enum icipc_receiver_sender_state state,
+ void *data);
+
+ /* emitted when message is received and needs to be handled */
+ bool (*handle_message) (struct icipc_receiver *self,
+ int sender_fd,
+ const uint8_t *buffer,
+ size_t size,
+ void *data);
+};
+
+ICIPC_API
+struct icipc_receiver *
+icipc_receiver_new (const char *path,
+ size_t buffer_size,
+ const struct icipc_receiver_events *events,
+ void *events_data,
+ size_t user_size);
+
+ICIPC_API
+void
+icipc_receiver_free (struct icipc_receiver *self);
+
+ICIPC_API
+bool
+icipc_receiver_start (struct icipc_receiver *self);
+
+ICIPC_API
+void
+icipc_receiver_stop (struct icipc_receiver *self);
+
+ICIPC_API
+bool
+icipc_receiver_is_running (struct icipc_receiver *self);
+
+/* for subclasses only */
+
+ICIPC_API
+void *
+icipc_receiver_get_user_data (struct icipc_receiver *self);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/sender.c b/lib/sender.c
new file mode 100644
index 0000000..7b5b0c9
--- /dev/null
+++ b/lib/sender.c
@@ -0,0 +1,251 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "private.h"
+#include "sender.h"
+
+#define MAX_ASYNC_TASKS 128
+
+struct icipc_sender_task {
+ icipc_sender_reply_func_t func;
+ void *data;
+};
+
+struct icipc_sender {
+ struct sockaddr_un addr;
+ int socket_fd;
+
+ uint8_t *buffer_read;
+ size_t buffer_size;
+
+ struct epoll_thread epoll_thread;
+ bool is_connected;
+
+ icipc_sender_lost_conn_func_t lost_func;
+ void *lost_data;
+
+ struct icipc_sender_task async_tasks[MAX_ASYNC_TASKS];
+
+ /* for subclasses */
+ void *user_data;
+};
+
+static int
+push_sync_task (struct icipc_sender *self,
+ icipc_sender_reply_func_t func,
+ void *data)
+{
+ size_t i;
+ for (i = MAX_ASYNC_TASKS; i > 1; i--) {
+ struct icipc_sender_task *curr = self->async_tasks + i - 1;
+ struct icipc_sender_task *next = self->async_tasks + i - 2;
+ if (next->func != NULL && curr->func == NULL) {
+ curr->func = func;
+ curr->data = data;
+ return i - 1;
+ } else if (i - 2 == 0 && next->func == NULL) {
+ /* empty queue */
+ next->func = func;
+ next->data = data;
+ return 0;
+ }
+ }
+ return -1;
+}
+
+static void
+pop_sync_task (struct icipc_sender *self,
+ bool trigger,
+ const uint8_t *buffer,
+ size_t size)
+{
+ size_t i;
+ for (i = 0; i < MAX_ASYNC_TASKS; i++) {
+ struct icipc_sender_task *task = self->async_tasks + i;
+ if (task->func != NULL) {
+ if (trigger)
+ task->func (self, buffer, size, task->data);
+ task->func = NULL;
+ return;
+ }
+ }
+}
+
+static void
+socket_event_received (struct epoll_thread *t, int fd, void *data)
+{
+ struct icipc_sender *self = data;
+
+ /* receiver sends a reply, read it trigger corresponding task */
+ ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size);
+ if (size < 0) {
+ icipc_log_error ("sender: could not read reply: %s", strerror(errno));
+ return;
+ }
+
+ if (size == 0) {
+ if (self->lost_func)
+ self->lost_func (self, fd, self->lost_data);
+ return;
+ }
+
+ /* trigger async task */
+ pop_sync_task (self, true, self->buffer_read, size);
+ return;
+}
+
+/* API */
+
+struct icipc_sender *
+icipc_sender_new (const char *path,
+ size_t buffer_size,
+ icipc_sender_lost_conn_func_t lost_func,
+ void *lost_data,
+ size_t user_size)
+{
+ struct icipc_sender *self;
+ int name_size;
+
+ if (path == NULL)
+ return NULL;
+
+ self = calloc (1, sizeof (struct icipc_sender) + user_size);
+ if (self == NULL)
+ return NULL;
+
+ self->socket_fd = -1;
+
+ /* set address */
+ self->addr.sun_family = AF_LOCAL;
+ name_size = snprintf(self->addr.sun_path, sizeof(self->addr.sun_path), "%s",
+ path) + 1;
+ if (name_size > (int) sizeof(self->addr.sun_path))
+ goto error;
+
+ /* create socket */
+ self->socket_fd =
+ socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0);
+ if (self->socket_fd < 0)
+ goto error;
+
+ /* alloc buffer read */
+ self->buffer_size = buffer_size;
+ self->buffer_read = calloc (buffer_size, sizeof (uint8_t));
+ if (self->buffer_read == NULL)
+ goto error;
+
+ /* init epoll thread */
+ if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd,
+ socket_event_received, NULL, self))
+ goto error;
+
+ self->lost_func = lost_func;
+ self->lost_data = lost_data;
+ if (user_size > 0)
+ self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_sender));
+
+ return self;
+
+error:
+ if (self->buffer_read)
+ free (self->buffer_read);
+ if (self->socket_fd != -1)
+ close (self->socket_fd);
+ free (self);
+ return NULL;
+}
+
+void
+icipc_sender_free (struct icipc_sender *self)
+{
+ icipc_sender_disconnect (self);
+
+ icipc_epoll_thread_destroy (&self->epoll_thread);
+ free (self->buffer_read);
+ close (self->socket_fd);
+ free (self);
+}
+
+bool
+icipc_sender_connect (struct icipc_sender *self)
+{
+ if (icipc_sender_is_connected (self))
+ return true;
+
+ if (connect(self->socket_fd, (struct sockaddr *)&self->addr,
+ sizeof(self->addr)) == 0 &&
+ icipc_epoll_thread_start (&self->epoll_thread)) {
+ self->is_connected = true;
+ return true;
+ }
+
+ return false;
+}
+
+void
+icipc_sender_disconnect (struct icipc_sender *self)
+{
+ if (icipc_sender_is_connected (self)) {
+ icipc_epoll_thread_stop (&self->epoll_thread);
+ shutdown(self->socket_fd, SHUT_RDWR);
+ self->is_connected = false;
+ }
+}
+
+bool
+icipc_sender_is_connected (struct icipc_sender *self)
+{
+ return self->is_connected;
+}
+
+bool
+icipc_sender_send (struct icipc_sender *self,
+ const uint8_t *buffer,
+ size_t size,
+ icipc_sender_reply_func_t func,
+ void *data)
+{
+ int id = -1;
+
+ if (buffer == NULL || size == 0)
+ return false;
+
+ if (!icipc_sender_is_connected (self))
+ return false;
+
+ /* add the task in the queue */
+ if (func) {
+ id = push_sync_task (self, func, data);
+ if (id == -1)
+ return false;
+ }
+
+ /* write buffer and remove task if it fails */
+ if (icipc_socket_write (self->socket_fd, buffer, size) <= 0) {
+ if (id != -1)
+ self->async_tasks[id].func = NULL;
+ return false;
+ }
+
+ return true;
+}
+
+void *
+icipc_sender_get_user_data (struct icipc_sender *self)
+{
+ return self->user_data;
+}
diff --git a/lib/sender.h b/lib/sender.h
new file mode 100644
index 0000000..88aaf3a
--- /dev/null
+++ b/lib/sender.h
@@ -0,0 +1,75 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_SENDER_H__
+#define __ICIPC_SENDER_H__
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stddef.h>
+
+#include "defs.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct icipc_sender;
+
+typedef void (*icipc_sender_lost_conn_func_t) (struct icipc_sender *self,
+ int receiver_fd,
+ void *data);
+
+typedef void (*icipc_sender_reply_func_t) (struct icipc_sender *self,
+ const uint8_t *buffer,
+ size_t size,
+ void *data);
+
+ICIPC_API
+struct icipc_sender *
+icipc_sender_new (const char *path,
+ size_t buffer_size,
+ icipc_sender_lost_conn_func_t lost_func,
+ void *lost_data,
+ size_t user_size);
+
+ICIPC_API
+void
+icipc_sender_free (struct icipc_sender *self);
+
+ICIPC_API
+bool
+icipc_sender_connect (struct icipc_sender *self);
+
+ICIPC_API
+void
+icipc_sender_disconnect (struct icipc_sender *self);
+
+ICIPC_API
+bool
+icipc_sender_is_connected (struct icipc_sender *self);
+
+ICIPC_API
+bool
+icipc_sender_send (struct icipc_sender *self,
+ const uint8_t *buffer,
+ size_t size,
+ icipc_sender_reply_func_t reply,
+ void *data);
+
+/* for subclasses only */
+
+ICIPC_API
+void *
+icipc_sender_get_user_data (struct icipc_sender *self);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/server.c b/lib/server.c
new file mode 100644
index 0000000..a798894
--- /dev/null
+++ b/lib/server.c
@@ -0,0 +1,258 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <pthread.h>
+
+#include "private.h"
+#include "protocol.h"
+#include "receiver.h"
+#include "server.h"
+
+#define BUFFER_SIZE 1024
+#define MAX_REQUEST_HANDLERS 128
+
+struct icipc_server_client_handler
+{
+ icipc_server_client_handler_func_t handler;
+ void *data;
+};
+
+struct icipc_server_request_handler
+{
+ const char *name;
+ icipc_server_request_handler_func_t handler;
+ void *data;
+};
+
+struct icipc_server_priv {
+ pthread_mutex_t mutex;
+ struct icipc_server_client_handler client_handler;
+ size_t n_request_handlers;
+ struct icipc_server_request_handler request_handlers[MAX_REQUEST_HANDLERS];
+};
+
+static void
+sender_state (struct icipc_receiver *base,
+ int sender_fd,
+ enum icipc_receiver_sender_state sender_state,
+ void *data)
+{
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+
+ icipc_log_info ("server: new state %d on client %d", sender_state, sender_fd);
+
+ pthread_mutex_lock (&priv->mutex);
+ if (priv->client_handler.handler)
+ priv->client_handler.handler ((struct icipc_server *)base, sender_fd,
+ sender_state, priv->client_handler.data);
+ pthread_mutex_unlock (&priv->mutex);
+}
+
+static bool
+handle_message (struct icipc_receiver *base,
+ int sender_fd,
+ const uint8_t *buffer,
+ size_t size,
+ void *data)
+{
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+ const char *name = NULL;
+ const struct spa_pod *args = NULL;
+
+ icipc_log_info ("server: message from client %d received", sender_fd);
+
+ /* parse */
+ if (!icipc_protocol_parse_request (buffer, size, &name, &args)) {
+ const char *msg = "could not parse request";
+ const size_t s = icipc_protocol_calculate_reply_error_size (msg);
+ uint8_t b[s];
+ icipc_protocol_build_reply_error (b, s, msg);
+ return icipc_socket_write (sender_fd, b, s) == (ssize_t)s;
+ }
+
+ /* handle */
+ size_t i;
+ bool res = false;
+ pthread_mutex_lock (&priv->mutex);
+
+ for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
+ struct icipc_server_request_handler *rh = priv->request_handlers + i;
+ if (rh->name != NULL && strcmp (rh->name, name) == 0 &&
+ rh->handler != NULL) {
+ res = rh->handler ((struct icipc_server *)base, sender_fd, name, args,
+ rh->data);
+ pthread_mutex_unlock (&priv->mutex);
+ return res;
+ }
+ }
+
+ /* handler was not found, reply with error */
+ res = icipc_server_reply_error ((struct icipc_server *)base, sender_fd,
+ "request handler not found");
+
+ pthread_mutex_unlock (&priv->mutex);
+ return res;
+}
+
+static struct icipc_receiver_events events = {
+ .sender_state = sender_state,
+ .handle_message = handle_message,
+};
+
+/* API */
+
+struct icipc_server *
+icipc_server_new (const char *path, bool start)
+{
+ struct icipc_server_priv * priv = NULL;
+ struct icipc_receiver *base = NULL;
+
+ base = icipc_receiver_new (path, BUFFER_SIZE, &events, NULL,
+ sizeof (struct icipc_server_priv));
+ if (base == NULL)
+ return NULL;
+
+ priv = icipc_receiver_get_user_data (base);
+ pthread_mutex_init (&priv->mutex, NULL);
+ priv->n_request_handlers = 0;
+
+ if (start)
+ icipc_receiver_start (base);
+
+ return (struct icipc_server *)base;
+}
+
+void
+icipc_server_free (struct icipc_server *self)
+{
+ struct icipc_receiver *base = icipc_server_to_receiver (self);
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+
+ pthread_mutex_destroy (&priv->mutex);
+
+ icipc_receiver_free (base);
+}
+
+void
+icipc_server_set_client_handler (struct icipc_server *self,
+ icipc_server_client_handler_func_t handler,
+ void *data)
+{
+ struct icipc_receiver *base = icipc_server_to_receiver (self);
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+
+ pthread_mutex_lock (&priv->mutex);
+ priv->client_handler.handler = handler;
+ priv->client_handler.data = data;
+ pthread_mutex_unlock (&priv->mutex);
+}
+
+void
+icipc_server_clear_client_handler (struct icipc_server *self)
+{
+ struct icipc_receiver *base = icipc_server_to_receiver (self);
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+
+ pthread_mutex_lock (&priv->mutex);
+ priv->client_handler.handler = NULL;
+ priv->client_handler.data = NULL;
+ pthread_mutex_unlock (&priv->mutex);
+}
+
+bool
+icipc_server_set_request_handler (struct icipc_server *self,
+ const char *name,
+ icipc_server_request_handler_func_t handler,
+ void *data)
+{
+ struct icipc_receiver *base = icipc_server_to_receiver (self);
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+ size_t i;
+
+ /* check params */
+ if (name == NULL)
+ return false;
+
+ pthread_mutex_lock (&priv->mutex);
+
+ /* make sure handler does not exist */
+ for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
+ struct icipc_server_request_handler *rh = priv->request_handlers + i;
+ if (rh->name != NULL && strcmp (rh->name, name) == 0) {
+ pthread_mutex_unlock (&priv->mutex);
+ return false;
+ }
+ }
+
+ /* set handler */
+ for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
+ struct icipc_server_request_handler *rh = priv->request_handlers + i;
+ if (rh->name == NULL) {
+ rh->name = name;
+ rh->handler = handler;
+ rh->data = data;
+ pthread_mutex_unlock (&priv->mutex);
+ return true;
+ }
+ }
+
+ pthread_mutex_unlock (&priv->mutex);
+
+ return false;
+}
+
+void
+icipc_server_clear_request_handler (struct icipc_server *self,
+ const char *name)
+{
+ struct icipc_receiver *base = icipc_server_to_receiver (self);
+ struct icipc_server_priv *priv = icipc_receiver_get_user_data (base);
+ size_t i;
+
+ /* check params */
+ if (name == NULL)
+ return;
+
+ pthread_mutex_lock (&priv->mutex);
+
+ /* clear handler */
+ for (i = 0; i < MAX_REQUEST_HANDLERS; i++) {
+ struct icipc_server_request_handler *rh = priv->request_handlers + i;
+ if (rh->name != NULL && strcmp (rh->name, name) == 0) {
+ rh->name = NULL;
+ break;
+ }
+ }
+
+ pthread_mutex_unlock (&priv->mutex);
+}
+
+bool
+icipc_server_reply_ok (struct icipc_server *self,
+ int client_fd,
+ const struct spa_pod *value)
+{
+ const size_t s = icipc_protocol_calculate_reply_ok_size (value);
+ uint8_t b[s];
+ icipc_protocol_build_reply_ok (b, s, value);
+ return icipc_socket_write (client_fd, b, s) == (ssize_t)s;
+}
+
+bool
+icipc_server_reply_error (struct icipc_server *self,
+ int client_fd,
+ const char *msg)
+{
+ if (msg == NULL)
+ return false;
+
+ const size_t s = icipc_protocol_calculate_reply_error_size (msg);
+ uint8_t b[s];
+ icipc_protocol_build_reply_error (b, s, msg);
+ return icipc_socket_write (client_fd, b, s) == (ssize_t)s;
+}
diff --git a/lib/server.h b/lib/server.h
new file mode 100644
index 0000000..3cc0ed9
--- /dev/null
+++ b/lib/server.h
@@ -0,0 +1,85 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#ifndef __ICIPC_SERVER_H__
+#define __ICIPC_SERVER_H__
+
+#include <spa/pod/pod.h>
+
+#include "defs.h"
+
+#include "receiver.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define icipc_server_to_receiver(self) ((struct icipc_receiver *)(self))
+
+struct icipc_server;
+
+typedef void (*icipc_server_client_handler_func_t) (struct icipc_server *self,
+ int client_fd,
+ enum icipc_receiver_sender_state client_state,
+ void *data);
+
+typedef bool (*icipc_server_request_handler_func_t) (struct icipc_server *self,
+ int client_fd,
+ const char *name,
+ const struct spa_pod *args,
+ void *data);
+
+ICIPC_API
+struct icipc_server *
+icipc_server_new (const char *path, bool start);
+
+ICIPC_API
+void
+icipc_server_free (struct icipc_server *self);
+
+ICIPC_API
+void
+icipc_server_set_client_handler (struct icipc_server *self,
+ icipc_server_client_handler_func_t handler,
+ void *data);
+
+ICIPC_API
+void
+icipc_server_clear_client_handler (struct icipc_server *self);
+
+ICIPC_API
+bool
+icipc_server_set_request_handler (struct icipc_server *self,
+ const char *name,
+ icipc_server_request_handler_func_t handler,
+ void *data);
+
+ICIPC_API
+void
+icipc_server_clear_request_handler (struct icipc_server *self,
+ const char *name);
+
+/* for request handlers only */
+
+ICIPC_API
+bool
+icipc_server_reply_ok (struct icipc_server *self,
+ int client_fd,
+ const struct spa_pod *value);
+
+ICIPC_API
+bool
+icipc_server_reply_error (struct icipc_server *self,
+ int client_fd,
+ const char *msg);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/utils.c b/lib/utils.c
new file mode 100644
index 0000000..7c683d1
--- /dev/null
+++ b/lib/utils.c
@@ -0,0 +1,269 @@
+/* PipeWire AGL Cluster IPC
+ *
+ * Copyright © 2021 Collabora Ltd.
+ * @author Julian Bouzas <julian.bouzas@collabora.com>
+ *
+ * SPDX-License-Identifier: MIT
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <string.h>
+#include <pthread.h>
+#include <time.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "private.h"
+
+#define MAX_POLL_EVENTS 128
+#define MAX_LOG_MESSAGE 1024
+
+/* log */
+
+const char *icipc_logger_level_text[] = {
+ [ICIPC_LOG_LEVEL_ERROR] = "E",
+ [ICIPC_LOG_LEVEL_WARN] = "W",
+ [ICIPC_LOG_LEVEL_INFO] = "I",
+};
+
+struct icipc_logger {
+ enum icipc_log_level level;
+};
+
+static const struct icipc_logger *
+icipc_log_get_instance (void)
+{
+ static struct icipc_logger logger_ = { 0, };
+ static struct icipc_logger* instance_ = NULL;
+
+ if (instance_ == NULL) {
+ char * val_str = NULL;
+ enum icipc_log_level val = 0;
+
+ /* default to error */
+ logger_.level = ICIPC_LOG_LEVEL_WARN;
+
+ /* get level from env */
+ val_str = getenv ("ICIPC_DEBUG");
+ if (val_str && sscanf (val_str, "%u", &val) == 1 &&
+ val >= ICIPC_LOG_LEVEL_NONE)
+ logger_.level = val;
+
+ instance_ = &logger_;
+ }
+
+ return instance_;
+}
+
+void
+icipc_logv (enum icipc_log_level level, const char *fmt, va_list args)
+{
+ const struct icipc_logger *logger = NULL;
+
+ logger = icipc_log_get_instance ();
+ assert (logger);
+
+ if (logger->level >= level) {
+ assert (level > 0);
+ char msg[MAX_LOG_MESSAGE];
+ struct timespec time;
+ clock_gettime (CLOCK_REALTIME, &time);
+ vsnprintf (msg, MAX_LOG_MESSAGE, fmt, args);
+ fprintf (stderr, "[%s][%lu.%lu] %s\n", icipc_logger_level_text[level],
+ time.tv_sec, time.tv_sec, msg);
+ }
+}
+
+void
+icipc_log (enum icipc_log_level level, const char *fmt, ...)
+{
+ va_list args;
+ va_start (args, fmt);
+ icipc_logv (level, fmt, args);
+ va_end (args);
+}
+
+/* socket */
+
+ssize_t
+icipc_socket_write (int fd, const uint8_t *buffer, size_t size)
+{
+ size_t total_written = 0;
+ size_t n;
+
+ assert (fd >= 0);
+ assert (buffer != NULL);
+ assert (size > 0);
+
+ do {
+ n = write(fd, buffer, size);
+ if (n < size) {
+ if (errno == EINTR)
+ continue;
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ return total_written;
+ return -1;
+ }
+ total_written += n;
+ } while (total_written < size);
+
+ return total_written;
+}
+
+ssize_t
+icipc_socket_read (int fd, uint8_t **buffer, size_t *max_size)
+{
+ ssize_t n;
+ ssize_t size;
+ size_t offset = 0;
+
+ assert (buffer);
+ assert (*buffer);
+ assert (max_size);
+ assert (*max_size > 0);
+
+again:
+ size = *max_size - offset;
+ n = read (fd, *buffer + offset, size);
+ if (n == 0)
+ return 0;
+
+ /* check for errors */
+ if (n < 0) {
+ if (errno == EINTR)
+ goto again;
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ return offset;
+ return -1;
+ }
+
+ /* realloc if we need more space, and read again */
+ if (n >= size) {
+ *max_size += *max_size;
+ *buffer = reallocarray (*buffer, *max_size, sizeof (uint8_t));
+ offset += n;
+ goto again;
+ }
+
+ return offset + n;
+}
+
+/* epoll thread */
+
+bool
+icipc_epoll_thread_init (struct epoll_thread *self,
+ int socket_fd,
+ icipc_epoll_thread_event_funct_t sock_func,
+ icipc_epoll_thread_event_funct_t other_func,
+ void *data)
+{
+ struct epoll_event event;
+
+ self->socket_fd = socket_fd;
+ self->event_fd = -1;
+ self->epoll_fd = -1;
+
+ /* create event fd */
+ self->event_fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
+ if (self->event_fd == -1)
+ goto error;
+
+ /* create epoll fd */
+ self->epoll_fd = epoll_create1 (EPOLL_CLOEXEC);
+ if (self->epoll_fd == -1)
+ goto error;
+
+ /* poll socket fd */
+ event.events = EPOLLIN;
+ event.data.fd = self->socket_fd;
+ if (epoll_ctl (self->epoll_fd, EPOLL_CTL_ADD, self->socket_fd, &event) != 0)
+ goto error;
+
+ /* poll event fd */
+ event.events = EPOLLIN;
+ event.data.fd = self->event_fd;
+ if (epoll_ctl (self->epoll_fd, EPOLL_CTL_ADD, self->event_fd, &event) != 0)
+ goto error;
+
+ self->socket_event_func = sock_func;
+ self->other_event_func = other_func;
+ self->event_data = data;
+ return true;
+
+error:
+ if (self->epoll_fd != -1)
+ close (self->epoll_fd);
+ if (self->event_fd != -1)
+ close (self->event_fd);
+ return false;
+}
+
+static void *
+epoll_thread_run (void *data)
+{
+ struct epoll_thread *self = data;
+ bool exit = false;
+
+ while (!exit) {
+ /* wait for events */
+ struct epoll_event ep[MAX_POLL_EVENTS];
+ int n = epoll_wait (self->epoll_fd, ep, MAX_POLL_EVENTS, -1);
+ if (n < 0) {
+ icipc_log_error ("epoll_thread: failed to wait for event: %s",
+ strerror(errno));
+ continue;
+ }
+
+ for (int i = 0; i < n; i++) {
+ /* socket fd */
+ if (ep[i].data.fd == self->socket_fd) {
+ if (self->socket_event_func)
+ self->socket_event_func (self, ep[i].data.fd, self->event_data);
+ }
+
+ /* event fd */
+ else if (ep[i].data.fd == self->event_fd) {
+ uint64_t stop = 0;
+ ssize_t res = read (ep[i].data.fd, &stop, sizeof(uint64_t));
+ if (res == sizeof(uint64_t) && stop == 1)
+ exit = true;
+ }
+
+ /* other */
+ else {
+ if (self->other_event_func)
+ self->other_event_func (self, ep[i].data.fd, self->event_data);
+ }
+ }
+ }
+
+ return NULL;
+}
+
+bool
+icipc_epoll_thread_start (struct epoll_thread *self)
+{
+ return pthread_create (&self->thread, NULL, epoll_thread_run, self) == 0;
+}
+
+void
+icipc_epoll_thread_stop (struct epoll_thread *self)
+{
+ uint64_t value = 1;
+ ssize_t res = write (self->event_fd, &value, sizeof(uint64_t));
+ if (res == sizeof(uint64_t))
+ pthread_join (self->thread, NULL);
+}
+
+void
+icipc_epoll_thread_destroy (struct epoll_thread *self)
+{
+ close (self->epoll_fd);
+ close (self->event_fd);
+}