/* PipeWire AGL Cluster IPC * * Copyright © 2021 Collabora Ltd. * @author Julian Bouzas * * SPDX-License-Identifier: MIT */ #define _GNU_SOURCE #include "test.h" #include #include #include typedef struct EventData { const uint8_t *expected_data; size_t expected_size; int connections; int n_events; pthread_mutex_t mutex; pthread_cond_t cond; } EventData; static inline char *new_address() { char *address = NULL; (void)asprintf(&address, "icipc-test-%d-%d", getpid(), rand()); test_ptr_notnull(address); return address; } static void wait_for_event(EventData *data, int n_events) { pthread_mutex_lock(&data->mutex); while (data->n_events < n_events) pthread_cond_wait(&data->cond, &data->mutex); pthread_mutex_unlock(&data->mutex); } static void sender_state_callback( struct icipc_receiver *self, int sender_fd, enum icipc_receiver_sender_state sender_state, void *p) { EventData *data = p; test_ptr_notnull(data); pthread_mutex_lock(&data->mutex); switch (sender_state) { case ICIPC_RECEIVER_SENDER_STATE_CONNECTED: data->connections++; break; case ICIPC_RECEIVER_SENDER_STATE_DISCONNECTED: data->connections--; break; default: test_fail_if_reached(); break; } data->n_events++; pthread_cond_signal(&data->cond); pthread_mutex_unlock(&data->mutex); } static void reply_callback( struct icipc_sender *self, const uint8_t * buffer, size_t size, void *p) { EventData *data = p; test_ptr_notnull(data); test_ptr_notnull(buffer); pthread_mutex_lock(&data->mutex); test_cmpint(size, ==, data->expected_size); test_cmpint(memcmp(buffer, data->expected_data, size), ==, 0); data->n_events++; pthread_cond_signal(&data->cond); pthread_mutex_unlock(&data->mutex); } static void test_icipc_receiver_basic() { char *address = new_address(); struct icipc_receiver *r = icipc_receiver_new(address, 16, NULL, NULL, 0); test_ptr_notnull(r); /* start and stop */ test_bool_false(icipc_receiver_is_running(r)); test_bool_true(icipc_receiver_start(r)); test_bool_true(icipc_receiver_is_running(r)); icipc_receiver_stop(r); test_bool_false(icipc_receiver_is_running(r)); /* clean up */ icipc_receiver_free(r); free(address); } static void test_icipc_sender_basic() { char *address = new_address(); struct icipc_sender *s = icipc_sender_new(address, 16, NULL, NULL, 0); test_ptr_notnull(s); /* clean up */ icipc_sender_free(s); free(address); } static void test_icipc_sender_connect() { static struct icipc_receiver_events events = { .sender_state = sender_state_callback, .handle_message = NULL, }; EventData data = { 0 }; pthread_mutex_init(&data.mutex, NULL); pthread_cond_init(&data.cond, NULL); char *address = new_address(); struct icipc_receiver *r = icipc_receiver_new(address, 16, &events, &data, 0); test_ptr_notnull(r); struct icipc_sender *s = icipc_sender_new(address, 16, NULL, NULL, 0); test_ptr_notnull(s); /* start receiver */ test_bool_true(icipc_receiver_start(r)); /* connect sender */ test_bool_true(icipc_sender_connect(s)); test_bool_true(icipc_sender_is_connected(s)); wait_for_event(&data, 1); test_cmpint(data.connections, ==, 1); /* disconnect sender */ icipc_sender_disconnect(s); test_bool_false(icipc_sender_is_connected(s)); wait_for_event(&data, 2); test_cmpint(data.connections, ==, 0); /* stop receiver */ icipc_receiver_stop(r); /* clean up */ pthread_cond_destroy(&data.cond); pthread_mutex_destroy(&data.mutex); icipc_sender_free(s); icipc_receiver_free(r); free(address); } static void lost_connection_handler( struct icipc_sender *self, int receiver_fd, void *p) { EventData *data = p; test_ptr_notnull(data); pthread_mutex_lock(&data->mutex); data->n_events++; pthread_cond_signal(&data->cond); pthread_mutex_unlock(&data->mutex); } static void test_icipc_sender_lost_connection() { EventData data = { 0 }; pthread_mutex_init(&data.mutex, NULL); pthread_cond_init(&data.cond, NULL); char *address = new_address(); struct icipc_receiver *r = icipc_receiver_new(address, 16, NULL, NULL, 0); test_ptr_notnull(r); struct icipc_sender *s = icipc_sender_new(address, 16, lost_connection_handler, &data, 0); test_ptr_notnull(s); /* connect sender */ test_bool_true(icipc_sender_connect(s)); test_bool_true(icipc_sender_is_connected(s)); /* destroy receiver and make sure the lost connection handler is triggered */ data.n_events = 0; icipc_receiver_free(r); wait_for_event(&data, 1); /* make sure the connection was lost */ test_bool_false(icipc_sender_is_connected(s)); /* create a new receiver */ struct icipc_receiver *r2 = icipc_receiver_new(address, 16, NULL, NULL, 0); test_ptr_notnull(r2); /* re-connect sender with new receiver */ test_bool_true(icipc_sender_connect(s)); test_bool_true(icipc_sender_is_connected(s)); /* clean up */ pthread_cond_destroy(&data.cond); pthread_mutex_destroy(&data.mutex); icipc_sender_free(s); icipc_receiver_free(r2); free(address); } static void test_icipc_sender_send() { char *address = new_address(); struct icipc_receiver *r = icipc_receiver_new(address, 2, NULL, NULL, 0); test_ptr_notnull(r); struct icipc_sender *s = icipc_sender_new(address, 2, NULL, NULL, 0); test_ptr_notnull(s); EventData data = { 0 }; pthread_mutex_init(&data.mutex, NULL); pthread_cond_init(&data.cond, NULL); /* start receiver */ test_bool_true(icipc_receiver_start(r)); /* connect */ test_bool_true(icipc_sender_connect(s)); test_bool_true(icipc_sender_is_connected(s)); /* send 1 byte message (should not realloc) */ data.n_events = 0; data.expected_data = (const uint8_t *)"h"; data.expected_size = 1; test_bool_true(icipc_sender_send (s, (const uint8_t *)"h", 1, reply_callback, &data)); wait_for_event(&data, 1); /* send 2 bytes message (should realloc once to 4) */ data.n_events = 0; data.expected_data = (const uint8_t *)"hi"; data.expected_size = 2; test_bool_true(icipc_sender_send (s, (const uint8_t *)"hi", 2, reply_callback, &data)); wait_for_event(&data, 1); /* send 3 bytes message (should not realloc) */ data.n_events = 0; data.expected_data = (const uint8_t *)"hii"; data.expected_size = 3; test_bool_true(icipc_sender_send (s, (const uint8_t *)"hii", 3, reply_callback, &data)); wait_for_event(&data, 1); /* send 28 bytes message (should realloc 3 times: first to 8, then to 16 and finally to 32) */ data.n_events = 0; data.expected_data = (const uint8_t *)"bigger than 16 bytes message"; data.expected_size = 28; test_bool_true(icipc_sender_send (s, (const uint8_t *)"bigger than 16 bytes message", 28, reply_callback, &data)); wait_for_event(&data, 1); /* don't allow empty messages */ data.n_events = 0; test_bool_false(icipc_sender_send (s, (const uint8_t *)"", 0, NULL, NULL)); /* stop receiver */ icipc_receiver_stop(r); /* clean up */ pthread_cond_destroy(&data.cond); pthread_mutex_destroy(&data.mutex); icipc_sender_free(s); icipc_receiver_free(r); free(address); } static void test_icipc_multiple_senders_send() { char *address = new_address(); struct icipc_receiver *r = icipc_receiver_new(address, 16, NULL, NULL, 0); test_ptr_notnull(r); struct icipc_sender *senders[50]; EventData data; pthread_mutex_init(&data.mutex, NULL); pthread_cond_init(&data.cond, NULL); data.n_events = 0; /* start receiver */ test_bool_true(icipc_receiver_start(r)); /* create and connect 50 senders */ for (int i = 0; i < 50; i++) { senders[i] = icipc_sender_new(address, 16, NULL, NULL, 0); test_ptr_notnull(senders[i]); test_bool_true(icipc_sender_connect(senders[i])); test_bool_true(icipc_sender_is_connected(senders[i])); } /* send 50 messages (1 per sender) */ data.n_events = 0; data.expected_data = (const uint8_t *)"hello"; data.expected_size = 5; for (int i = 0; i < 50; i++) test_bool_true(icipc_sender_send (senders[i], (const uint8_t *)"hello", 5, reply_callback, &data)); wait_for_event(&data, 50); /* stop receiver */ icipc_receiver_stop(r); /* clean up */ pthread_cond_destroy(&data.cond); pthread_mutex_destroy(&data.mutex); for (int i = 0; i < 50; i++) icipc_sender_free(senders[i]); icipc_receiver_free(r); free(address); } int main(int argc, char *argv[]) { test_icipc_receiver_basic(); test_icipc_sender_basic(); test_icipc_sender_connect(); test_icipc_sender_lost_connection(); test_icipc_sender_send(); test_icipc_multiple_senders_send(); return TEST_PASS; }