diff options
-rw-r--r-- | .editorconfig | 13 | ||||
-rw-r--r-- | lib/client.c | 97 | ||||
-rw-r--r-- | lib/client.h | 28 | ||||
-rw-r--r-- | lib/defs.h | 4 | ||||
-rw-r--r-- | lib/private.h | 93 | ||||
-rw-r--r-- | lib/protocol.c | 337 | ||||
-rw-r--r-- | lib/protocol.h | 70 | ||||
-rw-r--r-- | lib/receiver.c | 312 | ||||
-rw-r--r-- | lib/receiver.h | 49 | ||||
-rw-r--r-- | lib/sender.c | 433 | ||||
-rw-r--r-- | lib/sender.h | 55 | ||||
-rw-r--r-- | lib/server.c | 411 | ||||
-rw-r--r-- | lib/server.h | 70 | ||||
-rw-r--r-- | lib/utils.c | 479 | ||||
-rw-r--r-- | src/icipc-client.c | 168 | ||||
-rw-r--r-- | tests/client-server.c | 212 | ||||
-rw-r--r-- | tests/protocol.c | 112 | ||||
-rw-r--r-- | tests/sender-receiver.c | 573 |
18 files changed, 1731 insertions, 1785 deletions
diff --git a/.editorconfig b/.editorconfig index 1923d41..ecd6aac 100644 --- a/.editorconfig +++ b/.editorconfig @@ -1,8 +1,15 @@ root = true [*] +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true +charset = utf-8 + +[*.{c,h}] +indent_style = space +indent_size = 8 + +[meson.build] indent_style = space indent_size = 2 -charset = utf-8 -trim_trailing_whitespace = true -insert_final_newline = true diff --git a/lib/client.c b/lib/client.c index 735796f..4186ce4 100644 --- a/lib/client.c +++ b/lib/client.c @@ -13,72 +13,65 @@ #define BUFFER_SIZE 1024 -static void -on_lost_connection (struct icipc_sender *self, - int receiver_fd, - void *data) -{ - icipc_log_warn ("client: lost connection with server %d", receiver_fd); +static void on_lost_connection( + struct icipc_sender *self, + int receiver_fd, + void *data) { + icipc_log_warn("client: lost connection with server %d", receiver_fd); } /* API */ -struct icipc_client * -icipc_client_new (const char *path, bool connect) -{ - struct icipc_sender *base; - base = icipc_sender_new (path, BUFFER_SIZE, on_lost_connection, NULL, 0); +struct icipc_client *icipc_client_new(const char *path, bool connect) { + struct icipc_sender *base; + base = icipc_sender_new(path, BUFFER_SIZE, on_lost_connection, NULL, 0); - if (connect) - icipc_sender_connect (base); + if (connect) + icipc_sender_connect(base); - return (struct icipc_client *)base; + return (struct icipc_client *)base; } -void -icipc_client_free (struct icipc_client *self) -{ - struct icipc_sender *base = icipc_client_to_sender (self); - icipc_sender_free (base); +void icipc_client_free(struct icipc_client *self) { + struct icipc_sender *base = icipc_client_to_sender(self); + icipc_sender_free(base); } -bool -icipc_client_send_request (struct icipc_client *self, - const char *name, - const struct spa_pod *args, - icipc_sender_reply_func_t reply, - void *data) -{ - struct icipc_sender *base = icipc_client_to_sender (self); +bool icipc_client_send_request( + struct icipc_client *self, + const char *name, + const struct spa_pod *args, + icipc_sender_reply_func_t reply, + void *data) { + struct icipc_sender *base = icipc_client_to_sender(self); - /* check params */ - if (name == NULL) - return false; + /* check params */ + if (name == NULL) + return false; - const size_t size = icipc_protocol_calculate_request_size (name, args); - uint8_t buffer[size]; - icipc_protocol_build_request (buffer, size, name, args); - return icipc_sender_send (base, buffer, size, reply, data); + const size_t size = icipc_protocol_calculate_request_size(name, args); + uint8_t buffer[size]; + icipc_protocol_build_request(buffer, size, name, args); + return icipc_sender_send(base, buffer, size, reply, data); } -const struct spa_pod * -icipc_client_send_request_finish (struct icipc_sender *self, - const uint8_t *buffer, - size_t size, - const char **error) -{ - /* error */ - if (icipc_protocol_is_reply_error (buffer, size)) { - icipc_protocol_parse_reply_error (buffer, size, error); - return NULL; - } +const struct spa_pod *icipc_client_send_request_finish( + struct icipc_sender *self, + const uint8_t * buffer, + size_t size, + const char **error) { + /* error */ + if (icipc_protocol_is_reply_error(buffer, size)) { + icipc_protocol_parse_reply_error(buffer, size, error); + return NULL; + } - /* ok */ - if (icipc_protocol_is_reply_ok (buffer, size)) { - const struct spa_pod *value = NULL; - if (icipc_protocol_parse_reply_ok (buffer, size, &value)) - return value; - } + /* ok */ + if (icipc_protocol_is_reply_ok(buffer, size)) { + const struct spa_pod *value = NULL; + if (icipc_protocol_parse_reply_ok(buffer, size, &value)) + return value; + } - return NULL; + return NULL; } diff --git a/lib/client.h b/lib/client.h index 7ac8e1a..597552c 100644 --- a/lib/client.h +++ b/lib/client.h @@ -25,29 +25,27 @@ extern "C" { struct icipc_client; ICIPC_API -struct icipc_client * -icipc_client_new (const char *path, bool connect); +struct icipc_client *icipc_client_new(const char *path, bool connect); ICIPC_API -void -icipc_client_free (struct icipc_client *self); +void icipc_client_free(struct icipc_client *self); ICIPC_API -bool -icipc_client_send_request (struct icipc_client *self, - const char *name, - const struct spa_pod *args, - icipc_sender_reply_func_t reply, - void *data); +bool icipc_client_send_request( + struct icipc_client *self, + const char *name, + const struct spa_pod *args, + icipc_sender_reply_func_t reply, + void *data); /* for reply handlers only */ ICIPC_API -const struct spa_pod * -icipc_client_send_request_finish (struct icipc_sender *self, - const uint8_t *buffer, - size_t size, - const char **error); +const struct spa_pod *icipc_client_send_request_finish( + struct icipc_sender *self, + const uint8_t *buffer, + size_t size, + const char **error); #ifdef __cplusplus } @@ -10,7 +10,7 @@ #define __ICIPC_DEFS_H__ #if defined(__GNUC__) -# define ICIPC_API_EXPORT extern __attribute__ ((visibility ("default"))) +# define ICIPC_API_EXPORT extern __attribute__((visibility("default"))) #else # define ICIPC_API_EXPORT extern #endif @@ -20,7 +20,7 @@ #endif #ifndef ICIPC_PRIVATE_API -# define ICIPC_PRIVATE_API __attribute__ ((deprecated ("Private API"))) +# define ICIPC_PRIVATE_API __attribute__((deprecated("Private API"))) #endif #endif diff --git a/lib/private.h b/lib/private.h index a5b1b7d..fcdb448 100644 --- a/lib/private.h +++ b/lib/private.h @@ -25,75 +25,70 @@ extern "C" { /* log */ #define icipc_log_info(F, ...) \ - icipc_log(ICIPC_LOG_LEVEL_INFO, (F), ##__VA_ARGS__) + icipc_log(ICIPC_LOG_LEVEL_INFO, (F), ##__VA_ARGS__) #define icipc_log_warn(F, ...) \ - icipc_log(ICIPC_LOG_LEVEL_WARN, (F), ##__VA_ARGS__) + icipc_log(ICIPC_LOG_LEVEL_WARN, (F), ##__VA_ARGS__) #define icipc_log_error(F, ...) \ - icipc_log(ICIPC_LOG_LEVEL_ERROR, (F), ##__VA_ARGS__) + icipc_log(ICIPC_LOG_LEVEL_ERROR, (F), ##__VA_ARGS__) -enum icipc_log_level { - ICIPC_LOG_LEVEL_NONE = 0, - ICIPC_LOG_LEVEL_ERROR, - ICIPC_LOG_LEVEL_WARN, - ICIPC_LOG_LEVEL_INFO, -}; +typedef enum LogLevel { + ICIPC_LOG_LEVEL_NONE = 0, + ICIPC_LOG_LEVEL_ERROR, + ICIPC_LOG_LEVEL_WARN, + ICIPC_LOG_LEVEL_INFO, +} LogLevel; -void -icipc_logv (enum icipc_log_level level, - const char *fmt, - va_list args) __attribute__ ((format (printf, 2, 0))); +void icipc_logv ( + LogLevel level, + const char *fmt, + va_list args) __attribute__((format(printf, 2, 0))); -void -icipc_log (enum icipc_log_level level, - const char *fmt, - ...) __attribute__ ((format (printf, 2, 3))); +void icipc_log( + LogLevel level, + const char *fmt, + ...) __attribute__((format(printf, 2, 3))); /* socket path */ -int -icipc_construct_socket_path (const char *name, char *buf, size_t buf_size); +int icipc_construct_socket_path(const char *name, char *buf, size_t buf_size); /* socket */ -ssize_t -icipc_socket_write (int fd, const uint8_t *buffer, size_t size); +ssize_t icipc_socket_write(int fd, const uint8_t *buffer, size_t size); -ssize_t -icipc_socket_read (int fd, uint8_t **buffer, size_t *max_size); +ssize_t icipc_socket_read(int fd, uint8_t **buffer, size_t *max_size); /* epoll thread */ -struct epoll_thread; - -typedef void (*icipc_epoll_thread_event_func_t) (struct epoll_thread *self, - int fd, - void *data); - -struct epoll_thread { - int socket_fd; - int epoll_fd; - int event_fd; - pthread_t thread; - icipc_epoll_thread_event_func_t socket_event_func; - icipc_epoll_thread_event_func_t other_event_func; - void *event_data; +typedef struct EpollThread EpollThread; + +typedef void (*icipc_epoll_thread_event_func_t)( + struct EpollThread *self, + int fd, + void *data); + +struct EpollThread { + int socket_fd; + int epoll_fd; + int event_fd; + pthread_t thread; + icipc_epoll_thread_event_func_t socket_event_func; + icipc_epoll_thread_event_func_t other_event_func; + void *event_data; }; -bool -icipc_epoll_thread_init (struct epoll_thread *self, - int socket_fd, - icipc_epoll_thread_event_func_t sock_func, - icipc_epoll_thread_event_func_t other_func, - void *data); +bool icipc_epoll_thread_init( + EpollThread *self, + int socket_fd, + icipc_epoll_thread_event_func_t sock_func, + icipc_epoll_thread_event_func_t other_func, + void *data); -bool -icipc_epoll_thread_start (struct epoll_thread *self); +bool icipc_epoll_thread_start(EpollThread *self); -void -icipc_epoll_thread_stop (struct epoll_thread *self); +void icipc_epoll_thread_stop(EpollThread *self); -void -icipc_epoll_thread_destroy (struct epoll_thread *self); +void icipc_epoll_thread_destroy(EpollThread *self); #ifdef __cplusplus } diff --git a/lib/protocol.c b/lib/protocol.c index 6b2bcf1..35a9485 100644 --- a/lib/protocol.c +++ b/lib/protocol.c @@ -15,210 +15,193 @@ #define SIZE_PADDING 128 -enum icipc_protocol_reply_code { - REPLY_CODE_ERROR = 0, - REPLY_CODE_OK, +enum { + REPLY_CODE_ERROR = 0, + REPLY_CODE_OK, }; -static bool -is_reply (const uint8_t *buffer, size_t size, int code) -{ - const struct spa_pod *pod = (const struct spa_pod *)buffer; - struct spa_pod_parser p; - struct spa_pod_frame f; - int parsed_code = 0; +static bool is_reply(const uint8_t *buffer, size_t size, int code) { + const struct spa_pod *pod = (const struct spa_pod *)buffer; + struct spa_pod_parser p; + struct spa_pod_frame f; + int parsed_code = 0; - /* check if struct */ - if (!spa_pod_is_struct (pod)) - return false; + /* check if struct */ + if (!spa_pod_is_struct(pod)) + return false; - /* parse */ - spa_pod_parser_pod (&p, pod); - spa_pod_parser_push_struct(&p, &f); - spa_pod_parser_get_int (&p, &parsed_code); + /* parse */ + spa_pod_parser_pod(&p, pod); + spa_pod_parser_push_struct(&p, &f); + spa_pod_parser_get_int(&p, &parsed_code); - return parsed_code == code; + return parsed_code == code; } /* API */ -size_t -icipc_protocol_calculate_request_size (const char *name, - const struct spa_pod *args) -{ - assert (name); - return strlen(name) + (args ? SPA_POD_SIZE(args) : 8) + SIZE_PADDING; +size_t icipc_protocol_calculate_request_size( + const char *name, + const struct spa_pod *args) { + assert (name); + return strlen(name) + (args ? SPA_POD_SIZE(args) : 8) + SIZE_PADDING; } -void -icipc_protocol_build_request (uint8_t *buffer, - size_t size, - const char *name, - const struct spa_pod *args) -{ - const struct spa_pod none = SPA_POD_INIT_None(); - struct spa_pod_builder b; - struct spa_pod_frame f; - - memset (buffer, 0, size); - - if (args == NULL) - args = &none; - - spa_pod_builder_init (&b, buffer, size); - spa_pod_builder_push_struct (&b, &f); - spa_pod_builder_string (&b, name); - spa_pod_builder_primitive (&b, args); - spa_pod_builder_pop(&b, &f); +void icipc_protocol_build_request( + uint8_t *buffer, + size_t size, + const char *name, + const struct spa_pod *args) { + const struct spa_pod none = SPA_POD_INIT_None(); + struct spa_pod_builder b; + struct spa_pod_frame f; + + memset(buffer, 0, size); + + if (args == NULL) + args = &none; + + spa_pod_builder_init(&b, buffer, size); + spa_pod_builder_push_struct(&b, &f); + spa_pod_builder_string(&b, name); + spa_pod_builder_primitive (&b, args); + spa_pod_builder_pop(&b, &f); } -bool -icipc_protocol_parse_request (const uint8_t *buffer, - size_t size, - const char **name, - const struct spa_pod **args) -{ - const struct spa_pod *pod = (const struct spa_pod *)buffer; - struct spa_pod_parser p; - struct spa_pod_frame f; - const char *parsed_name = NULL; - struct spa_pod *parsed_args = NULL; - - /* check if struct */ - if (!spa_pod_is_struct (pod)) - return false; - - /* parse */ - spa_pod_parser_pod (&p, pod); - spa_pod_parser_push_struct(&p, &f); - spa_pod_parser_get_string (&p, &parsed_name); - spa_pod_parser_get_pod (&p, &parsed_args); - spa_pod_parser_pop(&p, &f); - - /* check name and args */ - if (name == NULL || args == NULL) - return false; - - if (name != NULL) - *name = parsed_name; - if (args != NULL) - *args = parsed_args; - return true; +bool icipc_protocol_parse_request( + const uint8_t *buffer, + size_t size, + const char **name, + const struct spa_pod **args) { + const struct spa_pod *pod = (const struct spa_pod *)buffer; + struct spa_pod_parser p; + struct spa_pod_frame f; + const char *parsed_name = NULL; + struct spa_pod *parsed_args = NULL; + + /* check if struct */ + if (!spa_pod_is_struct(pod)) + return false; + + /* parse */ + spa_pod_parser_pod(&p, pod); + spa_pod_parser_push_struct(&p, &f); + spa_pod_parser_get_string(&p, &parsed_name); + spa_pod_parser_get_pod(&p, &parsed_args); + spa_pod_parser_pop(&p, &f); + + /* check name and args */ + if (name == NULL || args == NULL) + return false; + + if (name != NULL) + *name = parsed_name; + if (args != NULL) + *args = parsed_args; + return true; } -size_t -icipc_protocol_calculate_reply_ok_size (const struct spa_pod *value) -{ - return (value ? SPA_POD_SIZE(value) : 8) + SIZE_PADDING; +size_t icipc_protocol_calculate_reply_ok_size(const struct spa_pod *value) { + return (value ? SPA_POD_SIZE(value) : 8) + SIZE_PADDING; } -size_t -icipc_protocol_calculate_reply_error_size (const char *msg) -{ - assert (msg); - return strlen(msg) + SIZE_PADDING; +size_t icipc_protocol_calculate_reply_error_size(const char *msg) { + assert (msg); + return strlen(msg) + SIZE_PADDING; } -void -icipc_protocol_build_reply_ok (uint8_t *buffer, - size_t size, - const struct spa_pod *value) -{ - const struct spa_pod none = SPA_POD_INIT_None(); - struct spa_pod_builder b; - struct spa_pod_frame f; - - memset (buffer, 0, size); - - if (value == NULL) - value = &none; - - spa_pod_builder_init (&b, buffer, size); - spa_pod_builder_push_struct (&b, &f); - spa_pod_builder_int (&b, REPLY_CODE_OK); - spa_pod_builder_primitive (&b, value); - spa_pod_builder_pop(&b, &f); +void icipc_protocol_build_reply_ok( + uint8_t *buffer, + size_t size, + const struct spa_pod *value) { + const struct spa_pod none = SPA_POD_INIT_None(); + struct spa_pod_builder b; + struct spa_pod_frame f; + + memset(buffer, 0, size); + + if (value == NULL) + value = &none; + + spa_pod_builder_init(&b, buffer, size); + spa_pod_builder_push_struct(&b, &f); + spa_pod_builder_int(&b, REPLY_CODE_OK); + spa_pod_builder_primitive(&b, value); + spa_pod_builder_pop(&b, &f); } -void -icipc_protocol_build_reply_error (uint8_t *buffer, - size_t size, - const char *msg) -{ - struct spa_pod_builder b; - struct spa_pod_frame f; - - memset (buffer, 0, size); - - spa_pod_builder_init (&b, buffer, size); - spa_pod_builder_push_struct (&b, &f); - spa_pod_builder_int (&b, REPLY_CODE_ERROR); - spa_pod_builder_string (&b, msg); - spa_pod_builder_pop(&b, &f); +void icipc_protocol_build_reply_error( + uint8_t *buffer, + size_t size, + const char *msg) { + struct spa_pod_builder b; + struct spa_pod_frame f; + + memset(buffer, 0, size); + + spa_pod_builder_init(&b, buffer, size); + spa_pod_builder_push_struct(&b, &f); + spa_pod_builder_int(&b, REPLY_CODE_ERROR); + spa_pod_builder_string (&b, msg); + spa_pod_builder_pop(&b, &f); } -bool -icipc_protocol_is_reply_ok (const uint8_t *buffer, size_t size) -{ - return is_reply (buffer, size, REPLY_CODE_OK); +bool icipc_protocol_is_reply_ok(const uint8_t *buffer, size_t size) { + return is_reply (buffer, size, REPLY_CODE_OK); } -bool -icipc_protocol_is_reply_error (const uint8_t *buffer, size_t size) -{ - return is_reply (buffer, size, REPLY_CODE_ERROR); +bool icipc_protocol_is_reply_error(const uint8_t *buffer, size_t size) { + return is_reply (buffer, size, REPLY_CODE_ERROR); } -bool -icipc_protocol_parse_reply_ok (const uint8_t *buffer, - size_t size, - const struct spa_pod **value) -{ - const struct spa_pod *pod = (const struct spa_pod *)buffer; - struct spa_pod_parser p; - struct spa_pod_frame f; - int parsed_code = 0; - struct spa_pod *parsed_value = NULL; - - /* check if struct */ - if (!spa_pod_is_struct (pod)) - return false; - - /* parse */ - spa_pod_parser_pod (&p, pod); - spa_pod_parser_push_struct(&p, &f); - spa_pod_parser_get_int (&p, &parsed_code); - spa_pod_parser_get_pod (&p, &parsed_value); - spa_pod_parser_pop (&p, &f); - - if (value != NULL) - *value = parsed_value; - return true; +bool icipc_protocol_parse_reply_ok ( + const uint8_t *buffer, + size_t size, + const struct spa_pod **value) { + const struct spa_pod *pod = (const struct spa_pod *)buffer; + struct spa_pod_parser p; + struct spa_pod_frame f; + int parsed_code = 0; + struct spa_pod *parsed_value = NULL; + + /* check if struct */ + if (!spa_pod_is_struct(pod)) + return false; + + /* parse */ + spa_pod_parser_pod(&p, pod); + spa_pod_parser_push_struct(&p, &f); + spa_pod_parser_get_int(&p, &parsed_code); + spa_pod_parser_get_pod(&p, &parsed_value); + spa_pod_parser_pop(&p, &f); + + if (value != NULL) + *value = parsed_value; + return true; } -bool -icipc_protocol_parse_reply_error (const uint8_t *buffer, - size_t size, - const char **msg) -{ - const struct spa_pod *pod = (const struct spa_pod *)buffer; - struct spa_pod_parser p; - struct spa_pod_frame f; - int parsed_code = 0; - const char *parsed_msg = NULL; - - /* check if struct */ - if (!spa_pod_is_struct (pod)) - return false; - - /* parse */ - spa_pod_parser_pod (&p, pod); - spa_pod_parser_push_struct(&p, &f); - spa_pod_parser_get_int (&p, &parsed_code); - spa_pod_parser_get_string (&p, &parsed_msg); - spa_pod_parser_pop (&p, &f); - - if (msg != NULL) - *msg = parsed_msg; - return true; +bool icipc_protocol_parse_reply_error ( + const uint8_t *buffer, + size_t size, + const char **msg) { + const struct spa_pod *pod = (const struct spa_pod *)buffer; + struct spa_pod_parser p; + struct spa_pod_frame f; + int parsed_code = 0; + const char *parsed_msg = NULL; + + /* check if struct */ + if (!spa_pod_is_struct(pod)) + return false; + + /* parse */ + spa_pod_parser_pod(&p, pod); + spa_pod_parser_push_struct(&p, &f); + spa_pod_parser_get_int(&p, &parsed_code); + spa_pod_parser_get_string(&p, &parsed_msg); + spa_pod_parser_pop(&p, &f); + + if (msg != NULL) + *msg = parsed_msg; + return true; } diff --git a/lib/protocol.h b/lib/protocol.h index 730c918..34a92a0 100644 --- a/lib/protocol.h +++ b/lib/protocol.h @@ -20,65 +20,61 @@ extern "C" { /* request */ ICIPC_API -size_t -icipc_protocol_calculate_request_size (const char *name, - const struct spa_pod *args); +size_t icipc_protocol_calculate_request_size( + const char *name, + const struct spa_pod *args); ICIPC_API -void -icipc_protocol_build_request (uint8_t *buffer, - size_t size, - const char *name, - const struct spa_pod *args); +void icipc_protocol_build_request( + uint8_t *buffer, + size_t size, + const char *name, + const struct spa_pod *args); ICIPC_API -bool -icipc_protocol_parse_request (const uint8_t *buffer, - size_t size, - const char **name, - const struct spa_pod **args); +bool icipc_protocol_parse_request( + const uint8_t *buffer, + size_t size, + const char **name, + const struct spa_pod **args); /* reply */ ICIPC_API -size_t -icipc_protocol_calculate_reply_ok_size (const struct spa_pod *value); +size_t icipc_protocol_calculate_reply_ok_size(const struct spa_pod *value); ICIPC_API -size_t -icipc_protocol_calculate_reply_error_size (const char *msg); +size_t icipc_protocol_calculate_reply_error_size(const char *msg); ICIPC_API -void -icipc_protocol_build_reply_ok (uint8_t *buffer, - size_t size, - const struct spa_pod *value); +void icipc_protocol_build_reply_ok( + uint8_t *buffer, + size_t size, + const struct spa_pod *value); ICIPC_API -void -icipc_protocol_build_reply_error (uint8_t *buffer, - size_t size, - const char *msg); +void icipc_protocol_build_reply_error( + uint8_t *buffer, + size_t size, + const char *msg); ICIPC_API -bool -icipc_protocol_is_reply_ok (const uint8_t *buffer, size_t size); +bool icipc_protocol_is_reply_ok(const uint8_t *buffer, size_t size); ICIPC_API -bool -icipc_protocol_is_reply_error (const uint8_t *buffer, size_t size); +bool icipc_protocol_is_reply_error(const uint8_t *buffer, size_t size); ICIPC_API -bool -icipc_protocol_parse_reply_ok (const uint8_t *buffer, - size_t size, - const struct spa_pod **value); +bool icipc_protocol_parse_reply_ok( + const uint8_t *buffer, + size_t size, + const struct spa_pod **value); ICIPC_API -bool -icipc_protocol_parse_reply_error (const uint8_t *buffer, - size_t size, - const char **msg); +bool icipc_protocol_parse_reply_error( + const uint8_t *buffer, + size_t size, + const char **msg); #ifdef __cplusplus } diff --git a/lib/receiver.c b/lib/receiver.c index ba7edda..9c21063 100644 --- a/lib/receiver.c +++ b/lib/receiver.c @@ -1,4 +1,4 @@ -/* PipeWire AGL Cluster IPC +/* PipeWire AGL Cluster IPC * * Copyright © 2021 Collabora Ltd. * @author Julian Bouzas <julian.bouzas@collabora.com> @@ -15,196 +15,188 @@ #include <string.h> #include <errno.h> #include <assert.h> - #include "private.h" #include "receiver.h" - #include "icipc.h" #define MAX_SENDERS 128 struct icipc_receiver { - struct sockaddr_un addr; - int socket_fd; + struct sockaddr_un addr; + int socket_fd; - uint8_t *buffer_read; - size_t buffer_size; + uint8_t *buffer_read; + size_t buffer_size; - struct epoll_thread epoll_thread; - bool thread_running; + EpollThread epoll_thread; + bool thread_running; - const struct icipc_receiver_events *events; - void *events_data; + const struct icipc_receiver_events *events; + void *events_data; - /* for subclasses */ - void *user_data; + /* for subclasses */ + void *user_data; }; -static bool -reply_message (struct icipc_receiver *self, - int sender_fd, - uint8_t *buffer, - size_t size) -{ - return self->events && self->events->handle_message ? - self->events->handle_message (self, sender_fd, buffer, size, self->events_data) : - icipc_socket_write (sender_fd, buffer, size) == (ssize_t)size; +static bool reply_message( + struct icipc_receiver *self, + int sender_fd, + uint8_t * buffer, + size_t size) { + return self->events && self->events->handle_message ? + self->events->handle_message(self, sender_fd, buffer, size, + self->events_data) : + icipc_socket_write(sender_fd, buffer, size) == (ssize_t) size; } -static void -socket_event_received (struct epoll_thread *t, int fd, void *data) -{ - /* sender wants to connect, accept connection */ - struct icipc_receiver *self = data; - socklen_t addr_size = sizeof(self->addr); - int sender_fd = accept4 (fd, (struct sockaddr*)&self->addr, &addr_size, - SOCK_CLOEXEC | SOCK_NONBLOCK); - struct epoll_event event; - event.events = EPOLLIN; - event.data.fd = sender_fd; - epoll_ctl (t->epoll_fd, EPOLL_CTL_ADD, sender_fd, &event); - if (self->events && self->events->sender_state) - self->events->sender_state (self, sender_fd, - ICIPC_RECEIVER_SENDER_STATE_CONNECTED, self->events_data); +static void socket_event_received(EpollThread *t, int fd, void *data) { + /* sender wants to connect, accept connection */ + struct icipc_receiver *self = data; + socklen_t addr_size = sizeof(self->addr); + int sender_fd = accept4(fd, (struct sockaddr *)&self->addr, &addr_size, + SOCK_CLOEXEC | SOCK_NONBLOCK); + struct epoll_event event; + event.events = EPOLLIN; + event.data.fd = sender_fd; + epoll_ctl(t->epoll_fd, EPOLL_CTL_ADD, sender_fd, &event); + if (self->events && self->events->sender_state) + self->events->sender_state(self, sender_fd, + ICIPC_RECEIVER_SENDER_STATE_CONNECTED, + self->events_data); } -static void -other_event_received (struct epoll_thread *t, int fd, void *data) -{ - struct icipc_receiver *self = data; - - /* sender sends a message, read it and reply */ - ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size); - if (size <= 0) { - if (size < 0) - icipc_log_error ("receiver: could not read message: %s", strerror(errno)); - /* client disconnected */ - epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL); - close (fd); - if (self->events && self->events->sender_state) - self->events->sender_state (self, fd, - ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED, self->events_data); - return; - } - - /* reply */ - if (!reply_message (self, fd, self->buffer_read, size)) - icipc_log_error ("receiver: could not reply message: %s", strerror(errno)); - - return; +static void other_event_received(EpollThread *t, int fd, void *data) { + struct icipc_receiver *self = data; + + /* sender sends a message, read it and reply */ + ssize_t size = + icipc_socket_read(fd, &self->buffer_read, &self->buffer_size); + if (size <= 0) { + if (size < 0) + icipc_log_error("receiver: could not read message: %s", + strerror(errno)); + /* client disconnected */ + epoll_ctl(t->epoll_fd, EPOLL_CTL_DEL, fd, NULL); + close(fd); + if (self->events && self->events->sender_state) + self->events->sender_state(self, fd, + ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED, + self->events_data); + return; + } + + /* reply */ + if (!reply_message(self, fd, self->buffer_read, size)) + icipc_log_error("receiver: could not reply message: %s", + strerror(errno)); + + return; } /* API */ -struct icipc_receiver * -icipc_receiver_new (const char *path, - size_t buffer_size, - const struct icipc_receiver_events *events, - void *events_data, - size_t user_size) -{ - struct icipc_receiver *self; - int res; - - /* check params */ - if (path == NULL || buffer_size == 0) - return NULL; - - self = calloc (1, sizeof (struct icipc_receiver) + user_size); - if (self == NULL) - return NULL; - - self->socket_fd = -1; - - /* set address */ - self->addr.sun_family = AF_LOCAL; - res = icipc_construct_socket_path (path, self->addr.sun_path, sizeof(self->addr.sun_path)); - if (res < 0) - goto error; - - unlink (self->addr.sun_path); - - /* create socket */ - self->socket_fd = - socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0); - if (self->socket_fd < 0) - goto error; - - /* bind socket */ - if (bind (self->socket_fd, (struct sockaddr *)&self->addr, - sizeof(self->addr)) != 0) - goto error; - - /* listen socket */ - if (listen (self->socket_fd, MAX_SENDERS) != 0) - goto error; - - /* alloc buffer read */ - self->buffer_size = buffer_size; - self->buffer_read = calloc (buffer_size, sizeof (uint8_t)); - if (self->buffer_read == NULL) - goto error; - - /* init epoll thread */ - if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd, - socket_event_received, other_event_received, self)) - goto error; - - self->events = events; - self->events_data = events_data; - if (user_size > 0) - self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_receiver)); - - return self; - -error: - if (self->buffer_read) - free (self->buffer_read); - if (self->socket_fd != -1) - close (self->socket_fd); - free (self); - return NULL; +struct icipc_receiver *icipc_receiver_new( + const char *path, + size_t buffer_size, + const struct icipc_receiver_events *events, + void *events_data, + size_t user_size) { + struct icipc_receiver *self; + int res; + + /* check params */ + if (path == NULL || buffer_size == 0) + return NULL; + + self = calloc(1, sizeof(struct icipc_receiver) + user_size); + if (self == NULL) + return NULL; + + self->socket_fd = -1; + + /* set address */ + self->addr.sun_family = AF_LOCAL; + res = + icipc_construct_socket_path(path, self->addr.sun_path, + sizeof(self->addr.sun_path)); + if (res < 0) + goto error; + + unlink(self->addr.sun_path); + + /* create socket */ + self->socket_fd = + socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0); + if (self->socket_fd < 0) + goto error; + + /* bind socket */ + if (bind(self->socket_fd, (struct sockaddr *)&self->addr, + sizeof(self->addr)) != 0) + goto error; + + /* listen socket */ + if (listen(self->socket_fd, MAX_SENDERS) != 0) + goto error; + + /* alloc buffer read */ + self->buffer_size = buffer_size; + self->buffer_read = calloc(buffer_size, sizeof(uint8_t)); + if (self->buffer_read == NULL) + goto error; + + /* init epoll thread */ + if (!icipc_epoll_thread_init(&self->epoll_thread, self->socket_fd, + socket_event_received, + other_event_received, self)) + goto error; + + self->events = events; + self->events_data = events_data; + if (user_size > 0) + self->user_data = + (void *)((uint8_t *) self + sizeof(struct icipc_receiver)); + + return self; + + error: + if (self->buffer_read) + free(self->buffer_read); + if (self->socket_fd != -1) + close(self->socket_fd); + free(self); + return NULL; } -void -icipc_receiver_free (struct icipc_receiver *self) -{ - icipc_receiver_stop (self); +void icipc_receiver_free(struct icipc_receiver *self) { + icipc_receiver_stop(self); - icipc_epoll_thread_destroy (&self->epoll_thread); - free (self->buffer_read); - close (self->socket_fd); - unlink (self->addr.sun_path); - free (self); + icipc_epoll_thread_destroy(&self->epoll_thread); + free(self->buffer_read); + close(self->socket_fd); + unlink(self->addr.sun_path); + free(self); } -bool -icipc_receiver_start (struct icipc_receiver *self) -{ - if (icipc_receiver_is_running (self)) - return true; +bool icipc_receiver_start(struct icipc_receiver *self) { + if (icipc_receiver_is_running(self)) + return true; - self->thread_running = icipc_epoll_thread_start (&self->epoll_thread); - return self->thread_running; + self->thread_running = icipc_epoll_thread_start(&self->epoll_thread); + return self->thread_running; } -void -icipc_receiver_stop (struct icipc_receiver *self) -{ - if (icipc_receiver_is_running (self)) { - icipc_epoll_thread_stop (&self->epoll_thread); - self->thread_running = false; - } +void icipc_receiver_stop(struct icipc_receiver *self) { + if (icipc_receiver_is_running(self)) { + icipc_epoll_thread_stop(&self->epoll_thread); + self->thread_running = false; + } } -bool -icipc_receiver_is_running (struct icipc_receiver *self) -{ - return self->thread_running; +bool icipc_receiver_is_running(struct icipc_receiver *self) { + return self->thread_running; } -void * -icipc_receiver_get_user_data (struct icipc_receiver *self) -{ - return self->user_data; +void *icipc_receiver_get_user_data(struct icipc_receiver *self) { + return self->user_data; } diff --git a/lib/receiver.h b/lib/receiver.h index cf13e1b..b14da35 100644 --- a/lib/receiver.h +++ b/lib/receiver.h @@ -22,54 +22,51 @@ extern "C" { struct icipc_receiver; enum icipc_receiver_sender_state { - ICIPC_RECEIVER_SENDER_STATE_CONNECTED = 0, - ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED + ICIPC_RECEIVER_SENDER_STATE_CONNECTED = 0, + ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED }; struct icipc_receiver_events { - /* emitted when a sender state changes */ - void (*sender_state) (struct icipc_receiver *self, + /* emitted when a sender state changes */ + void (*sender_state)( + struct icipc_receiver *self, int sender_fd, enum icipc_receiver_sender_state state, void *data); - /* emitted when message is received and needs to be handled */ - bool (*handle_message) (struct icipc_receiver *self, - int sender_fd, - const uint8_t *buffer, - size_t size, - void *data); + /* emitted when message is received and needs to be handled */ + bool (*handle_message)( + struct icipc_receiver *self, + int sender_fd, + const uint8_t *buffer, + size_t size, + void *data); }; ICIPC_API -struct icipc_receiver * -icipc_receiver_new (const char *path, - size_t buffer_size, - const struct icipc_receiver_events *events, - void *events_data, - size_t user_size); +struct icipc_receiver *icipc_receiver_new( + const char *path, + size_t buffer_size, + const struct icipc_receiver_events *events, + void *events_data, + size_t user_size); ICIPC_API -void -icipc_receiver_free (struct icipc_receiver *self); +void icipc_receiver_free(struct icipc_receiver *self); ICIPC_API -bool -icipc_receiver_start (struct icipc_receiver *self); +bool icipc_receiver_start(struct icipc_receiver *self); ICIPC_API -void -icipc_receiver_stop (struct icipc_receiver *self); +void icipc_receiver_stop(struct icipc_receiver *self); ICIPC_API -bool -icipc_receiver_is_running (struct icipc_receiver *self); +bool icipc_receiver_is_running(struct icipc_receiver *self); /* for subclasses only */ ICIPC_API -void * -icipc_receiver_get_user_data (struct icipc_receiver *self); +void *icipc_receiver_get_user_data(struct icipc_receiver *self); #ifdef __cplusplus } diff --git a/lib/sender.c b/lib/sender.c index e795bc4..fa2882e 100644 --- a/lib/sender.c +++ b/lib/sender.c @@ -21,256 +21,247 @@ #define MAX_ASYNC_TASKS 128 -struct icipc_sender_task { - icipc_sender_reply_func_t func; - void *data; -}; +typedef struct SenderTask { + icipc_sender_reply_func_t func; + void *data; +} SenderTask; struct icipc_sender { - struct sockaddr_un addr; - int socket_fd; + struct sockaddr_un addr; + int socket_fd; - uint8_t *buffer_read; - size_t buffer_size; + uint8_t *buffer_read; + size_t buffer_size; - struct epoll_thread epoll_thread; - bool is_connected; + EpollThread epoll_thread; + bool is_connected; - icipc_sender_lost_conn_func_t lost_func; - void *lost_data; - bool lost_connection; + icipc_sender_lost_conn_func_t lost_func; + void *lost_data; + bool lost_connection; - struct icipc_sender_task async_tasks[MAX_ASYNC_TASKS]; + SenderTask async_tasks[MAX_ASYNC_TASKS]; - /* for subclasses */ - void *user_data; + /* for subclasses */ + void *user_data; }; -static int -push_sync_task (struct icipc_sender *self, +static int push_sync_task( + struct icipc_sender *self, icipc_sender_reply_func_t func, - void *data) -{ - size_t i; - for (i = MAX_ASYNC_TASKS; i > 1; i--) { - struct icipc_sender_task *curr = self->async_tasks + i - 1; - struct icipc_sender_task *next = self->async_tasks + i - 2; - if (next->func != NULL && curr->func == NULL) { - curr->func = func; - curr->data = data; - return i - 1; - } else if (i - 2 == 0 && next->func == NULL) { - /* empty queue */ - next->func = func; - next->data = data; - return 0; - } - } - return -1; + void *data) { + size_t i; + for (i = MAX_ASYNC_TASKS; i > 1; i--) { + SenderTask *curr = self->async_tasks + i - 1; + SenderTask *next = self->async_tasks + i - 2; + if (next->func != NULL && curr->func == NULL) { + curr->func = func; + curr->data = data; + return i - 1; + } else if (i - 2 == 0 && next->func == NULL) { + /* empty queue */ + next->func = func; + next->data = data; + return 0; + } + } + return -1; } -static void -pop_sync_task (struct icipc_sender *self, - bool trigger, - bool all, - const uint8_t *buffer, - size_t size) -{ - size_t i; - for (i = 0; i < MAX_ASYNC_TASKS; i++) { - struct icipc_sender_task *task = self->async_tasks + i; - if (task->func != NULL) { - if (trigger) - task->func (self, buffer, size, task->data); - task->func = NULL; - if (!all) - return; - } - } +static void pop_sync_task( + struct icipc_sender *self, + bool trigger, + bool all, + const uint8_t * buffer, + size_t size) { + size_t i; + for (i = 0; i < MAX_ASYNC_TASKS; i++) { + SenderTask *task = self->async_tasks + i; + if (task->func != NULL) { + if (trigger) + task->func(self, buffer, size, task->data); + task->func = NULL; + if (!all) + return; + } + } } -static void -socket_event_received (struct epoll_thread *t, int fd, void *data) -{ - struct icipc_sender *self = data; - - /* receiver sends a reply, read it trigger corresponding task */ - ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size); - if (size <= 0) { - if (size < 0) - icipc_log_error ("sender: could not read reply: %s", strerror(errno)); - /* receiver disconnected */ - epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL); - shutdown(self->socket_fd, SHUT_RDWR); - self->is_connected = false; - self->lost_connection = true; - if (self->lost_func) - self->lost_func (self, fd, self->lost_data); - /* clear queue */ - pop_sync_task (self, true, true, NULL, 0); - return; - } - - /* trigger async task */ - pop_sync_task (self, true, false, self->buffer_read, size); - return; +static void socket_event_received(EpollThread *t, int fd, void *data) { + struct icipc_sender *self = data; + + /* receiver sends a reply, read it trigger corresponding task */ + ssize_t size = + icipc_socket_read(fd, &self->buffer_read, &self->buffer_size); + if (size <= 0) { + if (size < 0) + icipc_log_error("sender: could not read reply: %s", + strerror(errno)); + /* receiver disconnected */ + epoll_ctl(t->epoll_fd, EPOLL_CTL_DEL, fd, NULL); + shutdown(self->socket_fd, SHUT_RDWR); + self->is_connected = false; + self->lost_connection = true; + if (self->lost_func) + self->lost_func(self, fd, self->lost_data); + /* clear queue */ + pop_sync_task(self, true, true, NULL, 0); + return; + } + + /* trigger async task */ + pop_sync_task(self, true, false, self->buffer_read, size); + return; } /* API */ -struct icipc_sender * -icipc_sender_new (const char *path, - size_t buffer_size, - icipc_sender_lost_conn_func_t lost_func, - void *lost_data, - size_t user_size) -{ - struct icipc_sender *self; - int res; - - if (path == NULL) - return NULL; - - self = calloc (1, sizeof (struct icipc_sender) + user_size); - if (self == NULL) - return NULL; - - self->socket_fd = -1; - - /* set address */ - self->addr.sun_family = AF_LOCAL; - res = icipc_construct_socket_path (path, self->addr.sun_path, sizeof(self->addr.sun_path)); - if (res < 0) - goto error; - - /* create socket */ - self->socket_fd = - socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0); - if (self->socket_fd < 0) - goto error; - - /* alloc buffer read */ - self->buffer_size = buffer_size; - self->buffer_read = calloc (buffer_size, sizeof (uint8_t)); - if (self->buffer_read == NULL) - goto error; - - /* init epoll thread */ - if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd, - socket_event_received, NULL, self)) - goto error; - - self->lost_func = lost_func; - self->lost_data = lost_data; - self->lost_connection = false; - if (user_size > 0) - self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_sender)); - - return self; - -error: - if (self->buffer_read) - free (self->buffer_read); - if (self->socket_fd != -1) - close (self->socket_fd); - free (self); - return NULL; +struct icipc_sender *icipc_sender_new( + const char *path, + size_t buffer_size, + icipc_sender_lost_conn_func_t lost_func, + void *lost_data, + size_t user_size) { + struct icipc_sender *self; + int res; + + if (path == NULL) + return NULL; + + self = calloc(1, sizeof(struct icipc_sender) + user_size); + if (self == NULL) + return NULL; + + self->socket_fd = -1; + + /* set address */ + self->addr.sun_family = AF_LOCAL; + res = + icipc_construct_socket_path(path, self->addr.sun_path, + sizeof(self->addr.sun_path)); + if (res < 0) + goto error; + + /* create socket */ + self->socket_fd = + socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0); + if (self->socket_fd < 0) + goto error; + + /* alloc buffer read */ + self->buffer_size = buffer_size; + self->buffer_read = calloc(buffer_size, sizeof(uint8_t)); + if (self->buffer_read == NULL) + goto error; + + /* init epoll thread */ + if (!icipc_epoll_thread_init(&self->epoll_thread, self->socket_fd, + socket_event_received, NULL, self)) + goto error; + + self->lost_func = lost_func; + self->lost_data = lost_data; + self->lost_connection = false; + if (user_size > 0) + self->user_data = + (void *)((uint8_t *) self + sizeof(struct icipc_sender)); + + return self; + + error: + if (self->buffer_read) + free(self->buffer_read); + if (self->socket_fd != -1) + close(self->socket_fd); + free(self); + return NULL; } -void -icipc_sender_free (struct icipc_sender *self) -{ - icipc_sender_disconnect (self); +void icipc_sender_free(struct icipc_sender *self) { + icipc_sender_disconnect(self); - icipc_epoll_thread_destroy (&self->epoll_thread); - free (self->buffer_read); - close (self->socket_fd); - free (self); + icipc_epoll_thread_destroy(&self->epoll_thread); + free(self->buffer_read); + close(self->socket_fd); + free(self); } -bool -icipc_sender_connect (struct icipc_sender *self) -{ - if (icipc_sender_is_connected (self)) - return true; - - /* if connection was lost, re-init epoll thread with new socket */ - if (self->lost_connection) { - icipc_epoll_thread_stop (&self->epoll_thread); - icipc_epoll_thread_destroy (&self->epoll_thread); - self->socket_fd = - socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0); - if (self->socket_fd < 0) - return false; - if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd, - socket_event_received, NULL, self)) { - close (self->socket_fd); - return false; - } - self->lost_connection = false; - } - - /* connect */ - if (connect(self->socket_fd, (struct sockaddr *)&self->addr, - sizeof(self->addr)) == 0 && - icipc_epoll_thread_start (&self->epoll_thread)) { - self->is_connected = true; - return true; - } - - return false; +bool icipc_sender_connect(struct icipc_sender *self) { + if (icipc_sender_is_connected(self)) + return true; + + /* if connection was lost, re-init epoll thread with new socket */ + if (self->lost_connection) { + icipc_epoll_thread_stop(&self->epoll_thread); + icipc_epoll_thread_destroy(&self->epoll_thread); + self->socket_fd = + socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, + 0); + if (self->socket_fd < 0) + return false; + if (!icipc_epoll_thread_init + (&self->epoll_thread, self->socket_fd, + socket_event_received, NULL, self)) { + close(self->socket_fd); + return false; + } + self->lost_connection = false; + } + + /* connect */ + if (connect(self->socket_fd, (struct sockaddr *)&self->addr, + sizeof(self->addr)) == 0 && + icipc_epoll_thread_start(&self->epoll_thread)) { + self->is_connected = true; + return true; + } + + return false; } -void -icipc_sender_disconnect (struct icipc_sender *self) -{ - if (icipc_sender_is_connected (self)) { - icipc_epoll_thread_stop (&self->epoll_thread); - shutdown(self->socket_fd, SHUT_RDWR); - self->is_connected = false; - } +void icipc_sender_disconnect(struct icipc_sender *self) { + if (icipc_sender_is_connected(self)) { + icipc_epoll_thread_stop(&self->epoll_thread); + shutdown(self->socket_fd, SHUT_RDWR); + self->is_connected = false; + } } -bool -icipc_sender_is_connected (struct icipc_sender *self) -{ - return self->is_connected; +bool icipc_sender_is_connected(struct icipc_sender *self) { + return self->is_connected; } -bool -icipc_sender_send (struct icipc_sender *self, - const uint8_t *buffer, - size_t size, - icipc_sender_reply_func_t func, - void *data) -{ - int id = -1; - - if (buffer == NULL || size == 0) - return false; - - if (!icipc_sender_is_connected (self)) - return false; - - /* add the task in the queue */ - if (func) { - id = push_sync_task (self, func, data); - if (id == -1) - return false; - } - - /* write buffer and remove task if it fails */ - if (icipc_socket_write (self->socket_fd, buffer, size) <= 0) { - if (id != -1) - self->async_tasks[id].func = NULL; - return false; - } - - return true; +bool icipc_sender_send( + struct icipc_sender *self, + const uint8_t * buffer, + size_t size, + icipc_sender_reply_func_t func, + void *data) { + int id = -1; + + if (buffer == NULL || size == 0) + return false; + + if (!icipc_sender_is_connected(self)) + return false; + + /* add the task in the queue */ + if (func) { + id = push_sync_task(self, func, data); + if (id == -1) + return false; + } + + /* write buffer and remove task if it fails */ + if (icipc_socket_write(self->socket_fd, buffer, size) <= 0) { + if (id != -1) + self->async_tasks[id].func = NULL; + return false; + } + + return true; } -void * -icipc_sender_get_user_data (struct icipc_sender *self) -{ - return self->user_data; +void *icipc_sender_get_user_data(struct icipc_sender *self) { + return self->user_data; } diff --git a/lib/sender.h b/lib/sender.h index 88aaf3a..d1b4083 100644 --- a/lib/sender.h +++ b/lib/sender.h @@ -21,52 +21,49 @@ extern "C" { struct icipc_sender; -typedef void (*icipc_sender_lost_conn_func_t) (struct icipc_sender *self, - int receiver_fd, - void *data); +typedef void (*icipc_sender_lost_conn_func_t)( + struct icipc_sender *self, + int receiver_fd, + void *data); -typedef void (*icipc_sender_reply_func_t) (struct icipc_sender *self, - const uint8_t *buffer, - size_t size, - void *data); +typedef void (*icipc_sender_reply_func_t)( + struct icipc_sender *self, + const uint8_t *buffer, + size_t size, + void *data); ICIPC_API -struct icipc_sender * -icipc_sender_new (const char *path, - size_t buffer_size, - icipc_sender_lost_conn_func_t lost_func, - void *lost_data, - size_t user_size); +struct icipc_sender *icipc_sender_new( + const char *path, + size_t buffer_size, + icipc_sender_lost_conn_func_t lost_func, + void *lost_data, + size_t user_size); ICIPC_API -void -icipc_sender_free (struct icipc_sender *self); +void icipc_sender_free(struct icipc_sender *self); ICIPC_API -bool -icipc_sender_connect (struct icipc_sender *self); +bool icipc_sender_connect(struct icipc_sender *self); ICIPC_API -void -icipc_sender_disconnect (struct icipc_sender *self); +void icipc_sender_disconnect(struct icipc_sender *self); ICIPC_API -bool -icipc_sender_is_connected (struct icipc_sender *self); +bool icipc_sender_is_connected(struct icipc_sender *self); ICIPC_API -bool -icipc_sender_send (struct icipc_sender *self, - const uint8_t *buffer, - size_t size, - icipc_sender_reply_func_t reply, - void *data); +bool icipc_sender_send( + struct icipc_sender *self, + const uint8_t *buffer, + size_t size, + icipc_sender_reply_func_t reply, + void *data); /* for subclasses only */ ICIPC_API -void * -icipc_sender_get_user_data (struct icipc_sender *self); +void *icipc_sender_get_user_data(struct icipc_sender *self); #ifdef __cplusplus } diff --git a/lib/server.c b/lib/server.c index 86f8322..747b829 100644 --- a/lib/server.c +++ b/lib/server.c @@ -16,246 +16,237 @@ #define BUFFER_SIZE 1024 #define MAX_REQUEST_HANDLERS 128 -struct icipc_server_client_handler -{ - icipc_server_client_handler_func_t handler; - void *data; -}; - -struct icipc_server_request_handler -{ - const char *name; - icipc_server_request_handler_func_t handler; - void *data; -}; - -struct icipc_server_priv { - pthread_mutex_t mutex; - struct icipc_server_client_handler client_handler; - size_t n_request_handlers; - struct icipc_server_request_handler request_handlers[MAX_REQUEST_HANDLERS]; -}; - -static void -sender_state (struct icipc_receiver *base, - int sender_fd, - enum icipc_receiver_sender_state sender_state, - void *data) -{ - struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); - - icipc_log_info ("server: new state %d on client %d", sender_state, sender_fd); - - pthread_mutex_lock (&priv->mutex); - if (priv->client_handler.handler) - priv->client_handler.handler ((struct icipc_server *)base, sender_fd, - sender_state, priv->client_handler.data); - pthread_mutex_unlock (&priv->mutex); +typedef struct ClientHandler { + icipc_server_client_handler_func_t handler; + void *data; +} ClientHandler; + +typedef struct RequestHandler { + const char *name; + icipc_server_request_handler_func_t handler; + void *data; +} RequestHandler; + +typedef struct ServerPriv { + pthread_mutex_t mutex; + ClientHandler client_handler; + size_t n_request_handlers; + RequestHandler request_handlers[MAX_REQUEST_HANDLERS]; +} ServerPriv; + +static void sender_state( + struct icipc_receiver *base, + int sender_fd, + enum icipc_receiver_sender_state sender_state, + void *data) { + ServerPriv *priv = icipc_receiver_get_user_data(base); + + icipc_log_info("server: new state %d on client %d", sender_state, + sender_fd); + + pthread_mutex_lock(&priv->mutex); + if (priv->client_handler.handler) + priv->client_handler.handler((struct icipc_server *)base, + sender_fd, sender_state, + priv->client_handler.data); + pthread_mutex_unlock(&priv->mutex); } -static bool -handle_message (struct icipc_receiver *base, +static bool handle_message( + struct icipc_receiver *base, int sender_fd, - const uint8_t *buffer, + const uint8_t * buffer, size_t size, - void *data) -{ - struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); - const char *name = NULL; - const struct spa_pod *args = NULL; - - icipc_log_info ("server: message from client %d received", sender_fd); - - /* parse */ - if (!icipc_protocol_parse_request (buffer, size, &name, &args)) { - const char *msg = "could not parse request"; - const size_t s = icipc_protocol_calculate_reply_error_size (msg); - uint8_t b[s]; - icipc_protocol_build_reply_error (b, s, msg); - return icipc_socket_write (sender_fd, b, s) == (ssize_t)s; - } - - /* handle */ - size_t i; - bool res = false; - pthread_mutex_lock (&priv->mutex); - - for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { - struct icipc_server_request_handler *rh = priv->request_handlers + i; - if (rh->name != NULL && strcmp (rh->name, name) == 0 && - rh->handler != NULL) { - res = rh->handler ((struct icipc_server *)base, sender_fd, name, args, - rh->data); - pthread_mutex_unlock (&priv->mutex); - return res; - } - } - - /* handler was not found, reply with error */ - res = icipc_server_reply_error ((struct icipc_server *)base, sender_fd, - "request handler not found"); - - pthread_mutex_unlock (&priv->mutex); - return res; + void *data) { + ServerPriv *priv = icipc_receiver_get_user_data(base); + const char *name = NULL; + const struct spa_pod *args = NULL; + + icipc_log_info("server: message from client %d received", sender_fd); + + /* parse */ + if (!icipc_protocol_parse_request(buffer, size, &name, &args)) { + const char *msg = "could not parse request"; + const size_t s = icipc_protocol_calculate_reply_error_size(msg); + uint8_t b[s]; + icipc_protocol_build_reply_error(b, s, msg); + return icipc_socket_write(sender_fd, b, s) == (ssize_t) s; + } + + /* handle */ + size_t i; + bool res = false; + pthread_mutex_lock(&priv->mutex); + + for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { + RequestHandler *rh = priv->request_handlers + i; + if (rh->name != NULL && strcmp(rh->name, name) == 0 + && rh->handler != NULL) { + res = + rh->handler((struct icipc_server *)base, sender_fd, + name, args, rh->data); + pthread_mutex_unlock(&priv->mutex); + return res; + } + } + + /* handler was not found, reply with error */ + res = icipc_server_reply_error((struct icipc_server *)base, sender_fd, + "request handler not found"); + + pthread_mutex_unlock(&priv->mutex); + return res; } static struct icipc_receiver_events events = { - .sender_state = sender_state, - .handle_message = handle_message, + .sender_state = sender_state, + .handle_message = handle_message, }; /* API */ -struct icipc_server * -icipc_server_new (const char *path, bool start) -{ - struct icipc_server_priv * priv = NULL; - struct icipc_receiver *base = NULL; +struct icipc_server *icipc_server_new(const char *path, bool start) { + ServerPriv *priv = NULL; + struct icipc_receiver *base = NULL; - base = icipc_receiver_new (path, BUFFER_SIZE, &events, NULL, - sizeof (struct icipc_server_priv)); - if (base == NULL) - return NULL; + base = icipc_receiver_new(path, BUFFER_SIZE, &events, NULL, + sizeof(ServerPriv)); + if (base == NULL) + return NULL; - priv = icipc_receiver_get_user_data (base); - pthread_mutex_init (&priv->mutex, NULL); - priv->n_request_handlers = 0; + priv = icipc_receiver_get_user_data(base); + pthread_mutex_init(&priv->mutex, NULL); + priv->n_request_handlers = 0; - if (start && !icipc_receiver_start (base)) { - icipc_log_error ("failed to start receiver"); - icipc_server_free ((struct icipc_server *)base); - return NULL; - } + if (start && !icipc_receiver_start(base)) { + icipc_log_error("failed to start receiver"); + icipc_server_free((struct icipc_server *)base); + return NULL; + } - return (struct icipc_server *)base; + return (struct icipc_server *)base; } -void -icipc_server_free (struct icipc_server *self) -{ - struct icipc_receiver *base = icipc_server_to_receiver (self); - struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); +void icipc_server_free(struct icipc_server *self) { + struct icipc_receiver *base = icipc_server_to_receiver(self); + ServerPriv *priv = icipc_receiver_get_user_data(base); - pthread_mutex_destroy (&priv->mutex); + pthread_mutex_destroy(&priv->mutex); - icipc_receiver_free (base); + icipc_receiver_free(base); } -void -icipc_server_set_client_handler (struct icipc_server *self, - icipc_server_client_handler_func_t handler, - void *data) -{ - struct icipc_receiver *base = icipc_server_to_receiver (self); - struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); - - pthread_mutex_lock (&priv->mutex); - priv->client_handler.handler = handler; - priv->client_handler.data = data; - pthread_mutex_unlock (&priv->mutex); +void icipc_server_set_client_handler( + struct icipc_server *self, + icipc_server_client_handler_func_t handler, + void *data) { + struct icipc_receiver *base = icipc_server_to_receiver(self); + ServerPriv *priv = icipc_receiver_get_user_data(base); + + pthread_mutex_lock(&priv->mutex); + priv->client_handler.handler = handler; + priv->client_handler.data = data; + pthread_mutex_unlock(&priv->mutex); } -void -icipc_server_clear_client_handler (struct icipc_server *self) -{ - struct icipc_receiver *base = icipc_server_to_receiver (self); - struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); +void icipc_server_clear_client_handler(struct icipc_server *self) { + struct icipc_receiver *base = icipc_server_to_receiver(self); + ServerPriv *priv = icipc_receiver_get_user_data(base); - pthread_mutex_lock (&priv->mutex); - priv->client_handler.handler = NULL; - priv->client_handler.data = NULL; - pthread_mutex_unlock (&priv->mutex); + pthread_mutex_lock(&priv->mutex); + priv->client_handler.handler = NULL; + priv->client_handler.data = NULL; + pthread_mutex_unlock(&priv->mutex); } -bool -icipc_server_set_request_handler (struct icipc_server *self, - const char *name, - icipc_server_request_handler_func_t handler, - void *data) -{ - struct icipc_receiver *base = icipc_server_to_receiver (self); - struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); - size_t i; - - /* check params */ - if (name == NULL) - return false; - - pthread_mutex_lock (&priv->mutex); - - /* make sure handler does not exist */ - for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { - struct icipc_server_request_handler *rh = priv->request_handlers + i; - if (rh->name != NULL && strcmp (rh->name, name) == 0) { - pthread_mutex_unlock (&priv->mutex); - return false; - } - } - - /* set handler */ - for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { - struct icipc_server_request_handler *rh = priv->request_handlers + i; - if (rh->name == NULL) { - rh->name = name; - rh->handler = handler; - rh->data = data; - pthread_mutex_unlock (&priv->mutex); - return true; - } - } - - pthread_mutex_unlock (&priv->mutex); - - return false; +bool icipc_server_set_request_handler( + struct icipc_server *self, + const char *name, + icipc_server_request_handler_func_t handler, + void *data) { + struct icipc_receiver *base = icipc_server_to_receiver(self); + ServerPriv *priv = icipc_receiver_get_user_data(base); + size_t i; + + /* check params */ + if (name == NULL) + return false; + + pthread_mutex_lock(&priv->mutex); + + /* make sure handler does not exist */ + for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { + RequestHandler *rh = + priv->request_handlers + i; + if (rh->name != NULL && strcmp(rh->name, name) == 0) { + pthread_mutex_unlock(&priv->mutex); + return false; + } + } + + /* set handler */ + for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { + RequestHandler *rh = + priv->request_handlers + i; + if (rh->name == NULL) { + rh->name = name; + rh->handler = handler; + rh->data = data; + pthread_mutex_unlock(&priv->mutex); + return true; + } + } + + pthread_mutex_unlock(&priv->mutex); + + return false; } -void -icipc_server_clear_request_handler (struct icipc_server *self, - const char *name) -{ - struct icipc_receiver *base = icipc_server_to_receiver (self); - struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); - size_t i; - - /* check params */ - if (name == NULL) - return; - - pthread_mutex_lock (&priv->mutex); - - /* clear handler */ - for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { - struct icipc_server_request_handler *rh = priv->request_handlers + i; - if (rh->name != NULL && strcmp (rh->name, name) == 0) { - rh->name = NULL; - break; - } - } - - pthread_mutex_unlock (&priv->mutex); +void icipc_server_clear_request_handler( + struct icipc_server *self, + const char *name) { + struct icipc_receiver *base = icipc_server_to_receiver(self); + ServerPriv *priv = icipc_receiver_get_user_data(base); + size_t i; + + /* check params */ + if (name == NULL) + return; + + pthread_mutex_lock(&priv->mutex); + + /* clear handler */ + for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { + RequestHandler *rh = + priv->request_handlers + i; + if (rh->name != NULL && strcmp(rh->name, name) == 0) { + rh->name = NULL; + break; + } + } + + pthread_mutex_unlock(&priv->mutex); } -bool -icipc_server_reply_ok (struct icipc_server *self, - int client_fd, - const struct spa_pod *value) -{ - const size_t s = icipc_protocol_calculate_reply_ok_size (value); - uint8_t b[s]; - icipc_protocol_build_reply_ok (b, s, value); - return icipc_socket_write (client_fd, b, s) == (ssize_t)s; +bool icipc_server_reply_ok( + struct icipc_server *self, + int client_fd, + const struct spa_pod *value) { + const size_t s = icipc_protocol_calculate_reply_ok_size(value); + uint8_t b[s]; + icipc_protocol_build_reply_ok(b, s, value); + return icipc_socket_write(client_fd, b, s) == (ssize_t) s; } -bool -icipc_server_reply_error (struct icipc_server *self, - int client_fd, - const char *msg) -{ - if (msg == NULL) - return false; - - const size_t s = icipc_protocol_calculate_reply_error_size (msg); - uint8_t b[s]; - icipc_protocol_build_reply_error (b, s, msg); - return icipc_socket_write (client_fd, b, s) == (ssize_t)s; +bool icipc_server_reply_error( + struct icipc_server *self, + int client_fd, + const char *msg) { + if (msg == NULL) + return false; + + const size_t s = icipc_protocol_calculate_reply_error_size(msg); + uint8_t b[s]; + icipc_protocol_build_reply_error(b, s, msg); + return icipc_socket_write(client_fd, b, s) == (ssize_t) s; } diff --git a/lib/server.h b/lib/server.h index 3cc0ed9..6345035 100644 --- a/lib/server.h +++ b/lib/server.h @@ -12,7 +12,6 @@ #include <spa/pod/pod.h> #include "defs.h" - #include "receiver.h" #ifdef __cplusplus @@ -23,60 +22,59 @@ extern "C" { struct icipc_server; -typedef void (*icipc_server_client_handler_func_t) (struct icipc_server *self, - int client_fd, - enum icipc_receiver_sender_state client_state, - void *data); +typedef void (*icipc_server_client_handler_func_t)( + struct icipc_server *self, + int client_fd, + enum icipc_receiver_sender_state client_state, + void *data); -typedef bool (*icipc_server_request_handler_func_t) (struct icipc_server *self, - int client_fd, - const char *name, - const struct spa_pod *args, - void *data); +typedef bool (*icipc_server_request_handler_func_t)( + struct icipc_server *self, + int client_fd, + const char *name, + const struct spa_pod *args, + void *data); ICIPC_API -struct icipc_server * -icipc_server_new (const char *path, bool start); +struct icipc_server *icipc_server_new(const char *path, bool start); ICIPC_API -void -icipc_server_free (struct icipc_server *self); +void icipc_server_free(struct icipc_server *self); ICIPC_API -void -icipc_server_set_client_handler (struct icipc_server *self, - icipc_server_client_handler_func_t handler, - void *data); +void icipc_server_set_client_handler( + struct icipc_server *self, + icipc_server_client_handler_func_t handler, + void *data); ICIPC_API -void -icipc_server_clear_client_handler (struct icipc_server *self); +void icipc_server_clear_client_handler(struct icipc_server *self); ICIPC_API -bool -icipc_server_set_request_handler (struct icipc_server *self, - const char *name, - icipc_server_request_handler_func_t handler, - void *data); +bool icipc_server_set_request_handler( + struct icipc_server *self, + const char *name, + icipc_server_request_handler_func_t handler, + void *data); ICIPC_API -void -icipc_server_clear_request_handler (struct icipc_server *self, - const char *name); +void icipc_server_clear_request_handler( + struct icipc_server *self, + const char *name); /* for request handlers only */ ICIPC_API -bool -icipc_server_reply_ok (struct icipc_server *self, - int client_fd, - const struct spa_pod *value); +bool icipc_server_reply_ok( + struct icipc_server *self, + int client_fd, + const struct spa_pod *value); ICIPC_API -bool -icipc_server_reply_error (struct icipc_server *self, - int client_fd, - const char *msg); +bool icipc_server_reply_error( + struct icipc_server *self, + int client_fd, + const char *msg); #ifdef __cplusplus } diff --git a/lib/utils.c b/lib/utils.c index fd9cdff..4d91241 100644 --- a/lib/utils.c +++ b/lib/utils.c @@ -28,284 +28,273 @@ /* log */ const char *icipc_logger_level_text[] = { - [ICIPC_LOG_LEVEL_ERROR] = "E", - [ICIPC_LOG_LEVEL_WARN] = "W", - [ICIPC_LOG_LEVEL_INFO] = "I", + [ICIPC_LOG_LEVEL_ERROR] = "E", + [ICIPC_LOG_LEVEL_WARN] = "W", + [ICIPC_LOG_LEVEL_INFO] = "I", }; -struct icipc_logger { - enum icipc_log_level level; -}; +typedef struct Logger { + LogLevel level; +} Logger; -static const struct icipc_logger * -icipc_log_get_instance (void) -{ - static struct icipc_logger logger_ = { 0, }; - static struct icipc_logger* instance_ = NULL; +static const Logger *icipc_log_get_instance(void) { + static Logger logger_ = { 0, }; + static Logger *instance_ = NULL; - if (instance_ == NULL) { - char * val_str = NULL; - enum icipc_log_level val = 0; + if (instance_ == NULL) { + char *val_str = NULL; + LogLevel val = 0; - /* default to error */ - logger_.level = ICIPC_LOG_LEVEL_WARN; + /* default to error */ + logger_.level = ICIPC_LOG_LEVEL_WARN; - /* get level from env */ - val_str = getenv ("ICIPC_DEBUG"); - if (val_str && sscanf (val_str, "%u", &val) == 1 && - val >= ICIPC_LOG_LEVEL_NONE) - logger_.level = val; + /* get level from env */ + val_str = getenv("ICIPC_DEBUG"); + if (val_str && sscanf(val_str, "%u", &val) == 1 && + val >= ICIPC_LOG_LEVEL_NONE) + logger_.level = val; - instance_ = &logger_; - } + instance_ = &logger_; + } - return instance_; + return instance_; } -void -icipc_logv (enum icipc_log_level level, const char *fmt, va_list args) -{ - const struct icipc_logger *logger = NULL; - - logger = icipc_log_get_instance (); - assert (logger); - - if (logger->level >= level) { - assert (level > 0); - char msg[MAX_LOG_MESSAGE]; - struct timespec time; - clock_gettime (CLOCK_REALTIME, &time); - vsnprintf (msg, MAX_LOG_MESSAGE, fmt, args); - fprintf (stderr, "[%s][%lu.%lu] %s\n", icipc_logger_level_text[level], - time.tv_sec, time.tv_sec, msg); - } +void icipc_logv(LogLevel level, const char *fmt, va_list args) { + const Logger *logger = NULL; + + logger = icipc_log_get_instance(); + assert(logger); + + if (logger->level >= level) { + assert(level > 0); + char msg[MAX_LOG_MESSAGE]; + struct timespec time; + clock_gettime(CLOCK_REALTIME, &time); + vsnprintf(msg, MAX_LOG_MESSAGE, fmt, args); + fprintf(stderr, "[%s][%lu.%lu] %s\n", + icipc_logger_level_text[level], time.tv_sec, + time.tv_sec, msg); + } } -void -icipc_log (enum icipc_log_level level, const char *fmt, ...) -{ - va_list args; - va_start (args, fmt); - icipc_logv (level, fmt, args); - va_end (args); +void icipc_log(LogLevel level, const char *fmt, ...) { + va_list args; + va_start(args, fmt); + icipc_logv(level, fmt, args); + va_end(args); } /* socket path */ -int -icipc_construct_socket_path (const char *name, char *buf, size_t buf_size) -{ - bool path_is_absolute; - const char *runtime_dir = NULL; - struct passwd pwd, *result = NULL; - char buffer[4096]; - int name_size; - - path_is_absolute = name[0] == '/'; - - if (!path_is_absolute) { - runtime_dir = getenv("PIPEWIRE_RUNTIME_DIR"); - if (runtime_dir == NULL) - runtime_dir = getenv("XDG_RUNTIME_DIR"); - if (runtime_dir == NULL) - runtime_dir = getenv("HOME"); - if (runtime_dir == NULL) - runtime_dir = getenv("USERPROFILE"); - if (runtime_dir == NULL) { - if (getpwuid_r(getuid(), &pwd, buffer, sizeof(buffer), &result) == 0) - runtime_dir = result ? result->pw_dir : NULL; - } - } - - if (runtime_dir == NULL && !path_is_absolute) - return -ENOENT; - - if (path_is_absolute) - name_size = snprintf (buf, buf_size, "%s", name) + 1; - else - name_size = snprintf (buf, buf_size, "%s/%s", runtime_dir, name) + 1; - - if (name_size > (int) buf_size) - return -ENAMETOOLONG; - - return 0; +int icipc_construct_socket_path(const char *name, char *buf, size_t buf_size) { + bool path_is_absolute; + const char *runtime_dir = NULL; + struct passwd pwd, *result = NULL; + char buffer[4096]; + int name_size; + + path_is_absolute = name[0] == '/'; + + if (!path_is_absolute) { + runtime_dir = getenv("PIPEWIRE_RUNTIME_DIR"); + if (runtime_dir == NULL) + runtime_dir = getenv("XDG_RUNTIME_DIR"); + if (runtime_dir == NULL) + runtime_dir = getenv("HOME"); + if (runtime_dir == NULL) + runtime_dir = getenv("USERPROFILE"); + if (runtime_dir == NULL) { + if (getpwuid_r + (getuid(), &pwd, buffer, sizeof(buffer), + &result) == 0) + runtime_dir = result ? result->pw_dir : NULL; + } + } + + if (runtime_dir == NULL && !path_is_absolute) + return -ENOENT; + + if (path_is_absolute) + name_size = snprintf(buf, buf_size, "%s", name) + 1; + else + name_size = + snprintf(buf, buf_size, "%s/%s", runtime_dir, name) + 1; + + if (name_size > (int)buf_size) + return -ENAMETOOLONG; + + return 0; } /* socket */ -ssize_t -icipc_socket_write (int fd, const uint8_t *buffer, size_t size) -{ - size_t total_written = 0; - size_t n; - - assert (fd >= 0); - assert (buffer != NULL); - assert (size > 0); - - do { - n = write(fd, buffer, size); - if (n < size) { - if (errno == EINTR) - continue; - if (errno == EAGAIN || errno == EWOULDBLOCK) - return total_written; - return -1; - } - total_written += n; - } while (total_written < size); +ssize_t icipc_socket_write(int fd, const uint8_t * buffer, size_t size) { + size_t total_written = 0; + size_t n; + + assert(fd >= 0); + assert(buffer != NULL); + assert(size > 0); + + do { + n = write(fd, buffer, size); + if (n < size) { + if (errno == EINTR) + continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) + return total_written; + return -1; + } + total_written += n; + } while (total_written < size); - return total_written; + return total_written; } -ssize_t -icipc_socket_read (int fd, uint8_t **buffer, size_t *max_size) -{ - ssize_t n; - ssize_t size; - size_t offset = 0; - - assert (buffer); - assert (*buffer); - assert (max_size); - assert (*max_size > 0); - -again: - size = *max_size - offset; - n = read (fd, *buffer + offset, size); - if (n == 0) - return 0; - - /* check for errors */ - if (n < 0) { - if (errno == EINTR) - goto again; - if (errno == EAGAIN || errno == EWOULDBLOCK) - return offset; - return -1; - } - - /* realloc if we need more space, and read again */ - if (n >= size) { - *max_size += *max_size; - *buffer = reallocarray (*buffer, *max_size, sizeof (uint8_t)); - offset += n; - goto again; - } - - return offset + n; +ssize_t icipc_socket_read(int fd, uint8_t ** buffer, size_t *max_size) { + ssize_t n; + ssize_t size; + size_t offset = 0; + + assert(buffer); + assert(*buffer); + assert(max_size); + assert(*max_size > 0); + + again: + size = *max_size - offset; + n = read(fd, *buffer + offset, size); + if (n == 0) + return 0; + + /* check for errors */ + if (n < 0) { + if (errno == EINTR) + goto again; + if (errno == EAGAIN || errno == EWOULDBLOCK) + return offset; + return -1; + } + + /* realloc if we need more space, and read again */ + if (n >= size) { + *max_size += *max_size; + *buffer = reallocarray(*buffer, *max_size, sizeof(uint8_t)); + offset += n; + goto again; + } + + return offset + n; } /* epoll thread */ -bool -icipc_epoll_thread_init (struct epoll_thread *self, - int socket_fd, - icipc_epoll_thread_event_func_t sock_func, - icipc_epoll_thread_event_func_t other_func, - void *data) -{ - struct epoll_event event; - - self->socket_fd = socket_fd; - self->event_fd = -1; - self->epoll_fd = -1; - - /* create event fd */ - self->event_fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); - if (self->event_fd == -1) - goto error; - - /* create epoll fd */ - self->epoll_fd = epoll_create1 (EPOLL_CLOEXEC); - if (self->epoll_fd == -1) - goto error; - - /* poll socket fd */ - event.events = EPOLLIN; - event.data.fd = self->socket_fd; - if (epoll_ctl (self->epoll_fd, EPOLL_CTL_ADD, self->socket_fd, &event) != 0) - goto error; - - /* poll event fd */ - event.events = EPOLLIN; - event.data.fd = self->event_fd; - if (epoll_ctl (self->epoll_fd, EPOLL_CTL_ADD, self->event_fd, &event) != 0) - goto error; - - self->socket_event_func = sock_func; - self->other_event_func = other_func; - self->event_data = data; - return true; - -error: - if (self->epoll_fd != -1) - close (self->epoll_fd); - if (self->event_fd != -1) - close (self->event_fd); - return false; +bool icipc_epoll_thread_init( + EpollThread *self, + int socket_fd, + icipc_epoll_thread_event_func_t sock_func, + icipc_epoll_thread_event_func_t other_func, + void *data) { + struct epoll_event event; + + self->socket_fd = socket_fd; + self->event_fd = -1; + self->epoll_fd = -1; + + /* create event fd */ + self->event_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + if (self->event_fd == -1) + goto error; + + /* create epoll fd */ + self->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (self->epoll_fd == -1) + goto error; + + /* poll socket fd */ + event.events = EPOLLIN; + event.data.fd = self->socket_fd; + if (epoll_ctl(self->epoll_fd, EPOLL_CTL_ADD, self->socket_fd, &event) != 0) + goto error; + + /* poll event fd */ + event.events = EPOLLIN; + event.data.fd = self->event_fd; + if (epoll_ctl(self->epoll_fd, EPOLL_CTL_ADD, self->event_fd, &event) != 0) + goto error; + + self->socket_event_func = sock_func; + self->other_event_func = other_func; + self->event_data = data; + return true; + + error: + if (self->epoll_fd != -1) + close(self->epoll_fd); + if (self->event_fd != -1) + close(self->event_fd); + return false; } -static void * -epoll_thread_run (void *data) -{ - struct epoll_thread *self = data; - bool exit = false; - - while (!exit) { - /* wait for events */ - struct epoll_event ep[MAX_POLL_EVENTS]; - int n = epoll_wait (self->epoll_fd, ep, MAX_POLL_EVENTS, -1); - if (n < 0) { - icipc_log_error ("epoll_thread: failed to wait for event: %s", - strerror(errno)); - continue; - } - - for (int i = 0; i < n; i++) { - /* socket fd */ - if (ep[i].data.fd == self->socket_fd) { - if (self->socket_event_func) - self->socket_event_func (self, ep[i].data.fd, self->event_data); - } - - /* event fd */ - else if (ep[i].data.fd == self->event_fd) { - uint64_t stop = 0; - ssize_t res = read (ep[i].data.fd, &stop, sizeof(uint64_t)); - if (res == sizeof(uint64_t) && stop == 1) - exit = true; - } - - /* other */ - else { - if (self->other_event_func) - self->other_event_func (self, ep[i].data.fd, self->event_data); - } - } - } - - return NULL; +static void *epoll_thread_run(void *data) { + EpollThread *self = data; + bool exit = false; + + while (!exit) { + /* wait for events */ + struct epoll_event ep[MAX_POLL_EVENTS]; + int n = epoll_wait(self->epoll_fd, ep, MAX_POLL_EVENTS, -1); + if (n < 0) { + icipc_log_error + ("epoll_thread: failed to wait for event: %s", + strerror(errno)); + continue; + } + + for (int i = 0; i < n; i++) { + /* socket fd */ + if (ep[i].data.fd == self->socket_fd) { + if (self->socket_event_func) + self->socket_event_func(self, + ep[i].data.fd, + self->event_data); + } + + /* event fd */ + else if (ep[i].data.fd == self->event_fd) { + uint64_t stop = 0; + ssize_t res = read(ep[i].data.fd, &stop, + sizeof(uint64_t)); + if (res == sizeof(uint64_t) && stop == 1) + exit = true; + } + + /* other */ + else { + if (self->other_event_func) + self->other_event_func(self, + ep[i].data.fd, + self->event_data); + } + } + } + + return NULL; } -bool -icipc_epoll_thread_start (struct epoll_thread *self) -{ - return pthread_create (&self->thread, NULL, epoll_thread_run, self) == 0; +bool icipc_epoll_thread_start(EpollThread *self) { + return pthread_create(&self->thread, NULL, epoll_thread_run, self) == 0; } -void -icipc_epoll_thread_stop (struct epoll_thread *self) -{ - uint64_t value = 1; - ssize_t res = write (self->event_fd, &value, sizeof(uint64_t)); - if (res == sizeof(uint64_t)) - pthread_join (self->thread, NULL); +void icipc_epoll_thread_stop(EpollThread *self) { + uint64_t value = 1; + ssize_t res = write(self->event_fd, &value, sizeof(uint64_t)); + if (res == sizeof(uint64_t)) + pthread_join(self->thread, NULL); } -void -icipc_epoll_thread_destroy (struct epoll_thread *self) -{ - close (self->epoll_fd); - close (self->event_fd); +void icipc_epoll_thread_destroy(EpollThread *self) { + close(self->epoll_fd); + close(self->event_fd); } diff --git a/src/icipc-client.c b/src/icipc-client.c index 4626754..c1d2de7 100644 --- a/src/icipc-client.c +++ b/src/icipc-client.c @@ -12,98 +12,104 @@ #include <spa/pod/builder.h> #include <icipc.h> -struct client_data { - struct icipc_client *c; - pthread_mutex_t mutex; - pthread_cond_t cond; - bool reply_received; -}; +typedef struct ClientData { + struct icipc_client *c; + pthread_mutex_t mutex; + pthread_cond_t cond; + bool reply_received; +} ClientData; -static void -reply_handler (struct icipc_sender *self, const uint8_t *buffer, size_t size, void *p) -{ - struct client_data *data = p; - const char *error = NULL; +static void reply_handler( + struct icipc_sender *self, + const uint8_t * buffer, + size_t size, + void *p) { + ClientData *data = p; + const char *error = NULL; - if (buffer) { - const struct spa_pod *pod = icipc_client_send_request_finish (self, buffer, size, &error); - if (!pod) - printf ("error: %s\n", error ? error : "unknown"); - else - printf ("success!\n"); - } else { - printf ("error: lost connection with server\n"); - } + if (buffer) { + const struct spa_pod *pod = + icipc_client_send_request_finish(self, buffer, size, &error); + if (!pod) + printf("error: %s\n", error ? error : "unknown"); + else + printf("success!\n"); + } else { + printf("error: lost connection with server\n"); + } - /* signal reply received */ - pthread_mutex_lock (&data->mutex); - data->reply_received = true; - pthread_cond_signal (&data->cond); - pthread_mutex_unlock (&data->mutex); + /* signal reply received */ + pthread_mutex_lock(&data->mutex); + data->reply_received = true; + pthread_cond_signal(&data->cond); + pthread_mutex_unlock(&data->mutex); } -int -main (int argc, char *argv[]) -{ - struct client_data data; +int main(int argc, char *argv[]) { + ClientData data; - if (argc < 2) { - printf ("usage: <server-path>\n"); - return -1; - } + if (argc < 2) { + printf("usage: <server-path>\n"); + return -1; + } - /* init */ - data.c = icipc_client_new (argv[1], true); - pthread_mutex_init (&data.mutex, NULL); - pthread_cond_init (&data.cond, NULL); - data.reply_received = false; + /* init */ + data.c = icipc_client_new(argv[1], true); + pthread_mutex_init(&data.mutex, NULL); + pthread_cond_init(&data.cond, NULL); + data.reply_received = false; - while (true) { - char str[1024]; + while (true) { + char str[1024]; - printf ("> "); - fgets (str, 1023, stdin); + printf("> "); + fgets(str, 1023, stdin); - if (strncmp (str, "help", 4) == 0) { - printf ("help\tprints this message\n"); - printf ("quit\texits the client\n"); - printf ("send\tsends a request, usage: send <request-name> [args]\n"); - } else if (strncmp (str, "quit", 4) == 0) { - printf ("exiting...\n"); - break; - } else if (strncmp (str, "send", 4) == 0) { - char request_name[128]; - char request_args[1024]; - int n = sscanf(str, "send %s %s", request_name, request_args); - if (n <= 0) - continue; + if (strncmp(str, "help", 4) == 0) { + printf("help\tprints this message\n"); + printf("quit\texits the client\n"); + printf("send\tsends a request, usage: send <request-name> [args]\n"); + } else if (strncmp(str, "quit", 4) == 0) { + printf("exiting...\n"); + break; + } else if (strncmp(str, "send", 4) == 0) { + char request_name[128]; + char request_args[1024]; + int n = + sscanf(str, "send %s %s", request_name, + request_args); + if (n <= 0) + continue; - /* send request */ - if (n >= 2) { - /* TODO: for now we always create a string pod for args */ - struct { - struct spa_pod_string pod; - char str[1024]; - } args; - args.pod = SPA_POD_INIT_String(1024); - strncpy (args.str, request_args, 1024); - icipc_client_send_request (data.c, request_name, (const struct spa_pod *)&args, - reply_handler, &data); - } else { - icipc_client_send_request (data.c, request_name, NULL, reply_handler, &data); - } + /* send request */ + if (n >= 2) { + /* TODO: for now we always create a string pod for args */ + struct { + struct spa_pod_string pod; + char str[1024]; + } args; + args.pod = SPA_POD_INIT_String(1024); + strncpy(args.str, request_args, 1024); + icipc_client_send_request(data.c, request_name, + (const struct spa_pod *)&args, + reply_handler, &data); + } else { + icipc_client_send_request(data.c, request_name, + NULL, reply_handler, + &data); + } - /* wait for reply */ - pthread_mutex_lock (&data.mutex); - while (!data.reply_received) - pthread_cond_wait (&data.cond, &data.mutex); - pthread_mutex_unlock (&data.mutex); - } - } + /* wait for reply */ + pthread_mutex_lock(&data.mutex); + while (!data.reply_received) + pthread_cond_wait(&data.cond, &data.mutex); + pthread_mutex_unlock(&data.mutex); + } + } - /* clean up */ - pthread_cond_destroy (&data.cond); - pthread_mutex_destroy (&data.mutex); - icipc_client_free (data.c); - return 0; + /* clean up */ + pthread_cond_destroy(&data.cond); + pthread_mutex_destroy(&data.mutex); + icipc_client_free(data.c); + return 0; } diff --git a/tests/client-server.c b/tests/client-server.c index 97f2f6d..9954b86 100644 --- a/tests/client-server.c +++ b/tests/client-server.c @@ -14,119 +14,125 @@ #include <unistd.h> #include <pthread.h> -static inline char *new_address() -{ - char *address = NULL; - (void) asprintf(&address, "icipc-test-%d-%d", getpid(), rand()); - test_ptr_notnull(address); - return address; +static inline char *new_address() { + char *address = NULL; + (void)asprintf(&address, "icipc-test-%d-%d", getpid(), rand()); + test_ptr_notnull(address); + return address; } -static bool -increment_request_handler (struct icipc_server *self, int client_fd, - const char *name, const struct spa_pod *args, void *data) -{ - int32_t val = 0; - test_bool_true (spa_pod_is_int (args)); - test_bool_true (spa_pod_get_int (args, &val) == 0); - struct spa_pod_int res = SPA_POD_INIT_Int (val + 1); - return icipc_server_reply_ok (self, client_fd, (struct spa_pod *)&res); +static bool increment_request_handler( + struct icipc_server *self, + int client_fd, + const char *name, + const struct spa_pod *args, + void *data) { + int32_t val = 0; + test_bool_true(spa_pod_is_int(args)); + test_bool_true(spa_pod_get_int(args, &val) == 0); + struct spa_pod_int res = SPA_POD_INIT_Int(val + 1); + return icipc_server_reply_ok(self, client_fd, (struct spa_pod *)&res); } -static bool -error_request_handler (struct icipc_server *self, int client_fd, - const char *name, const struct spa_pod *args, void *data) -{ - return icipc_server_reply_error (self, client_fd, "error message"); +static bool error_request_handler( + struct icipc_server *self, + int client_fd, + const char *name, + const struct spa_pod *args, + void *data) { + return icipc_server_reply_error(self, client_fd, "error message"); } -struct reply_data { - int32_t incremented; - const char *error; - int n_replies; - pthread_mutex_t mutex; - pthread_cond_t cond; -}; - -static void -wait_for_reply (struct reply_data *data, int n_replies) -{ - pthread_mutex_lock (&data->mutex); - while (data->n_replies < n_replies) - pthread_cond_wait (&data->cond, &data->mutex); - pthread_mutex_unlock (&data->mutex); +typedef struct ReplyData { + int32_t incremented; + const char *error; + int n_replies; + pthread_mutex_t mutex; + pthread_cond_t cond; +} ReplyData; + +static void wait_for_reply(ReplyData *data, int n_replies) { + pthread_mutex_lock(&data->mutex); + while (data->n_replies < n_replies) + pthread_cond_wait(&data->cond, &data->mutex); + pthread_mutex_unlock(&data->mutex); } -static void -reply_handler (struct icipc_sender *self, const uint8_t *buffer, size_t size, void *p) -{ - struct reply_data *data = p; - test_ptr_notnull(data); - - pthread_mutex_lock (&data->mutex); - - const struct spa_pod *pod = icipc_client_send_request_finish (self, buffer, size, &data->error); - if (pod) { - test_bool_true (spa_pod_is_int (pod)); - test_bool_true (spa_pod_get_int (pod, &data->incremented) == 0); - } - data->n_replies++; - pthread_cond_signal (&data->cond); - - pthread_mutex_unlock (&data->mutex); +static void reply_handler( + struct icipc_sender *self, + const uint8_t * buffer, + size_t size, + void *p) { + ReplyData *data = p; + test_ptr_notnull(data); + + pthread_mutex_lock(&data->mutex); + + const struct spa_pod *pod = + icipc_client_send_request_finish(self, buffer, size, &data->error); + if (pod) { + test_bool_true(spa_pod_is_int(pod)); + test_bool_true(spa_pod_get_int(pod, &data->incremented) == 0); + } + data->n_replies++; + pthread_cond_signal(&data->cond); + + pthread_mutex_unlock(&data->mutex); } -static void -test_icipc_server_client () -{ - char *address = new_address(); - struct icipc_server *s = icipc_server_new (address, true); - test_ptr_notnull(s); - struct icipc_client *c = icipc_client_new (address, true); - test_ptr_notnull(c); - struct reply_data data; - pthread_mutex_init (&data.mutex, NULL); - pthread_cond_init (&data.cond, NULL); - - /* add request handlers */ - test_bool_true (icipc_server_set_request_handler (s, "INCREMENT", increment_request_handler, NULL)); - test_bool_true (icipc_server_set_request_handler (s, "ERROR", error_request_handler, NULL)); - - /* send an INCREMENT request of 3, and make sure the returned value is 4 */ - data.incremented = -1; - data.error = NULL; - data.n_replies = 0; - struct spa_pod_int i = SPA_POD_INIT_Int (3); - test_bool_true (icipc_client_send_request (c, "INCREMENT", (struct spa_pod *)&i, reply_handler, &data)); - wait_for_reply (&data, 1); - test_ptr_null(data.error); - test_cmpint(data.incremented, ==, 4); - - /* send an ERROR request, and make sure the returned value is an error */ - data.error = NULL; - data.n_replies = 0; - test_bool_true (icipc_client_send_request (c, "ERROR", NULL, reply_handler, &data)); - wait_for_reply (&data, 1); - test_str_eq(data.error, "error message"); - - /* send an unhandled request, and make sure the server replies with an error */ - data.error = NULL; - data.n_replies = 0; - test_bool_true (icipc_client_send_request (c, "UNHANDLED-REQUEST", NULL, reply_handler, &data)); - wait_for_reply (&data, 1); - test_str_eq(data.error, "request handler not found"); - - /* clean up */ - pthread_cond_destroy (&data.cond); - pthread_mutex_destroy (&data.mutex); - icipc_client_free (c); - icipc_server_free (s); - free(address); +static void test_icipc_server_client() { + char *address = new_address(); + struct icipc_server *s = icipc_server_new(address, true); + test_ptr_notnull(s); + struct icipc_client *c = icipc_client_new(address, true); + test_ptr_notnull(c); + ReplyData data; + pthread_mutex_init(&data.mutex, NULL); + pthread_cond_init(&data.cond, NULL); + + /* add request handlers */ + test_bool_true(icipc_server_set_request_handler + (s, "INCREMENT", increment_request_handler, NULL)); + test_bool_true(icipc_server_set_request_handler + (s, "ERROR", error_request_handler, NULL)); + + /* send an INCREMENT request of 3, and make sure the returned value is 4 */ + data.incremented = -1; + data.error = NULL; + data.n_replies = 0; + struct spa_pod_int i = SPA_POD_INIT_Int(3); + test_bool_true(icipc_client_send_request + (c, "INCREMENT", (struct spa_pod *)&i, reply_handler, + &data)); + wait_for_reply(&data, 1); + test_ptr_null(data.error); + test_cmpint(data.incremented, ==, 4); + + /* send an ERROR request, and make sure the returned value is an error */ + data.error = NULL; + data.n_replies = 0; + test_bool_true(icipc_client_send_request + (c, "ERROR", NULL, reply_handler, &data)); + wait_for_reply(&data, 1); + test_str_eq(data.error, "error message"); + + /* send an unhandled request, and make sure the server replies with an error */ + data.error = NULL; + data.n_replies = 0; + test_bool_true(icipc_client_send_request + (c, "UNHANDLED-REQUEST", NULL, reply_handler, &data)); + wait_for_reply(&data, 1); + test_str_eq(data.error, "request handler not found"); + + /* clean up */ + pthread_cond_destroy(&data.cond); + pthread_mutex_destroy(&data.mutex); + icipc_client_free(c); + icipc_server_free(s); + free(address); } -int -main (int argc, char *argv[]) -{ - test_icipc_server_client(); - return TEST_PASS; +int main(int argc, char *argv[]) { + test_icipc_server_client(); + return TEST_PASS; } diff --git a/tests/protocol.c b/tests/protocol.c index 334c511..1d29db3 100644 --- a/tests/protocol.c +++ b/tests/protocol.c @@ -11,65 +11,69 @@ #include <spa/pod/parser.h> #include <icipc.h> -static void -test_icipc_protocol () -{ - uint8_t b[1024]; +static void test_icipc_protocol() { + uint8_t b[1024]; - /* request null value */ - { - icipc_protocol_build_request (b, sizeof(b), "name", NULL); - const char *name = NULL; - const struct spa_pod *value = NULL; - test_bool_true (icipc_protocol_parse_request (b, sizeof(b), &name, &value)); - test_str_eq (name, "name"); - test_bool_true (spa_pod_is_none (value)); - } + /* request null value */ + { + icipc_protocol_build_request(b, sizeof(b), "name", NULL); + const char *name = NULL; + const struct spa_pod *value = NULL; + test_bool_true(icipc_protocol_parse_request + (b, sizeof(b), &name, &value)); + test_str_eq(name, "name"); + test_bool_true(spa_pod_is_none(value)); + } - /* request */ - { - struct spa_pod_int i = SPA_POD_INIT_Int (8); - icipc_protocol_build_request (b, sizeof(b), "name", (struct spa_pod *)&i); - const char *name = NULL; - const struct spa_pod_int *value = NULL; - test_bool_true (icipc_protocol_parse_request (b, sizeof(b), &name, (const struct spa_pod **)&value)); - test_str_eq (name, "name"); - test_cmpint (value->value, ==, 8); - } + /* request */ + { + struct spa_pod_int i = SPA_POD_INIT_Int(8); + icipc_protocol_build_request(b, sizeof(b), "name", + (struct spa_pod *)&i); + const char *name = NULL; + const struct spa_pod_int *value = NULL; + test_bool_true(icipc_protocol_parse_request + (b, sizeof(b), &name, + (const struct spa_pod **)&value)); + test_str_eq(name, "name"); + test_cmpint(value->value, ==, 8); + } - /* reply error */ - { - icipc_protocol_build_reply_error (b, sizeof(b), "error message"); - test_bool_true (icipc_protocol_is_reply_error (b, sizeof(b))); - const char *msg = NULL; - test_bool_true (icipc_protocol_parse_reply_error (b, sizeof(b), &msg)); - test_str_eq (msg, "error message"); - } + /* reply error */ + { + icipc_protocol_build_reply_error(b, sizeof(b), "error message"); + test_bool_true(icipc_protocol_is_reply_error(b, sizeof(b))); + const char *msg = NULL; + test_bool_true(icipc_protocol_parse_reply_error + (b, sizeof(b), &msg)); + test_str_eq(msg, "error message"); + } - /* reply ok null value */ - { - icipc_protocol_build_reply_ok (b, sizeof(b), NULL); - test_bool_true (icipc_protocol_is_reply_ok (b, sizeof(b))); - const struct spa_pod *value = NULL; - test_bool_true (icipc_protocol_parse_reply_ok (b, sizeof(b), &value)); - test_ptr_notnull (value); - test_bool_true (spa_pod_is_none (value)); - } + /* reply ok null value */ + { + icipc_protocol_build_reply_ok(b, sizeof(b), NULL); + test_bool_true(icipc_protocol_is_reply_ok(b, sizeof(b))); + const struct spa_pod *value = NULL; + test_bool_true(icipc_protocol_parse_reply_ok + (b, sizeof(b), &value)); + test_ptr_notnull(value); + test_bool_true(spa_pod_is_none(value)); + } - /* reply ok */ - { - struct spa_pod_int i = SPA_POD_INIT_Int (3); - icipc_protocol_build_reply_ok (b, sizeof(b), (struct spa_pod *)&i); - test_bool_true (icipc_protocol_is_reply_ok (b, sizeof(b))); - const struct spa_pod_int *value = NULL; - test_bool_true (icipc_protocol_parse_reply_ok (b, sizeof(b), (const struct spa_pod **)&value)); - test_cmpint (value->value, ==, 3); - } + /* reply ok */ + { + struct spa_pod_int i = SPA_POD_INIT_Int(3); + icipc_protocol_build_reply_ok(b, sizeof(b), + (struct spa_pod *)&i); + test_bool_true(icipc_protocol_is_reply_ok(b, sizeof(b))); + const struct spa_pod_int *value = NULL; + test_bool_true(icipc_protocol_parse_reply_ok + (b, sizeof(b), (const struct spa_pod **)&value)); + test_cmpint(value->value, ==, 3); + } } -int -main (int argc, char *argv[]) -{ - test_icipc_protocol(); - return TEST_PASS; +int main(int argc, char *argv[]) { + test_icipc_protocol(); + return TEST_PASS; } diff --git a/tests/sender-receiver.c b/tests/sender-receiver.c index 19a989d..9858ef7 100644 --- a/tests/sender-receiver.c +++ b/tests/sender-receiver.c @@ -12,312 +12,315 @@ #include <unistd.h> #include <pthread.h> -struct event_data { - const uint8_t * expected_data; - size_t expected_size; - int connections; - int n_events; - pthread_mutex_t mutex; - pthread_cond_t cond; -}; - -static inline char *new_address() -{ - char *address = NULL; - (void) asprintf(&address, "icipc-test-%d-%d", getpid(), rand()); - test_ptr_notnull(address); - return address; +typedef struct EventData { + const uint8_t *expected_data; + size_t expected_size; + int connections; + int n_events; + pthread_mutex_t mutex; + pthread_cond_t cond; +} EventData; + +static inline char *new_address() { + char *address = NULL; + (void)asprintf(&address, "icipc-test-%d-%d", getpid(), rand()); + test_ptr_notnull(address); + return address; } -static void -wait_for_event (struct event_data *data, int n_events) -{ - pthread_mutex_lock (&data->mutex); - while (data->n_events < n_events) - pthread_cond_wait (&data->cond, &data->mutex); - pthread_mutex_unlock (&data->mutex); +static void wait_for_event(EventData *data, int n_events) { + pthread_mutex_lock(&data->mutex); + while (data->n_events < n_events) + pthread_cond_wait(&data->cond, &data->mutex); + pthread_mutex_unlock(&data->mutex); } -static void -sender_state_callback (struct icipc_receiver *self, int sender_fd, - enum icipc_receiver_sender_state sender_state, void *p) -{ - struct event_data *data = p; - test_ptr_notnull (data); - - pthread_mutex_lock (&data->mutex); - switch (sender_state) { - case ICIPC_RECEIVER_SENDER_STATE_CONNECTED: - data->connections++; - break; - case ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED: - data->connections--; - break; - default: - test_fail_if_reached (); - break; - } - data->n_events++; - pthread_cond_signal (&data->cond); - pthread_mutex_unlock (&data->mutex); +static void sender_state_callback( + struct icipc_receiver *self, + int sender_fd, + enum icipc_receiver_sender_state sender_state, + void *p) { + EventData *data = p; + test_ptr_notnull(data); + + pthread_mutex_lock(&data->mutex); + switch (sender_state) { + case ICIPC_RECEIVER_SENDER_STATE_CONNECTED: + data->connections++; + break; + case ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED: + data->connections--; + break; + default: + test_fail_if_reached(); + break; + } + data->n_events++; + pthread_cond_signal(&data->cond); + pthread_mutex_unlock(&data->mutex); } -static void -reply_callback (struct icipc_sender *self, const uint8_t *buffer, size_t size, void *p) -{ - struct event_data *data = p; - test_ptr_notnull (data); - test_ptr_notnull (buffer); - - pthread_mutex_lock (&data->mutex); - test_cmpint(size, ==, data->expected_size); - test_cmpint(memcmp(buffer, data->expected_data, size), ==, 0); - data->n_events++; - pthread_cond_signal (&data->cond); - pthread_mutex_unlock (&data->mutex); +static void reply_callback( + struct icipc_sender *self, + const uint8_t * buffer, + size_t size, + void *p) { + EventData *data = p; + test_ptr_notnull(data); + test_ptr_notnull(buffer); + + pthread_mutex_lock(&data->mutex); + test_cmpint(size, ==, data->expected_size); + test_cmpint(memcmp(buffer, data->expected_data, size), ==, 0); + data->n_events++; + pthread_cond_signal(&data->cond); + pthread_mutex_unlock(&data->mutex); } -static void -test_icipc_receiver_basic () -{ - char *address = new_address(); - struct icipc_receiver *r = icipc_receiver_new (address, 16, NULL, NULL, 0); - test_ptr_notnull (r); - - /* start and stop */ - test_bool_false (icipc_receiver_is_running (r)); - test_bool_true (icipc_receiver_start (r)); - test_bool_true (icipc_receiver_is_running (r)); - icipc_receiver_stop (r); - test_bool_false (icipc_receiver_is_running (r)); - - /* clean up */ - icipc_receiver_free (r); - free(address); +static void test_icipc_receiver_basic() { + char *address = new_address(); + struct icipc_receiver *r = + icipc_receiver_new(address, 16, NULL, NULL, 0); + test_ptr_notnull(r); + + /* start and stop */ + test_bool_false(icipc_receiver_is_running(r)); + test_bool_true(icipc_receiver_start(r)); + test_bool_true(icipc_receiver_is_running(r)); + icipc_receiver_stop(r); + test_bool_false(icipc_receiver_is_running(r)); + + /* clean up */ + icipc_receiver_free(r); + free(address); } -static void -test_icipc_sender_basic () -{ - char *address = new_address(); - struct icipc_sender *s = icipc_sender_new (address, 16, NULL, NULL, 0); - test_ptr_notnull (s); +static void test_icipc_sender_basic() { + char *address = new_address(); + struct icipc_sender *s = icipc_sender_new(address, 16, NULL, NULL, 0); + test_ptr_notnull(s); - /* clean up */ - icipc_sender_free (s); - free(address); + /* clean up */ + icipc_sender_free(s); + free(address); } -static void -test_icipc_sender_connect () -{ - static struct icipc_receiver_events events = { - .sender_state = sender_state_callback, - .handle_message = NULL, - }; - struct event_data data = {0}; - - pthread_mutex_init (&data.mutex, NULL); - pthread_cond_init (&data.cond, NULL); - - char *address = new_address(); - struct icipc_receiver *r = icipc_receiver_new (address, 16, &events, &data, 0); - test_ptr_notnull (r); - struct icipc_sender *s = icipc_sender_new (address, 16, NULL, NULL, 0); - test_ptr_notnull (s); - - /* start receiver */ - test_bool_true (icipc_receiver_start (r)); - - /* connect sender */ - test_bool_true (icipc_sender_connect (s)); - test_bool_true (icipc_sender_is_connected (s)); - wait_for_event (&data, 1); - test_cmpint (data.connections, ==, 1); - - /* disconnect sender */ - icipc_sender_disconnect (s); - test_bool_false (icipc_sender_is_connected (s)); - wait_for_event (&data, 2); - test_cmpint (data.connections, ==, 0); - - /* stop receiver */ - icipc_receiver_stop (r); - - /* clean up */ - pthread_cond_destroy (&data.cond); - pthread_mutex_destroy (&data.mutex); - icipc_sender_free (s); - icipc_receiver_free (r); - free(address); +static void test_icipc_sender_connect() { + static struct icipc_receiver_events events = { + .sender_state = sender_state_callback, + .handle_message = NULL, + }; + EventData data = { 0 }; + + pthread_mutex_init(&data.mutex, NULL); + pthread_cond_init(&data.cond, NULL); + + char *address = new_address(); + struct icipc_receiver *r = + icipc_receiver_new(address, 16, &events, &data, 0); + test_ptr_notnull(r); + struct icipc_sender *s = icipc_sender_new(address, 16, NULL, NULL, 0); + test_ptr_notnull(s); + + /* start receiver */ + test_bool_true(icipc_receiver_start(r)); + + /* connect sender */ + test_bool_true(icipc_sender_connect(s)); + test_bool_true(icipc_sender_is_connected(s)); + wait_for_event(&data, 1); + test_cmpint(data.connections, ==, 1); + + /* disconnect sender */ + icipc_sender_disconnect(s); + test_bool_false(icipc_sender_is_connected(s)); + wait_for_event(&data, 2); + test_cmpint(data.connections, ==, 0); + + /* stop receiver */ + icipc_receiver_stop(r); + + /* clean up */ + pthread_cond_destroy(&data.cond); + pthread_mutex_destroy(&data.mutex); + icipc_sender_free(s); + icipc_receiver_free(r); + free(address); } -static void -lost_connection_handler (struct icipc_sender *self, int receiver_fd, void *p) -{ - struct event_data *data = p; - test_ptr_notnull (data); - - pthread_mutex_lock (&data->mutex); - data->n_events++; - pthread_cond_signal (&data->cond); - pthread_mutex_unlock (&data->mutex); +static void lost_connection_handler( + struct icipc_sender *self, + int receiver_fd, + void *p) { + EventData *data = p; + test_ptr_notnull(data); + + pthread_mutex_lock(&data->mutex); + data->n_events++; + pthread_cond_signal(&data->cond); + pthread_mutex_unlock(&data->mutex); } -static void -test_icipc_sender_lost_connection () -{ - struct event_data data = {0}; - pthread_mutex_init (&data.mutex, NULL); - pthread_cond_init (&data.cond, NULL); - - char *address = new_address(); - struct icipc_receiver *r = icipc_receiver_new (address, 16, NULL, NULL, 0); - test_ptr_notnull (r); - struct icipc_sender *s = icipc_sender_new (address, 16, lost_connection_handler, &data, 0); - test_ptr_notnull (s); - - /* connect sender */ - test_bool_true (icipc_sender_connect (s)); - test_bool_true (icipc_sender_is_connected (s)); - - /* destroy receiver and make sure the lost connection handler is triggered */ - data.n_events = 0; - icipc_receiver_free (r); - wait_for_event (&data, 1); - - /* make sure the connection was lost */ - test_bool_false (icipc_sender_is_connected (s)); - - /* create a new receiver */ - struct icipc_receiver *r2 = icipc_receiver_new (address, 16, NULL, NULL, 0); - test_ptr_notnull (r2); - - /* re-connect sender with new receiver */ - test_bool_true (icipc_sender_connect (s)); - test_bool_true (icipc_sender_is_connected (s)); - - /* clean up */ - pthread_cond_destroy (&data.cond); - pthread_mutex_destroy (&data.mutex); - icipc_sender_free (s); - icipc_receiver_free (r2); - free(address); +static void test_icipc_sender_lost_connection() { + EventData data = { 0 }; + pthread_mutex_init(&data.mutex, NULL); + pthread_cond_init(&data.cond, NULL); + + char *address = new_address(); + struct icipc_receiver *r = + icipc_receiver_new(address, 16, NULL, NULL, 0); + test_ptr_notnull(r); + struct icipc_sender *s = + icipc_sender_new(address, 16, lost_connection_handler, &data, 0); + test_ptr_notnull(s); + + /* connect sender */ + test_bool_true(icipc_sender_connect(s)); + test_bool_true(icipc_sender_is_connected(s)); + + /* destroy receiver and make sure the lost connection handler is triggered */ + data.n_events = 0; + icipc_receiver_free(r); + wait_for_event(&data, 1); + + /* make sure the connection was lost */ + test_bool_false(icipc_sender_is_connected(s)); + + /* create a new receiver */ + struct icipc_receiver *r2 = + icipc_receiver_new(address, 16, NULL, NULL, 0); + test_ptr_notnull(r2); + + /* re-connect sender with new receiver */ + test_bool_true(icipc_sender_connect(s)); + test_bool_true(icipc_sender_is_connected(s)); + + /* clean up */ + pthread_cond_destroy(&data.cond); + pthread_mutex_destroy(&data.mutex); + icipc_sender_free(s); + icipc_receiver_free(r2); + free(address); } -static void -test_icipc_sender_send () -{ - char *address = new_address(); - struct icipc_receiver *r = icipc_receiver_new (address, 2, NULL, NULL, 0); - test_ptr_notnull (r); - struct icipc_sender *s = icipc_sender_new (address, 2, NULL, NULL, 0); - test_ptr_notnull (s); - struct event_data data = {0}; - pthread_mutex_init (&data.mutex, NULL); - pthread_cond_init (&data.cond, NULL); - - /* start receiver */ - test_bool_true (icipc_receiver_start (r)); - - /* connect */ - test_bool_true (icipc_sender_connect (s)); - test_bool_true (icipc_sender_is_connected (s)); - - /* send 1 byte message (should not realloc) */ - data.n_events = 0; - data.expected_data = (const uint8_t *)"h"; - data.expected_size = 1; - test_bool_true (icipc_sender_send (s, (const uint8_t *)"h", 1, reply_callback, &data)); - wait_for_event (&data, 1); - - /* send 2 bytes message (should realloc once to 4) */ - data.n_events = 0; - data.expected_data = (const uint8_t *)"hi"; - data.expected_size = 2; - test_bool_true (icipc_sender_send (s, (const uint8_t *)"hi", 2, reply_callback, &data)); - wait_for_event (&data, 1); - - /* send 3 bytes message (should not realloc) */ - data.n_events = 0; - data.expected_data = (const uint8_t *)"hii"; - data.expected_size = 3; - test_bool_true (icipc_sender_send (s, (const uint8_t *)"hii", 3, reply_callback, &data)); - wait_for_event (&data, 1); - - /* send 28 bytes message (should realloc 3 times: first to 8, then to 16 and finally to 32) */ - data.n_events = 0; - data.expected_data = (const uint8_t *)"bigger than 16 bytes message"; - data.expected_size = 28; - test_bool_true (icipc_sender_send (s, (const uint8_t *)"bigger than 16 bytes message", 28, reply_callback, &data)); - wait_for_event (&data, 1); - - /* don't allow empty messages */ - data.n_events = 0; - test_bool_false (icipc_sender_send (s, (const uint8_t *)"", 0, NULL, NULL)); - - /* stop receiver */ - icipc_receiver_stop (r); - - /* clean up */ - pthread_cond_destroy (&data.cond); - pthread_mutex_destroy (&data.mutex); - icipc_sender_free (s); - icipc_receiver_free (r); - free(address); +static void test_icipc_sender_send() { + char *address = new_address(); + struct icipc_receiver *r = + icipc_receiver_new(address, 2, NULL, NULL, 0); + test_ptr_notnull(r); + struct icipc_sender *s = icipc_sender_new(address, 2, NULL, NULL, 0); + test_ptr_notnull(s); + EventData data = { 0 }; + pthread_mutex_init(&data.mutex, NULL); + pthread_cond_init(&data.cond, NULL); + + /* start receiver */ + test_bool_true(icipc_receiver_start(r)); + + /* connect */ + test_bool_true(icipc_sender_connect(s)); + test_bool_true(icipc_sender_is_connected(s)); + + /* send 1 byte message (should not realloc) */ + data.n_events = 0; + data.expected_data = (const uint8_t *)"h"; + data.expected_size = 1; + test_bool_true(icipc_sender_send + (s, (const uint8_t *)"h", 1, reply_callback, &data)); + wait_for_event(&data, 1); + + /* send 2 bytes message (should realloc once to 4) */ + data.n_events = 0; + data.expected_data = (const uint8_t *)"hi"; + data.expected_size = 2; + test_bool_true(icipc_sender_send + (s, (const uint8_t *)"hi", 2, reply_callback, &data)); + wait_for_event(&data, 1); + + /* send 3 bytes message (should not realloc) */ + data.n_events = 0; + data.expected_data = (const uint8_t *)"hii"; + data.expected_size = 3; + test_bool_true(icipc_sender_send + (s, (const uint8_t *)"hii", 3, reply_callback, &data)); + wait_for_event(&data, 1); + + /* send 28 bytes message (should realloc 3 times: + first to 8, then to 16 and finally to 32) */ + data.n_events = 0; + data.expected_data = (const uint8_t *)"bigger than 16 bytes message"; + data.expected_size = 28; + test_bool_true(icipc_sender_send + (s, (const uint8_t *)"bigger than 16 bytes message", 28, + reply_callback, &data)); + wait_for_event(&data, 1); + + /* don't allow empty messages */ + data.n_events = 0; + test_bool_false(icipc_sender_send + (s, (const uint8_t *)"", 0, NULL, NULL)); + + /* stop receiver */ + icipc_receiver_stop(r); + + /* clean up */ + pthread_cond_destroy(&data.cond); + pthread_mutex_destroy(&data.mutex); + icipc_sender_free(s); + icipc_receiver_free(r); + free(address); } -static void -test_icipc_multiple_senders_send () -{ - char *address = new_address(); - struct icipc_receiver *r = icipc_receiver_new (address, 16, NULL, NULL, 0); - test_ptr_notnull (r); - struct icipc_sender *senders[50]; - struct event_data data; - pthread_mutex_init (&data.mutex, NULL); - pthread_cond_init (&data.cond, NULL); - data.n_events = 0; - - /* start receiver */ - test_bool_true (icipc_receiver_start (r)); - - /* create and connect 50 senders */ - for (int i = 0; i < 50; i++) { - senders[i] = icipc_sender_new (address, 16, NULL, NULL, 0); - test_ptr_notnull (senders[i]); - test_bool_true (icipc_sender_connect (senders[i])); - test_bool_true (icipc_sender_is_connected (senders[i])); - } - - /* send 50 messages (1 per sender) */ - data.n_events = 0; - data.expected_data = (const uint8_t *)"hello"; - data.expected_size = 5; - for (int i = 0; i < 50; i++) - test_bool_true (icipc_sender_send (senders[i], (const uint8_t *)"hello", 5, reply_callback, &data)); - wait_for_event (&data, 50); - - /* stop receiver */ - icipc_receiver_stop (r); - - /* clean up */ - pthread_cond_destroy (&data.cond); - pthread_mutex_destroy (&data.mutex); - for (int i = 0; i < 50; i++) - icipc_sender_free (senders[i]); - icipc_receiver_free (r); - free(address); +static void test_icipc_multiple_senders_send() { + char *address = new_address(); + struct icipc_receiver *r = + icipc_receiver_new(address, 16, NULL, NULL, 0); + test_ptr_notnull(r); + struct icipc_sender *senders[50]; + EventData data; + pthread_mutex_init(&data.mutex, NULL); + pthread_cond_init(&data.cond, NULL); + data.n_events = 0; + + /* start receiver */ + test_bool_true(icipc_receiver_start(r)); + + /* create and connect 50 senders */ + for (int i = 0; i < 50; i++) { + senders[i] = icipc_sender_new(address, 16, NULL, NULL, 0); + test_ptr_notnull(senders[i]); + test_bool_true(icipc_sender_connect(senders[i])); + test_bool_true(icipc_sender_is_connected(senders[i])); + } + + /* send 50 messages (1 per sender) */ + data.n_events = 0; + data.expected_data = (const uint8_t *)"hello"; + data.expected_size = 5; + for (int i = 0; i < 50; i++) + test_bool_true(icipc_sender_send + (senders[i], (const uint8_t *)"hello", 5, + reply_callback, &data)); + wait_for_event(&data, 50); + + /* stop receiver */ + icipc_receiver_stop(r); + + /* clean up */ + pthread_cond_destroy(&data.cond); + pthread_mutex_destroy(&data.mutex); + for (int i = 0; i < 50; i++) + icipc_sender_free(senders[i]); + icipc_receiver_free(r); + free(address); } -int -main (int argc, char *argv[]) -{ - test_icipc_receiver_basic(); - test_icipc_sender_basic(); - test_icipc_sender_connect(); - test_icipc_sender_lost_connection(); - test_icipc_sender_send(); - test_icipc_multiple_senders_send(); - return TEST_PASS; +int main(int argc, char *argv[]) { + test_icipc_receiver_basic(); + test_icipc_sender_basic(); + test_icipc_sender_connect(); + test_icipc_sender_lost_connection(); + test_icipc_sender_send(); + test_icipc_multiple_senders_send(); + return TEST_PASS; } |