diff options
-rw-r--r-- | lib/client.c | 84 | ||||
-rw-r--r-- | lib/client.h | 56 | ||||
-rw-r--r-- | lib/defs.h | 26 | ||||
-rw-r--r-- | lib/icipc.h | 18 | ||||
-rw-r--r-- | lib/meson.build | 40 | ||||
-rw-r--r-- | lib/private.h | 97 | ||||
-rw-r--r-- | lib/protocol.c | 217 | ||||
-rw-r--r-- | lib/protocol.h | 87 | ||||
-rw-r--r-- | lib/receiver.c | 213 | ||||
-rw-r--r-- | lib/receiver.h | 78 | ||||
-rw-r--r-- | lib/sender.c | 251 | ||||
-rw-r--r-- | lib/sender.h | 75 | ||||
-rw-r--r-- | lib/server.c | 258 | ||||
-rw-r--r-- | lib/server.h | 85 | ||||
-rw-r--r-- | lib/utils.c | 269 | ||||
-rw-r--r-- | src/icipc-client.c | 104 | ||||
-rw-r--r-- | tests/client-server.c | 124 | ||||
-rw-r--r-- | tests/meson.build | 26 | ||||
-rw-r--r-- | tests/protocol.c | 77 | ||||
-rw-r--r-- | tests/sender-receiver.c | 286 |
20 files changed, 2471 insertions, 0 deletions
diff --git a/lib/client.c b/lib/client.c new file mode 100644 index 0000000..735796f --- /dev/null +++ b/lib/client.c @@ -0,0 +1,84 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#include "private.h" +#include "protocol.h" +#include "sender.h" +#include "client.h" + +#define BUFFER_SIZE 1024 + +static void +on_lost_connection (struct icipc_sender *self, + int receiver_fd, + void *data) +{ + icipc_log_warn ("client: lost connection with server %d", receiver_fd); +} + +/* API */ + +struct icipc_client * +icipc_client_new (const char *path, bool connect) +{ + struct icipc_sender *base; + base = icipc_sender_new (path, BUFFER_SIZE, on_lost_connection, NULL, 0); + + if (connect) + icipc_sender_connect (base); + + return (struct icipc_client *)base; +} + +void +icipc_client_free (struct icipc_client *self) +{ + struct icipc_sender *base = icipc_client_to_sender (self); + icipc_sender_free (base); +} + +bool +icipc_client_send_request (struct icipc_client *self, + const char *name, + const struct spa_pod *args, + icipc_sender_reply_func_t reply, + void *data) +{ + struct icipc_sender *base = icipc_client_to_sender (self); + + /* check params */ + if (name == NULL) + return false; + + const size_t size = icipc_protocol_calculate_request_size (name, args); + uint8_t buffer[size]; + icipc_protocol_build_request (buffer, size, name, args); + return icipc_sender_send (base, buffer, size, reply, data); +} + +const struct spa_pod * +icipc_client_send_request_finish (struct icipc_sender *self, + const uint8_t *buffer, + size_t size, + const char **error) +{ + /* error */ + if (icipc_protocol_is_reply_error (buffer, size)) { + icipc_protocol_parse_reply_error (buffer, size, error); + return NULL; + } + + /* ok */ + if (icipc_protocol_is_reply_ok (buffer, size)) { + const struct spa_pod *value = NULL; + if (icipc_protocol_parse_reply_ok (buffer, size, &value)) + return value; + } + + return NULL; +} diff --git a/lib/client.h b/lib/client.h new file mode 100644 index 0000000..7ac8e1a --- /dev/null +++ b/lib/client.h @@ -0,0 +1,56 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#ifndef __ICIPC_CLIENT_H__ +#define __ICIPC_CLIENT_H__ + +#include <spa/pod/pod.h> + +#include <stddef.h> + +#include "sender.h" +#include "defs.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define icipc_client_to_sender(self) ((struct icipc_sender *)(self)) + +struct icipc_client; + +ICIPC_API +struct icipc_client * +icipc_client_new (const char *path, bool connect); + +ICIPC_API +void +icipc_client_free (struct icipc_client *self); + +ICIPC_API +bool +icipc_client_send_request (struct icipc_client *self, + const char *name, + const struct spa_pod *args, + icipc_sender_reply_func_t reply, + void *data); + +/* for reply handlers only */ + +ICIPC_API +const struct spa_pod * +icipc_client_send_request_finish (struct icipc_sender *self, + const uint8_t *buffer, + size_t size, + const char **error); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/lib/defs.h b/lib/defs.h new file mode 100644 index 0000000..e962db7 --- /dev/null +++ b/lib/defs.h @@ -0,0 +1,26 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#ifndef __ICIPC_DEFS_H__ +#define __ICIPC_DEFS_H__ + +#if defined(__GNUC__) +# define ICIPC_API_EXPORT extern __attribute__ ((visibility ("default"))) +#else +# define ICIPC_API_EXPORT extern +#endif + +#ifndef ICIPC_API +# define ICIPC_API ICIPC_API_EXPORT +#endif + +#ifndef ICIPC_PRIVATE_API +# define ICIPC_PRIVATE_API __attribute__ ((deprecated ("Private API"))) +#endif + +#endif diff --git a/lib/icipc.h b/lib/icipc.h new file mode 100644 index 0000000..a619513 --- /dev/null +++ b/lib/icipc.h @@ -0,0 +1,18 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#ifndef __ICIPC_H__ +#define __ICIPC_H__ + +#include "protocol.h" +#include "receiver.h" +#include "sender.h" +#include "server.h" +#include "client.h" + +#endif diff --git a/lib/meson.build b/lib/meson.build new file mode 100644 index 0000000..318b304 --- /dev/null +++ b/lib/meson.build @@ -0,0 +1,40 @@ +icipc_lib_sources = files( + 'utils.c', + 'protocol.c', + 'receiver.c', + 'sender.c', + 'client.c', + 'server.c', +) + +icipc_lib_headers = files( + 'protocol.h', + 'receiver.h', + 'sender.h', + 'client.h', + 'server.h', + 'icipc.h', +) + + + +install_headers(icipc_lib_headers, + install_dir : join_paths(get_option('includedir'), 'icipc-' + wireplumber_api_version, 'icipc') +) + +icipc_lib = library('icipc-' + wireplumber_api_version, + icipc_lib_sources, + c_args : [ + '-D_GNU_SOURCE', + '-DG_LOG_USE_STRUCTURED', + '-DG_LOG_DOMAIN="icipc"', + ], + install: true, + dependencies : [dependency('threads'), spa_dep], +) + +icipc_dep = declare_dependency( + link_with: icipc_lib, + include_directories: wp_lib_include_dir, + dependencies: [spa_dep], +) diff --git a/lib/private.h b/lib/private.h new file mode 100644 index 0000000..55d87a3 --- /dev/null +++ b/lib/private.h @@ -0,0 +1,97 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#ifndef __ICIPC_PRIVATE_H__ +#define __ICIPC_PRIVATE_H__ + +#include <stdbool.h> +#include <stdint.h> +#include <pthread.h> +#include <stddef.h> +#include <sys/types.h> +#include <stdarg.h> + +#include "defs.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* log */ + +#define icipc_log_info(F, ...) \ + icipc_log(ICIPC_LOG_LEVEL_INFO, (F), ##__VA_ARGS__) +#define icipc_log_warn(F, ...) \ + icipc_log(ICIPC_LOG_LEVEL_WARN, (F), ##__VA_ARGS__) +#define icipc_log_error(F, ...) \ + icipc_log(ICIPC_LOG_LEVEL_ERROR, (F), ##__VA_ARGS__) + +enum icipc_log_level { + ICIPC_LOG_LEVEL_NONE = 0, + ICIPC_LOG_LEVEL_ERROR, + ICIPC_LOG_LEVEL_WARN, + ICIPC_LOG_LEVEL_INFO, +}; + +void +icipc_logv (enum icipc_log_level level, + const char *fmt, + va_list args) __attribute__ ((format (printf, 2, 0))); + +void +icipc_log (enum icipc_log_level level, + const char *fmt, + ...) __attribute__ ((format (printf, 2, 3))); + +/* socket */ + +ssize_t +icipc_socket_write (int fd, const uint8_t *buffer, size_t size); + +ssize_t +icipc_socket_read (int fd, uint8_t **buffer, size_t *max_size); + +/* epoll thread */ + +struct epoll_thread; + +typedef void (*icipc_epoll_thread_event_funct_t) (struct epoll_thread *self, + int fd, + void *data); + +struct epoll_thread { + int socket_fd; + int epoll_fd; + int event_fd; + pthread_t thread; + icipc_epoll_thread_event_funct_t socket_event_func; + icipc_epoll_thread_event_funct_t other_event_func; + void *event_data; +}; + +bool +icipc_epoll_thread_init (struct epoll_thread *self, + int socket_fd, + icipc_epoll_thread_event_funct_t sock_func, + icipc_epoll_thread_event_funct_t other_func, + void *data); + +bool +icipc_epoll_thread_start (struct epoll_thread *self); + +void +icipc_epoll_thread_stop (struct epoll_thread *self); + +void +icipc_epoll_thread_destroy (struct epoll_thread *self); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/lib/protocol.c b/lib/protocol.c new file mode 100644 index 0000000..6de4bf5 --- /dev/null +++ b/lib/protocol.c @@ -0,0 +1,217 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#include <assert.h> + +#include <spa/pod/builder.h> +#include <spa/pod/parser.h> + +#include "protocol.h" + +#define SIZE_PADDING 128 + +enum icipc_protocol_reply_code { + REPLY_CODE_ERROR = 0, + REPLY_CODE_OK, +}; + +static bool +is_reply (const uint8_t *buffer, size_t size, int code) +{ + const struct spa_pod *pod = (const struct spa_pod *)buffer; + struct spa_pod_parser p; + struct spa_pod_frame f; + int parsed_code = 0; + + /* check if struct */ + if (!spa_pod_is_struct (pod)) + return false; + + /* parse */ + spa_pod_parser_pod (&p, pod); + spa_pod_parser_push_struct(&p, &f); + spa_pod_parser_get_int (&p, &parsed_code); + + return parsed_code == code; +} + +/* API */ + +size_t +icipc_protocol_calculate_request_size (const char *name, + const struct spa_pod *args) +{ + assert (name); + return strlen(name) + (args ? SPA_POD_SIZE(args) : 8) + SIZE_PADDING; +} + +void +icipc_protocol_build_request (uint8_t *buffer, + size_t size, + const char *name, + const struct spa_pod *args) +{ + const struct spa_pod none = SPA_POD_INIT_None(); + struct spa_pod_builder b; + struct spa_pod_frame f; + + if (args == NULL) + args = &none; + + spa_pod_builder_init (&b, buffer, size); + spa_pod_builder_push_struct (&b, &f); + spa_pod_builder_string (&b, name); + spa_pod_builder_primitive (&b, args); + spa_pod_builder_pop(&b, &f); +} + +bool +icipc_protocol_parse_request (const uint8_t *buffer, + size_t size, + const char **name, + const struct spa_pod **args) +{ + const struct spa_pod *pod = (const struct spa_pod *)buffer; + struct spa_pod_parser p; + struct spa_pod_frame f; + const char *parsed_name = NULL; + struct spa_pod *parsed_args = NULL; + + /* check if struct */ + if (!spa_pod_is_struct (pod)) + return false; + + /* parse */ + spa_pod_parser_pod (&p, pod); + spa_pod_parser_push_struct(&p, &f); + spa_pod_parser_get_string (&p, &parsed_name); + spa_pod_parser_get_pod (&p, &parsed_args); + spa_pod_parser_pop(&p, &f); + + /* check name and args */ + if (name == NULL || args == NULL) + return false; + + if (name != NULL) + *name = parsed_name; + if (args != NULL) + *args = parsed_args; + return true; +} + +size_t +icipc_protocol_calculate_reply_ok_size (const struct spa_pod *value) +{ + return (value ? SPA_POD_SIZE(value) : 8) + SIZE_PADDING; +} + +size_t +icipc_protocol_calculate_reply_error_size (const char *msg) +{ + assert (msg); + return strlen(msg) + SIZE_PADDING; +} + +void +icipc_protocol_build_reply_ok (uint8_t *buffer, + size_t size, + const struct spa_pod *value) +{ + const struct spa_pod none = SPA_POD_INIT_None(); + struct spa_pod_builder b; + struct spa_pod_frame f; + + if (value == NULL) + value = &none; + + spa_pod_builder_init (&b, buffer, size); + spa_pod_builder_push_struct (&b, &f); + spa_pod_builder_int (&b, REPLY_CODE_OK); + spa_pod_builder_primitive (&b, value); + spa_pod_builder_pop(&b, &f); +} + +void +icipc_protocol_build_reply_error (uint8_t *buffer, + size_t size, + const char *msg) +{ + struct spa_pod_builder b; + struct spa_pod_frame f; + spa_pod_builder_init (&b, buffer, size); + spa_pod_builder_push_struct (&b, &f); + spa_pod_builder_int (&b, REPLY_CODE_ERROR); + spa_pod_builder_string (&b, msg); + spa_pod_builder_pop(&b, &f); +} + +bool +icipc_protocol_is_reply_ok (const uint8_t *buffer, size_t size) +{ + return is_reply (buffer, size, REPLY_CODE_OK); +} + +bool +icipc_protocol_is_reply_error (const uint8_t *buffer, size_t size) +{ + return is_reply (buffer, size, REPLY_CODE_ERROR); +} + +bool +icipc_protocol_parse_reply_ok (const uint8_t *buffer, + size_t size, + const struct spa_pod **value) +{ + const struct spa_pod *pod = (const struct spa_pod *)buffer; + struct spa_pod_parser p; + struct spa_pod_frame f; + int parsed_code = 0; + struct spa_pod *parsed_value = NULL; + + /* check if struct */ + if (!spa_pod_is_struct (pod)) + return false; + + /* parse */ + spa_pod_parser_pod (&p, pod); + spa_pod_parser_push_struct(&p, &f); + spa_pod_parser_get_int (&p, &parsed_code); + spa_pod_parser_get_pod (&p, &parsed_value); + spa_pod_parser_pop (&p, &f); + + if (value != NULL) + *value = parsed_value; + return true; +} + +bool +icipc_protocol_parse_reply_error (const uint8_t *buffer, + size_t size, + const char **msg) +{ + const struct spa_pod *pod = (const struct spa_pod *)buffer; + struct spa_pod_parser p; + struct spa_pod_frame f; + int parsed_code = 0; + const char *parsed_msg = NULL; + + /* check if struct */ + if (!spa_pod_is_struct (pod)) + return false; + + /* parse */ + spa_pod_parser_pod (&p, pod); + spa_pod_parser_push_struct(&p, &f); + spa_pod_parser_get_int (&p, &parsed_code); + spa_pod_parser_get_string (&p, &parsed_msg); + spa_pod_parser_pop (&p, &f); + + if (msg != NULL) + *msg = parsed_msg; + return true; +} diff --git a/lib/protocol.h b/lib/protocol.h new file mode 100644 index 0000000..730c918 --- /dev/null +++ b/lib/protocol.h @@ -0,0 +1,87 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#ifndef __ICIPC_PROTOCOL_H__ +#define __ICIPC_PROTOCOL_H__ + +#include <spa/pod/pod.h> + +#include "defs.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* request */ + +ICIPC_API +size_t +icipc_protocol_calculate_request_size (const char *name, + const struct spa_pod *args); + +ICIPC_API +void +icipc_protocol_build_request (uint8_t *buffer, + size_t size, + const char *name, + const struct spa_pod *args); + +ICIPC_API +bool +icipc_protocol_parse_request (const uint8_t *buffer, + size_t size, + const char **name, + const struct spa_pod **args); + +/* reply */ + +ICIPC_API +size_t +icipc_protocol_calculate_reply_ok_size (const struct spa_pod *value); + +ICIPC_API +size_t +icipc_protocol_calculate_reply_error_size (const char *msg); + +ICIPC_API +void +icipc_protocol_build_reply_ok (uint8_t *buffer, + size_t size, + const struct spa_pod *value); + +ICIPC_API +void +icipc_protocol_build_reply_error (uint8_t *buffer, + size_t size, + const char *msg); + +ICIPC_API +bool +icipc_protocol_is_reply_ok (const uint8_t *buffer, size_t size); + +ICIPC_API +bool +icipc_protocol_is_reply_error (const uint8_t *buffer, size_t size); + +ICIPC_API +bool +icipc_protocol_parse_reply_ok (const uint8_t *buffer, + size_t size, + const struct spa_pod **value); + +ICIPC_API +bool +icipc_protocol_parse_reply_error (const uint8_t *buffer, + size_t size, + const char **msg); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/lib/receiver.c b/lib/receiver.c new file mode 100644 index 0000000..f6c46c5 --- /dev/null +++ b/lib/receiver.c @@ -0,0 +1,213 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <sys/epoll.h> +#include <string.h> +#include <errno.h> +#include <assert.h> + +#include "private.h" +#include "receiver.h" + +#include "icipc.h" + +#define MAX_SENDERS 128 + +struct icipc_receiver { + struct sockaddr_un addr; + int socket_fd; + + uint8_t *buffer_read; + size_t buffer_size; + + struct epoll_thread epoll_thread; + bool thread_running; + + const struct icipc_receiver_events *events; + void *events_data; + + /* for subclasses */ + void *user_data; +}; + +static bool +reply_message (struct icipc_receiver *self, + int sender_fd, + uint8_t *buffer, + size_t size) +{ + return self->events && self->events->handle_message ? + self->events->handle_message (self, sender_fd, buffer, size, self->events_data) : + icipc_socket_write (sender_fd, buffer, size) == (ssize_t)size; +} + +static void +socket_event_received (struct epoll_thread *t, int fd, void *data) +{ + /* sender wants to connect, accept connection */ + struct icipc_receiver *self = data; + socklen_t addr_size = sizeof(self->addr); + int sender_fd = accept4 (fd, (struct sockaddr*)&self->addr, &addr_size, + SOCK_CLOEXEC | SOCK_NONBLOCK); + struct epoll_event event; + event.events = EPOLLIN; + event.data.fd = sender_fd; + epoll_ctl (t->epoll_fd, EPOLL_CTL_ADD, sender_fd, &event); + if (self->events && self->events->sender_state) + self->events->sender_state (self, sender_fd, + ICIPC_RECEIVER_SENDER_STATE_CONNECTED, self->events_data); +} + +static void +other_event_received (struct epoll_thread *t, int fd, void *data) +{ + struct icipc_receiver *self = data; + + /* sender sends a message, read it and reply */ + ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size); + if (size < 0) { + icipc_log_error ("receiver: could not read message: %s", strerror(errno)); + return; + } + + if (size == 0) { + /* client disconnected */ + epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL); + close (fd); + if (self->events && self->events->sender_state) + self->events->sender_state (self, fd, + ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED, self->events_data); + return; + } + + /* reply */ + if (!reply_message (self, fd, self->buffer_read, size)) + icipc_log_error ("receiver: could not reply message: %s", strerror(errno)); + + return; +} + +/* API */ + +struct icipc_receiver * +icipc_receiver_new (const char *path, + size_t buffer_size, + const struct icipc_receiver_events *events, + void *events_data, + size_t user_size) +{ + struct icipc_receiver *self; + int name_size; + + /* check params */ + if (path == NULL || buffer_size == 0) + return NULL; + + unlink (path); + + self = calloc (1, sizeof (struct icipc_receiver) + user_size); + if (self == NULL) + return NULL; + + self->socket_fd = -1; + + /* set address */ + self->addr.sun_family = AF_LOCAL; + name_size = snprintf(self->addr.sun_path, sizeof(self->addr.sun_path), "%s", + path) + 1; + if (name_size > (int) sizeof(self->addr.sun_path)) + goto error; + + /* create socket */ + self->socket_fd = + socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0); + if (self->socket_fd < 0) + goto error; + + /* bind socket */ + if (bind (self->socket_fd, (struct sockaddr *)&self->addr, + sizeof(self->addr)) != 0) + goto error; + + /* listen socket */ + if (listen (self->socket_fd, MAX_SENDERS) != 0) + goto error; + + /* alloc buffer read */ + self->buffer_size = buffer_size; + self->buffer_read = calloc (buffer_size, sizeof (uint8_t)); + if (self->buffer_read == NULL) + goto error; + + /* init epoll thread */ + if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd, + socket_event_received, other_event_received, self)) + goto error; + + self->events = events; + self->events_data = events_data; + if (user_size > 0) + self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_receiver)); + + return self; + +error: + if (self->buffer_read) + free (self->buffer_read); + if (self->socket_fd != -1) + close (self->socket_fd); + free (self); + return NULL; +} + +void +icipc_receiver_free (struct icipc_receiver *self) +{ + icipc_receiver_stop (self); + + icipc_epoll_thread_destroy (&self->epoll_thread); + free (self->buffer_read); + close (self->socket_fd); + free (self); +} + +bool +icipc_receiver_start (struct icipc_receiver *self) +{ + if (icipc_receiver_is_running (self)) + return true; + + self->thread_running = icipc_epoll_thread_start (&self->epoll_thread); + return self->thread_running; +} + +void +icipc_receiver_stop (struct icipc_receiver *self) +{ + if (icipc_receiver_is_running (self)) { + icipc_epoll_thread_stop (&self->epoll_thread); + self->thread_running = false; + } +} + +bool +icipc_receiver_is_running (struct icipc_receiver *self) +{ + return self->thread_running; +} + +void * +icipc_receiver_get_user_data (struct icipc_receiver *self) +{ + return self->user_data; +} diff --git a/lib/receiver.h b/lib/receiver.h new file mode 100644 index 0000000..cf13e1b --- /dev/null +++ b/lib/receiver.h @@ -0,0 +1,78 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#ifndef __ICIPC_RECEIVER_H__ +#define __ICIPC_RECEIVER_H__ + +#include <stdbool.h> +#include <stdint.h> +#include <stddef.h> + +#include "defs.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct icipc_receiver; + +enum icipc_receiver_sender_state { + ICIPC_RECEIVER_SENDER_STATE_CONNECTED = 0, + ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED +}; + +struct icipc_receiver_events { + /* emitted when a sender state changes */ + void (*sender_state) (struct icipc_receiver *self, + int sender_fd, + enum icipc_receiver_sender_state state, + void *data); + + /* emitted when message is received and needs to be handled */ + bool (*handle_message) (struct icipc_receiver *self, + int sender_fd, + const uint8_t *buffer, + size_t size, + void *data); +}; + +ICIPC_API +struct icipc_receiver * +icipc_receiver_new (const char *path, + size_t buffer_size, + const struct icipc_receiver_events *events, + void *events_data, + size_t user_size); + +ICIPC_API +void +icipc_receiver_free (struct icipc_receiver *self); + +ICIPC_API +bool +icipc_receiver_start (struct icipc_receiver *self); + +ICIPC_API +void +icipc_receiver_stop (struct icipc_receiver *self); + +ICIPC_API +bool +icipc_receiver_is_running (struct icipc_receiver *self); + +/* for subclasses only */ + +ICIPC_API +void * +icipc_receiver_get_user_data (struct icipc_receiver *self); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/lib/sender.c b/lib/sender.c new file mode 100644 index 0000000..7b5b0c9 --- /dev/null +++ b/lib/sender.c @@ -0,0 +1,251 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <string.h> +#include <errno.h> +#include <assert.h> + +#include "private.h" +#include "sender.h" + +#define MAX_ASYNC_TASKS 128 + +struct icipc_sender_task { + icipc_sender_reply_func_t func; + void *data; +}; + +struct icipc_sender { + struct sockaddr_un addr; + int socket_fd; + + uint8_t *buffer_read; + size_t buffer_size; + + struct epoll_thread epoll_thread; + bool is_connected; + + icipc_sender_lost_conn_func_t lost_func; + void *lost_data; + + struct icipc_sender_task async_tasks[MAX_ASYNC_TASKS]; + + /* for subclasses */ + void *user_data; +}; + +static int +push_sync_task (struct icipc_sender *self, + icipc_sender_reply_func_t func, + void *data) +{ + size_t i; + for (i = MAX_ASYNC_TASKS; i > 1; i--) { + struct icipc_sender_task *curr = self->async_tasks + i - 1; + struct icipc_sender_task *next = self->async_tasks + i - 2; + if (next->func != NULL && curr->func == NULL) { + curr->func = func; + curr->data = data; + return i - 1; + } else if (i - 2 == 0 && next->func == NULL) { + /* empty queue */ + next->func = func; + next->data = data; + return 0; + } + } + return -1; +} + +static void +pop_sync_task (struct icipc_sender *self, + bool trigger, + const uint8_t *buffer, + size_t size) +{ + size_t i; + for (i = 0; i < MAX_ASYNC_TASKS; i++) { + struct icipc_sender_task *task = self->async_tasks + i; + if (task->func != NULL) { + if (trigger) + task->func (self, buffer, size, task->data); + task->func = NULL; + return; + } + } +} + +static void +socket_event_received (struct epoll_thread *t, int fd, void *data) +{ + struct icipc_sender *self = data; + + /* receiver sends a reply, read it trigger corresponding task */ + ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size); + if (size < 0) { + icipc_log_error ("sender: could not read reply: %s", strerror(errno)); + return; + } + + if (size == 0) { + if (self->lost_func) + self->lost_func (self, fd, self->lost_data); + return; + } + + /* trigger async task */ + pop_sync_task (self, true, self->buffer_read, size); + return; +} + +/* API */ + +struct icipc_sender * +icipc_sender_new (const char *path, + size_t buffer_size, + icipc_sender_lost_conn_func_t lost_func, + void *lost_data, + size_t user_size) +{ + struct icipc_sender *self; + int name_size; + + if (path == NULL) + return NULL; + + self = calloc (1, sizeof (struct icipc_sender) + user_size); + if (self == NULL) + return NULL; + + self->socket_fd = -1; + + /* set address */ + self->addr.sun_family = AF_LOCAL; + name_size = snprintf(self->addr.sun_path, sizeof(self->addr.sun_path), "%s", + path) + 1; + if (name_size > (int) sizeof(self->addr.sun_path)) + goto error; + + /* create socket */ + self->socket_fd = + socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0); + if (self->socket_fd < 0) + goto error; + + /* alloc buffer read */ + self->buffer_size = buffer_size; + self->buffer_read = calloc (buffer_size, sizeof (uint8_t)); + if (self->buffer_read == NULL) + goto error; + + /* init epoll thread */ + if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd, + socket_event_received, NULL, self)) + goto error; + + self->lost_func = lost_func; + self->lost_data = lost_data; + if (user_size > 0) + self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_sender)); + + return self; + +error: + if (self->buffer_read) + free (self->buffer_read); + if (self->socket_fd != -1) + close (self->socket_fd); + free (self); + return NULL; +} + +void +icipc_sender_free (struct icipc_sender *self) +{ + icipc_sender_disconnect (self); + + icipc_epoll_thread_destroy (&self->epoll_thread); + free (self->buffer_read); + close (self->socket_fd); + free (self); +} + +bool +icipc_sender_connect (struct icipc_sender *self) +{ + if (icipc_sender_is_connected (self)) + return true; + + if (connect(self->socket_fd, (struct sockaddr *)&self->addr, + sizeof(self->addr)) == 0 && + icipc_epoll_thread_start (&self->epoll_thread)) { + self->is_connected = true; + return true; + } + + return false; +} + +void +icipc_sender_disconnect (struct icipc_sender *self) +{ + if (icipc_sender_is_connected (self)) { + icipc_epoll_thread_stop (&self->epoll_thread); + shutdown(self->socket_fd, SHUT_RDWR); + self->is_connected = false; + } +} + +bool +icipc_sender_is_connected (struct icipc_sender *self) +{ + return self->is_connected; +} + +bool +icipc_sender_send (struct icipc_sender *self, + const uint8_t *buffer, + size_t size, + icipc_sender_reply_func_t func, + void *data) +{ + int id = -1; + + if (buffer == NULL || size == 0) + return false; + + if (!icipc_sender_is_connected (self)) + return false; + + /* add the task in the queue */ + if (func) { + id = push_sync_task (self, func, data); + if (id == -1) + return false; + } + + /* write buffer and remove task if it fails */ + if (icipc_socket_write (self->socket_fd, buffer, size) <= 0) { + if (id != -1) + self->async_tasks[id].func = NULL; + return false; + } + + return true; +} + +void * +icipc_sender_get_user_data (struct icipc_sender *self) +{ + return self->user_data; +} diff --git a/lib/sender.h b/lib/sender.h new file mode 100644 index 0000000..88aaf3a --- /dev/null +++ b/lib/sender.h @@ -0,0 +1,75 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#ifndef __ICIPC_SENDER_H__ +#define __ICIPC_SENDER_H__ + +#include <stdbool.h> +#include <stdint.h> +#include <stddef.h> + +#include "defs.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct icipc_sender; + +typedef void (*icipc_sender_lost_conn_func_t) (struct icipc_sender *self, + int receiver_fd, + void *data); + +typedef void (*icipc_sender_reply_func_t) (struct icipc_sender *self, + const uint8_t *buffer, + size_t size, + void *data); + +ICIPC_API +struct icipc_sender * +icipc_sender_new (const char *path, + size_t buffer_size, + icipc_sender_lost_conn_func_t lost_func, + void *lost_data, + size_t user_size); + +ICIPC_API +void +icipc_sender_free (struct icipc_sender *self); + +ICIPC_API +bool +icipc_sender_connect (struct icipc_sender *self); + +ICIPC_API +void +icipc_sender_disconnect (struct icipc_sender *self); + +ICIPC_API +bool +icipc_sender_is_connected (struct icipc_sender *self); + +ICIPC_API +bool +icipc_sender_send (struct icipc_sender *self, + const uint8_t *buffer, + size_t size, + icipc_sender_reply_func_t reply, + void *data); + +/* for subclasses only */ + +ICIPC_API +void * +icipc_sender_get_user_data (struct icipc_sender *self); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/lib/server.c b/lib/server.c new file mode 100644 index 0000000..a798894 --- /dev/null +++ b/lib/server.c @@ -0,0 +1,258 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#include <pthread.h> + +#include "private.h" +#include "protocol.h" +#include "receiver.h" +#include "server.h" + +#define BUFFER_SIZE 1024 +#define MAX_REQUEST_HANDLERS 128 + +struct icipc_server_client_handler +{ + icipc_server_client_handler_func_t handler; + void *data; +}; + +struct icipc_server_request_handler +{ + const char *name; + icipc_server_request_handler_func_t handler; + void *data; +}; + +struct icipc_server_priv { + pthread_mutex_t mutex; + struct icipc_server_client_handler client_handler; + size_t n_request_handlers; + struct icipc_server_request_handler request_handlers[MAX_REQUEST_HANDLERS]; +}; + +static void +sender_state (struct icipc_receiver *base, + int sender_fd, + enum icipc_receiver_sender_state sender_state, + void *data) +{ + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + + icipc_log_info ("server: new state %d on client %d", sender_state, sender_fd); + + pthread_mutex_lock (&priv->mutex); + if (priv->client_handler.handler) + priv->client_handler.handler ((struct icipc_server *)base, sender_fd, + sender_state, priv->client_handler.data); + pthread_mutex_unlock (&priv->mutex); +} + +static bool +handle_message (struct icipc_receiver *base, + int sender_fd, + const uint8_t *buffer, + size_t size, + void *data) +{ + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + const char *name = NULL; + const struct spa_pod *args = NULL; + + icipc_log_info ("server: message from client %d received", sender_fd); + + /* parse */ + if (!icipc_protocol_parse_request (buffer, size, &name, &args)) { + const char *msg = "could not parse request"; + const size_t s = icipc_protocol_calculate_reply_error_size (msg); + uint8_t b[s]; + icipc_protocol_build_reply_error (b, s, msg); + return icipc_socket_write (sender_fd, b, s) == (ssize_t)s; + } + + /* handle */ + size_t i; + bool res = false; + pthread_mutex_lock (&priv->mutex); + + for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { + struct icipc_server_request_handler *rh = priv->request_handlers + i; + if (rh->name != NULL && strcmp (rh->name, name) == 0 && + rh->handler != NULL) { + res = rh->handler ((struct icipc_server *)base, sender_fd, name, args, + rh->data); + pthread_mutex_unlock (&priv->mutex); + return res; + } + } + + /* handler was not found, reply with error */ + res = icipc_server_reply_error ((struct icipc_server *)base, sender_fd, + "request handler not found"); + + pthread_mutex_unlock (&priv->mutex); + return res; +} + +static struct icipc_receiver_events events = { + .sender_state = sender_state, + .handle_message = handle_message, +}; + +/* API */ + +struct icipc_server * +icipc_server_new (const char *path, bool start) +{ + struct icipc_server_priv * priv = NULL; + struct icipc_receiver *base = NULL; + + base = icipc_receiver_new (path, BUFFER_SIZE, &events, NULL, + sizeof (struct icipc_server_priv)); + if (base == NULL) + return NULL; + + priv = icipc_receiver_get_user_data (base); + pthread_mutex_init (&priv->mutex, NULL); + priv->n_request_handlers = 0; + + if (start) + icipc_receiver_start (base); + + return (struct icipc_server *)base; +} + +void +icipc_server_free (struct icipc_server *self) +{ + struct icipc_receiver *base = icipc_server_to_receiver (self); + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + + pthread_mutex_destroy (&priv->mutex); + + icipc_receiver_free (base); +} + +void +icipc_server_set_client_handler (struct icipc_server *self, + icipc_server_client_handler_func_t handler, + void *data) +{ + struct icipc_receiver *base = icipc_server_to_receiver (self); + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + + pthread_mutex_lock (&priv->mutex); + priv->client_handler.handler = handler; + priv->client_handler.data = data; + pthread_mutex_unlock (&priv->mutex); +} + +void +icipc_server_clear_client_handler (struct icipc_server *self) +{ + struct icipc_receiver *base = icipc_server_to_receiver (self); + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + + pthread_mutex_lock (&priv->mutex); + priv->client_handler.handler = NULL; + priv->client_handler.data = NULL; + pthread_mutex_unlock (&priv->mutex); +} + +bool +icipc_server_set_request_handler (struct icipc_server *self, + const char *name, + icipc_server_request_handler_func_t handler, + void *data) +{ + struct icipc_receiver *base = icipc_server_to_receiver (self); + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + size_t i; + + /* check params */ + if (name == NULL) + return false; + + pthread_mutex_lock (&priv->mutex); + + /* make sure handler does not exist */ + for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { + struct icipc_server_request_handler *rh = priv->request_handlers + i; + if (rh->name != NULL && strcmp (rh->name, name) == 0) { + pthread_mutex_unlock (&priv->mutex); + return false; + } + } + + /* set handler */ + for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { + struct icipc_server_request_handler *rh = priv->request_handlers + i; + if (rh->name == NULL) { + rh->name = name; + rh->handler = handler; + rh->data = data; + pthread_mutex_unlock (&priv->mutex); + return true; + } + } + + pthread_mutex_unlock (&priv->mutex); + + return false; +} + +void +icipc_server_clear_request_handler (struct icipc_server *self, + const char *name) +{ + struct icipc_receiver *base = icipc_server_to_receiver (self); + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + size_t i; + + /* check params */ + if (name == NULL) + return; + + pthread_mutex_lock (&priv->mutex); + + /* clear handler */ + for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { + struct icipc_server_request_handler *rh = priv->request_handlers + i; + if (rh->name != NULL && strcmp (rh->name, name) == 0) { + rh->name = NULL; + break; + } + } + + pthread_mutex_unlock (&priv->mutex); +} + +bool +icipc_server_reply_ok (struct icipc_server *self, + int client_fd, + const struct spa_pod *value) +{ + const size_t s = icipc_protocol_calculate_reply_ok_size (value); + uint8_t b[s]; + icipc_protocol_build_reply_ok (b, s, value); + return icipc_socket_write (client_fd, b, s) == (ssize_t)s; +} + +bool +icipc_server_reply_error (struct icipc_server *self, + int client_fd, + const char *msg) +{ + if (msg == NULL) + return false; + + const size_t s = icipc_protocol_calculate_reply_error_size (msg); + uint8_t b[s]; + icipc_protocol_build_reply_error (b, s, msg); + return icipc_socket_write (client_fd, b, s) == (ssize_t)s; +} diff --git a/lib/server.h b/lib/server.h new file mode 100644 index 0000000..3cc0ed9 --- /dev/null +++ b/lib/server.h @@ -0,0 +1,85 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#ifndef __ICIPC_SERVER_H__ +#define __ICIPC_SERVER_H__ + +#include <spa/pod/pod.h> + +#include "defs.h" + +#include "receiver.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define icipc_server_to_receiver(self) ((struct icipc_receiver *)(self)) + +struct icipc_server; + +typedef void (*icipc_server_client_handler_func_t) (struct icipc_server *self, + int client_fd, + enum icipc_receiver_sender_state client_state, + void *data); + +typedef bool (*icipc_server_request_handler_func_t) (struct icipc_server *self, + int client_fd, + const char *name, + const struct spa_pod *args, + void *data); + +ICIPC_API +struct icipc_server * +icipc_server_new (const char *path, bool start); + +ICIPC_API +void +icipc_server_free (struct icipc_server *self); + +ICIPC_API +void +icipc_server_set_client_handler (struct icipc_server *self, + icipc_server_client_handler_func_t handler, + void *data); + +ICIPC_API +void +icipc_server_clear_client_handler (struct icipc_server *self); + +ICIPC_API +bool +icipc_server_set_request_handler (struct icipc_server *self, + const char *name, + icipc_server_request_handler_func_t handler, + void *data); + +ICIPC_API +void +icipc_server_clear_request_handler (struct icipc_server *self, + const char *name); + +/* for request handlers only */ + +ICIPC_API +bool +icipc_server_reply_ok (struct icipc_server *self, + int client_fd, + const struct spa_pod *value); + +ICIPC_API +bool +icipc_server_reply_error (struct icipc_server *self, + int client_fd, + const char *msg); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/lib/utils.c b/lib/utils.c new file mode 100644 index 0000000..7c683d1 --- /dev/null +++ b/lib/utils.c @@ -0,0 +1,269 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <sys/epoll.h> +#include <sys/eventfd.h> +#include <string.h> +#include <pthread.h> +#include <time.h> +#include <errno.h> +#include <assert.h> + +#include "private.h" + +#define MAX_POLL_EVENTS 128 +#define MAX_LOG_MESSAGE 1024 + +/* log */ + +const char *icipc_logger_level_text[] = { + [ICIPC_LOG_LEVEL_ERROR] = "E", + [ICIPC_LOG_LEVEL_WARN] = "W", + [ICIPC_LOG_LEVEL_INFO] = "I", +}; + +struct icipc_logger { + enum icipc_log_level level; +}; + +static const struct icipc_logger * +icipc_log_get_instance (void) +{ + static struct icipc_logger logger_ = { 0, }; + static struct icipc_logger* instance_ = NULL; + + if (instance_ == NULL) { + char * val_str = NULL; + enum icipc_log_level val = 0; + + /* default to error */ + logger_.level = ICIPC_LOG_LEVEL_WARN; + + /* get level from env */ + val_str = getenv ("ICIPC_DEBUG"); + if (val_str && sscanf (val_str, "%u", &val) == 1 && + val >= ICIPC_LOG_LEVEL_NONE) + logger_.level = val; + + instance_ = &logger_; + } + + return instance_; +} + +void +icipc_logv (enum icipc_log_level level, const char *fmt, va_list args) +{ + const struct icipc_logger *logger = NULL; + + logger = icipc_log_get_instance (); + assert (logger); + + if (logger->level >= level) { + assert (level > 0); + char msg[MAX_LOG_MESSAGE]; + struct timespec time; + clock_gettime (CLOCK_REALTIME, &time); + vsnprintf (msg, MAX_LOG_MESSAGE, fmt, args); + fprintf (stderr, "[%s][%lu.%lu] %s\n", icipc_logger_level_text[level], + time.tv_sec, time.tv_sec, msg); + } +} + +void +icipc_log (enum icipc_log_level level, const char *fmt, ...) +{ + va_list args; + va_start (args, fmt); + icipc_logv (level, fmt, args); + va_end (args); +} + +/* socket */ + +ssize_t +icipc_socket_write (int fd, const uint8_t *buffer, size_t size) +{ + size_t total_written = 0; + size_t n; + + assert (fd >= 0); + assert (buffer != NULL); + assert (size > 0); + + do { + n = write(fd, buffer, size); + if (n < size) { + if (errno == EINTR) + continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) + return total_written; + return -1; + } + total_written += n; + } while (total_written < size); + + return total_written; +} + +ssize_t +icipc_socket_read (int fd, uint8_t **buffer, size_t *max_size) +{ + ssize_t n; + ssize_t size; + size_t offset = 0; + + assert (buffer); + assert (*buffer); + assert (max_size); + assert (*max_size > 0); + +again: + size = *max_size - offset; + n = read (fd, *buffer + offset, size); + if (n == 0) + return 0; + + /* check for errors */ + if (n < 0) { + if (errno == EINTR) + goto again; + if (errno == EAGAIN || errno == EWOULDBLOCK) + return offset; + return -1; + } + + /* realloc if we need more space, and read again */ + if (n >= size) { + *max_size += *max_size; + *buffer = reallocarray (*buffer, *max_size, sizeof (uint8_t)); + offset += n; + goto again; + } + + return offset + n; +} + +/* epoll thread */ + +bool +icipc_epoll_thread_init (struct epoll_thread *self, + int socket_fd, + icipc_epoll_thread_event_funct_t sock_func, + icipc_epoll_thread_event_funct_t other_func, + void *data) +{ + struct epoll_event event; + + self->socket_fd = socket_fd; + self->event_fd = -1; + self->epoll_fd = -1; + + /* create event fd */ + self->event_fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); + if (self->event_fd == -1) + goto error; + + /* create epoll fd */ + self->epoll_fd = epoll_create1 (EPOLL_CLOEXEC); + if (self->epoll_fd == -1) + goto error; + + /* poll socket fd */ + event.events = EPOLLIN; + event.data.fd = self->socket_fd; + if (epoll_ctl (self->epoll_fd, EPOLL_CTL_ADD, self->socket_fd, &event) != 0) + goto error; + + /* poll event fd */ + event.events = EPOLLIN; + event.data.fd = self->event_fd; + if (epoll_ctl (self->epoll_fd, EPOLL_CTL_ADD, self->event_fd, &event) != 0) + goto error; + + self->socket_event_func = sock_func; + self->other_event_func = other_func; + self->event_data = data; + return true; + +error: + if (self->epoll_fd != -1) + close (self->epoll_fd); + if (self->event_fd != -1) + close (self->event_fd); + return false; +} + +static void * +epoll_thread_run (void *data) +{ + struct epoll_thread *self = data; + bool exit = false; + + while (!exit) { + /* wait for events */ + struct epoll_event ep[MAX_POLL_EVENTS]; + int n = epoll_wait (self->epoll_fd, ep, MAX_POLL_EVENTS, -1); + if (n < 0) { + icipc_log_error ("epoll_thread: failed to wait for event: %s", + strerror(errno)); + continue; + } + + for (int i = 0; i < n; i++) { + /* socket fd */ + if (ep[i].data.fd == self->socket_fd) { + if (self->socket_event_func) + self->socket_event_func (self, ep[i].data.fd, self->event_data); + } + + /* event fd */ + else if (ep[i].data.fd == self->event_fd) { + uint64_t stop = 0; + ssize_t res = read (ep[i].data.fd, &stop, sizeof(uint64_t)); + if (res == sizeof(uint64_t) && stop == 1) + exit = true; + } + + /* other */ + else { + if (self->other_event_func) + self->other_event_func (self, ep[i].data.fd, self->event_data); + } + } + } + + return NULL; +} + +bool +icipc_epoll_thread_start (struct epoll_thread *self) +{ + return pthread_create (&self->thread, NULL, epoll_thread_run, self) == 0; +} + +void +icipc_epoll_thread_stop (struct epoll_thread *self) +{ + uint64_t value = 1; + ssize_t res = write (self->event_fd, &value, sizeof(uint64_t)); + if (res == sizeof(uint64_t)) + pthread_join (self->thread, NULL); +} + +void +icipc_epoll_thread_destroy (struct epoll_thread *self) +{ + close (self->epoll_fd); + close (self->event_fd); +} diff --git a/src/icipc-client.c b/src/icipc-client.c new file mode 100644 index 0000000..aa71b1f --- /dev/null +++ b/src/icipc-client.c @@ -0,0 +1,104 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#include <pthread.h> +#include <assert.h> + +#include <spa/pod/builder.h> +#include <icipc/icipc.h> + +struct client_data { + struct icipc_client *c; + pthread_mutex_t mutex; + pthread_cond_t cond; + bool reply_received; +}; + +static void +reply_handler (struct icipc_sender *self, const uint8_t *buffer, size_t size, void *p) +{ + struct client_data *data = p; + const char *error = NULL; + const struct spa_pod *pod = icipc_client_send_request_finish (self, buffer, size, &error); + if (!pod) + printf ("error: %s\n", error ? error : "unknown"); + else + printf ("success!\n"); + + /* signal reply received */ + pthread_mutex_lock (&data->mutex); + data->reply_received = true; + pthread_cond_signal (&data->cond); + pthread_mutex_unlock (&data->mutex); +} + +int +main (int argc, char *argv[]) +{ + struct client_data data; + + if (argc < 2) { + printf ("usage: <server-path>\n"); + return -1; + } + + /* init */ + data.c = icipc_client_new (argv[1], true); + pthread_mutex_init (&data.mutex, NULL); + pthread_cond_init (&data.cond, NULL); + data.reply_received = false; + + while (true) { + char str[1024]; + + printf ("> "); + fgets (str, 1023, stdin); + + if (strncmp (str, "help", 4) == 0) { + printf ("help\tprints this message\n"); + printf ("quit\texits the client\n"); + printf ("send\tsends a request, usage: send <request-name> [args]\n"); + } else if (strncmp (str, "quit", 4) == 0) { + printf ("exiting...\n"); + break; + } else if (strncmp (str, "send", 4) == 0) { + char request_name[128]; + char request_args[1024]; + int n = sscanf(str, "send %s %s", request_name, request_args); + if (n <= 0) + continue; + + /* send request */ + if (n >= 2) { + /* TODO: for now we always create a string pod for args */ + struct { + struct spa_pod_string pod; + char str[1024]; + } args; + args.pod = SPA_POD_INIT_String(1024); + strncpy (args.str, request_args, 1024); + icipc_client_send_request (data.c, request_name, (const struct spa_pod *)&args, + reply_handler, &data); + } else { + icipc_client_send_request (data.c, request_name, NULL, reply_handler, &data); + } + + /* wait for reply */ + pthread_mutex_lock (&data.mutex); + while (!data.reply_received) + pthread_cond_wait (&data.cond, &data.mutex); + pthread_mutex_unlock (&data.mutex); + } + } + + /* clean up */ + pthread_cond_destroy (&data.cond); + pthread_mutex_destroy (&data.mutex); + icipc_client_free (data.c); + return 0; +} diff --git a/tests/client-server.c b/tests/client-server.c new file mode 100644 index 0000000..ca6d1c3 --- /dev/null +++ b/tests/client-server.c @@ -0,0 +1,124 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#include <glib.h> +#include <spa/pod/builder.h> +#include <spa/pod/parser.h> +#include <icipc/icipc.h> + +#define TEST_ADDRESS "/tmp/icipc-client-server" + +static bool +increment_request_handler (struct icipc_server *self, int client_fd, + const char *name, const struct spa_pod *args, void *data) +{ + int32_t val = 0; + g_assert_true (spa_pod_is_int (args)); + g_assert_true (spa_pod_get_int (args, &val) == 0); + struct spa_pod_int res = SPA_POD_INIT_Int (val + 1); + return icipc_server_reply_ok (self, client_fd, (struct spa_pod *)&res); +} + +static bool +error_request_handler (struct icipc_server *self, int client_fd, + const char *name, const struct spa_pod *args, void *data) +{ + return icipc_server_reply_error (self, client_fd, "error message"); +} + +struct reply_data { + int32_t incremented; + const char *error; + int n_replies; + GMutex mutex; +}; + +static void +wait_for_reply (struct reply_data *data, int n_replies) +{ + while (true) { + g_mutex_lock (&data->mutex); + if (data->n_replies == n_replies) { + g_mutex_unlock (&data->mutex); + break; + } + g_mutex_unlock (&data->mutex); + } +} + +static void +reply_handler (struct icipc_sender *self, const uint8_t *buffer, size_t size, void *p) +{ + struct reply_data *data = p; + g_assert_nonnull (data); + + g_mutex_lock (&data->mutex); + + const struct spa_pod *pod = icipc_client_send_request_finish (self, buffer, size, &data->error); + if (pod) { + g_assert_true (spa_pod_is_int (pod)); + g_assert_true (spa_pod_get_int (pod, &data->incremented) == 0); + } + data->n_replies++; + + g_mutex_unlock (&data->mutex); +} + +static void +test_icipc_server_client () +{ + struct icipc_server *s = icipc_server_new (TEST_ADDRESS, true); + g_assert_nonnull (s); + struct icipc_client *c = icipc_client_new (TEST_ADDRESS, true); + g_assert_nonnull (c); + struct reply_data data; + g_mutex_init (&data.mutex); + + /* add request handlers */ + g_assert_true (icipc_server_set_request_handler (s, "INCREMENT", increment_request_handler, NULL)); + g_assert_true (icipc_server_set_request_handler (s, "ERROR", error_request_handler, NULL)); + + /* send an INCREMENT request of 3, and make sure the returned value is 4 */ + data.incremented = -1; + data.error = NULL; + data.n_replies = 0; + struct spa_pod_int i = SPA_POD_INIT_Int (3); + g_assert_true (icipc_client_send_request (c, "INCREMENT", (struct spa_pod *)&i, reply_handler, &data)); + wait_for_reply (&data, 1); + g_assert_null (data.error); + g_assert_cmpint (data.incremented, ==, 4); + + /* send an ERROR request, and make sure the returned value is an error */ + data.error = NULL; + data.n_replies = 0; + g_assert_true (icipc_client_send_request (c, "ERROR", NULL, reply_handler, &data)); + wait_for_reply (&data, 1); + g_assert_cmpstr (data.error, ==, "error message"); + + /* send an unhandled request, and make sure the server replies with an error */ + data.error = NULL; + data.n_replies = 0; + g_assert_true (icipc_client_send_request (c, "UNHANDLED-REQUEST", NULL, reply_handler, &data)); + wait_for_reply (&data, 1); + g_assert_cmpstr (data.error, ==, "request handler not found"); + + /* clean up */ + g_mutex_clear (&data.mutex); + icipc_client_free (c); + icipc_server_free (s); +} + +gint +main (gint argc, gchar *argv[]) +{ + g_test_init (&argc, &argv, NULL); + + g_test_add_func ("/icipc/icipc-server-client", test_icipc_server_client); + + return g_test_run (); +} diff --git a/tests/meson.build b/tests/meson.build new file mode 100644 index 0000000..12771c0 --- /dev/null +++ b/tests/meson.build @@ -0,0 +1,26 @@ +common_deps = [icipc_dep, glib_dep] +common_env = [ + 'G_TEST_SRCDIR=@0@'.format(meson.current_source_dir()), + 'G_TEST_BUILDDIR=@0@'.format(meson.current_build_dir()), +] + +test( + 'test-icipc-sender-receiver', + executable('test-sender-receiver', 'sender-receiver.c', dependencies: common_deps), + env: common_env, + workdir : meson.current_source_dir(), +) + +test( + 'test-icipc-protocol', + executable('test-protocol', 'protocol.c', dependencies: common_deps), + env: common_env, + workdir : meson.current_source_dir(), +) + +test( + 'test-icipc-client-server', + executable('test-client-server', 'client-server.c', dependencies: common_deps), + env: common_env, + workdir : meson.current_source_dir(), +) diff --git a/tests/protocol.c b/tests/protocol.c new file mode 100644 index 0000000..5e272a1 --- /dev/null +++ b/tests/protocol.c @@ -0,0 +1,77 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#include <glib.h> +#include <spa/pod/builder.h> +#include <spa/pod/parser.h> +#include <icipc/icipc.h> + +static void +test_icipc_protocol () +{ + uint8_t b[1024]; + + /* request null value */ + { + icipc_protocol_build_request (b, sizeof(b), "name", NULL); + const char *name = NULL; + const struct spa_pod *value = NULL; + g_assert_true (icipc_protocol_parse_request (b, sizeof(b), &name, &value)); + g_assert_cmpstr (name, ==, "name"); + g_assert_true (spa_pod_is_none (value)); + } + + /* request */ + { + struct spa_pod_int i = SPA_POD_INIT_Int (8); + icipc_protocol_build_request (b, sizeof(b), "name", (struct spa_pod *)&i); + const char *name = NULL; + const struct spa_pod_int *value = NULL; + g_assert_true (icipc_protocol_parse_request (b, sizeof(b), &name, (const struct spa_pod **)&value)); + g_assert_cmpstr (name, ==, "name"); + g_assert_cmpint (value->value, ==, 8); + } + + /* reply error */ + { + icipc_protocol_build_reply_error (b, sizeof(b), "error message"); + g_assert_true (icipc_protocol_is_reply_error (b, sizeof(b))); + const char *msg = NULL; + g_assert_true (icipc_protocol_parse_reply_error (b, sizeof(b), &msg)); + g_assert_cmpstr (msg, ==, "error message"); + } + + /* reply ok null value */ + { + icipc_protocol_build_reply_ok (b, sizeof(b), NULL); + g_assert_true (icipc_protocol_is_reply_ok (b, sizeof(b))); + const struct spa_pod *value = NULL; + g_assert_true (icipc_protocol_parse_reply_ok (b, sizeof(b), &value)); + g_assert_true (spa_pod_is_none (value)); + } + + /* reply ok */ + { + struct spa_pod_int i = SPA_POD_INIT_Int (3); + icipc_protocol_build_reply_ok (b, sizeof(b), (struct spa_pod *)&i); + g_assert_true (icipc_protocol_is_reply_ok (b, sizeof(b))); + const struct spa_pod_int *value = NULL; + g_assert_true (icipc_protocol_parse_reply_ok (b, sizeof(b), (const struct spa_pod **)&value)); + g_assert_cmpint (value->value, ==, 3); + } +} + +gint +main (gint argc, gchar *argv[]) +{ + g_test_init (&argc, &argv, NULL); + + g_test_add_func ("/icipc/icipc-protocol", test_icipc_protocol); + + return g_test_run (); +} diff --git a/tests/sender-receiver.c b/tests/sender-receiver.c new file mode 100644 index 0000000..bd8721d --- /dev/null +++ b/tests/sender-receiver.c @@ -0,0 +1,286 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#include <glib.h> +#include <icipc/icipc.h> + +#define TEST_ADDRESS "/tmp/icipc-sender-receiver" + +struct event_data { + const uint8_t * expected_data; + size_t expected_size; + int connections; + int n_events; + GMutex mutex; +}; + +static void +wait_for_event (struct event_data *data, int n_events) +{ + while (true) { + g_mutex_lock (&data->mutex); + if (data->n_events == n_events) { + g_mutex_unlock (&data->mutex); + break; + } + g_mutex_unlock (&data->mutex); + } +} + +static void +sender_state_callback (struct icipc_receiver *self, int sender_fd, + enum icipc_receiver_sender_state sender_state, void *p) +{ + struct event_data *data = p; + g_assert_nonnull (data); + + g_mutex_lock (&data->mutex); + switch (sender_state) { + case ICIPC_RECEIVER_SENDER_STATE_CONNECTED: + data->connections++; + break; + case ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED: + data->connections--; + break; + default: + g_assert_not_reached (); + break; + } + data->n_events++; + g_mutex_unlock (&data->mutex); +} + +static void +reply_callback (struct icipc_sender *self, const uint8_t *buffer, size_t size, void *p) +{ + struct event_data *data = p; + g_assert_nonnull (data); + g_assert_nonnull (buffer); + + g_mutex_lock (&data->mutex); + g_assert_cmpmem (buffer, size, data->expected_data, data->expected_size); + data->n_events++; + g_mutex_unlock (&data->mutex); +} + +static void +test_icipc_receiver_basic () +{ + struct icipc_receiver *r = icipc_receiver_new (TEST_ADDRESS, 16, NULL, NULL, 0); + g_assert_nonnull (r); + + /* start and stop */ + g_assert_false (icipc_receiver_is_running (r)); + g_assert_true (icipc_receiver_start (r)); + g_assert_true (icipc_receiver_is_running (r)); + icipc_receiver_stop (r); + g_assert_false (icipc_receiver_is_running (r)); + + /* clean up */ + icipc_receiver_free (r); +} + +static void +test_icipc_sender_basic () +{ + struct icipc_sender *s = icipc_sender_new (TEST_ADDRESS, 16, NULL, NULL, 0); + g_assert_nonnull (s); + + /* clean up */ + icipc_sender_free (s); +} + +static void +test_icipc_sender_connect () +{ + static struct icipc_receiver_events events = { + .sender_state = sender_state_callback, + .handle_message = NULL, + }; + struct event_data data; + g_mutex_init (&data.mutex); + data.n_events = 0; + data.connections = 0; + struct icipc_receiver *r = icipc_receiver_new (TEST_ADDRESS, 16, &events, &data, 0); + g_assert_nonnull (r); + struct icipc_sender *s = icipc_sender_new (TEST_ADDRESS, 16, NULL, NULL, 0); + g_assert_nonnull (s); + + /* start receiver */ + g_assert_true (icipc_receiver_start (r)); + + /* connect sender */ + g_assert_true (icipc_sender_connect (s)); + g_assert_true (icipc_sender_is_connected (s)); + wait_for_event (&data, 1); + g_assert_cmpint (data.connections, ==, 1); + + /* disconnect sender */ + icipc_sender_disconnect (s); + g_assert_false (icipc_sender_is_connected (s)); + wait_for_event (&data, 2); + g_assert_cmpint (data.connections, ==, 0); + + /* stop receiver */ + icipc_receiver_stop (r); + + /* clean up */ + g_mutex_clear (&data.mutex); + icipc_sender_free (s); + icipc_receiver_free (r); +} + +static void +lost_connection_handler (struct icipc_sender *self, int receiver_fd, void *p) +{ + struct event_data *data = p; + g_assert_nonnull (data); + + g_mutex_lock (&data->mutex); + data->n_events++; + g_mutex_unlock (&data->mutex); +} + +static void +test_icipc_sender_lost_connection () +{ + struct event_data data; + g_mutex_init (&data.mutex); + struct icipc_receiver *r = icipc_receiver_new (TEST_ADDRESS, 16, NULL, NULL, 0); + g_assert_nonnull (r); + struct icipc_sender *s = icipc_sender_new (TEST_ADDRESS, 16, lost_connection_handler, &data, 0); + g_assert_nonnull (s); + + /* connect sender */ + g_assert_true (icipc_sender_connect (s)); + g_assert_true (icipc_sender_is_connected (s)); + + /* destroy receiver and make sure the lost connection handler is triggered */ + data.n_events = 0; + icipc_receiver_free (r); + wait_for_event (&data, 1); + + /* clean up */ + g_mutex_clear (&data.mutex); + icipc_sender_free (s); +} + +static void +test_icipc_sender_send () +{ + struct icipc_receiver *r = icipc_receiver_new (TEST_ADDRESS, 2, NULL, NULL, 0); + g_assert_nonnull (r); + struct icipc_sender *s = icipc_sender_new (TEST_ADDRESS, 2, NULL, NULL, 0); + g_assert_nonnull (s); + struct event_data data; + g_mutex_init (&data.mutex); + data.n_events = 0; + + /* start receiver */ + g_assert_true (icipc_receiver_start (r)); + + /* connect */ + g_assert_true (icipc_sender_connect (s)); + g_assert_true (icipc_sender_is_connected (s)); + + /* send 1 byte message (should not realloc) */ + data.n_events = 0; + data.expected_data = (const uint8_t *)"h"; + data.expected_size = 1; + g_assert_true (icipc_sender_send (s, (const uint8_t *)"h1", 1, reply_callback, &data)); + wait_for_event (&data, 1); + + /* send 2 bytes message (should realloc once to 4) */ + data.n_events = 0; + data.expected_data = (const uint8_t *)"hi"; + data.expected_size = 2; + g_assert_true (icipc_sender_send (s, (const uint8_t *)"hi", 2, reply_callback, &data)); + wait_for_event (&data, 1); + + /* send 3 bytes message (should not realloc) */ + data.n_events = 0; + data.expected_data = (const uint8_t *)"hii"; + data.expected_size = 3; + g_assert_true (icipc_sender_send (s, (const uint8_t *)"hii", 3, reply_callback, &data)); + wait_for_event (&data, 1); + + /* send 28 bytes message (should realloc 3 times: first to 8, then to 16 and finally to 32) */ + data.n_events = 0; + data.expected_data = (const uint8_t *)"bigger than 16 bytes message"; + data.expected_size = 28; + g_assert_true (icipc_sender_send (s, (const uint8_t *)"bigger than 16 bytes message", 28, reply_callback, &data)); + wait_for_event (&data, 1); + + /* don't allow empty messages */ + data.n_events = 0; + g_assert_false (icipc_sender_send (s, (const uint8_t *)"", 0, NULL, NULL)); + + /* stop receiver */ + icipc_receiver_stop (r); + + /* clean up */ + g_mutex_clear (&data.mutex); + icipc_sender_free (s); + icipc_receiver_free (r); +} + +static void +test_icipc_multiple_senders_send () +{ + struct icipc_receiver *r = icipc_receiver_new (TEST_ADDRESS, 16, NULL, NULL, 0); + g_assert_nonnull (r); + struct icipc_sender *senders[50]; + struct event_data data; + g_mutex_init (&data.mutex); + data.n_events = 0; + + /* start receiver */ + g_assert_true (icipc_receiver_start (r)); + + /* create and connect 50 senders */ + for (int i = 0; i < 50; i++) { + senders[i] = icipc_sender_new (TEST_ADDRESS, 16, NULL, NULL, 0); + g_assert_nonnull (senders[i]); + g_assert_true (icipc_sender_connect (senders[i])); + g_assert_true (icipc_sender_is_connected (senders[i])); + } + + /* send 50 messages (1 per sender) */ + data.n_events = 0; + data.expected_data = (const uint8_t *)"hello"; + data.expected_size = 5; + for (int i = 0; i < 50; i++) + g_assert_true (icipc_sender_send (senders[i], (const uint8_t *)"hello", 5, reply_callback, &data)); + wait_for_event (&data, 50); + + /* stop receiver */ + icipc_receiver_stop (r); + + /* clean up */ + g_mutex_clear (&data.mutex); + for (int i = 0; i < 50; i++) + icipc_sender_free (senders[i]); + icipc_receiver_free (r); +} + +gint +main (gint argc, gchar *argv[]) +{ + g_test_init (&argc, &argv, NULL); + + g_test_add_func ("/icipc/receiver-basic", test_icipc_receiver_basic); + g_test_add_func ("/icipc/sender-basic", test_icipc_sender_basic); + g_test_add_func ("/icipc/sender-connect", test_icipc_sender_connect); + g_test_add_func ("/icipc/sender-lost-connection", + test_icipc_sender_lost_connection); + g_test_add_func ("/icipc/sender-send", test_icipc_sender_send); + g_test_add_func ("/icipc/multiple-senders-send", + test_icipc_multiple_senders_send); + + return g_test_run (); +} |