diff options
Diffstat (limited to 'lib/sender.c')
-rw-r--r-- | lib/sender.c | 433 |
1 files changed, 212 insertions, 221 deletions
diff --git a/lib/sender.c b/lib/sender.c index e795bc4..fa2882e 100644 --- a/lib/sender.c +++ b/lib/sender.c @@ -21,256 +21,247 @@ #define MAX_ASYNC_TASKS 128 -struct icipc_sender_task { - icipc_sender_reply_func_t func; - void *data; -}; +typedef struct SenderTask { + icipc_sender_reply_func_t func; + void *data; +} SenderTask; struct icipc_sender { - struct sockaddr_un addr; - int socket_fd; + struct sockaddr_un addr; + int socket_fd; - uint8_t *buffer_read; - size_t buffer_size; + uint8_t *buffer_read; + size_t buffer_size; - struct epoll_thread epoll_thread; - bool is_connected; + EpollThread epoll_thread; + bool is_connected; - icipc_sender_lost_conn_func_t lost_func; - void *lost_data; - bool lost_connection; + icipc_sender_lost_conn_func_t lost_func; + void *lost_data; + bool lost_connection; - struct icipc_sender_task async_tasks[MAX_ASYNC_TASKS]; + SenderTask async_tasks[MAX_ASYNC_TASKS]; - /* for subclasses */ - void *user_data; + /* for subclasses */ + void *user_data; }; -static int -push_sync_task (struct icipc_sender *self, +static int push_sync_task( + struct icipc_sender *self, icipc_sender_reply_func_t func, - void *data) -{ - size_t i; - for (i = MAX_ASYNC_TASKS; i > 1; i--) { - struct icipc_sender_task *curr = self->async_tasks + i - 1; - struct icipc_sender_task *next = self->async_tasks + i - 2; - if (next->func != NULL && curr->func == NULL) { - curr->func = func; - curr->data = data; - return i - 1; - } else if (i - 2 == 0 && next->func == NULL) { - /* empty queue */ - next->func = func; - next->data = data; - return 0; - } - } - return -1; + void *data) { + size_t i; + for (i = MAX_ASYNC_TASKS; i > 1; i--) { + SenderTask *curr = self->async_tasks + i - 1; + SenderTask *next = self->async_tasks + i - 2; + if (next->func != NULL && curr->func == NULL) { + curr->func = func; + curr->data = data; + return i - 1; + } else if (i - 2 == 0 && next->func == NULL) { + /* empty queue */ + next->func = func; + next->data = data; + return 0; + } + } + return -1; } -static void -pop_sync_task (struct icipc_sender *self, - bool trigger, - bool all, - const uint8_t *buffer, - size_t size) -{ - size_t i; - for (i = 0; i < MAX_ASYNC_TASKS; i++) { - struct icipc_sender_task *task = self->async_tasks + i; - if (task->func != NULL) { - if (trigger) - task->func (self, buffer, size, task->data); - task->func = NULL; - if (!all) - return; - } - } +static void pop_sync_task( + struct icipc_sender *self, + bool trigger, + bool all, + const uint8_t * buffer, + size_t size) { + size_t i; + for (i = 0; i < MAX_ASYNC_TASKS; i++) { + SenderTask *task = self->async_tasks + i; + if (task->func != NULL) { + if (trigger) + task->func(self, buffer, size, task->data); + task->func = NULL; + if (!all) + return; + } + } } -static void -socket_event_received (struct epoll_thread *t, int fd, void *data) -{ - struct icipc_sender *self = data; - - /* receiver sends a reply, read it trigger corresponding task */ - ssize_t size = icipc_socket_read (fd, &self->buffer_read, &self->buffer_size); - if (size <= 0) { - if (size < 0) - icipc_log_error ("sender: could not read reply: %s", strerror(errno)); - /* receiver disconnected */ - epoll_ctl (t->epoll_fd, EPOLL_CTL_DEL, fd, NULL); - shutdown(self->socket_fd, SHUT_RDWR); - self->is_connected = false; - self->lost_connection = true; - if (self->lost_func) - self->lost_func (self, fd, self->lost_data); - /* clear queue */ - pop_sync_task (self, true, true, NULL, 0); - return; - } - - /* trigger async task */ - pop_sync_task (self, true, false, self->buffer_read, size); - return; +static void socket_event_received(EpollThread *t, int fd, void *data) { + struct icipc_sender *self = data; + + /* receiver sends a reply, read it trigger corresponding task */ + ssize_t size = + icipc_socket_read(fd, &self->buffer_read, &self->buffer_size); + if (size <= 0) { + if (size < 0) + icipc_log_error("sender: could not read reply: %s", + strerror(errno)); + /* receiver disconnected */ + epoll_ctl(t->epoll_fd, EPOLL_CTL_DEL, fd, NULL); + shutdown(self->socket_fd, SHUT_RDWR); + self->is_connected = false; + self->lost_connection = true; + if (self->lost_func) + self->lost_func(self, fd, self->lost_data); + /* clear queue */ + pop_sync_task(self, true, true, NULL, 0); + return; + } + + /* trigger async task */ + pop_sync_task(self, true, false, self->buffer_read, size); + return; } /* API */ -struct icipc_sender * -icipc_sender_new (const char *path, - size_t buffer_size, - icipc_sender_lost_conn_func_t lost_func, - void *lost_data, - size_t user_size) -{ - struct icipc_sender *self; - int res; - - if (path == NULL) - return NULL; - - self = calloc (1, sizeof (struct icipc_sender) + user_size); - if (self == NULL) - return NULL; - - self->socket_fd = -1; - - /* set address */ - self->addr.sun_family = AF_LOCAL; - res = icipc_construct_socket_path (path, self->addr.sun_path, sizeof(self->addr.sun_path)); - if (res < 0) - goto error; - - /* create socket */ - self->socket_fd = - socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0); - if (self->socket_fd < 0) - goto error; - - /* alloc buffer read */ - self->buffer_size = buffer_size; - self->buffer_read = calloc (buffer_size, sizeof (uint8_t)); - if (self->buffer_read == NULL) - goto error; - - /* init epoll thread */ - if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd, - socket_event_received, NULL, self)) - goto error; - - self->lost_func = lost_func; - self->lost_data = lost_data; - self->lost_connection = false; - if (user_size > 0) - self->user_data = (void *)((uint8_t *)self + sizeof (struct icipc_sender)); - - return self; - -error: - if (self->buffer_read) - free (self->buffer_read); - if (self->socket_fd != -1) - close (self->socket_fd); - free (self); - return NULL; +struct icipc_sender *icipc_sender_new( + const char *path, + size_t buffer_size, + icipc_sender_lost_conn_func_t lost_func, + void *lost_data, + size_t user_size) { + struct icipc_sender *self; + int res; + + if (path == NULL) + return NULL; + + self = calloc(1, sizeof(struct icipc_sender) + user_size); + if (self == NULL) + return NULL; + + self->socket_fd = -1; + + /* set address */ + self->addr.sun_family = AF_LOCAL; + res = + icipc_construct_socket_path(path, self->addr.sun_path, + sizeof(self->addr.sun_path)); + if (res < 0) + goto error; + + /* create socket */ + self->socket_fd = + socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0); + if (self->socket_fd < 0) + goto error; + + /* alloc buffer read */ + self->buffer_size = buffer_size; + self->buffer_read = calloc(buffer_size, sizeof(uint8_t)); + if (self->buffer_read == NULL) + goto error; + + /* init epoll thread */ + if (!icipc_epoll_thread_init(&self->epoll_thread, self->socket_fd, + socket_event_received, NULL, self)) + goto error; + + self->lost_func = lost_func; + self->lost_data = lost_data; + self->lost_connection = false; + if (user_size > 0) + self->user_data = + (void *)((uint8_t *) self + sizeof(struct icipc_sender)); + + return self; + + error: + if (self->buffer_read) + free(self->buffer_read); + if (self->socket_fd != -1) + close(self->socket_fd); + free(self); + return NULL; } -void -icipc_sender_free (struct icipc_sender *self) -{ - icipc_sender_disconnect (self); +void icipc_sender_free(struct icipc_sender *self) { + icipc_sender_disconnect(self); - icipc_epoll_thread_destroy (&self->epoll_thread); - free (self->buffer_read); - close (self->socket_fd); - free (self); + icipc_epoll_thread_destroy(&self->epoll_thread); + free(self->buffer_read); + close(self->socket_fd); + free(self); } -bool -icipc_sender_connect (struct icipc_sender *self) -{ - if (icipc_sender_is_connected (self)) - return true; - - /* if connection was lost, re-init epoll thread with new socket */ - if (self->lost_connection) { - icipc_epoll_thread_stop (&self->epoll_thread); - icipc_epoll_thread_destroy (&self->epoll_thread); - self->socket_fd = - socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC| SOCK_NONBLOCK, 0); - if (self->socket_fd < 0) - return false; - if (!icipc_epoll_thread_init (&self->epoll_thread, self->socket_fd, - socket_event_received, NULL, self)) { - close (self->socket_fd); - return false; - } - self->lost_connection = false; - } - - /* connect */ - if (connect(self->socket_fd, (struct sockaddr *)&self->addr, - sizeof(self->addr)) == 0 && - icipc_epoll_thread_start (&self->epoll_thread)) { - self->is_connected = true; - return true; - } - - return false; +bool icipc_sender_connect(struct icipc_sender *self) { + if (icipc_sender_is_connected(self)) + return true; + + /* if connection was lost, re-init epoll thread with new socket */ + if (self->lost_connection) { + icipc_epoll_thread_stop(&self->epoll_thread); + icipc_epoll_thread_destroy(&self->epoll_thread); + self->socket_fd = + socket(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, + 0); + if (self->socket_fd < 0) + return false; + if (!icipc_epoll_thread_init + (&self->epoll_thread, self->socket_fd, + socket_event_received, NULL, self)) { + close(self->socket_fd); + return false; + } + self->lost_connection = false; + } + + /* connect */ + if (connect(self->socket_fd, (struct sockaddr *)&self->addr, + sizeof(self->addr)) == 0 && + icipc_epoll_thread_start(&self->epoll_thread)) { + self->is_connected = true; + return true; + } + + return false; } -void -icipc_sender_disconnect (struct icipc_sender *self) -{ - if (icipc_sender_is_connected (self)) { - icipc_epoll_thread_stop (&self->epoll_thread); - shutdown(self->socket_fd, SHUT_RDWR); - self->is_connected = false; - } +void icipc_sender_disconnect(struct icipc_sender *self) { + if (icipc_sender_is_connected(self)) { + icipc_epoll_thread_stop(&self->epoll_thread); + shutdown(self->socket_fd, SHUT_RDWR); + self->is_connected = false; + } } -bool -icipc_sender_is_connected (struct icipc_sender *self) -{ - return self->is_connected; +bool icipc_sender_is_connected(struct icipc_sender *self) { + return self->is_connected; } -bool -icipc_sender_send (struct icipc_sender *self, - const uint8_t *buffer, - size_t size, - icipc_sender_reply_func_t func, - void *data) -{ - int id = -1; - - if (buffer == NULL || size == 0) - return false; - - if (!icipc_sender_is_connected (self)) - return false; - - /* add the task in the queue */ - if (func) { - id = push_sync_task (self, func, data); - if (id == -1) - return false; - } - - /* write buffer and remove task if it fails */ - if (icipc_socket_write (self->socket_fd, buffer, size) <= 0) { - if (id != -1) - self->async_tasks[id].func = NULL; - return false; - } - - return true; +bool icipc_sender_send( + struct icipc_sender *self, + const uint8_t * buffer, + size_t size, + icipc_sender_reply_func_t func, + void *data) { + int id = -1; + + if (buffer == NULL || size == 0) + return false; + + if (!icipc_sender_is_connected(self)) + return false; + + /* add the task in the queue */ + if (func) { + id = push_sync_task(self, func, data); + if (id == -1) + return false; + } + + /* write buffer and remove task if it fails */ + if (icipc_socket_write(self->socket_fd, buffer, size) <= 0) { + if (id != -1) + self->async_tasks[id].func = NULL; + return false; + } + + return true; } -void * -icipc_sender_get_user_data (struct icipc_sender *self) -{ - return self->user_data; +void *icipc_sender_get_user_data(struct icipc_sender *self) { + return self->user_data; } |