summaryrefslogtreecommitdiffstats
path: root/tests/client-server.c
blob: 1345ca93644845bdf4884158f81604aa8d4e50c3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/* PipeWire AGL Cluster IPC
 *
 * Copyright © 2021 Collabora Ltd.
 *    @author Julian Bouzas <julian.bouzas@collabora.com>
 *
 * SPDX-License-Identifier: MIT
 */

#include "test.h"
#include "data.h"
#include <icipc.h>
#include <unistd.h>
#include <pthread.h>

typedef struct DataInt {
        struct icipc_data hdr;
        int value;
} DataInt;

static inline char *new_address() {
        char *address = NULL;
        (void)asprintf(&address, "icipc-test-%d-%d", getpid(), rand());
        test_ptr_notnull(address);
        return address;
}

static bool increment_request_handler(
                struct icipc_server *self,
                int client_fd,
                const char *name,
                const struct icipc_data *args,
                void *data) {
        int32_t val = 0;
        test_cmpint(args->type, ==, (uint32_t) DATA_TYPE_INT);
        test_cmpint(ICIPC_DATA_BODY_SIZE(args), ==,
                    ROUND_UP_TO_ALIGN(sizeof(int)));
        val = *((const int*)ICIPC_DATA_BODY_CONST(args));

        DataInt res = {
                .hdr.size = sizeof(int),
                .hdr.type = DATA_TYPE_INT,
                .value = val + 1,
        };
        return icipc_server_reply_ok(self, client_fd, (struct icipc_data *)&res);
}

static bool error_request_handler(
                struct icipc_server *self,
                int client_fd,
                const char *name,
                const struct icipc_data *args,
                void *data) {
        return icipc_server_reply_error(self, client_fd, "error message");
}

typedef struct ReplyData {
        int32_t incremented;
        const char *error;
        int n_replies;
        pthread_mutex_t mutex;
        pthread_cond_t cond;
} ReplyData;

static void wait_for_reply(ReplyData *data, int n_replies) {
        pthread_mutex_lock(&data->mutex);
        while (data->n_replies < n_replies)
                pthread_cond_wait(&data->cond, &data->mutex);
        pthread_mutex_unlock(&data->mutex);
}

static void reply_handler(
                struct icipc_sender *self,
                const uint8_t * buffer,
                size_t size,
                void *p) {
        ReplyData *data = p;
        test_ptr_notnull(data);

        pthread_mutex_lock(&data->mutex);

        const struct icipc_data *args =
            icipc_client_send_request_finish(self, buffer, size, &data->error);
        if (args) {
                test_cmpint(args->type, ==, (uint32_t) DATA_TYPE_INT);
                test_cmpint(ICIPC_DATA_BODY_SIZE(args), ==,
                            ROUND_UP_TO_ALIGN(sizeof(int)));
                data->incremented = *((const int*)ICIPC_DATA_BODY_CONST(args));
        }
        data->n_replies++;
        pthread_cond_signal(&data->cond);

        pthread_mutex_unlock(&data->mutex);
}

static void test_icipc_server_client() {
        char *address = new_address();
        struct icipc_server *s = icipc_server_new(address, true);
        test_ptr_notnull(s);
        struct icipc_client *c = icipc_client_new(address, true);
        test_ptr_notnull(c);
        ReplyData data;
        pthread_mutex_init(&data.mutex, NULL);
        pthread_cond_init(&data.cond, NULL);

        /* add request handlers */
        test_bool_true(icipc_server_set_request_handler
                       (s, "INCREMENT", increment_request_handler, NULL));
        test_bool_true(icipc_server_set_request_handler
                       (s, "ERROR", error_request_handler, NULL));

        /* send an INCREMENT request of 3, and make sure the returned value is 4 */
        data.incremented = -1;
        data.error = NULL;
        data.n_replies = 0;
        DataInt i = {
                .hdr.size = sizeof(int),
                .hdr.type = DATA_TYPE_INT,
                .value = 3,
        };
        test_bool_true(icipc_client_send_request
                       (c, "INCREMENT", (struct icipc_data *)&i, reply_handler,
                        &data));
        wait_for_reply(&data, 1);
        test_ptr_null(data.error);
        test_cmpint(data.incremented, ==, 4);

        /* send an ERROR request, and make sure the returned value is an error */
        data.error = NULL;
        data.n_replies = 0;
        test_bool_true(icipc_client_send_request
                       (c, "ERROR", NULL, reply_handler, &data));
        wait_for_reply(&data, 1);
        test_str_eq(data.error, "error message");

        /* send an unhandled request, and make sure the server replies with an error */
        data.error = NULL;
        data.n_replies = 0;
        test_bool_true(icipc_client_send_request
                       (c, "UNHANDLED-REQUEST", NULL, reply_handler, &data));
        wait_for_reply(&data, 1);
        test_str_eq(data.error, "request handler not found");

        /* clean up */
        pthread_cond_destroy(&data.cond);
        pthread_mutex_destroy(&data.mutex);
        icipc_client_free(c);
        icipc_server_free(s);
        free(address);
}

int main(int argc, char *argv[]) {
        test_icipc_server_client();
        return TEST_PASS;
}