From dffeeeb74f5b0a0385c4131b7ff349ff04fcebce Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 1 Oct 2019 12:53:56 +0200 Subject: [PATCH] protocol: improve flushing Use the IO_OUT flag to schedule flushing instead of a flush_event. Handle EGAIN and wait for IO_OUT to try again. Fixes #111 Upstream-Status: Backport [cc8e992cd155b4f19312a5036c7b744fc547410f] --- src/modules/module-protocol-native.c | 99 ++++++++++++------- .../module-protocol-native/connection.c | 2 - 2 files changed, 62 insertions(+), 39 deletions(-) diff --git a/src/modules/module-protocol-native.c b/src/modules/module-protocol-native.c index cc8501d5..be2d4f57 100644 --- a/src/modules/module-protocol-native.c +++ b/src/modules/module-protocol-native.c @@ -82,9 +82,8 @@ struct client { struct pw_protocol_native_connection *connection; struct spa_hook conn_listener; - struct spa_source *flush_event; unsigned int disconnecting:1; - unsigned int flush_signaled:1; + unsigned int flushing:1; }; struct server { @@ -106,6 +105,7 @@ struct client_data { struct spa_source *source; struct pw_protocol_native_connection *connection; unsigned int busy:1; + unsigned int need_flush:1; }; static void @@ -200,12 +200,14 @@ client_busy_changed(void *data, bool busy) { struct client_data *c = data; struct pw_client *client = c->client; - uint32_t mask = SPA_IO_ERR | SPA_IO_HUP; + uint32_t mask = c->source->mask; c->busy = busy; - if (!busy) - mask |= SPA_IO_IN; + if (busy) + SPA_FLAG_UNSET(mask, SPA_IO_IN); + else + SPA_FLAG_SET(mask, SPA_IO_IN); pw_log_debug(NAME" %p: busy changed %d", client->protocol, busy); pw_loop_update_io(client->core->main_loop, c->source, mask); @@ -220,13 +222,32 @@ connection_data(void *data, int fd, uint32_t mask) { struct client_data *this = data; struct pw_client *client = this->client; + int res; - if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { + if (mask & SPA_IO_HUP) { pw_log_info(NAME" %p: client %p disconnected", client->protocol, client); pw_client_destroy(client); return; } - + if (mask & SPA_IO_ERR) { + pw_log_error(NAME" %p: client %p error", client->protocol, client); + pw_client_destroy(client); + return; + } + if (mask & SPA_IO_OUT) { + res = pw_protocol_native_connection_flush(this->connection); + if (res >= 0) { + int mask = this->source->mask; + SPA_FLAG_UNSET(mask, SPA_IO_OUT); + pw_loop_update_io(client->protocol->core->main_loop, + this->source, mask); + } else if (res != EAGAIN) { + pw_log_error("client %p: could not flush: %s", + client, spa_strerror(res)); + pw_client_destroy(client); + return; + } + } if (mask & SPA_IO_IN) process_messages(this); } @@ -296,7 +317,8 @@ static struct pw_client *client_new(struct server *s, int fd) this->client = client; this->source = pw_loop_add_io(pw_core_get_main_loop(core), - fd, SPA_IO_ERR | SPA_IO_HUP, true, connection_data, this); + fd, SPA_IO_ERR | SPA_IO_HUP, true, + connection_data, this); if (this->source == NULL) goto cleanup_client; @@ -408,7 +430,7 @@ socket_data(void *data, int fd, uint32_t mask) if (!client->busy) pw_loop_update_io(client->protocol->core->main_loop, - c->source, SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP); + c->source, c->source->mask | SPA_IO_IN); } static int add_socket(struct pw_protocol *protocol, struct server *s) @@ -514,6 +536,17 @@ on_remote_data(void *data, int fd, uint32_t mask) res = -EPIPE; goto error; } + if (mask & SPA_IO_OUT) { + res = pw_protocol_native_connection_flush(conn); + if (res >= 0) { + int mask = impl->source->mask; + SPA_FLAG_UNSET(mask, SPA_IO_OUT); + pw_loop_update_io(core->main_loop, + impl->source, mask); + impl->flushing = false; + } else if (res != EAGAIN) + goto error; + } if (mask & SPA_IO_IN) { const struct pw_protocol_native_message *msg; @@ -588,23 +621,17 @@ error: } -static void do_flush_event(void *data, uint64_t count) -{ - struct client *impl = data; - impl->flush_signaled = false; - if (impl->connection) - if (pw_protocol_native_connection_flush(impl->connection) < 0) - impl->this.disconnect(&impl->this); -} - static void on_need_flush(void *data) { struct client *impl = data; struct pw_remote *remote = impl->this.remote; - if (!impl->flush_signaled) { - impl->flush_signaled = true; - pw_loop_signal_event(remote->core->main_loop, impl->flush_event); + if (!impl->flushing) { + int mask = impl->source->mask; + impl->flushing = true; + SPA_FLAG_SET(mask, SPA_IO_OUT); + pw_loop_update_io(remote->core->main_loop, + impl->source, mask); } } @@ -669,12 +696,9 @@ static void impl_disconnect(struct pw_protocol_client *client) static void impl_destroy(struct pw_protocol_client *client) { struct client *impl = SPA_CONTAINER_OF(client, struct client, this); - struct pw_remote *remote = client->remote; impl_disconnect(client); - pw_loop_destroy_source(remote->core->main_loop, impl->flush_event); - spa_list_remove(&client->link); free(impl); } @@ -687,7 +711,6 @@ impl_new_client(struct pw_protocol *protocol, struct client *impl; struct pw_protocol_client *this; const char *str = NULL; - int res; if ((impl = calloc(1, sizeof(struct client))) == NULL) return NULL; @@ -711,20 +734,9 @@ impl_new_client(struct pw_protocol *protocol, this->disconnect = impl_disconnect; this->destroy = impl_destroy; - impl->flush_event = pw_loop_add_event(remote->core->main_loop, do_flush_event, impl); - if (impl->flush_event == NULL) { - res = -errno; - goto error_cleanup; - } - spa_list_append(&protocol->client_list, &this->link); return this; - -error_cleanup: - free(impl); - errno = -res; - return NULL; } static void destroy_server(struct pw_protocol_server *server) @@ -757,10 +769,23 @@ static void on_before_hook(void *_data) struct pw_protocol_server *this = &server->this; struct pw_client *client, *tmp; struct client_data *data; + int res; spa_list_for_each_safe(client, tmp, &this->client_list, protocol_link) { data = client->user_data; - pw_protocol_native_connection_flush(data->connection); + + res = pw_protocol_native_connection_flush(data->connection); + if (res == -EAGAIN) { + int mask = data->source->mask; + SPA_FLAG_SET(mask, SPA_IO_OUT); + pw_loop_update_io(client->protocol->core->main_loop, + data->source, mask); + } else if (res < 0) { + pw_log_warn("client %p: could not flush: %s", + data->client, spa_strerror(res)); + pw_client_destroy(client); + } + } } diff --git a/src/modules/module-protocol-native/connection.c b/src/modules/module-protocol-native/connection.c index 7b6cf112..116457e3 100644 --- a/src/modules/module-protocol-native/connection.c +++ b/src/modules/module-protocol-native/connection.c @@ -502,8 +502,6 @@ int pw_protocol_native_connection_flush(struct pw_protocol_native_connection *co continue; else { res = -errno; - pw_log_error("could not sendmsg on fd:%d n_fds:%d: %s", - conn->fd, n_fds, spa_strerror(res)); goto exit; } } -- 2.23.0