diff options
Diffstat (limited to 'lib/server.c')
-rw-r--r-- | lib/server.c | 258 |
1 files changed, 258 insertions, 0 deletions
diff --git a/lib/server.c b/lib/server.c new file mode 100644 index 0000000..a798894 --- /dev/null +++ b/lib/server.c @@ -0,0 +1,258 @@ +/* PipeWire AGL Cluster IPC + * + * Copyright © 2021 Collabora Ltd. + * @author Julian Bouzas <julian.bouzas@collabora.com> + * + * SPDX-License-Identifier: MIT + */ + +#include <pthread.h> + +#include "private.h" +#include "protocol.h" +#include "receiver.h" +#include "server.h" + +#define BUFFER_SIZE 1024 +#define MAX_REQUEST_HANDLERS 128 + +struct icipc_server_client_handler +{ + icipc_server_client_handler_func_t handler; + void *data; +}; + +struct icipc_server_request_handler +{ + const char *name; + icipc_server_request_handler_func_t handler; + void *data; +}; + +struct icipc_server_priv { + pthread_mutex_t mutex; + struct icipc_server_client_handler client_handler; + size_t n_request_handlers; + struct icipc_server_request_handler request_handlers[MAX_REQUEST_HANDLERS]; +}; + +static void +sender_state (struct icipc_receiver *base, + int sender_fd, + enum icipc_receiver_sender_state sender_state, + void *data) +{ + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + + icipc_log_info ("server: new state %d on client %d", sender_state, sender_fd); + + pthread_mutex_lock (&priv->mutex); + if (priv->client_handler.handler) + priv->client_handler.handler ((struct icipc_server *)base, sender_fd, + sender_state, priv->client_handler.data); + pthread_mutex_unlock (&priv->mutex); +} + +static bool +handle_message (struct icipc_receiver *base, + int sender_fd, + const uint8_t *buffer, + size_t size, + void *data) +{ + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + const char *name = NULL; + const struct spa_pod *args = NULL; + + icipc_log_info ("server: message from client %d received", sender_fd); + + /* parse */ + if (!icipc_protocol_parse_request (buffer, size, &name, &args)) { + const char *msg = "could not parse request"; + const size_t s = icipc_protocol_calculate_reply_error_size (msg); + uint8_t b[s]; + icipc_protocol_build_reply_error (b, s, msg); + return icipc_socket_write (sender_fd, b, s) == (ssize_t)s; + } + + /* handle */ + size_t i; + bool res = false; + pthread_mutex_lock (&priv->mutex); + + for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { + struct icipc_server_request_handler *rh = priv->request_handlers + i; + if (rh->name != NULL && strcmp (rh->name, name) == 0 && + rh->handler != NULL) { + res = rh->handler ((struct icipc_server *)base, sender_fd, name, args, + rh->data); + pthread_mutex_unlock (&priv->mutex); + return res; + } + } + + /* handler was not found, reply with error */ + res = icipc_server_reply_error ((struct icipc_server *)base, sender_fd, + "request handler not found"); + + pthread_mutex_unlock (&priv->mutex); + return res; +} + +static struct icipc_receiver_events events = { + .sender_state = sender_state, + .handle_message = handle_message, +}; + +/* API */ + +struct icipc_server * +icipc_server_new (const char *path, bool start) +{ + struct icipc_server_priv * priv = NULL; + struct icipc_receiver *base = NULL; + + base = icipc_receiver_new (path, BUFFER_SIZE, &events, NULL, + sizeof (struct icipc_server_priv)); + if (base == NULL) + return NULL; + + priv = icipc_receiver_get_user_data (base); + pthread_mutex_init (&priv->mutex, NULL); + priv->n_request_handlers = 0; + + if (start) + icipc_receiver_start (base); + + return (struct icipc_server *)base; +} + +void +icipc_server_free (struct icipc_server *self) +{ + struct icipc_receiver *base = icipc_server_to_receiver (self); + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + + pthread_mutex_destroy (&priv->mutex); + + icipc_receiver_free (base); +} + +void +icipc_server_set_client_handler (struct icipc_server *self, + icipc_server_client_handler_func_t handler, + void *data) +{ + struct icipc_receiver *base = icipc_server_to_receiver (self); + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + + pthread_mutex_lock (&priv->mutex); + priv->client_handler.handler = handler; + priv->client_handler.data = data; + pthread_mutex_unlock (&priv->mutex); +} + +void +icipc_server_clear_client_handler (struct icipc_server *self) +{ + struct icipc_receiver *base = icipc_server_to_receiver (self); + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + + pthread_mutex_lock (&priv->mutex); + priv->client_handler.handler = NULL; + priv->client_handler.data = NULL; + pthread_mutex_unlock (&priv->mutex); +} + +bool +icipc_server_set_request_handler (struct icipc_server *self, + const char *name, + icipc_server_request_handler_func_t handler, + void *data) +{ + struct icipc_receiver *base = icipc_server_to_receiver (self); + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + size_t i; + + /* check params */ + if (name == NULL) + return false; + + pthread_mutex_lock (&priv->mutex); + + /* make sure handler does not exist */ + for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { + struct icipc_server_request_handler *rh = priv->request_handlers + i; + if (rh->name != NULL && strcmp (rh->name, name) == 0) { + pthread_mutex_unlock (&priv->mutex); + return false; + } + } + + /* set handler */ + for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { + struct icipc_server_request_handler *rh = priv->request_handlers + i; + if (rh->name == NULL) { + rh->name = name; + rh->handler = handler; + rh->data = data; + pthread_mutex_unlock (&priv->mutex); + return true; + } + } + + pthread_mutex_unlock (&priv->mutex); + + return false; +} + +void +icipc_server_clear_request_handler (struct icipc_server *self, + const char *name) +{ + struct icipc_receiver *base = icipc_server_to_receiver (self); + struct icipc_server_priv *priv = icipc_receiver_get_user_data (base); + size_t i; + + /* check params */ + if (name == NULL) + return; + + pthread_mutex_lock (&priv->mutex); + + /* clear handler */ + for (i = 0; i < MAX_REQUEST_HANDLERS; i++) { + struct icipc_server_request_handler *rh = priv->request_handlers + i; + if (rh->name != NULL && strcmp (rh->name, name) == 0) { + rh->name = NULL; + break; + } + } + + pthread_mutex_unlock (&priv->mutex); +} + +bool +icipc_server_reply_ok (struct icipc_server *self, + int client_fd, + const struct spa_pod *value) +{ + const size_t s = icipc_protocol_calculate_reply_ok_size (value); + uint8_t b[s]; + icipc_protocol_build_reply_ok (b, s, value); + return icipc_socket_write (client_fd, b, s) == (ssize_t)s; +} + +bool +icipc_server_reply_error (struct icipc_server *self, + int client_fd, + const char *msg) +{ + if (msg == NULL) + return false; + + const size_t s = icipc_protocol_calculate_reply_error_size (msg); + uint8_t b[s]; + icipc_protocol_build_reply_error (b, s, msg); + return icipc_socket_write (client_fd, b, s) == (ssize_t)s; +} |